using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using glue.Collections.XSpinLock; using glue.Tasks; using glue.Extensions.Enumerable; namespace agree.Parse { abstract public partial class SubscriptionChart : TaskCancellationChart { /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// /// N O T I F I C A T I O N S O U R C E /// /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// [DebuggerDisplay("Right of {i_col}: {this.Count} subscribers")] abstract internal class NotificationSource : AtomicSequenceList<ActiveEdge> { readonly protected SubscriptionChart m_chart; readonly protected int i_col; protected abstract IEnumerable<IParseChartEdge> GetRetroactiveEdges(uint i_seq); /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// public NotificationSource(SubscriptionChart chart, int i_col) : base(chart.AtomicStamp) { this.m_chart = chart; this.i_col = i_col; } /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// This is the normal (non-retroactive) subscription delivery mechanism. /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// internal int SendToAllAsync(IParseChartEdge pce) { int cu = 0; foreach (ActiveEdge ace in this.EnumerateSeq(pce.SequenceId)) cu += ace.TryMatchEdge(pce); return cu; } /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// Add the target to the list of subscribers. Because rule_targets is a list that is interlocked with the /// ChartEdge stamping sequence, Add() atomically imposes an (arbitrary) partition upon all past (and future) /// ChartEdges such that those which arrive prior to the sequencing point are sent by the retroactive mechanism, /// and ChartEdges sent afterwards will be posted to the target via the subscription delivery mechanism. /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// public int Subscribe(ActiveEdge ace) { if (!ace.IsInert(m_chart.AtomicStamp)) throw new InvalidOperationException("Target is already active; cannot support atomic activation."); // add and set a sequence barrier this.Add(ace); // Using the sequence stamp, only send retroactive ChartEdges to the newly connected target. int cu = 0; foreach (IParseChartEdge pce in GetRetroactiveEdges(ace.SequenceId)) cu += ace.TryMatchEdge(pce); return cu; } }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// /// /// /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// internal class LeftNotificationSource : NotificationSource { public LeftNotificationSource(SubscriptionChart chart, int i_left_of) : base(chart, i_left_of) { } protected override IEnumerable<IParseChartEdge> GetRetroactiveEdges(uint i_seq) { return m_chart.GetLeftAlignedEdges(i_col - 1, i_seq); } }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// /// /// /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// internal class RightNotificationSource : NotificationSource { public RightNotificationSource(SubscriptionChart chart, int i_right_of) : base(chart, i_right_of) { } protected override IEnumerable<IParseChartEdge> GetRetroactiveEdges(uint i_seq) { return m_chart.GetRightAlignedEdges(i_col + 1, i_seq); } }; }; }