//#define FL_COUNT #define GLOBAL_COUNT #define FL_MULTI using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; using glue; using glue.Collections.LockFreeDictionary; using glue.Debugging; #pragma warning disable 0420, 0414, 0649 namespace agree { /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// sealed public class LockFreeTray : Tray { public LockFreeTray(int tix, TypeMgr tm, int next_mark, int next_id) : base(tix, tm, next_mark, next_id) { for (int i = 0; i < Pools.Length; i++) Pools[i] = new LockFreeConstraintPool(this, i); } /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// /// /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// const int DefaultCapacity = 35; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// [StructLayout(LayoutKind.Auto, Pack = 64)] sealed class LockFreeConstraintPool : ConstraintPool { struct KeyInfo { internal int gx_found; internal int i_found; internal int gx_last; internal bool Found { get { return gx_found != -1; } } }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// struct Entry { internal int gx_next; internal int mark; internal Edge edge; }; public LockFreeConstraintPool(Tray tr, int i_feat) : base(tr, i_feat) { Nop.X(fsp1, fsp2, fsp3, fsp4, fsp6); #if GLOBAL_COUNT Nop.X(fsp5); #endif EntriesPerBlockPower = 12; EntriesPerBlock = 1 << EntriesPerBlockPower; GlobalIndexMask = EntriesPerBlock - 1; entry_blocks = new Entry[0x10][]; entry_blocks[0] = new Entry[EntriesPerBlock]; #if FL_MULTI uint free_list_count; //free_list_count = 4; //free_list_count = BucketList.HashFriendly((uint)Environment.ProcessorCount * 2); //free_list_count = BucketList.HashFriendly((uint)Environment.ProcessorCount / 2); free_list_count = (uint)(Environment.ProcessorCount / 2) | 1; //free_list_count = (uint)Environment.ProcessorCount | 1; Console.Write("{0} ", free_list_count); free_lists = new Freelist[free_list_count]; for (int i = 0; i < free_lists.Length; i++) { //var o = new FalseSharing.Padding2048(); free_lists[i] = new Freelist(this); //Nop.X(o); } #else free_list = new Freelist(this); Console.Write("1 "); #endif m_buckets = new BucketListPrimary(this, DefaultCapacity); } LockFreeConstraintPool TrayPool { get { return (LockFreeConstraintPool)tr.Pools[i_feat]; } } int EntriesPerBlockPower; int EntriesPerBlock; int GlobalIndexMask; /// each EntryBlock stores a fixed number of entries FalseSharing.Padding120 fsp1; Entry[][] entry_blocks; /// entries have a global index across all EntryBlocks FalseSharing.Padding120 fsp2; int m_gx_next; /// a pair of bucket arrays that can be atomically swapped FalseSharing.Padding120 fsp3; volatile BucketList m_buckets; FalseSharing.Padding120 fsp4; static int next_list = 0; #if GLOBAL_COUNT FalseSharing.Padding120 fsp5; internal int m_count = 0; #endif FalseSharing.Padding120 fsp6; #if FL_MULTI /// a small number of free lists Freelist[] free_lists; #else Freelist free_list; #endif internal enum TryResult { None = 0, Expired, Impossible, Ok }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// internal bool IsCurrent { get { return this == this.TrayPool; } } /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// public override Edge GetEdge(Int32 m) { /// using mark as hash, and hash zero also indicates a freelist entry so ensure that it's not used Debug.Assert(m != 0); int mtid = -1; LockFreeConstraintPool.BucketListResize blr = m_buckets as LockFreeConstraintPool.BucketListResize; if (blr != null) blr._check_assist(ref mtid); LockFreeConstraintPool.BucketList.BucketHead bh; Edge c; bool f_found; while (!m_buckets.GetBucketHead(ref mtid, m, out bh) || !bh.TryGetValueForKey(m, out f_found, out c)) ; return c; } /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// public override bool TryGetEdge(int m, out Edge e) { Debug.Assert(m != 0); int mtid = -1; LockFreeConstraintPool.BucketListResize blr = m_buckets as LockFreeConstraintPool.BucketListResize; if (blr != null) blr._check_assist(ref mtid); LockFreeConstraintPool.BucketList.BucketHead bh; bool f_found; while (!m_buckets.GetBucketHead(ref mtid, m, out bh) || !bh.TryGetValueForKey(m, out f_found, out e)) ; return f_found; } /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// Store the constraint for the specified mark. Handle Edge value semantics properly. /// Allows detached mark to be set for bare or atomic types /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// public override void SetEdge(Int32 m, Edge e) { Debug.Assert(m != 0 && m >= tr._protect_mark && e.FlagsId != 0); int mtid = -1; LockFreeConstraintPool.BucketListResize blr = m_buckets as LockFreeConstraintPool.BucketListResize; if (blr != null) blr._check_assist(ref mtid); while (true) { LockFreeConstraintPool.BucketList.BucketHead bh; if (!m_buckets.GetBucketHead(ref mtid, m, out bh)) continue; int c = bh.CapturedEntryCount; if (c == 0) goto add_item; Edge cur_val; KeyInfo ki; if (!bh.TryFindKeyInfo(m, out ki, out cur_val)) continue; if (!ki.Found) goto add_item; //LockFreeConstraintPool cfg = bh.Config; //if ((cfg.d.m_options & Options.CheckForVacuousValueUpdates) > 0 && Entry.CompareValues(cur_val, value)) // return; TryResult tz = TryResult.Impossible; int gx_next; if (c == 1) { gx_next = -1; /// continues below... } else if (ki.i_found == 0 || (tz = LockFreeConstraintPool.BucketList.BucketHead.IfDirectRotateToFront(ref bh, ki.gx_last, ki.gx_found)) != TryResult.Impossible) { if (tz == TryResult.Expired) continue; /// This will be a replace-first operation on the bucket int offs; gx_next = GetBlock(bh.CapturedFirstEntryIndex, out offs)[offs].gx_next; /// no acquire fence required on this read: whatever value is read into e_update.gx_next is not published until /// after the full fence in TryReplaceFirstOrShift->TryUpdate->CompareExchange, and its correctness depends only /// upon the success of the CMPEXCHG /// continues below... } else if (ki.i_found == c - 1 || LockFreeConstraintPool.BucketList.BucketHead.TryRotateToEnd(ref mtid, ref bh, ki.gx_last, ki.i_found)) { /// This will be a shift operation on the bucket gx_next = bh.CapturedFirstEntryIndex; /// continues below... } else continue; /// ...replace first or shift. In either case, the count remains the same. if (bh.TryReplaceFirstOrShift(ref mtid, gx_next, m, e)) { ReleaseIndex(ref mtid, ki.gx_found); return; } continue; add_item: /// try to add the item if (bh.TryInsertFirstItem(ref mtid, m, e)) { #if GLOBAL_COUNT || FL_COUNT IncrementCountAndCheck(ref mtid); #endif return; } } } /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// public override void RemoveEdge(Int32 in_mark) { Edge _dont_care; TryRemoveEdge(in_mark, out _dont_care); } /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// public override bool TryRemoveEdge(int m, out Edge e) { int mtid = -1; LockFreeConstraintPool.BucketListResize blr = m_buckets as LockFreeConstraintPool.BucketListResize; if (blr != null) blr._check_assist(ref mtid); while (true) { LockFreeConstraintPool.BucketList.BucketHead bh; if (!m_buckets.GetBucketHead(ref mtid, m, out bh)) continue; int c = bh.CapturedEntryCount; if (c == 0) break; KeyInfo ki; if (!bh.TryFindKeyInfo(m, out ki, out e)) continue; if (!ki.Found) break; TryResult tz = TryResult.Impossible; if (c == 1) { /// remove the only one ki.gx_last = -1; /// continues below... } else if (ki.i_found == 0 || (tz = LockFreeConstraintPool.BucketList.BucketHead.IfDirectRotateToFront(ref bh, ki.gx_last, ki.gx_found)) != TryResult.Impossible) { if (tz == TryResult.Expired) continue; /// remove the first one int offs; ki.gx_last = GetBlock(ki.gx_found, out offs)[offs].gx_next; /// TryUpdate incorporates data dependence on gx, no need for acquire fence /// continues below... } else if (ki.i_found == c - 1 || LockFreeConstraintPool.BucketList.BucketHead.TryRotateToEnd(ref mtid, ref bh, ki.gx_last, ki.i_found)) { /// remove the last one by decrementing the count ki.gx_last = bh.CapturedFirstEntryIndex; /// continues below... } else continue; /// ...final update if (bh.TryUpdateHead(ki.gx_last, c - 1)) { ReleaseIndex(ref mtid, ki.gx_found); #if GLOBAL_COUNT Interlocked.Decrement(ref m_count); #elif FL_COUNT && FL_MULTI Interlocked.Decrement(ref free_lists[mtid].m_count); #elif FL_COUNT Interlocked.Decrement(ref free_list.m_count); #endif return true; } } /// nothing to remove e = default(Edge); return false; } public override bool ContainsInMark(int m) { int mtid = -1; LockFreeConstraintPool.BucketListResize blr = m_buckets as LockFreeConstraintPool.BucketListResize; if (blr != null) blr._check_assist(ref mtid); LockFreeConstraintPool.BucketList.BucketHead bh; bool f; while (!m_buckets.GetBucketHead(ref mtid, m, out bh) || !bh.TryContainsKey(m, out f)) ; return f; } public override int Count { get { int mtid = -1; LockFreeConstraintPool.BucketListResize blr = m_buckets as LockFreeConstraintPool.BucketListResize; if (blr != null) blr._check_assist(ref mtid); #if GLOBAL_COUNT return m_count; #elif FL_COUNT && FL_MULTI return (int)free_lists.Sum(fl => fl.m_count); #elif FL_COUNT return (int)free_list.m_count; #else return -1; #endif } } public override IEnumerable<int> Marks { get { return new glue.Collections.DeferredCollection<int>(-1, _keys()); } } public override IEnumerable<Edge> Edges { get { return new glue.Collections.DeferredCollection<Edge>(-1, _values()); } } public override IEnumerable<KeyValuePair<int, Edge>> PoolEdges { get { return new glue.Collections.DeferredCollection<KeyValuePair<int, Edge>>(-1, _pool_edges()); } } BucketList ChangeBuckets(BucketList bl_old, BucketList bl_new) { BucketList bl_cur = Interlocked.CompareExchange(ref m_buckets, bl_new, bl_old); return bl_cur == bl_old ? bl_new : bl_cur; } void RequestBucketResize(ref int mtid) { LockFreeConstraintPool.BucketListPrimary blp = m_buckets as LockFreeConstraintPool.BucketListPrimary; if (blp != null) blp.InitiateBucketResize(ref mtid); } void GetFreelistIndex(ref int mtid) { #if FL_MULTI if (mtid == -1) { mtid = Thread.CurrentThread.ManagedThreadId % free_lists.Length; //mtid = next_list++ % free_lists.Length; //mtid = Thread.CurrentThread.GetHashCode() % free_lists.Length; } #endif } #if GLOBAL_COUNT || FL_COUNT void IncrementCountAndCheck(ref int mtid) { #if GLOBAL_COUNT int c = Interlocked.Increment(ref m_count); #elif FL_COUNT && FL_MULTI GetFreelistIndex(ref mtid); Int64 c = Interlocked.Increment(ref free_lists[mtid].m_count) /* * free_lists.Length*/; #elif FL_COUNT Int64 c = Interlocked.Increment(ref free_list.m_count); #endif if (c > m_buckets.BucketCount) RequestBucketResize(ref mtid); } #endif IEnumerator<KeyValuePair<int, Edge>> EnumerateKeyValuePairs() { tr.mre_truncate_ok.WaitOne(); int mtid = -1; for (int offs, i = 0; i < m_gx_next; i++) { Entry e = GetBlock(i, out offs)[offs]; /// todo: this is subject to torn reads // Thread.MemoryBarrier(); if (e.mark != Freelist.FreelistHash) { bool f; BucketList.BucketHead bh; do m_buckets.GetBucketHead(ref mtid, e.mark, out bh); /// ignore config change while (!bh.TryContainsKey(e.mark, out f)); if (f) yield return new KeyValuePair<int, Edge>(e.mark, e.edge); } } } IEnumerable<int> _keys() { IEnumerator<KeyValuePair<int, Edge>> e = EnumerateKeyValuePairs(); while (e.MoveNext()) yield return e.Current.Key; } IEnumerable<Edge> _values() { IEnumerator<KeyValuePair<int, Edge>> e = EnumerateKeyValuePairs(); while (e.MoveNext()) yield return e.Current.Value; } IEnumerable<KeyValuePair<int, Edge>> _pool_edges() { IEnumerator<KeyValuePair<int, Edge>> e = EnumerateKeyValuePairs(); while (e.MoveNext()) yield return e.Current; } /// <summary> /// Get an unused entry index, either from one of the freelists, or by allocating a new one /// </summary> int GetUnusedIndex(ref int mtid) { int gx; //GetFreelistIndex(ref mtid); //int try_mtid = mtid; //while (!free_lists[try_mtid].TryGetFreeIndex(out gx)) //{ // try_mtid = (try_mtid + 1) % free_lists.Length; // if ((uint)m_gx_next >= 0xFFFFFFF0) // throw new OverflowException("The dictionary already contains the maximum number of elements."); // gx = Interlocked.Increment(ref m_gx_next) - 1; //} #if FL_MULTI GetFreelistIndex(ref mtid); if (!free_lists[mtid].TryGetFreeIndex(out gx)) { if ((uint)m_gx_next >= 0xFFFFFFF0) throw new OverflowException("The dictionary already contains the maximum number of elements."); gx = Interlocked.Increment(ref m_gx_next) - 1; } #else if (!free_list.TryGetFreeIndex(out gx)) { if ((uint)m_gx_next >= 0xFFFFFFF0) throw new OverflowException("The dictionary already contains the amximum number of elements."); gx = Interlocked.Increment(ref m_gx_next) - 1; } #endif return gx; } /// <summary> /// Add an entry index to one of the the freelists /// </summary> void ReleaseIndex(ref int mtid, int gx) { #if FL_MULTI GetFreelistIndex(ref mtid); free_lists[mtid].AddIndexToFreeList(gx); #else free_list.AddIndexToFreeList(gx); #endif } Entry[] GetBlock(int gx, out int offs) { int i_block = (gx >> EntriesPerBlockPower); offs = gx & GlobalIndexMask; Entry[] eb = entry_blocks[i_block]; if (eb == null) { Entry[][] rgeb_loc = entry_blocks; Thread.MemoryBarrier(); if (i_block == rgeb_loc.Length - 1) { Entry[][] rgeb_new = new Entry[rgeb_loc.Length + 0x10][]; rgeb_loc.CopyTo(rgeb_new, 0); var t1 = Interlocked.CompareExchange(ref entry_blocks, rgeb_new, rgeb_loc); rgeb_loc = (t1 == rgeb_loc) ? rgeb_new : t1; } eb = rgeb_loc[i_block]; if (eb == null) { Entry[] eb_new = new Entry[EntriesPerBlock]; eb = Interlocked.CompareExchange(ref entry_blocks[i_block], eb_new, null) ?? eb_new; } } #if false else if (offs >= ResizeTrigger) { GetFreelistIndex(ref mtid); } #endif return eb; } /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// Lock-free LIFO stack /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class Freelist { public Freelist(LockFreeConstraintPool cfg) { Nop.X(fsp1, fsp3); #if FL_COUNT Nop.X(fsp2); #endif this.cfg = cfg; this.m_head = unchecked((UInt32)FreeListTerminator); } internal const Int32 FreelistHash = 0; internal const Int32 FreeListTerminator = unchecked((Int32)0xFFFFFFFE); const Int64 SequenceMask = unchecked((Int64)0xFFFFFFFF00000000); const Int64 SequenceIncrement = 0x0000000100000000; LockFreeConstraintPool cfg; FalseSharing.Padding120 fsp1; Int64 m_head; #if FL_COUNT /// The number of entries in the dictionary is maintained with interlocked operations. By keeping these in the /// freelist, we get several count instances which helps minimize contention when the count is maintained /// during reading and writing. Note that therefore some counts may go negative, and imbalances due to the /// affinity of threads to dictionary operations may cause imbalances. Currently, the only concession to this /// potential problem is to use store the counts as 64-bit integers. FalseSharing.Padding120 fsp2; internal Int64 m_count; #endif FalseSharing.Padding120 fsp3; /// <summary> /// Add an item to the free list /// </summary> public void AddIndexToFreeList(int gx_add) { Int64 new_head, head = Interlocked.CompareExchange(ref m_head, 0, 0); int offs; Entry[] rg = cfg.GetBlock(gx_add, out offs); rg[offs] = default(Entry); // permit GC release of reference types, set Entry.FreelistHash while (true) { rg[offs].gx_next = (Int32)head; new_head = ((head + SequenceIncrement) & SequenceMask) | (UInt32)gx_add; if ((new_head = Interlocked.CompareExchange(ref m_head, new_head, head)) == head) return; head = new_head; } } /// <summary> /// Pick the first item from the free list. /// </summary> /// <returns><value>true</value> if an free index is returned in <paramref name="gx_out"/>, /// <value>false</value> otherwise.</returns> public bool TryGetFreeIndex(out int gx_out) { Int64 same_head, new_head, head = Interlocked.CompareExchange(ref m_head, 0, 0); while (true) { int offs, gx_next; while (true) { gx_out = (Int32)head; if (gx_out == FreeListTerminator) { gx_out = -1; return false; } gx_next = cfg.GetBlock(gx_out, out offs)[offs].gx_next; /// acquire frence: read of gx_next must occur prior to the read of m_head in this.Head. Thread.MemoryBarrier(); if (head == (same_head = Interlocked.CompareExchange(ref m_head, 0, 0))) break; head = same_head; } new_head = ((head + SequenceIncrement) & SequenceMask) | (UInt32)gx_next; if ((new_head = Interlocked.CompareExchange(ref m_head, new_head, head)) == head) return true; head = new_head; } } }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// BucketList (abstract) /// an array of Int64s. These Int64s are intended to be managed via the BucketHead value type, which contains a /// snapshot of an Int64 value plus information about where it came from. /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// abstract class BucketList { protected Int64[] rg; protected LockFreeConstraintPool cfg; internal BucketList(LockFreeConstraintPool cfg, Int64[] rg) { this.cfg = cfg; this.rg = rg; } internal BucketList(LockFreeConstraintPool cfg, int c_entries) : this(cfg, new Int64[HashFriendly(c_entries)]) { } internal bool IsCurrentForConfig { get { return cfg.m_buckets == this; } } internal int BucketCount { get { return rg.Length; } } internal virtual bool GetBucketHead(ref int mtid, int hash, out BucketHead bh) { bh = new BucketHead(this, hash % rg.Length); return cfg.m_buckets == this; } static internal int HashFriendly(int c) { int ix = Array.BinarySearch<int>(some_primes, c); if (ix < 0) ix = ~ix; if (ix < some_primes.Length) return some_primes[ix]; /// requested size is bigger than the largest prime in the table c |= 1; while (c % 3 == 0 || c % 5 == 0 || c % 7 == 0) c += 2; return c; } static int[] some_primes = { 0x000007, 0x00000B, 0x00000D, 0x000011, 0x000017, 0x00001D, 0x000025, 0x00002F, 0x00003B, 0x000043, 0x00004F, 0x000061, 0x00007F, 0x000089, 0x0000A3, 0x0000C5, 0x000101, 0x000115, 0x000133, 0x00014B, 0x00018D, 0x000209, 0x00022D, 0x000269, 0x0002A1, 0x00031D, 0x000419, 0x00045D, 0x0004D5, 0x000551, 0x00063D, 0x000833, 0x0008BD, 0x0009AD, 0x000AA9, 0x000C83, 0x001069, 0x001181, 0x00135D, 0x00155F, 0x001915, 0x0020E3, 0x002303, 0x0026C3, 0x002AC5, 0x003235, 0x0041CB, 0x004609, 0x004D8D, 0x005597, 0x006475, 0x0083A7, 0x008C17, 0x009B1D, 0x00AB4D, 0x00C8ED, 0x010751, 0x01183D, 0x01363F, 0x0156A7, 0x0191DD, 0x020EB5, 0x02307B, 0x026C81, 0x02AD57, 0x0323BF, 0x041D73, 0x0460FD, 0x04D905, 0x055AB3, 0x064787, 0x083AFD, 0x08C201, 0x09B215, 0x0AB57B, 0x0C8F4D, 0x107603, 0x118411, 0x136441, 0x20EC13, 0x2DC6D1, 0x41D82F, 0x4C4B4B, 0x5B8D8B, 0x6ACFC3, 0x7A1209, 0x83B073, 0x90F575, 0x989693 }; /// Layout of the Int64 BucketHead: /// x /// |ccccccc /// qqqqqqqqqqqqqqqqqqqqqqqq|| |eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee /// +-----------------------++-----+-------------------------------+ /// 4 3 2 1 0 /// FEDCBA9876543210FEDCBA9876543210FEDCBA9876543210FEDCBA9876543210 /// /// q: sequence number, 0 - (2^27 - 1), in order to avoid ABA failures /// x: bucket closed bit /// c: number of entries in the chain /// e: global index of the first entry in a chain of entries, 0 - (2^32 - 1) /// internal struct BucketHead { BucketHead(BucketList bl, int bx, Int64 v) { this.bl = bl; this.bx = bx; this.v = v; } internal BucketHead(BucketList bl, int bx) { this.bl = bl; this.bx = bx; /// the bucket head is read atomically and with an acquire fence this.v = Interlocked.CompareExchange(ref bl.rg[bx], 0, 0); } /// <summary> /// This constructor closes the bucket. To prevent unnecessary invalidations, a special bit is reserved to /// indicate the closed status, i.e, if the bucket is already closed, it does not have to be invalidated /// again, which could cause other work that's underway to be restarted. In either case, the first entry-- /// which is returned, if any--is guaranteed to be atomically associated with the constructed buckethead. /// </summary> internal BucketHead(LockFreeConstraintPool cfg, BucketList bl, int bx, ref Entry e) { this.bl = bl; this.bx = bx; this.v = Interlocked.CompareExchange(ref bl.rg[bx], 0, 0); while (CapturedEntryCount > 0) { /// we require the same buckethead to be read before and after the unsecured Entry to ensure that /// the Entry belongs to this bucket int offs; e = cfg.GetBlock(CapturedFirstEntryIndex, out offs)[offs]; /// acquire fence: the read of this entry must occur before the read of bl.rg[bx] /// the fence is provided by CompareExchange Int64 new_v; if ((v & Closed) > 0) { new_v = Interlocked.CompareExchange(ref bl.rg[bx], 0, 0); if (this.v == new_v) return; } else { /// the bucket head is read atomically and with an acquire fence Int64 old_v = v; v |= Closed; new_v = Interlocked.CompareExchange(ref bl.rg[bx], v, old_v); if (new_v == old_v) return; } this.v = new_v; } } /// <summary> /// bucket list that the target bucket resides in and to which it belongs /// </summary> BucketList bl; /// <summary> /// index in the target BucketList from which the target was originally obtained and will be (possibly) /// re-stored /// </summary> int bx; /// <summary> /// originally witnessed version of the target bucket. This BucketHead structure only allows the target /// bucket to be modified if the proposed new value can be atomically exchanged with this value. /// </summary> Int64 v; internal const Int64 EntryCountMask = 0x0000007F00000000; internal const Int64 Closed = 0x0000008000000000; internal const Int64 SequenceIncrement = 0x0000010000000000; internal const Int64 SequenceMask = unchecked((Int64)0xFFFFFF0000000000); internal const Int64 FirstIndexMask = 0x00000000FFFFFFFF; internal const Int32 EntriesPerBucketMax = 0x80; internal const Int32 EntriesPerBucketMask = EntriesPerBucketMax - 1; /// <summary> /// Global index of the first entry in the chain headed by this BucketHead instance, as /// originally observed /// </summary> internal int CapturedFirstEntryIndex { get { return (int)v; } } /// <summary> /// Number of entries in the chain headed by this BucketHead instance, as originally observed. /// </summary> internal int CapturedEntryCount { get { return (int)(v >> 32) & EntriesPerBucketMask; } } /// <summary> /// Returns 'true' if attempting to commit the buckethead will fail because the /// originally observed value has been superseded. However, a 'false' result can /// only be considered advisory. /// </summary> internal bool AcquireIsExpired { get { return v != Interlocked.CompareExchange(ref bl.rg[bx], 0, 0); } } /// <summary> /// Attempt to atomically commit the specified first entry index and entry count to this /// BucketHead. If the BucketHead is expired, no action is taken and the function returns /// 'false'. The 'closed' flag does not prevent action, but is preserved. In this way, /// the 'closed' flag operates through the same invalidation mechanism as the other /// parts of the BucketHead. /// </summary> internal bool TryUpdateHead(int gx_first, int c_ent) { return Interlocked.CompareExchange(ref bl.rg[bx], ((v + SequenceIncrement) & (SequenceMask | Closed)) | ((Int64)c_ent << 32) | (UInt32)gx_first, v) == v; } /// <summary> /// Scan the chain headed by this BucketHead to see if it contains an entry with the /// specified key. /// </summary> /// <returns>'true' (advisory only) if the BucketHead (and chain) still appear valid; /// 'false' (irrekovable) if it is no longer valid and this work effort should be /// abandoned and restarted.</returns> internal bool TryContainsKey(int m, out bool f_found) { /// If we captured an entry count of zero then that was a fact at some point in time /// and we are permitted to assert with certainty that the chain does not contain the /// key. int c_ent = CapturedEntryCount; if (c_ent == 0) { f_found = false; return true; } /// Rather than doing an expensive re-validation of our chain after fetching each/any part of a presumed chain entry /// for a polite walk as this loop proceeds, knowing the number of entries allows us to use an optimistic trampling /// approach and only bail early in /// cases of detecting a freelist or list terminator. We return the validation state only upon completion. int gx = CapturedFirstEntryIndex; LockFreeConstraintPool cfg = bl.cfg; while (true) { int offs; Entry[] eb = cfg.GetBlock(gx, out offs); if (m == eb[offs].mark) { f_found = true; return !AcquireIsExpired; } if (--c_ent == 0) { f_found = false; return !AcquireIsExpired; } /// opportunistic non-re-validation may result in going off-course. detect easy cases only. if ((uint)(gx = eb[offs].gx_next) >= 0xFFFFFFFE) { /// found some kind of terminator prior to the expected number of entries. Thus, we have trampled /// off course and the BucketHead must no longer be valid f_found = default(bool); return false; } } } /// <summary> /// Same as TryContainsKey, but returns the value as well, if successful. /// </summary> internal bool TryGetValueForKey(int m, out bool f_found, out Edge v) { int c_ent = CapturedEntryCount; if (c_ent == 0) { f_found = false; v = default(Edge); return true; } int gx = CapturedFirstEntryIndex; LockFreeConstraintPool cfg = bl.cfg; while (true) { int offs; Entry[] eb = cfg.GetBlock(gx, out offs); if (m == eb[offs].mark) { f_found = true; v = eb[offs].edge; return !AcquireIsExpired; } if (--c_ent == 0) { f_found = false; v = default(Edge); return !AcquireIsExpired; } if ((uint)(gx = eb[offs].gx_next) >= 0xFFFFFFFE) { f_found = default(bool); v = default(Edge); return false; } } } /// <summary> /// Returns information about the whole chain, including the global index of the last entry, /// whether the specified key was found, and--if so--its global index and zero-based index /// within the chain. /// </summary> internal bool TryFindKeyInfo(int m, out KeyInfo ki, out Edge val_found) { val_found = default(Edge); ki.gx_found = ki.gx_last = ki.i_found = -1; int gx = CapturedFirstEntryIndex, c_ent = CapturedEntryCount; LockFreeConstraintPool cfg = bl.cfg; for (int i = 0; ; i++) { ki.gx_last = gx; int offs; Entry[] eb = cfg.GetBlock(gx, out offs); if (m == eb[offs].mark) { val_found = eb[offs].edge; ki.gx_found = gx; ki.i_found = i; } if (--c_ent == 0) return !AcquireIsExpired; /// if we detect that the opportunistic approach has gone awry, stop walking. All of the /// returned information shall be considered invalid. if ((uint)(gx = eb[offs].gx_next) >= 0xFFFFFFFE) return false; } } /// <summary> /// Attempt to add one element to the chain by inserting the entry into the first position. /// </summary> internal bool TryInsertFirstItem(ref int mtid, int m, Edge value) { LockFreeConstraintPool cfg = bl.cfg; int c = CapturedEntryCount; /// If this bucket has maxed out the maximum number of entries for a bucket, resize the bucket table if (c == EntriesPerBucketMask) { cfg.RequestBucketResize(ref mtid); return false; } /// prepare the entry in a temporary slot that we can hot-patch int offs, gx = cfg.GetUnusedIndex(ref mtid); /// no release fence required: not referencing shared location (plus, all stores are release on .NET) Entry[] eb = cfg.GetBlock(gx, out offs); eb[offs].gx_next = c == 0 ? -1 : CapturedFirstEntryIndex; eb[offs].mark = m; eb[offs].edge = value; /// attempt atomic hot-patch if (TryUpdateHead(gx, c + 1)) return true; /// if the hot-patch failed because the BucketHead was expired, release the temporary slot cfg.ReleaseIndex(ref mtid, gx); return false; } /// <summary> /// Basically the same as TryInsertFirstEntry, but the entry count in the BucketHead remains the same. /// This allows one of two operations, depending on the specified gx_next value. If gx_next indicates /// the existing first entry, then the chain is shifted so that the last entry is released. If gx_next /// indicates the existing second entry, then the specified entry is patched in, such that the /// existing first entry is released. /// </summary> internal bool TryReplaceFirstOrShift(ref int mtid, int gx_next, int m, Edge value) { LockFreeConstraintPool cfg = bl.cfg; /// get a slot for private prep int offs, gx = cfg.GetUnusedIndex(ref mtid); /// no release fence required: 'e' never references shared location (plus, all stores are release on .NET) Entry[] eb = cfg.GetBlock(gx, out offs); eb[offs].gx_next = gx_next; eb[offs].mark = m; eb[offs].edge = value; if (TryUpdateHead(gx, CapturedEntryCount)) return true; /// release the private slot if things didn't work out cfg.ReleaseIndex(ref mtid, gx); return false; } /// <summary> /// If a chain can be made circular, then it can be atomically rotated to any position with just a single /// BucketHead operation. Unfortunately, if the gx_next field of the last entry is not either already correct /// nor -1, then this optimzation cannot be used, as there's no way to atomically correct an otherwise /// wrong value out there. Accordingly, the return value tri-state indicates if the trick cannot be used. /// </summary> /// <param name="bh">indicates the chain. Updated to represent the rotated chain if successful</param> /// <param name="gx_last">required input hint: presumed global index of the last entry in the chain</param> /// <param name="gx_target">target to rotate to the front, if possible</param> internal static TryResult IfDirectRotateToFront(ref BucketHead bh, int gx_last, int gx_target) { BucketList bl = bh.bl; int offs, gx_first = bh.CapturedFirstEntryIndex; /// just go for the swap, protected only by compare requirement. This may affect a final entry in /// the wrong chain in rare cases but this is not fatal and guarantees failure of the case we're interested in, /// which is detected by an expired BucketHead just below. int n = Interlocked.CompareExchange(ref bl.cfg.GetBlock(gx_last, out offs)[offs].gx_next, gx_first, -1); if (n != -1 && n != gx_first) return TryResult.Impossible; /// seems ok so far. Attempt direct rotation. Int64 v = bh.v; Int64 new_v = ((v + SequenceIncrement) & ~FirstIndexMask) | (UInt32)gx_target; if (Interlocked.CompareExchange(ref bl.rg[bh.bx], new_v, v) != v) return TryResult.Expired; /// It worked. construct a new BucketHead to return the caller bh = new BucketHead(bl, bh.bx, new_v); return TryResult.Ok; } /// <summary> /// If the optimized rotation could not be used, we can still move the entry we're interested in to /// an atomically actionable position (first or last) by rotating one at a time. In this case, it's /// one less operation to move the target to the end of the chain so that's what we'll do /// </summary> internal static bool TryRotateToEnd(ref int mtid, ref BucketHead bh, int gx_last, int i_found) { BucketList bl = bh.bl; LockFreeConstraintPool cfg = bl.cfg; int o_last; Entry[] eb_last = cfg.GetBlock(gx_last, out o_last); /// transform the index of the desired entry into the number of rotations that we must /// perform in order to move it into last position. i_found = bh.CapturedEntryCount - i_found - 1; while (true) { ////////////////////////////////////////////////////////////////////// /// do one rotation: move last item to first ////////////////////////////////////////////////////////////////////// /// doing so requires a temporary slot int gx_new = cfg.GetUnusedIndex(ref mtid); Entry e = eb_last[o_last]; /// no acquire fence required: 'e' is not shared until after the full fence (CompareExchange, below) e.gx_next = bh.CapturedFirstEntryIndex; /// no release fence required: the new Entry is still not shared (plus, all stores are release on .NET) int o_new; cfg.GetBlock(gx_new, out o_new)[o_new] = e; /// calculate new atom value for this one rotation Int64 new_v = ((bh.v + SequenceIncrement) & ~FirstIndexMask) | (UInt32)gx_new; if (Interlocked.CompareExchange(ref bl.rg[bh.bx], new_v, bh.v) != bh.v) { /// if this rotation failed, release the temporary slot cfg.ReleaseIndex(ref mtid, gx_new); return false; } /// if this rotation succeeded, release the entry that used to be last. cfg.ReleaseIndex(ref mtid, gx_last); bh = new BucketHead(bl, bh.bx, new_v); ////////////////////////////////////////////////////////////////////// /// see if done ////////////////////////////////////////////////////////////////////// if (--i_found == 0) return true; ////////////////////////////////////////////////////////////////////// /// get the new last item ////////////////////////////////////////////////////////////////////// /// walk to the end of the list to find the global index of the (new) last item int c_ent = bh.CapturedEntryCount; gx_last = bh.CapturedFirstEntryIndex; /// as before, this loop will proceed without BucketHead re-validation while (true) { eb_last = cfg.GetBlock(gx_last, out o_last); if (--c_ent == 0) break; gx_last = eb_last[o_last].gx_next; /// acquire fences not needed (except for the last iteration) because gx_last has /// data dependence on itself. This also prevents loop hoisting if ((uint)gx_last >= 0xFFFFFFFE) return false; } /// acquire frence: the final read of gx_last is not demanded yet but we proceed fenceless until /// the CompareExchange } } }; }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// BucketListPrimary - a list of buckets used during normal operation /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class BucketListPrimary : BucketList { internal BucketListPrimary(LockFreeConstraintPool cfg, int c_entries) : base(cfg, c_entries) { } internal BucketListPrimary(LockFreeConstraintPool cfg, Int64[] rg) : base(cfg, rg) { } /// <summary> /// Initiate bucket array resizing and continue on to assist with that task /// </summary> internal void InitiateBucketResize(ref int mtid) { if (this.IsCurrentForConfig && this.cfg.IsCurrent) { BucketListResize blr = new BucketListResize(cfg, this, HashFriendly(BucketCount * 2)); cfg.ChangeBuckets(this, blr); blr = cfg.m_buckets as LockFreeConstraintPool.BucketListResize; if (blr != null) blr._check_assist(ref mtid); } } }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// Swapped in as the BucketList for a Config in order to force everyone to join in to a bucket /// table resizing operation. /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class BucketListResize : BucketList { /// <summary> /// This is the old set of buckets we are in the process of replacing. The normal buckets that /// come with our base class is the "new" set. /// </summary> BucketListPrimary blp; /// number of helping parties that have joined the resizing operation int m_c_helpers; /// allows helpers to share the discovery that the task is complete bool f_done = false; internal BucketListResize(LockFreeConstraintPool c, BucketListPrimary blp, int new_size) : base(c, new_size) { this.blp = blp; this.m_c_helpers = 0; } /// <summary> /// Conscript all callers to assist with the bucket resizing, no matter if it's already complete or not. /// </summary> internal override bool GetBucketHead(ref int mtid, int hash, out BucketHead bh) { _check_assist(ref mtid); /// when complete, switch everyone to the latest config and/or new buckets return cfg.TrayPool.m_buckets.GetBucketHead(ref mtid, hash, out bh); } /// <summary> /// Allow any number of helpers to cooperatively resize the bucket array /// </summary> internal void _check_assist(ref int mtid) { if (!this.IsCurrentForConfig) return; Entry e = default(Entry); int i_helper = Interlocked.Increment(ref m_c_helpers) - 1; int c_buckets_old = blp.BucketCount; int n_chunk = c_buckets_old / (i_helper + 1); int i_chunk = 0; contention: /// the starting point for the single-stepping loop advances by a different chunk amount for each helper i_chunk = (i_chunk + n_chunk) % c_buckets_old; /// single-stepping loop. Restarted when any kind of contention is detected /// each helper must eventually confirm zero entries in all source buckets for (int bx = i_chunk, c = 0; !f_done && c < c_buckets_old; bx = (bx + 1) % c_buckets_old, c++) { int c_ent; BucketHead bh_old; /// loop until this bucket has zero entries, unless aborted first due to contention. /// construct a BucketHead for the old (source) Int64. Use a special constructor which /// retrieves the first entry (if any) and guarantees that it is matched to the constructed /// BucketHead. while ((c_ent = (bh_old = new BucketHead(cfg, blp, bx, ref e)).CapturedEntryCount) > 0) { /// Construct a target BucketHead for this entry's position in the new set of buckets BucketHead bh_new = new BucketHead(this, e.mark % BucketCount); bool f; /// For true non-blocking operation, we must use a write-before-read procedure. This ensures that /// work-in-progress is always visible to other helpers and available for them to take over, /// join, or complete, should someone stall. Accordingly, each helper independently /// verifies that a particular target chain contains, or does not contain, the old entry /// and proceeds on that basis. if (bh_new.TryContainsKey(e.mark, out f) && (f || bh_new.TryInsertFirstItem(ref mtid, e.mark, e.edge))) { /// The new BucketHead remains valid and confirms either that the old entry was already /// in the new chain, or that it wasn't, but this helper added it. We should immediately /// test our lucky streak by attempting to delete the old entry. if (bh_old.TryUpdateHead(--c_ent == 0 ? -1 : e.gx_next, c_ent)) { /// the helper who gets a confirmed patch-over of the old entry is the one who /// releases the slot back to the freelist. cfg.ReleaseIndex(ref mtid, bh_old.CapturedFirstEntryIndex); } /// if we made it this far, then contention does not cause a restart. we'll keep trying the same bucket } else goto contention; /// contention. exit the bucket chain loop and restart the outer loop } } if (!f_done) /// avoid gratuitous MemoryBarrier { /// release fence. setting 'f_done' must occur after all bucket head access Thread.MemoryBarrier(); f_done = true; } /// first one finished swaps out the swap set back to a primary set. Also, be sure not to swap into the latest config /// because if the dictionary was emptied while we were at work, we don't want to destroy any new items if (this.IsCurrentForConfig) cfg.ChangeBuckets(this, new BucketListPrimary(cfg, this.rg)); } }; }; }; }