Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat(graphql): Configurable option to deduplicate pollers #1451

Merged
merged 2 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/graphql/lib/src/core/query_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ class QueryManager {
required this.link,
required this.cache,
this.alwaysRebroadcast = false,
bool deduplicatePollers = false,
}) {
scheduler = QueryScheduler(
queryManager: this,
deduplicatePollers: deduplicatePollers,
);
}

Expand Down
2 changes: 2 additions & 0 deletions packages/graphql/lib/src/graphql_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ class GraphQLClient implements GraphQLDataProxy {
required this.cache,
DefaultPolicies? defaultPolicies,
bool alwaysRebroadcast = false,
bool deduplicatePollers = false,
}) : defaultPolicies = defaultPolicies ?? DefaultPolicies(),
queryManager = QueryManager(
link: link,
cache: cache,
alwaysRebroadcast: alwaysRebroadcast,
deduplicatePollers: deduplicatePollers,
);

/// The default [Policies] to set for each client action
Expand Down
84 changes: 72 additions & 12 deletions packages/graphql/lib/src/scheduler/scheduler.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import 'dart:async';

import 'package:collection/collection.dart';
import 'package:gql_exec/gql_exec.dart';
import 'package:graphql/src/core/query_manager.dart';
import 'package:graphql/src/core/query_options.dart';
import 'package:graphql/src/core/observable_query.dart';
Expand All @@ -8,9 +10,11 @@ import 'package:graphql/src/core/observable_query.dart';
class QueryScheduler {
QueryScheduler({
this.queryManager,
});
bool deduplicatePollers = false,
}) : _deduplicatePollers = deduplicatePollers;

QueryManager? queryManager;
final bool _deduplicatePollers;

/// Map going from query ids to the [WatchQueryOptions] associated with those queries.
Map<String, WatchQueryOptions> registeredQueries =
Expand Down Expand Up @@ -68,25 +72,81 @@ class QueryScheduler {
options.pollInterval != null && options.pollInterval! > Duration.zero,
);

registeredQueries[queryId] = options;
final existingEntry = _fastestEntryForRequest(options.asRequest);
final String? existingQueryId = existingEntry?.key;
final Duration? existingInterval = existingEntry?.value.pollInterval;

final interval = options.pollInterval;
// Update or add the query in registeredQueries
registeredQueries[queryId] = options;

if (intervalQueries.containsKey(interval)) {
intervalQueries[interval]!.add(queryId);
final Duration interval;

if (existingInterval != null && _deduplicatePollers) {
if (existingInterval > options.pollInterval!) {
// The new one is faster, remove the old one and add the new one
intervalQueries[existingInterval]!.remove(existingQueryId);
interval = options.pollInterval!;
} else {
// The new one is slower or the same. Don't add it to the list
return;
}
} else {
intervalQueries[interval] = Set<String>.of([queryId]);

_pollingTimers[interval] = Timer.periodic(
interval!,
(Timer timer) => fetchQueriesOnInterval(timer, interval),
);
// If there is no existing interval, we'll add the new one
interval = options.pollInterval!;
}

// Add new query to intervalQueries
_addInterval(queryId, interval);
}

/// Removes the [ObservableQuery] from one of the registered queries.
/// The fetchQueriesOnInterval will then take care of not firing it anymore.
void stopPollingQuery(String queryId) {
registeredQueries.remove(queryId);
final removedQuery = registeredQueries.remove(queryId);

if (removedQuery == null ||
removedQuery.pollInterval == null ||
!_deduplicatePollers) {
return;
}

// If there is a registered query that has the same `asRequest` as this one
// Add the next fastest duration to the intervalQueries
final fastestEntry = _fastestEntryForRequest(removedQuery.asRequest);
final String? fastestQueryId = fastestEntry?.key;
final Duration? fastestInterval = fastestEntry?.value.pollInterval;

if (fastestQueryId == null || fastestInterval == null) {
// There is no other query, return.
return;
}

_addInterval(fastestQueryId, fastestInterval);
}

/// Adds a [queryId] to the [intervalQueries] for a specific [interval]
/// and starts the timer if it doesn't exist.
void _addInterval(String queryId, Duration interval) {
final existingSet = intervalQueries[interval];
if (existingSet != null) {
existingSet.add(queryId);
} else {
intervalQueries[interval] = {queryId};
_pollingTimers[interval] = Timer.periodic(
interval, (Timer timer) => fetchQueriesOnInterval(timer, interval));
}
}

/// Returns the fastest query that matches the [request] or null if none exists.
MapEntry<String, WatchQueryOptions<Object?>>? _fastestEntryForRequest(
Request request) {
return registeredQueries.entries
// All existing queries mapping to the same request.
.where((entry) =>
entry.value.asRequest == request &&
entry.value.pollInterval != null)
// Ascending is default (shortest poll interval first)
.sortedBy((entry) => entry.value.pollInterval!)
.firstOrNull;
}
}
Loading