diff --git a/LiteDB/Engine/Disk/DiskService.cs b/LiteDB/Engine/Disk/DiskService.cs index ad2bc8344..de6d0b503 100644 --- a/LiteDB/Engine/Disk/DiskService.cs +++ b/LiteDB/Engine/Disk/DiskService.cs @@ -186,8 +186,6 @@ public int WriteAsync(IEnumerable pages) count++; } - _queue.Value.Run(); - return count; } diff --git a/LiteDB/Engine/Disk/DiskWriterQueue.cs b/LiteDB/Engine/Disk/DiskWriterQueue.cs index 0592cc6b9..2b2e67041 100644 --- a/LiteDB/Engine/Disk/DiskWriterQueue.cs +++ b/LiteDB/Engine/Disk/DiskWriterQueue.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Concurrent; using System.IO; -using System.Linq; using System.Threading; using System.Threading.Tasks; using static LiteDB.Constants; @@ -18,10 +17,12 @@ internal class DiskWriterQueue : IDisposable // async thread controls private Task _task; + private bool _shouldClose = false; private readonly ConcurrentQueue _queue = new ConcurrentQueue(); - - private int _running = 0; + private readonly object _queueSync = new object(); + private readonly AsyncManualResetEvent _queueHasItems = new AsyncManualResetEvent(); + private readonly ManualResetEventSlim _queueIsEmpty = new ManualResetEventSlim(true); public DiskWriterQueue(Stream stream) { @@ -40,26 +41,15 @@ public DiskWriterQueue(Stream stream) public void EnqueuePage(PageBuffer page) { ENSURE(page.Origin == FileOrigin.Log, "async writer must use only for Log file"); - - _queue.Enqueue(page); - } - - /// - /// If queue contains pages and are not running, starts run queue again now - /// - public void Run() - { - lock (_queue) + lock (_queueSync) { - if (_queue.Count == 0) return; - - var oldValue = Interlocked.CompareExchange(ref _running, 1, 0); + _queueIsEmpty.Reset(); + _queue.Enqueue(page); + _queueHasItems.Set(); - if (oldValue == 0) + if (_task == null) { - // Schedule a new thread to process the pages in the queue. - // https://blog.stephencleary.com/2013/08/startnew-is-dangerous.html - _task = Task.Run(ExecuteQueue); + _task = Task.Factory.StartNew(ExecuteQueue, TaskCreationOptions.LongRunning); } } } @@ -69,53 +59,35 @@ public void Run() /// public void Wait() { - lock (_queue) - { - if (_task != null) - { - _task.Wait(); - } - - Run(); - } - + _queueIsEmpty.Wait(); ENSURE(_queue.Count == 0, "queue should be empty after wait() call"); } /// /// Execute all items in queue sync /// - private void ExecuteQueue() + private async Task ExecuteQueue() { - do + while (true) { if (_queue.TryDequeue(out var page)) { WritePageToStream(page); } - - while (page == null) + else { - _stream.FlushToDisk(); - Volatile.Write(ref _running, 0); - - if (!_queue.Any()) return; - - // Another item was added to the queue after we detected it was empty. - var oldValue = Interlocked.CompareExchange(ref _running, 1, 0); - - if (oldValue == 1) + lock (_queueSync) { - // A new thread was already scheduled for execution, this thread can return. - return; + if (_queue.Count > 0) continue; + _queueIsEmpty.Set(); + _queueHasItems.Reset(); + if (_shouldClose) return; } + _stream.FlushToDisk(); - // This thread will continue to process the queue as a new thread was not scheduled. - _queue.TryDequeue(out page); - WritePageToStream(page); + await _queueHasItems.WaitAsync(); } - - } while (true); + } } private void WritePageToStream(PageBuffer page) @@ -137,8 +109,13 @@ public void Dispose() { LOG($"disposing disk writer queue (with {_queue.Count} pages in queue)", "DISK"); + _shouldClose = true; + _queueHasItems.Set(); // unblock the running loop in case there are no items + // run all items in queue before dispose this.Wait(); + _task?.Wait(); + _task = null; } } } \ No newline at end of file diff --git a/LiteDB/Utils/AsyncManualResetEvent.cs b/LiteDB/Utils/AsyncManualResetEvent.cs new file mode 100644 index 000000000..0cbaf3421 --- /dev/null +++ b/LiteDB/Utils/AsyncManualResetEvent.cs @@ -0,0 +1,35 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace LiteDB +{ + /// + /// Async implementation of ManualResetEvent + /// https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-1-asyncmanualresetevent/ + /// + internal class AsyncManualResetEvent + { + private volatile TaskCompletionSource _tcs = new TaskCompletionSource(); + + public Task WaitAsync() + { + return _tcs.Task; + } + + public void Set() + { + _tcs.TrySetResult(true); + } + + public void Reset() + { + while (true) + { + var tcs = _tcs; + if (!tcs.Task.IsCompleted || + Interlocked.CompareExchange(ref _tcs, new TaskCompletionSource(), tcs) == tcs) + return; + } + } + } +} \ No newline at end of file