using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using glue.Tasks; using glue.Collections.XSpinLock; using glue; using glue.Debugging; #pragma warning disable 0649 namespace agree.Parse { /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// /// S U B S C R I P T I O N C H A R T /// /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// abstract public partial class SubscriptionChart : TaskCancellationChart { internal SubscriptionChart(ParserConfiguration config, ISysObj so_owner, String source_text, int c_cols) : base(config, so_owner, source_text, c_cols) { this.srcs_leftof = new LeftNotificationSource[c_cols - 1]; this.srcs_rightof = new RightNotificationSource[c_cols - 1]; for (int i = 0; i < c_cols - 1; i++) { srcs_leftof[i] = new LeftNotificationSource(this, i + 1); srcs_rightof[i] = new RightNotificationSource(this, i); } Nop.X(fsp1, fsp2); } FalseSharing.Padding60 fsp1; public int c_unif = 0; FalseSharing.Padding60 fsp2; LeftNotificationSource[] srcs_leftof; RightNotificationSource[] srcs_rightof; internal int SubscribeRightOf(int i_col, ActiveEdge ace) { return srcs_rightof[i_col].Subscribe(ace); } internal int SubscribeLeftOf(int i_col, ActiveEdge ace) { return srcs_leftof[i_col - 1].Subscribe(ace); } protected abstract void GenerateRules(IParseChartEdge pce); /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// Accept new edges into the chart. Atomically with insertion into the chart, stamp the edge with an interlocking /// sequence barrier in order to separate interested parties into two groups: those who registered previously--and /// who will receive the edge now, versus those who register at any point afterwards, who will receive the edge via /// the retroactive edge delivery mechanism which is part of the subscription process. /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #if false internal Task AddEdgeAsync(PassiveEdge pce) { return StartAttachedChildTask(AddEdgeInner, pce); } #endif internal void AddEdgeInner(IParseChartEdge pce) { if (ParseTime.TotalSeconds > config.ItemTimeoutSeconds) throw new ParseTimeoutException(); // add edge to chart with its default inert sequence stamp. this stamps it with a sequence barrier AddEdge(pce); // now that there's a sequence ID, print out some stuff ChartEdgeReport(pce); // a task for generating new rules StartAttachedChildTask(GenerateRules, pce); int cu = 0; // send the edge to interested parties: // 1. parties for which this item is to the right int i_start = pce.ChartSpan.StartIndex; if (i_start > 0) cu += srcs_rightof[i_start - 1].SendToAllAsync(pce); // 2. parties for which this item is to the left int i_end = pce.ChartSpan.EndIndex; if (i_end < ColumnCount - 1) cu += srcs_leftof[(i_end - 1) + 1].SendToAllAsync(pce); if (cu > 0) Interlocked.Add(ref c_unif, cu); } /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// Clearing subscribers allows references to active edges which were not productive during the parse to be released /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// int shutdown = 0; protected void ClearSubscribers() { if (shutdown == 0 && Interlocked.CompareExchange(ref shutdown, 1, 0) == 0) { foreach (NotificationSource ns in srcs_leftof.AsEnumerable<NotificationSource>().Concat(srcs_rightof)) foreach (ActiveEdge ae in ns.EnumerateSeq(AtomicStamp.InertValue)) ae.Dispose(); srcs_leftof = null; srcs_rightof = null; } } }; }