Skip to content

Commit

Permalink
Fix concurrency issues with SQLite pooling (#32615)
Browse files Browse the repository at this point in the history
Fixes #25797
Fixes #26016

I identified two race conditions, both caused by splitting state across multiple data structures. In particular, the Semaphore and the two ConcurrentStacks must stay in sync--that is, the Semaphore can let someone get a collection if and only if there is a connection available in the one of the stacks.

The fix is to wrap all these things in a single lock. It's possible that we don't need a full lock, but we already have one to protect _collections which can easily be expanded to cover the right areas.

Once this lock is used, the semaphore is no longer needed, and the stacks don't need to be concurrent because they are protected by the lock.
  • Loading branch information
ajcvickers authored Dec 14, 2023
1 parent 5bceb5e commit 555421e
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,7 @@ public override void Delete()

if (!string.IsNullOrEmpty(path))
{
SqliteConnection.ClearPool(new SqliteConnection(Dependencies.Connection.ConnectionString));
// See issues #25797 and #26016
// SqliteConnection.ClearAllPools();
SqliteConnection.ClearAllPools();
File.Delete(path);
}
else if (dbConnection.State == ConnectionState.Open)
Expand Down
104 changes: 49 additions & 55 deletions src/Microsoft.Data.Sqlite.Core/SqliteConnectionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;

Expand All @@ -16,9 +14,8 @@ internal class SqliteConnectionPool

private readonly SqliteConnectionStringBuilder _connectionOptions;
private readonly List<SqliteConnectionInternal> _connections = new();
private readonly ConcurrentStack<SqliteConnectionInternal> _warmPool = new();
private readonly ConcurrentStack<SqliteConnectionInternal> _coldPool = new();
private readonly Semaphore _poolSemaphore = new(0, int.MaxValue);
private readonly Stack<SqliteConnectionInternal> _warmPool = new();
private readonly Stack<SqliteConnectionInternal> _coldPool = new();

private Timer? _pruneTimer;
private State _state = State.Active;
Expand Down Expand Up @@ -51,20 +48,14 @@ public SqliteConnectionInternal GetConnection()
SqliteConnectionInternal? connection = null;
do
{
if (_poolSemaphore.WaitOne(0))
lock (_connections)
{
if (!_warmPool.TryPop(out connection)
&& !_coldPool.TryPop(out connection))
if (!TryPop(_warmPool, out connection)
&& !TryPop(_coldPool, out connection)
&& (Count % 2 == 1 || !ReclaimLeakedConnections()))
{
Debug.Fail("Inconceivable!");
}
}
else if (Count % 2 == 1 || !ReclaimLeakedConnections())
{
connection = new SqliteConnectionInternal(_connectionOptions, this);
connection = new SqliteConnectionInternal(_connectionOptions, this);

lock (_connections)
{
_connections.Add(connection);
}
}
Expand All @@ -74,73 +65,76 @@ public SqliteConnectionInternal GetConnection()
return connection;
}

public void Return(SqliteConnectionInternal connection)
private static bool TryPop(Stack<SqliteConnectionInternal> stack, out SqliteConnectionInternal? connection)
{
connection.Deactivate();

if (_state != State.Disabled
&& connection.CanBePooled)
{
_warmPool.Push(connection);
_poolSemaphore.Release();
}
else
#if NET5_0_OR_GREATER
return stack.TryPop(out connection);
#else
if (stack.Count > 0)
{
DisposeConnection(connection);
connection = stack.Pop();
return true;
}

connection = null;
return false;
#endif
}

public void Clear()
public void Return(SqliteConnectionInternal connection)
{
lock (_connections)
{
foreach (var connection in _connections)
connection.Deactivate();

if (_state != State.Disabled
&& connection.CanBePooled)
{
connection.DoNotPool();
_warmPool.Push(connection);
}
else
{
DisposeConnection(connection);
}
}

while (_warmPool.TryPop(out var connection))
{
DisposeConnection(connection);
}

while (_coldPool.TryPop(out var connection))
{
DisposeConnection(connection);
}

ReclaimLeakedConnections();
}

private void PruneCallback(object? _)
public void Clear()
{
while (Count > 0)
lock (_connections)
{
if (!_poolSemaphore.WaitOne(0))
foreach (var connection in _connections)
{
break;
connection.DoNotPool();
}

if (_coldPool.TryPop(out var connection))
while (TryPop(_warmPool, out var connection))
{
DisposeConnection(connection);
DisposeConnection(connection!);
}
else

while (TryPop(_coldPool, out var connection))
{
_poolSemaphore.Release();
break;
DisposeConnection(connection!);
}

ReclaimLeakedConnections();
}
}

if (_poolSemaphore.WaitOne(0))
private void PruneCallback(object? _)
{
lock (_connections)
{
while (_warmPool.TryPop(out var connection))
while (TryPop(_coldPool, out var connection))
{
_coldPool.Push(connection);
DisposeConnection(connection!);
}

_poolSemaphore.Release();
while (TryPop(_warmPool, out var connection))
{
_coldPool.Push(connection!);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public async Task Create_sets_journal_mode_to_wal(bool async)
Assert.Equal("wal", journalMode);
}

[ConditionalTheory(Skip = "Issues #25797 and #26016")]
[ConditionalTheory]
[InlineData(false)]
[InlineData(true)]
public async Task Delete_works_even_when_different_connection_exists_to_same_file(bool async)
Expand Down
43 changes: 43 additions & 0 deletions test/Microsoft.Data.Sqlite.Tests/SqliteConnectionFactoryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
using System;
using System.Data;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using SQLitePCL;
using Xunit;

Expand Down Expand Up @@ -88,6 +90,47 @@ public void Internal_connections_are_not_reused_after_clearing_pool(bool allPool
Assert.NotSame(db, connection.Handle);
}

[Fact]
public void Can_clear_pools_while_connections_are_being_used()
{
const int threadCount = 20;
var connectionStrings = Enumerable.Range(30, 30 + threadCount - 1)
.Select(i => $"Data Source={FileName};Cache=Shared;Pooling=True;Command Timeout={i}").ToArray();

var usingTasks = new Action[threadCount];
for (var i = 0; i < threadCount; i++)
{
var captured = i;
usingTasks[i] = () =>
{
for (var j = 0; j < 10000; j++)
{
using (var connection = new SqliteConnection(connectionStrings[captured]))
{
connection.Open();
Task.Yield();
connection.Close();
}
}
};
}

for (int j = 0; j < 30; j++)
{
var runningTasks = usingTasks.Select(Task.Run).ToArray();

for (var i = 0; i < 10000; i++)
{
SqliteConnection.ClearAllPools();
Task.Yield();
}

#pragma warning disable xUnit1031
Task.WaitAll(runningTasks);
#pragma warning restore xUnit1031
}
}

[Theory]
[InlineData(false)]
[InlineData(true)]
Expand Down

0 comments on commit 555421e

Please # to comment.