Skip to content

Commit

Permalink
Merge pull request #2411 from ltetak/diskwriterqueue
Browse files Browse the repository at this point in the history
Simplified DiskWriterQueue with blocking concurrency
  • Loading branch information
mbdavid authored Feb 13, 2024
2 parents d1ef38a + f21cd84 commit 6d2a165
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 52 deletions.
2 changes: 0 additions & 2 deletions LiteDB/Engine/Disk/DiskService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ public int WriteAsync(IEnumerable<PageBuffer> pages)
count++;
}

_queue.Value.Run();

return count;
}

Expand Down
77 changes: 27 additions & 50 deletions LiteDB/Engine/Disk/DiskWriterQueue.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using static LiteDB.Constants;
Expand All @@ -18,10 +17,12 @@ internal class DiskWriterQueue : IDisposable

// async thread controls
private Task _task;
private bool _shouldClose = false;

private readonly ConcurrentQueue<PageBuffer> _queue = new ConcurrentQueue<PageBuffer>();

private int _running = 0;
private readonly object _queueSync = new object();
private readonly AsyncManualResetEvent _queueHasItems = new AsyncManualResetEvent();
private readonly ManualResetEventSlim _queueIsEmpty = new ManualResetEventSlim(true);

public DiskWriterQueue(Stream stream)
{
Expand All @@ -40,26 +41,15 @@ public DiskWriterQueue(Stream stream)
public void EnqueuePage(PageBuffer page)
{
ENSURE(page.Origin == FileOrigin.Log, "async writer must use only for Log file");

_queue.Enqueue(page);
}

/// <summary>
/// If queue contains pages and are not running, starts run queue again now
/// </summary>
public void Run()
{
lock (_queue)
lock (_queueSync)
{
if (_queue.Count == 0) return;

var oldValue = Interlocked.CompareExchange(ref _running, 1, 0);
_queueIsEmpty.Reset();
_queue.Enqueue(page);
_queueHasItems.Set();

if (oldValue == 0)
if (_task == null)
{
// Schedule a new thread to process the pages in the queue.
// https://blog.stephencleary.com/2013/08/startnew-is-dangerous.html
_task = Task.Run(ExecuteQueue);
_task = Task.Factory.StartNew(ExecuteQueue, TaskCreationOptions.LongRunning);
}
}
}
Expand All @@ -69,53 +59,35 @@ public void Run()
/// </summary>
public void Wait()
{
lock (_queue)
{
if (_task != null)
{
_task.Wait();
}

Run();
}

_queueIsEmpty.Wait();
ENSURE(_queue.Count == 0, "queue should be empty after wait() call");
}

/// <summary>
/// Execute all items in queue sync
/// </summary>
private void ExecuteQueue()
private async Task ExecuteQueue()
{
do
while (true)
{
if (_queue.TryDequeue(out var page))
{
WritePageToStream(page);
}

while (page == null)
else
{
_stream.FlushToDisk();
Volatile.Write(ref _running, 0);

if (!_queue.Any()) return;

// Another item was added to the queue after we detected it was empty.
var oldValue = Interlocked.CompareExchange(ref _running, 1, 0);

if (oldValue == 1)
lock (_queueSync)
{
// A new thread was already scheduled for execution, this thread can return.
return;
if (_queue.Count > 0) continue;
_queueIsEmpty.Set();
_queueHasItems.Reset();
if (_shouldClose) return;
}
_stream.FlushToDisk();

// This thread will continue to process the queue as a new thread was not scheduled.
_queue.TryDequeue(out page);
WritePageToStream(page);
await _queueHasItems.WaitAsync();
}

} while (true);
}
}

private void WritePageToStream(PageBuffer page)
Expand All @@ -137,8 +109,13 @@ public void Dispose()
{
LOG($"disposing disk writer queue (with {_queue.Count} pages in queue)", "DISK");

_shouldClose = true;
_queueHasItems.Set(); // unblock the running loop in case there are no items

// run all items in queue before dispose
this.Wait();
_task?.Wait();
_task = null;
}
}
}
35 changes: 35 additions & 0 deletions LiteDB/Utils/AsyncManualResetEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System.Threading;
using System.Threading.Tasks;

namespace LiteDB
{
/// <summary>
/// Async implementation of ManualResetEvent
/// https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-1-asyncmanualresetevent/
/// </summary>
internal class AsyncManualResetEvent
{
private volatile TaskCompletionSource<bool> _tcs = new TaskCompletionSource<bool>();

public Task WaitAsync()
{
return _tcs.Task;
}

public void Set()
{
_tcs.TrySetResult(true);
}

public void Reset()
{
while (true)
{
var tcs = _tcs;
if (!tcs.Task.IsCompleted ||
Interlocked.CompareExchange(ref _tcs, new TaskCompletionSource<bool>(), tcs) == tcs)
return;
}
}
}
}

0 comments on commit 6d2a165

Please # to comment.