Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Simplified DiskWriterQueue with blocking concurrency #2411

Merged
merged 2 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions LiteDB/Engine/Disk/DiskService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ public int WriteAsync(IEnumerable<PageBuffer> pages)
count++;
}

_queue.Value.Run();

return count;
}

Expand Down
77 changes: 27 additions & 50 deletions LiteDB/Engine/Disk/DiskWriterQueue.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using static LiteDB.Constants;
Expand All @@ -18,10 +17,12 @@ internal class DiskWriterQueue : IDisposable

// async thread controls
private Task _task;
private bool _shouldClose = false;

private readonly ConcurrentQueue<PageBuffer> _queue = new ConcurrentQueue<PageBuffer>();

private int _running = 0;
private readonly object _queueSync = new object();
private readonly AsyncManualResetEvent _queueHasItems = new AsyncManualResetEvent();
private readonly ManualResetEventSlim _queueIsEmpty = new ManualResetEventSlim(true);

public DiskWriterQueue(Stream stream)
{
Expand All @@ -40,26 +41,15 @@ public DiskWriterQueue(Stream stream)
public void EnqueuePage(PageBuffer page)
{
ENSURE(page.Origin == FileOrigin.Log, "async writer must use only for Log file");

_queue.Enqueue(page);
}

/// <summary>
/// If queue contains pages and are not running, starts run queue again now
/// </summary>
public void Run()
{
lock (_queue)
lock (_queueSync)
{
if (_queue.Count == 0) return;

var oldValue = Interlocked.CompareExchange(ref _running, 1, 0);
_queueIsEmpty.Reset();
_queue.Enqueue(page);
_queueHasItems.Set();

if (oldValue == 0)
if (_task == null)
{
// Schedule a new thread to process the pages in the queue.
// https://blog.stephencleary.com/2013/08/startnew-is-dangerous.html
_task = Task.Run(ExecuteQueue);
_task = Task.Factory.StartNew(ExecuteQueue, TaskCreationOptions.LongRunning);
}
}
}
Expand All @@ -69,53 +59,35 @@ public void Run()
/// </summary>
public void Wait()
{
lock (_queue)
{
if (_task != null)
{
_task.Wait();
}

Run();
}

_queueIsEmpty.Wait();
ENSURE(_queue.Count == 0, "queue should be empty after wait() call");
}

/// <summary>
/// Execute all items in queue sync
/// </summary>
private void ExecuteQueue()
private async Task ExecuteQueue()
{
do
while (true)
{
if (_queue.TryDequeue(out var page))
{
WritePageToStream(page);
}

while (page == null)
else
{
_stream.FlushToDisk();
Volatile.Write(ref _running, 0);

if (!_queue.Any()) return;

// Another item was added to the queue after we detected it was empty.
var oldValue = Interlocked.CompareExchange(ref _running, 1, 0);

if (oldValue == 1)
lock (_queueSync)
{
// A new thread was already scheduled for execution, this thread can return.
return;
if (_queue.Count > 0) continue;
_queueIsEmpty.Set();
_queueHasItems.Reset();
if (_shouldClose) return;
}
_stream.FlushToDisk();

// This thread will continue to process the queue as a new thread was not scheduled.
_queue.TryDequeue(out page);
WritePageToStream(page);
await _queueHasItems.WaitAsync();
}

} while (true);
}
}

private void WritePageToStream(PageBuffer page)
Expand All @@ -137,8 +109,13 @@ public void Dispose()
{
LOG($"disposing disk writer queue (with {_queue.Count} pages in queue)", "DISK");

_shouldClose = true;
_queueHasItems.Set(); // unblock the running loop in case there are no items

// run all items in queue before dispose
this.Wait();
_task?.Wait();
_task = null;
}
}
}
35 changes: 35 additions & 0 deletions LiteDB/Utils/AsyncManualResetEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System.Threading;
using System.Threading.Tasks;

namespace LiteDB
{
/// <summary>
/// Async implementation of ManualResetEvent
/// https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-1-asyncmanualresetevent/
/// </summary>
internal class AsyncManualResetEvent
{
private volatile TaskCompletionSource<bool> _tcs = new TaskCompletionSource<bool>();

public Task WaitAsync()
{
return _tcs.Task;
}

public void Set()
{
_tcs.TrySetResult(true);
}

public void Reset()
{
while (true)
{
var tcs = _tcs;
if (!tcs.Task.IsCompleted ||
Interlocked.CompareExchange(ref _tcs, new TaskCompletionSource<bool>(), tcs) == tcs)
return;
}
}
}
}