Skip to content

Commit 065bf32

Browse files
authoredMay 14, 2021
Fix edgehub queue len metric (#4952)
This attempts to fix the issue of queue len metric reporting incorrectly due to how MessageStore calculated the current queue length. This change now gives the ability to directly request the Count of items from the store implementation. There is also a matter of discussion on whether we can get the metric to exist solely in one unified place and whether we require both Checkpointer and MessageStore.
1 parent e5218d1 commit 065bf32

File tree

13 files changed

+47
-19
lines changed

13 files changed

+47
-19
lines changed
 

‎edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Core/storage/MessageStore.cs

+1-13
Original file line numberDiff line numberDiff line change
@@ -298,24 +298,14 @@ private async Task CleanQueue(bool checkEntireQueueOnCleanup)
298298
Events.CleanupCheckpointState(messageQueueId, checkpointData);
299299
int cleanupEntityStoreCount = 0;
300300

301-
// If checkEntireQueueOnCleanup is set to false, we only peek the head, message counts is tailOffset-headOffset+1
302-
// otherwise count while iterating over the queue.
303-
var headOffset = 0L;
304-
var tailOffset = sequentialStore.GetTailOffset(CancellationToken.None);
305-
var messageCount = 0L;
306-
307301
async Task<bool> DeleteMessageCallback(long offset, MessageRef messageRef)
308302
{
309303
var expiry = messageRef.TimeStamp + messageRef.TimeToLive;
310304
if (offset > checkpointData.Offset && expiry > DateTime.UtcNow)
311305
{
312-
// message is not sent and not expired, increase message counts
313-
messageCount++;
314306
return false;
315307
}
316308

317-
headOffset = Math.Max(headOffset, offset);
318-
319309
var message = await this.TryDecrementRefCountUpdate(messageRef.EdgeMessageId, messageQueueId);
320310

321311
await message.ForEachAsync(async msg =>
@@ -368,12 +358,10 @@ await message.ForEachAsync(async msg =>
368358
{
369359
cleanupCount++;
370360
}
371-
372-
messageCount = tailOffset - headOffset + 1;
373361
}
374362

375363
// update Metrics for message counts
376-
Checkpointer.Metrics.QueueLength.Set(messageCount, new[] { endpointId, priority.ToString(), bool.TrueString });
364+
Checkpointer.Metrics.SetQueueLength(await sequentialStore.Count(), endpointId, priority.ToString());
377365
totalCleanupCount += cleanupCount;
378366
totalCleanupStoreCount += cleanupEntityStoreCount;
379367
Events.CleanupCompleted(messageQueueId, cleanupCount, cleanupEntityStoreCount, totalCleanupCount, totalCleanupStoreCount);

‎edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/checkpointers/Checkpointer.cs

+8-2
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ public static void CommitStarted(Checkpointer checkpointer, int successfulCount,
182182

183183
public static void CommitFinished(Checkpointer checkpointer)
184184
{
185-
Log.LogInformation((int)EventIds.CommitFinished, "[CheckpointerCommitFinishedo] {context}", GetContextString(checkpointer));
185+
Log.LogInformation((int)EventIds.CommitFinished, "[CheckpointerCommitFinished] {context}", GetContextString(checkpointer));
186186
}
187187

188188
public static void Close(Checkpointer checkpointer)
@@ -203,7 +203,13 @@ public static class Metrics
203203
"Number of messages pending to be processed for the endpoint",
204204
new List<string> { "endpoint", "priority", MetricsConstants.MsTelemetry });
205205

206-
public static void SetQueueLength(Checkpointer checkpointer) => QueueLength.Set(checkpointer.Proposed - checkpointer.Offset, new[] { checkpointer.EndpointId, checkpointer.Priority, bool.TrueString });
206+
public static void SetQueueLength(Checkpointer checkpointer) => SetQueueLength(CalculateQueueLength(checkpointer), checkpointer.EndpointId, checkpointer.Priority);
207+
208+
public static void SetQueueLength(double length, string endpointId, string priority) => QueueLength.Set(length, new[] { endpointId, priority, bool.TrueString });
209+
210+
private static double CalculateQueueLength(Checkpointer checkpointer) => CalculateQueueLength(checkpointer.Proposed - checkpointer.Offset);
211+
212+
private static double CalculateQueueLength(long length) => Math.Max(length, 0);
207213
}
208214
}
209215
}

‎edge-util/src/Microsoft.Azure.Devices.Edge.Storage.RocksDb/ColumnFamilyDbStore.cs

+18-4
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,20 @@ namespace Microsoft.Azure.Devices.Edge.Storage.RocksDb
1212
class ColumnFamilyDbStore : IDbStore
1313
{
1414
readonly IRocksDb db;
15+
private ulong count;
1516

1617
public ColumnFamilyDbStore(IRocksDb db, ColumnFamilyHandle handle)
1718
{
1819
this.db = Preconditions.CheckNotNull(db, nameof(db));
1920
this.Handle = Preconditions.CheckNotNull(handle, nameof(handle));
21+
22+
var iterator = db.NewIterator(this.Handle);
23+
this.count = 0;
24+
while (iterator.Valid())
25+
{
26+
this.count += 1;
27+
iterator = iterator.Next();
28+
}
2029
}
2130

2231
internal ColumnFamilyHandle Handle { get; }
@@ -49,20 +58,23 @@ public async Task<Option<byte[]>> Get(byte[] key, CancellationToken cancellation
4958
return returnValue;
5059
}
5160

52-
public Task Put(byte[] key, byte[] value, CancellationToken cancellationToken)
61+
public async Task Put(byte[] key, byte[] value, CancellationToken cancellationToken)
5362
{
5463
Preconditions.CheckNotNull(key, nameof(key));
5564
Preconditions.CheckNotNull(value, nameof(value));
5665

5766
Action operation = () => this.db.Put(key, value, this.Handle);
58-
return operation.ExecuteUntilCancelled(cancellationToken);
67+
await operation.ExecuteUntilCancelled(cancellationToken);
68+
this.count += 1;
5969
}
6070

61-
public Task Remove(byte[] key, CancellationToken cancellationToken)
71+
public async Task Remove(byte[] key, CancellationToken cancellationToken)
6272
{
6373
Preconditions.CheckNotNull(key, nameof(key));
74+
6475
Action operation = () => this.db.Remove(key, this.Handle);
65-
return operation.ExecuteUntilCancelled(cancellationToken);
76+
await operation.ExecuteUntilCancelled(cancellationToken);
77+
this.count -= 1;
6678
}
6779

6880
public async Task<Option<(byte[] key, byte[] value)>> GetLastEntry(CancellationToken cancellationToken)
@@ -128,6 +140,8 @@ public Task IterateBatch(int batchSize, Func<byte[], byte[], Task> callback, Can
128140
return this.IterateBatch(iterator => iterator.SeekToFirst(), batchSize, callback, cancellationToken);
129141
}
130142

143+
public Task<ulong> Count() => Task.FromResult(this.count);
144+
131145
public void Dispose()
132146
{
133147
this.Dispose(true);

‎edge-util/src/Microsoft.Azure.Devices.Edge.Storage/DbStoreDecorator.cs

+2
Original file line numberDiff line numberDiff line change
@@ -102,5 +102,7 @@ public Task Remove(byte[] key, CancellationToken cancellationToken)
102102
{
103103
return this.dbStore.Remove(key, cancellationToken);
104104
}
105+
106+
public Task<ulong> Count() => this.dbStore.Count();
105107
}
106108
}

‎edge-util/src/Microsoft.Azure.Devices.Edge.Storage/EncryptedStore.cs

+2
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ await decryptedValue.ForEachAsync(
120120
cancellationToken);
121121
}
122122

123+
public Task<ulong> Count() => this.entityStore.Count();
124+
123125
public void Dispose()
124126
{
125127
this.Dispose(true);

‎edge-util/src/Microsoft.Azure.Devices.Edge.Storage/EntityStore.cs

+2
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,8 @@ public Task IterateBatch(int batchSize, Func<TK, TV, Task> callback, Cancellatio
150150
public Task<bool> Contains(TK key, CancellationToken cancellationToken)
151151
=> this.dbStore.Contains(key, cancellationToken);
152152

153+
public Task<ulong> Count() => this.dbStore.Count();
154+
153155
public void Dispose()
154156
{
155157
this.Dispose(true);

‎edge-util/src/Microsoft.Azure.Devices.Edge.Storage/IKeyValueStore.cs

+2
Original file line numberDiff line numberDiff line change
@@ -42,5 +42,7 @@ public interface IKeyValueStore<TK, TV> : IDisposable
4242
Task IterateBatch(int batchSize, Func<TK, TV, Task> perEntityCallback, CancellationToken cancellationToken);
4343

4444
Task IterateBatch(TK startKey, int batchSize, Func<TK, TV, Task> perEntityCallback, CancellationToken cancellationToken);
45+
46+
Task<ulong> Count();
4547
}
4648
}

‎edge-util/src/Microsoft.Azure.Devices.Edge.Storage/ISequentialStore.cs

+2
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,7 @@ public interface ISequentialStore<T> : IDisposable
3232
Task<bool> RemoveFirst(Func<long, T, Task<bool>> predicate, CancellationToken cancellationToken);
3333

3434
Task<IEnumerable<(long, T)>> GetBatch(long startingOffset, int batchSize, CancellationToken cancellationToken);
35+
36+
Task<ulong> Count();
3537
}
3638
}

‎edge-util/src/Microsoft.Azure.Devices.Edge.Storage/InMemoryDbStore.cs

+2
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ public async Task Remove(byte[] key, CancellationToken cancellationToken)
120120
}
121121
}
122122

123+
public Task<ulong> Count() => Task.FromResult((ulong)this.keyValues.Count);
124+
123125
public void Dispose()
124126
{
125127
// No-op

‎edge-util/src/Microsoft.Azure.Devices.Edge.Storage/KeyValueStoreMapper.cs

+2
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ public Task IterateBatch(TK startKey, int batchSize, Func<TK, TV, Task> callback
7474
public Task IterateBatch(int batchSize, Func<TK, TV, Task> callback, CancellationToken cancellationToken)
7575
=> this.IterateBatch(Option.None<TK>(), batchSize, callback, cancellationToken);
7676

77+
public Task<ulong> Count() => this.underlyingStore.Count();
78+
7779
Task IterateBatch(Option<TK> startKey, int batchSize, Func<TK, TV, Task> callback, CancellationToken cancellationToken)
7880
{
7981
Preconditions.CheckRange(batchSize, 1, nameof(batchSize));

‎edge-util/src/Microsoft.Azure.Devices.Edge.Storage/NullKeyValueStore.cs

+2
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,7 @@ public void Dispose()
4343
public Task IterateBatch(int batchSize, Func<TK, TV, Task> perEntityCallback, CancellationToken cancellationToken) => Task.CompletedTask;
4444

4545
public Task IterateBatch(TK startKey, int batchSize, Func<TK, TV, Task> perEntityCallback, CancellationToken cancellationToken) => Task.CompletedTask;
46+
47+
public Task<ulong> Count() => Task.FromResult(0UL);
4648
}
4749
}

‎edge-util/src/Microsoft.Azure.Devices.Edge.Storage/SequentialStore.cs

+2
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,8 @@ await this.entityStore.IterateBatch(
188188
return batch;
189189
}
190190

191+
public Task<ulong> Count() => this.entityStore.Count();
192+
191193
public void Dispose()
192194
{
193195
this.Dispose(true);

‎edge-util/src/Microsoft.Azure.Devices.Edge.Storage/TimedEntityStore.cs

+2
Original file line numberDiff line numberDiff line change
@@ -129,5 +129,7 @@ public Task IterateBatch(TK startKey, int batchSize, Func<TK, TV, Task> perEntit
129129
Func<CancellationToken, Task> iterateWithTimeout = cts => this.underlyingKeyValueStore.IterateBatch(startKey, batchSize, perEntityCallback, cts);
130130
return iterateWithTimeout.TimeoutAfter(cancellationToken, this.timeout);
131131
}
132+
133+
public Task<ulong> Count() => this.underlyingKeyValueStore.Count();
132134
}
133135
}

0 commit comments

Comments
 (0)