using System; using System.Diagnostics; using System.Collections.Generic; using System.Threading; using System.Linq; #pragma warning disable 0420 public partial class LockFreeDictionary<K, V> : IDictionary<K, V> where K : IEquatable<K> where V : IEquatable<V> { internal enum TryResult { None = 0, Expired, Impossible, Ok }; public partial class Config { internal abstract partial class BucketList { /// Layout of the Int64 BucketHead: /// /// cccccccc /// qqqqqqqqqqqqqqqqqqqqqqqq| |eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee /// +-----------------------+------+-------------------------------+ /// 3 3 2 1 0 /// FEDCBA9876543210FEDCBA9876543210FEDCBA9876543210FEDCBA9876543210 /// /// q: sequence number, 0 - (2^27 - 1), in order to avoid ABA failures /// c: number of entries in the chain /// e: global index of the first entry in a chain of entries, 0 - (2^32 - 1) /// internal partial struct BucketHead { BucketHead(BucketList bl, uint bx, Int64 v) { this.bl = bl; this.bx = bx; this.v = v; } internal BucketHead(BucketList bl, uint bx) { this.bl = bl; this.bx = bx; /// the bucket head is read atomically and with an acquire fence this.v = Interlocked.CompareExchange(ref bl.rg[bx], 0, 0); } #if false internal BucketHead(Config cfg, BucketList bl, uint bx, ref Entry e) : this(bl, bx) { while (CapturedEntryCount > 0) { /// we require the same buckethead to be read before and after the unsecured Entry to ensure that /// the Entry belongs to this bucket int offs; e = cfg.GetBlock(CapturedFirstEntryIndex, out offs)[offs]; /// acquire fence: the read of this entry must occur before the read of bl.rg[bx] /// the fence is provided by CompareExchange /// the bucket head is read atomically and with an acquire fence Int64 new_v = Interlocked.CompareExchange(ref bl.rg[bx], 0, 0); if (this.v == new_v) return; this.v = new_v; } } #endif internal BucketHead(Config cfg, BucketList bl, uint bx, ref uint h, ref K key, ref V val) { this.bl = bl; this.bx = bx; /// the bucket head is read atomically and with an acquire fence this.v = Interlocked.CompareExchange(ref bl.rg[bx], 0, 0); while (CapturedEntryCount > 0) { /// we require the same buckethead to be read before and after the unsecured Entry to ensure that /// the Entry belongs to this bucket int offs; Entry[] eb = cfg.GetBlock(CapturedFirstEntryIndex, out offs); h = eb[offs].hash; key = eb[offs].key; val = eb[offs].value; /// acquire fence: the read of this entry must occur before the read of bl.rg[bx] /// the fence is provided by CompareExchange /// the bucket head is read atomically and with an acquire fence Int64 new_v = Interlocked.CompareExchange(ref bl.rg[bx], 0, 0); if (this.v == new_v) return; this.v = new_v; } } /// <summary> /// bucket list that the target bucket resides in and to which it belongs /// </summary> BucketList bl; /// <summary> /// index in the target BucketList from which the target was originally obtained and will be (possibly) /// re-stored /// </summary> uint bx; /// <summary> /// originally witnessed version of the target bucket. This BucketHead structure only allows the target /// bucket to be modified if the proposed new value can be atomically exchanged with this value. /// </summary> Int64 v; internal const Int64 EntryCountMask = 0x000000FF00000000; internal const Int64 SequenceIncrement = 0x0000010000000000; internal const Int64 SequenceMask = unchecked((Int64)0xFFFFFF0000000000); internal const Int64 FirstIndexMask = 0x00000000FFFFFFFF; internal const Int32 EntriesPerBucketMax = 0x80; internal const Int32 EntriesPerBucketMask = EntriesPerBucketMax - 1; internal int CapturedFirstEntryIndex { get { return (int)v; } } internal int CapturedEntryCount { get { return (int)(v >> 32) & EntriesPerBucketMask; } } internal Config Config { get { return this.bl.cfg; } } internal bool AcquireIsExpired { get { return v != Interlocked.CompareExchange(ref bl.rg[bx], 0, 0); } } internal bool TryUpdateHead(int gx_first, int c_ent) { return Interlocked.CompareExchange(ref bl.rg[bx], ((v + SequenceIncrement) & SequenceMask) | ((Int64)c_ent << 32) | (UInt32)gx_first, v) == v; } internal bool TryContainsKey(K key, out bool f_found) { int c_ent = CapturedEntryCount; if (c_ent == 0) { f_found = false; return true; } int gx = CapturedFirstEntryIndex; Config cfg = bl.cfg; while (true) { int offs; Entry[] eb = cfg.GetBlock(gx, out offs); if (key.Equals(eb[offs].key)) { f_found = true; return !AcquireIsExpired; } if (--c_ent == 0) { f_found = false; return !AcquireIsExpired; } if ((uint)(gx = eb[offs].gx_next) >= 0xFFFFFFFE) { f_found = default(bool); return false; } } } internal bool TryGetValueForKey(K key, out bool f_found, out V v) { int c_ent = CapturedEntryCount; if (c_ent == 0) { f_found = false; v = default(V); return true; } int gx = CapturedFirstEntryIndex; Config cfg = bl.cfg; while (true) { int offs; Entry[] eb = cfg.GetBlock(gx, out offs); if (key.Equals(eb[offs].key)) { f_found = true; v = eb[offs].value; return !AcquireIsExpired; } if (--c_ent == 0) { f_found = false; v = default(V); return !AcquireIsExpired; } if ((uint)(gx = eb[offs].gx_next) >= 0xFFFFFFFE) { f_found = default(bool); v = default(V); return false; } } } internal bool TryFindKeyInfo(uint h_in, K key, out KeyInfo ki, out V val_found) { val_found = default(V); ki.gx_found = ki.gx_last = ki.i_found = -1; int gx = CapturedFirstEntryIndex, c_ent = CapturedEntryCount; Config cfg = bl.cfg; for (int i = 0; ; i++) { ki.gx_last = gx; int offs; Entry[] eb = cfg.GetBlock(gx, out offs); if (key.Equals(eb[offs].key)) { val_found = eb[offs].value; ki.gx_found = gx; ki.i_found = i; } if (--c_ent == 0) return !AcquireIsExpired; if ((uint)(gx = eb[offs].gx_next) >= 0xFFFFFFFE) return false; } } internal bool TryFindKeyThrow(K key, out V v) { int c_ent = CapturedEntryCount; if (c_ent == 0) throw new KeyNotFoundException(); int gx = CapturedFirstEntryIndex; Config cfg = bl.cfg; while (true) { int offs; Entry[] eb = cfg.GetBlock(gx, out offs); if (key.Equals(eb[offs].key)) { v = eb[offs].value; return !AcquireIsExpired; } if (--c_ent == 0) { /// Technically, this is a case where we could check expiration without an acquire fence, but /// for simplicity we won't create more code surface just to support this callsite if (AcquireIsExpired) break; throw new KeyNotFoundException(); } if ((uint)(gx = eb[offs].gx_next) >= 0xFFFFFFFE) break; } v = default(V); return false; } internal bool TryRemoveFirstItem() { int gx_next, c = CapturedEntryCount; if (c == 0) return false; if (--c == 0) gx_next = -1; else { int offs; gx_next = bl.cfg.GetBlock(CapturedFirstEntryIndex, out offs)[offs].gx_next; /// no need for acquire fence: TryUpdate incorporates data dependence on gx_next } return TryUpdateHead(gx_next, c); } internal bool TryInsertFirstItem(ref int mtid, uint h, ref K key, ref V value) { Config cfg = bl.cfg; int c = CapturedEntryCount; if (c == EntriesPerBucketMask) { cfg.RequestBucketResize(ref mtid); return false; } int offs, gx = cfg.GetUnusedIndex(ref mtid); /// no release fence required: not referencing shared location (plus, all stores are release on .NET) Entry[] eb = cfg.GetBlock(gx, out offs); eb[offs].hash = h; eb[offs].gx_next = c == 0 ? -1 : CapturedFirstEntryIndex; eb[offs].key = key; eb[offs].value = value; if (TryUpdateHead(gx, c + 1)) return true; cfg.ReleaseIndex(ref mtid, gx); return false; } internal bool TryReplaceFirstOrShift(ref int mtid, uint h, int gx_next, ref K key, ref V value) { Config cfg = bl.cfg; int offs, gx = cfg.GetUnusedIndex(ref mtid); /// no release fence required: 'e' never references shared location (plus, all stores are release on .NET) Entry[] eb = cfg.GetBlock(gx, out offs); eb[offs].hash = h; eb[offs].gx_next = gx_next; eb[offs].key = key; eb[offs].value = value; if (TryUpdateHead(gx, CapturedEntryCount)) return true; cfg.ReleaseIndex(ref mtid, gx); return false; } internal static TryResult IfDirectRotateToFront(ref BucketHead bh, int gx_last, int gx_target) { Debug.Assert(gx_last != -1); BucketList bl = bh.bl; int offs, gx_first = bh.CapturedFirstEntryIndex; int n = Interlocked.CompareExchange(ref bl.cfg.GetBlock(gx_last, out offs)[offs].gx_next, gx_first, -1); if (n != -1 && n != gx_first) return TryResult.Impossible; Int64 v = bh.v; Int64 new_v = ((v + SequenceIncrement) & ~FirstIndexMask) | (UInt32)gx_target; if (Interlocked.CompareExchange(ref bl.rg[bh.bx], new_v, v) != v) return TryResult.Expired; bh = new BucketHead(bl, bh.bx, new_v); return TryResult.Ok; } /// rotate one at a time until the target is at the end of the chain internal static bool TryRotateToEnd(ref int mtid, ref BucketHead bh, int gx_last, int i_found) { Debug.Assert(gx_last != -1); /// couldn't use any tricks. rotate one at a time BucketList bl = bh.bl; Config cfg = bl.cfg; int o_last; Entry[] eb_last = cfg.GetBlock(gx_last, out o_last); i_found = bh.CapturedEntryCount - i_found - 1; while (true) { ////////////////////////////////////////////////////////////////////// /// do one rotation: move last item to first ////////////////////////////////////////////////////////////////////// int o_new, gx_new = cfg.GetUnusedIndex(ref mtid); Entry e = eb_last[o_last]; /// no acquire fence required: 'e' is not shared until after the full fence (CompareExchange, below) e.gx_next = bh.CapturedFirstEntryIndex; /// no release fence required: the new Entry is still not shared (plus, all stores are release on .NET) cfg.GetBlock(gx_new, out o_new)[o_new] = e; Int64 new_v = ((bh.v + SequenceIncrement) & ~FirstIndexMask) | (UInt32)gx_new; if (Interlocked.CompareExchange(ref bl.rg[bh.bx], new_v, bh.v) != bh.v) { cfg.ReleaseIndex(ref mtid, gx_new); return false; } cfg.ReleaseIndex(ref mtid, gx_last); bh = new BucketHead(bl, bh.bx, new_v); ////////////////////////////////////////////////////////////////////// /// see if done ////////////////////////////////////////////////////////////////////// if (--i_found == 0) return true; ////////////////////////////////////////////////////////////////////// /// get the new last item ////////////////////////////////////////////////////////////////////// int c_ent = bh.CapturedEntryCount; gx_last = bh.CapturedFirstEntryIndex; while (true) { eb_last = cfg.GetBlock(gx_last, out o_last); if (--c_ent == 0) break; gx_last = eb_last[o_last].gx_next; /// acquire fences not needed (except for the last iteration) because gx_last has /// data dependence on itself. This also prevents loop hoisting if ((uint)gx_last >= 0xFFFFFFFE) return false; } /// acquire frence: the final read of gx_last is not demanded yet but we proceed fenceless until /// the CompareExchange } } internal void Invalidate() { /// try to effect our change, or get the current value Int64 v_cur = Interlocked.CompareExchange(ref bl.rg[bx], v + SequenceIncrement, v); /// we never have to try more than twice because if a change based on the new value isn't /// accepted, we know that there must have been an invalidation since the time of this function /// call which we can count for this function as well (invalidations are indistinguishable) if (v_cur != v) Interlocked.CompareExchange(ref bl.rg[bx], v_cur + SequenceIncrement, v_cur); } }; }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// BucketList (abstract) /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// internal abstract partial class BucketList { protected Int64[] rg; protected Config cfg; bool f_closed = false; internal BucketList(Config cfg, Int64[] rg) { this.cfg = cfg; this.rg = rg; } internal BucketList(Config cfg, uint c_entries) : this(cfg, new Int64[HashFriendly(c_entries)]) { } internal bool IsCurrentForConfig { get { return cfg.m_buckets == this; } } [DebuggerBrowsable(DebuggerBrowsableState.Never)] internal uint BucketCount { get { return (uint)rg.Length; } } internal virtual bool GetBucketHead(ref int mtid, uint hash, out BucketHead bh) { bh = new BucketHead(this, hash % (uint)rg.Length); return cfg.m_buckets == this; } internal void InvalidateAllBuckets() { for (uint i = 0; !f_closed && i < rg.Length; i++) new BucketHead(this, i).Invalidate(); if (!f_closed) { /// acquire fence required here on any pending invalidations, but Invalidate() entails full fences /// (CompareExchange) f_closed = true; } } static internal uint HashFriendly(uint c) { int ix = Array.BinarySearch<uint>(some_primes, c); if (ix < 0) ix = ~ix; if (ix < some_primes.Length) return some_primes[ix]; /// requested size is bigger than the largest prime in the table c |= 1; while (c % 3 == 0 || c % 5 == 0 || c % 7 == 0) c += 2; return c; } static uint[] some_primes = { 0x000007, 0x00000B, 0x00000D, 0x000011, 0x000017, 0x00001D, 0x000025, 0x00002F, 0x00003B, 0x000043, 0x00004F, 0x000061, 0x00007F, 0x000089, 0x0000A3, 0x0000C5, 0x000101, 0x000115, 0x000133, 0x00014B, 0x00018D, 0x000209, 0x00022D, 0x000269, 0x0002A1, 0x00031D, 0x000419, 0x00045D, 0x0004D5, 0x000551, 0x00063D, 0x000833, 0x0008BD, 0x0009AD, 0x000AA9, 0x000C83, 0x001069, 0x001181, 0x00135D, 0x00155F, 0x001915, 0x0020E3, 0x002303, 0x0026C3, 0x002AC5, 0x003235, 0x0041CB, 0x004609, 0x004D8D, 0x005597, 0x006475, 0x0083A7, 0x008C17, 0x009B1D, 0x00AB4D, 0x00C8ED, 0x010751, 0x01183D, 0x01363F, 0x0156A7, 0x0191DD, 0x020EB5, 0x02307B, 0x026C81, 0x02AD57, 0x0323BF, 0x041D73, 0x0460FD, 0x04D905, 0x055AB3, 0x064787, 0x083AFD, 0x08C201, 0x09B215, 0x0AB57B, 0x0C8F4D, 0x107603, 0x118411, 0x136441, 0x20EC13, 0x2DC6D1, 0x41D82F, 0x4C4B4B, 0x5B8D8B, 0x6ACFC3, 0x7A1209, 0x83B073, 0x90F575, 0x989693 }; }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// BucketListPrimary - a list of buckets used during normal operation /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// internal partial class BucketListPrimary : BucketList { internal BucketListPrimary(Config cfg, uint c_entries) : base(cfg, c_entries) { } internal BucketListPrimary(Config cfg, Int64[] rg) : base(cfg, rg) { } /// <summary> /// Initiate bucket array resizing and continue on to assist with that task /// </summary> internal void InitiateBucketResize(ref int mtid) { if (this.IsCurrentForConfig && this.cfg.IsCurrent) { BucketListResize blr = new BucketListResize(cfg, this, HashFriendly(BucketCount * 2)); cfg.ChangeBuckets(this, blr); cfg.CheckAssist(ref mtid); } } }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// internal partial class BucketListResize : BucketList { BucketListPrimary blp; int m_c_helpers; bool f_done = false; internal BucketListResize(Config c, BucketListPrimary blp, uint new_size) : base(c, new_size) { this.blp = blp; this.m_c_helpers = 0; } /// <summary> /// Conscript all callers to assist with the bucket resizing, no matter if it's already complete or not. /// </summary> internal override bool GetBucketHead(ref int mtid, uint hash, out BucketHead bh) { _check_assist(ref mtid); return cfg.d.m_config.m_buckets.GetBucketHead(ref mtid, hash, out bh); } /// <summary> /// Allow any number of helpers to cooperatively resize the bucket array /// </summary> internal void _check_assist(ref int mtid) { if (!this.IsCurrentForConfig) return; /// wait-free: everyone sets all closed flags until the operation is confirmed and we have a frozen count blp.InvalidateAllBuckets(); uint i_helper = (uint)Interlocked.Increment(ref m_c_helpers) - 1; uint c_buckets_old = blp.BucketCount; uint n_chunk = c_buckets_old / (i_helper + 1); uint i_chunk = 0; #if DEBUG if (i_helper == 0 && (cfg.d.m_options & Options.DebugOutput) > 0) { Console.WriteLine("initiated resizing from {0} to {1} ({2} items)", c_buckets_old, BucketCount, cfg.m_count); Console.Out.Flush(); } #endif //Entry e = default(Entry); uint h = 0; K key = default(K); V val = default(V); contention: /// the starting point for the single-stepping loop advances by a different chunk amount for each helper i_chunk = (i_chunk + n_chunk) % c_buckets_old; /// single-stepping loop. Restarted when any kind of contention is detected /// each helper must eventually confirm zero entries in all source buckets for (uint bx = i_chunk, c = 0; !f_done && c < c_buckets_old; bx = (bx + 1) % c_buckets_old, c++) { /// loop until this bucket has zero entries, unless aborted first due to contention BucketHead bh_old; while ((bh_old = new BucketHead(cfg, blp, bx, ref h, ref key, ref val)).CapturedEntryCount > 0) { BucketHead bh_new = new BucketHead(this, h % BucketCount); bool f; if (!bh_new.TryContainsKey(key, out f)) goto contention; if (!f && !bh_new.TryInsertFirstItem(ref mtid, h, ref key, ref val)) goto contention; if (!bh_old.TryRemoveFirstItem()) goto contention; cfg.ReleaseIndex(ref mtid, bh_old.CapturedFirstEntryIndex); } } if (!f_done) { Thread.MemoryBarrier(); f_done = true; } /// first one finished swaps out the swap set back to a primary set. Also, be sure not to swap into the latest config /// because if the dictionary was emptied while we were at work, we don't want to destroy any new items if (this.IsCurrentForConfig) cfg.ChangeBuckets(this, new BucketListPrimary(cfg, this.rg)); #if DEBUG if (i_helper == 0 && (cfg.d.m_options & Options.DebugOutput) > 0) { Console.WriteLine("done"); Console.Out.Flush(); } #endif } }; }; };