using System; using System.Diagnostics; using System.Collections.Generic; using System.Threading; using System.Linq; #pragma warning disable 0420 public partial class LockFreeDictionary<K, V> : IDictionary<K, V> where K : IEquatable<K> where V : IEquatable<V> { public partial class Config { internal abstract partial class BucketList { /// Layout of the Int64 BucketHead: /// /// y /// |ccccccc /// qqqqqqqqqqqqqqqqqqqqqqqq|| |eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee /// +-----------------------++-----+-------------------------------+ /// 3 3 2 1 0 /// FEDCBA9876543210FEDCBA9876543210FEDCBA9876543210FEDCBA9876543210 /// /// q: sequence number, 0 - (2^27 - 1), in order to avoid ABA failures /// y: tail-next ok /// c: number of entries in the chain /// e: global index of the first entry in a chain of entries, 0 - (2^32 - 1) /// internal partial struct BucketHead { internal BucketHead(BucketList bl, uint bx) { this.bl = bl; this.bx = bx; this.v = AtomicRead64(ref bl.rg[bx]); } internal BucketHead(Config cfg, BucketList bl, uint bx, out EntryBlock.Entry e) : this(bl, bx) { while (CapturedSlotCount > 0) { cfg.GetEntryLoose(CapturedFirstSlotIndex, out e); Int64 new_v = AtomicRead64(ref bl.rg[bx]); if (this.v == new_v) return; this.v = new_v; } e = default(EntryBlock.Entry); } internal Config Config { get { return this.bl.cfg; } } /// <summary> /// bucket list that the target bucket resides in and to which it belongs /// </summary> readonly BucketList bl; /// <summary> /// index in the target BucketList from which the target was originally obtained and will be (possibly) /// re-stored /// </summary> readonly uint bx; /// <summary> /// originally witnessed version of the target bucket. This BucketHead structure only allows the target /// bucket to be modified if the proposed new value can be atomically exchanged with this value. /// </summary> readonly Int64 v; internal const Int64 TailNextValid = 0x0000004000000000; internal const Int64 SequenceIncrement = 0x0000010000000000; internal const Int64 SlotCountMask = 0x0000007F00000000; internal const Int64 SequenceMask = unchecked((Int64)0xFFFFFF0000000000); internal const Int32 SlotsMax = 0x80; internal const Int32 SlotsMask = SlotsMax - 1; internal const Int32 SlotsThreshold = (Int32)(SlotsMax * .75); internal int CapturedSlotCount { get { return (int)(v >> 32) & SlotsMask; } } internal static int CapturedSlotCountV(Int64 v) { return (int)(v >> 32) & SlotsMask; } internal int CapturedFirstSlotIndex { get { return (int)v; } } internal bool IsExpired { get { return AtomicRead64(ref bl.rg[bx]) != v; } } internal bool TryUpdate(int gx_first, int c_slots) { Int64 new_v = ((v + SequenceIncrement) & SequenceMask) | ((Int64)c_slots << 32) | (UInt32)gx_first; return Interlocked.CompareExchange(ref bl.rg[bx], new_v, v) == v; } internal bool TryInvalidate() { return Interlocked.CompareExchange(ref bl.rg[bx], v + SequenceIncrement, v) == v; } internal Int64 Invalidate() { /// try to effect our change, or get the current value Int64 v_try = v + SequenceIncrement; Int64 v_cur = Interlocked.CompareExchange(ref bl.rg[bx], v_try, v); if (v_cur != v) { /// we never have to try more than twice because if a change based on the new value isn't /// accepted, we know that there must have been an invalidation since the time of this function /// call which we can count for this function as well v_try = v_cur + SequenceIncrement; Int64 v_latest = Interlocked.CompareExchange(ref bl.rg[bx], v_try, v_cur); if (v_latest != v_cur) return v_latest; } return v_try; } internal bool TryChainContainsKey(uint h_in, K key, out bool f_contains) { K k; f_contains = false; int c_slots = CapturedSlotCount; for (int i = 0, gx_next, gx = CapturedFirstSlotIndex; i < c_slots; i++) { uint h = bl.cfg.GetHashAndNext(gx, out gx_next); if (h == h_in) { bl.cfg.GetKeyReadback(gx, out k); if (IsExpired) return false; if (k.Equals(key)) return f_contains = true; } else if (IsExpired) return false; gx = gx_next; } return true; } internal bool TryRemoveFirstItem(out int gx) { gx = CapturedFirstSlotIndex; int gx_next = bl.cfg.GetEntryNext(gx); if (IsExpired) return false; int c = CapturedSlotCount; return c > 0 && TryUpdate(--c == 0 ? -1 : gx_next, c); } internal bool TryInsertFirstItem(ref EntryBlock.Entry e) { Config cfg = bl.cfg; int c = CapturedSlotCount; e.gx_next = c == 0 ? -1 : CapturedFirstSlotIndex; int gx = cfg.GetUnusedIndex(); cfg.StoreEntryLoose(gx, ref e); if (TryUpdate(gx, c + 1)) return true; cfg.ReleaseIndex(gx); return false; } }; }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// BucketList (abstract) /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// internal abstract partial class BucketList { readonly protected Int64[] rg; readonly protected Config cfg; internal BucketList(Config cfg, Int64[] rg) { this.cfg = cfg; this.rg = rg; } internal BucketList(Config cfg, uint c_entries) : this(cfg, new Int64[HashFriendly(c_entries)]) { } internal bool IsCurrentForConfig { get { return cfg.m_buckets == this; } } [DebuggerBrowsable(DebuggerBrowsableState.Never)] internal uint BucketCount { get { return (uint)rg.Length; } } internal virtual bool GetBucketHead(uint hash, out BucketHead bh) { bh = new BucketHead(this, hash % (uint)rg.Length); return cfg.m_buckets == this; } internal int EntryCount { get { int c = 0; for (int i = 0; i < rg.Length; i++) c += BucketHead.CapturedSlotCountV(AtomicRead64(ref rg[i])); return c; } } internal int CloseAllBuckets() { int c = 0; for (uint i = 0; i < rg.Length; i++) c += BucketHead.CapturedSlotCountV(new BucketHead(this, i).Invalidate()); return c; } static protected uint HashFriendly(uint c) { int ix = Array.BinarySearch<uint>(some_primes, c); if (ix < 0) ix = ~ix; if (ix < some_primes.Length) return some_primes[ix]; /// requested size is bigger than the largest prime in the table c |= 1; while (c % 3 == 0 || c % 5 == 0 || c % 7 == 0) c += 2; return c; } static readonly uint[] some_primes = { 0x000007, 0x00000B, 0x00000D, 0x000011, 0x000017, 0x00001D, 0x000025, 0x00002F, 0x00003B, 0x000043, 0x00004F, 0x000061, 0x00007F, 0x000089, 0x0000A3, 0x0000C5, 0x000101, 0x000115, 0x000133, 0x00014B, 0x00018D, 0x000209, 0x00022D, 0x000269, 0x0002A1, 0x00031D, 0x000419, 0x00045D, 0x0004D5, 0x000551, 0x00063D, 0x000833, 0x0008BD, 0x0009AD, 0x000AA9, 0x000C83, 0x001069, 0x001181, 0x00135D, 0x00155F, 0x001915, 0x0020E3, 0x002303, 0x0026C3, 0x002AC5, 0x003235, 0x0041CB, 0x004609, 0x004D8D, 0x005597, 0x006475, 0x0083A7, 0x008C17, 0x009B1D, 0x00AB4D, 0x00C8ED, 0x010751, 0x01183D, 0x01363F, 0x0156A7, 0x0191DD, 0x020EB5, 0x02307B, 0x026C81, 0x02AD57, 0x0323BF, 0x041D73, 0x0460FD, 0x04D905, 0x055AB3, 0x064787, 0x083AFD, 0x08C201, 0x09B215, 0x0AB57B, 0x0C8F4D, 0x107603, 0x118411, 0x136441, 0x20EC13, 0x2DC6D1, 0x41D82F, 0x4C4B4B, 0x5B8D8B, 0x6ACFC3, 0x7A1209, 0x83B073, 0x90F575, 0x989693 }; }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// BucketListPrimary - a list of buckets used during normal operation /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// internal partial class BucketListPrimary : BucketList { internal BucketListPrimary(Config cfg, uint c_entries) : base(cfg, c_entries) { } internal BucketListPrimary(Config cfg, Int64[] rg) : base(cfg, rg) { } /// <summary> /// Initiate bucket array resizing and continue on to assist with that task /// </summary> internal void InitiateBucketResize() { if (!this.IsCurrentForConfig || !cfg.IsCurrent) return; BucketListResize blg = new BucketListResize(cfg, this, HashFriendly(BucketCount * 2)); blg = cfg.ChangeBuckets(this, blg) as BucketListResize; if (blg != null) blg._check_assist(); } }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// internal partial class BucketListResize : BucketList { readonly BucketListPrimary blp; volatile int m_c_helpers; volatile bool target_verified = false; int m_final_count; internal BucketListResize(Config c, BucketListPrimary blp, uint new_size) : base(c, new_size) { this.blp = blp; this.m_c_helpers = 0; } /// <summary> /// Conscript all callers to assist with the bucket resizing, no matter if it's already complete or not. /// </summary> internal override bool GetBucketHead(uint hash, out BucketHead bh) { _check_assist(); return cfg.dict.m_config.m_buckets.GetBucketHead(hash, out bh); } /// <summary> /// Allow any number of helpers to cooperatively resize the bucket array /// </summary> internal void _check_assist() { if (!this.IsCurrentForConfig) return; /// wait-free: everyone sets all closed flags until the operation is confirmed and we have a frozen count if (m_c_helpers == 0) m_final_count = blp.CloseAllBuckets(); uint i_helper = (uint)Interlocked.Increment(ref m_c_helpers) - 1; uint c_buckets_old = blp.BucketCount; uint n_chunk = BucketCount / (i_helper + 3); uint i_chunk = 0; if (i_helper == 0 && Debugger.IsAttached) { Console.WriteLine("initiated resizing from {0} to {1} ({2} items)", c_buckets_old, BucketCount, cfg.m_count); Console.Out.Flush(); } contention: /// the starting point for the single-stepping loop advances by a different chunk amount for each helper i_chunk = (i_chunk + n_chunk) % c_buckets_old; /// single-stepping loop. Restarted when any kind of contention is detected /// each helper must eventually confirm zero entries in a total of 'c_buckets_old' buckets for (uint bx = i_chunk, c = 0; c < c_buckets_old; bx = (bx + 1) % c_buckets_old, c++) { /// loop until this bucket has zero entries, unless aborted first due to contention EntryBlock.Entry e; BucketHead bh_old; while ((bh_old = new BucketHead(cfg, blp, bx, out e)).CapturedSlotCount > 0) { BucketHead bh_new = new BucketHead(this, e.hash % BucketCount); bool f_contains; if (!bh_new.TryChainContainsKey(e.hash, e.key, out f_contains)) goto contention; if (f_contains) { int gx_removed; if (!bh_old.TryRemoveFirstItem(out gx_removed)) goto contention; cfg.ReleaseIndex(gx_removed); } else if (!bh_new.TryInsertFirstItem(ref e)) goto contention; } } /// if we can't account for the entry count in the target, then we need to go back to help finish the job. This /// is an example of how the wait-free paradigm sometimes causes increased contention in order to accrue its benefits if (!target_verified && !(target_verified = !(cfg.m_buckets == this && this.EntryCount < m_final_count))) goto contention; /// first one finished swaps out the swap set back to a primary set. Also, be sure not to swap into the latest config /// because if the dictionary was emptied while we were at work, we don't want to destroy any new items if (this.IsCurrentForConfig) cfg.ChangeBuckets(this, new BucketListPrimary(cfg, this.rg)); if (i_helper == 0 && Debugger.IsAttached) { Console.WriteLine("done"); Console.Out.Flush(); } } }; }; };