using System; using System.Diagnostics; using System.Collections.Generic; using System.Threading; using System.Linq; #pragma warning disable 0420 public partial class LockFreeDictionary<K, V> : IDictionary<K, V> where K : IEquatable<K> where V : IEquatable<V> { internal enum TryResult { None = 0, Expired, Impossible, Ok }; public partial class Config { internal abstract partial class BucketList { /// Layout of the Int64 BucketHead: /// /// cccccccc /// qqqqqqqqqqqqqqqqqqqqqqqq| |eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee /// +-----------------------+------+-------------------------------+ /// 3 3 2 1 0 /// FEDCBA9876543210FEDCBA9876543210FEDCBA9876543210FEDCBA9876543210 /// /// q: sequence number, 0 - (2^27 - 1), in order to avoid ABA failures /// c: number of entries in the chain /// e: global index of the first entry in a chain of entries, 0 - (2^32 - 1) /// internal partial struct BucketHead { BucketHead(BucketList bl, uint bx, Int64 v) { this.bl = bl; this.bx = bx; this.v = v; } internal BucketHead(BucketList bl, uint bx) { this.bl = bl; this.bx = bx; if (IntPtr.Size == 4) this.v = Interlocked.CompareExchange(ref bl.rg[bx], 0, 0); else { this.v = bl.rg[bx]; Thread.MemoryBarrier(); } } internal BucketHead(Config cfg, BucketList bl, uint bx, out EntryBlock.Entry e) : this(bl, bx) { while (CapturedEntryCount > 0) { cfg.GetEntryHot(CapturedFirstEntryIndex, out e); Int64 new_v; if (IntPtr.Size == 4) new_v = Interlocked.CompareExchange(ref bl.rg[bx], 0, 0); else { new_v = bl.rg[bx]; Thread.MemoryBarrier(); } if (this.v == new_v) return; this.v = new_v; } e = default(EntryBlock.Entry); } /// <summary> /// bucket list that the target bucket resides in and to which it belongs /// </summary> readonly BucketList bl; /// <summary> /// index in the target BucketList from which the target was originally obtained and will be (possibly) /// re-stored /// </summary> readonly uint bx; /// <summary> /// originally witnessed version of the target bucket. This BucketHead structure only allows the target /// bucket to be modified if the proposed new value can be atomically exchanged with this value. /// </summary> readonly Int64 v; internal const Int64 EntryCountMask = 0x000000FF00000000; internal const Int64 SequenceIncrement = 0x0000010000000000; internal const Int64 SequenceMask = unchecked((Int64)0xFFFFFF0000000000); internal const Int64 FirstIndexMask = 0x00000000FFFFFFFF; internal const Int32 EntriesPerBucketMax = 0x80; internal const Int32 EntriesPerBucketMask = EntriesPerBucketMax - 1; internal int CapturedFirstEntryIndex { get { return (int)v; } } internal int CapturedEntryCount { get { return (int)(v >> 32) & EntriesPerBucketMask; } } internal Config Config { get { return this.bl.cfg; } } internal bool IsExpired { get { if (IntPtr.Size == 4) return v != Interlocked.CompareExchange(ref bl.rg[bx], 0, 0); bool b = v != bl.rg[bx]; Thread.MemoryBarrier(); return b; } } internal bool TryGetValue(int gx, out V value) { bl.cfg.GetValueHot(gx, out value); return !IsExpired; } internal bool TryGetKey(int gx, out K key) { bl.cfg.GetKeyHot(gx, out key); return !IsExpired; } internal bool TryUpdate(int gx_first, int c_ent) { return Interlocked.CompareExchange(ref bl.rg[bx], ((v + SequenceIncrement) & SequenceMask) | ((Int64)c_ent << 32) | (UInt32)gx_first, v) == v; } internal void Invalidate() { /// try to effect our change, or get the current value Int64 v_cur = Interlocked.CompareExchange(ref bl.rg[bx], v + SequenceIncrement, v); /// we never have to try more than twice because if a change based on the new value isn't /// accepted, we know that there must have been an invalidation since the time of this function /// call which we can count for this function as well (invalidations are indistinguishable) if (v_cur != v) Interlocked.CompareExchange(ref bl.rg[bx], v_cur + SequenceIncrement, v_cur); } internal bool TryFindKey(uint h_in, K key, out int gx) { int c_ent = CapturedEntryCount; gx = CapturedFirstEntryIndex; for (int i = 0; i < c_ent; i++) { int gx_next; uint h = bl.cfg.GetHashAndNextHot(gx, out gx_next); if (h == h_in) { K k; if (!TryGetKey(gx, out k)) return false; if (k.Equals(key)) return true; } else if (IsExpired) return false; gx = gx_next; } gx = -1; return true; } internal bool TryFindKey(uint h_in, ref K key, out int gx_found, out int i_found, out int gx_last) { gx_found = gx_last = i_found = -1; int gx = CapturedFirstEntryIndex, c_ent = CapturedEntryCount; for (int i = 0; i < c_ent; i++) { gx_last = gx; uint h = bl.cfg.GetHashAndNextHot(gx_last, out gx); if (h == h_in) { K k; if (!TryGetKey(gx_last, out k)) return false; if (k.Equals(key)) { gx_found = gx_last; i_found = i; } } else if (IsExpired) return false; } return true; } internal bool TryRemoveFirstItem() { int c = CapturedEntryCount; return c > 0 && TryUpdate(--c == 0 ? -1 : bl.cfg.GetNextHot(CapturedFirstEntryIndex), c); } internal bool TryInsertFirstItem(ref int mtid, ref EntryBlock.Entry e) { Config cfg = bl.cfg; int c = CapturedEntryCount; if (c == EntriesPerBucketMask) { cfg.RequestBucketResize(ref mtid); return false; } e.gx_next = c == 0 ? -1 : CapturedFirstEntryIndex; int gx = cfg.GetUnusedIndex(ref mtid); cfg.StoreEntryHot(gx, ref e); if (TryUpdate(gx, c + 1)) return true; cfg.ReleaseIndex(ref mtid, gx); return false; } internal TryResult CanCarouselTo(int gx_last, int gx_target, ref BucketHead bh_new) { Debug.Assert(gx_last != -1); int n = bl.cfg.TransactEntryNext(gx_last, CapturedFirstEntryIndex, -1); if (n != -1 && n != CapturedFirstEntryIndex) return TryResult.Impossible; Int64 new_v = ((v + SequenceIncrement) & ~FirstIndexMask) | (UInt32)gx_target; if (Interlocked.CompareExchange(ref bl.rg[bx], new_v, v) != v) return TryResult.Expired; bh_new = new BucketHead(bl, bx, new_v); return TryResult.Ok; } internal bool TryReplaceFirst(ref int mtid, ref EntryBlock.Entry e) { int gx = bl.cfg.GetUnusedIndex(ref mtid); bl.cfg.StoreEntryHot(gx, ref e); if (TryUpdate(gx, CapturedEntryCount)) return true; bl.cfg.ReleaseIndex(ref mtid, gx); return false; } internal bool TryRotateToEnd(ref int mtid, int gx_last, int i_target, ref BucketHead bh_new) { Debug.Assert(gx_last != -1); /// couldn't use any tricks. rotate one at a time i_target = CapturedEntryCount - i_target - 1; while (true) { if (!bh_new.RotateOnce(ref mtid, gx_last, ref bh_new)) return false; if (--i_target == 0) return true; if (!bh_new.TryGetLast(out gx_last)) return false; } } bool RotateOnce(ref int mtid, int gx_last, ref BucketHead bh_new) { Config cfg = bl.cfg; EntryBlock.Entry e; cfg.GetEntryHot(gx_last, out e); e.gx_next = CapturedFirstEntryIndex; int gx_new = cfg.GetUnusedIndex(ref mtid); cfg.StoreEntryHot(gx_new, ref e); Int64 new_v = ((v + SequenceIncrement) & ~FirstIndexMask) | (UInt32)gx_new; if (Interlocked.CompareExchange(ref bl.rg[bx], new_v, v) != v) { cfg.ReleaseIndex(ref mtid, gx_new); return false; } cfg.ReleaseIndex(ref mtid, gx_last); bh_new = new BucketHead(bl, bx, new_v); return true; } bool TryGetLast(out int gx_last) { int c_ent = CapturedEntryCount; Debug.Assert(c_ent > 0); gx_last = CapturedFirstEntryIndex; for (int i = 0; i < c_ent - 1; i++) { gx_last = bl.cfg.GetNextHot(gx_last); if (IsExpired) return false; } return true; } }; }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// BucketList (abstract) /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// internal abstract partial class BucketList { readonly protected Int64[] rg; readonly protected Config cfg; bool f_closed = false; internal BucketList(Config cfg, Int64[] rg) { this.cfg = cfg; this.rg = rg; } internal BucketList(Config cfg, uint c_entries) : this(cfg, new Int64[HashFriendly(c_entries)]) { } internal bool IsCurrentForConfig { get { return cfg.m_buckets == this; } } [DebuggerBrowsable(DebuggerBrowsableState.Never)] internal uint BucketCount { get { return (uint)rg.Length; } } internal virtual bool GetBucketHead(ref int mtid, uint hash, out BucketHead bh) { bh = new BucketHead(this, hash % (uint)rg.Length); return cfg.m_buckets == this; } internal void InvalidateAllBuckets() { for (uint i = 0; !f_closed && i < rg.Length; i++) new BucketHead(this, i).Invalidate(); if (!f_closed) { Thread.MemoryBarrier(); f_closed = true; } } static protected uint HashFriendly(uint c) { int ix = Array.BinarySearch<uint>(some_primes, c); if (ix < 0) ix = ~ix; if (ix < some_primes.Length) return some_primes[ix]; /// requested size is bigger than the largest prime in the table c |= 1; while (c % 3 == 0 || c % 5 == 0 || c % 7 == 0) c += 2; return c; } static readonly uint[] some_primes = { 0x000007, 0x00000B, 0x00000D, 0x000011, 0x000017, 0x00001D, 0x000025, 0x00002F, 0x00003B, 0x000043, 0x00004F, 0x000061, 0x00007F, 0x000089, 0x0000A3, 0x0000C5, 0x000101, 0x000115, 0x000133, 0x00014B, 0x00018D, 0x000209, 0x00022D, 0x000269, 0x0002A1, 0x00031D, 0x000419, 0x00045D, 0x0004D5, 0x000551, 0x00063D, 0x000833, 0x0008BD, 0x0009AD, 0x000AA9, 0x000C83, 0x001069, 0x001181, 0x00135D, 0x00155F, 0x001915, 0x0020E3, 0x002303, 0x0026C3, 0x002AC5, 0x003235, 0x0041CB, 0x004609, 0x004D8D, 0x005597, 0x006475, 0x0083A7, 0x008C17, 0x009B1D, 0x00AB4D, 0x00C8ED, 0x010751, 0x01183D, 0x01363F, 0x0156A7, 0x0191DD, 0x020EB5, 0x02307B, 0x026C81, 0x02AD57, 0x0323BF, 0x041D73, 0x0460FD, 0x04D905, 0x055AB3, 0x064787, 0x083AFD, 0x08C201, 0x09B215, 0x0AB57B, 0x0C8F4D, 0x107603, 0x118411, 0x136441, 0x20EC13, 0x2DC6D1, 0x41D82F, 0x4C4B4B, 0x5B8D8B, 0x6ACFC3, 0x7A1209, 0x83B073, 0x90F575, 0x989693 }; }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// BucketListPrimary - a list of buckets used during normal operation /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// internal partial class BucketListPrimary : BucketList { internal BucketListPrimary(Config cfg, uint c_entries) : base(cfg, c_entries) { } internal BucketListPrimary(Config cfg, Int64[] rg) : base(cfg, rg) { } /// <summary> /// Initiate bucket array resizing and continue on to assist with that task /// </summary> internal void InitiateBucketResize(ref int mtid) { if (this.IsCurrentForConfig && this.cfg.IsCurrent) { BucketListResize blr = new BucketListResize(cfg, this, HashFriendly(BucketCount * 2)); cfg.ChangeBuckets(this, blr); cfg.CheckAssist(ref mtid); } } }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// <summary> /// /// </summary> /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// internal partial class BucketListResize : BucketList { readonly BucketListPrimary blp; int m_c_helpers; bool f_done = false; internal BucketListResize(Config c, BucketListPrimary blp, uint new_size) : base(c, new_size) { this.blp = blp; this.m_c_helpers = 0; } /// <summary> /// Conscript all callers to assist with the bucket resizing, no matter if it's already complete or not. /// </summary> internal override bool GetBucketHead(ref int mtid, uint hash, out BucketHead bh) { _check_assist(ref mtid); return cfg.d.m_config.m_buckets.GetBucketHead(ref mtid, hash, out bh); } /// <summary> /// Allow any number of helpers to cooperatively resize the bucket array /// </summary> internal void _check_assist(ref int mtid) { if (!this.IsCurrentForConfig) return; /// wait-free: everyone sets all closed flags until the operation is confirmed and we have a frozen count blp.InvalidateAllBuckets(); uint i_helper = (uint)Interlocked.Increment(ref m_c_helpers) - 1; uint c_buckets_old = blp.BucketCount; uint n_chunk = BucketCount / (i_helper + 3); uint i_chunk = 0; if (i_helper == 0 && (cfg.d.m_options & Options.DebugOutput) > 0) { Console.WriteLine("initiated resizing from {0} to {1} ({2} items)", c_buckets_old, BucketCount, cfg.m_count); Console.Out.Flush(); } contention: /// the starting point for the single-stepping loop advances by a different chunk amount for each helper i_chunk = (i_chunk + n_chunk) % c_buckets_old; /// single-stepping loop. Restarted when any kind of contention is detected /// each helper must eventually confirm zero entries in all source buckets for (uint bx = i_chunk, c = 0; !f_done && c < c_buckets_old; bx = (bx + 1) % c_buckets_old, c++) { /// loop until this bucket has zero entries, unless aborted first due to contention BucketHead bh_old; EntryBlock.Entry e; while ((bh_old = new BucketHead(cfg, blp, bx, out e)).CapturedEntryCount > 0) { BucketHead bh_new = new BucketHead(this, e.hash % BucketCount); int gx; if (!bh_new.TryFindKey(e.hash, e.key, out gx)) goto contention; if (gx != -1) { if (!bh_old.TryRemoveFirstItem()) goto contention; cfg.ReleaseIndex(ref mtid, bh_old.CapturedFirstEntryIndex); } else if (!bh_new.TryInsertFirstItem(ref mtid, ref e)) goto contention; } } if (!f_done) { Thread.MemoryBarrier(); f_done = true; } /// first one finished swaps out the swap set back to a primary set. Also, be sure not to swap into the latest config /// because if the dictionary was emptied while we were at work, we don't want to destroy any new items if (this.IsCurrentForConfig) cfg.ChangeBuckets(this, new BucketListPrimary(cfg, this.rg)); if (i_helper == 0 && (cfg.d.m_options & Options.DebugOutput) > 0) { Console.WriteLine("done"); Console.Out.Flush(); } } }; }; };