diff --git a/olp-cpp-sdk-core/include/olp/core/client/TaskContext.h b/olp-cpp-sdk-core/include/olp/core/client/TaskContext.h index 56a098218..6fe498342 100644 --- a/olp-cpp-sdk-core/include/olp/core/client/TaskContext.h +++ b/olp-cpp-sdk-core/include/olp/core/client/TaskContext.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019-2021 HERE Europe B.V. + * Copyright (C) 2019-2022 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ #include #include #include +#include namespace olp { namespace client { @@ -89,6 +90,18 @@ class CORE_API TaskContext { */ client::CancellationToken CancelToken() const { return impl_->CancelToken(); } + /** + * @brief Provides a token which will schedule cancellation of the task. + * + * @param scheduler The task scheduler instance. + * + * @return The `CancellationToken` instance. + */ + client::CancellationToken CancelToken( + const std::shared_ptr& scheduler) const { + return impl_->CancelToken(scheduler); + } + /** * @brief Checks whether the values of the `TaskContext` parameter are * the same as the values of the `other` parameter. @@ -156,6 +169,19 @@ class CORE_API TaskContext { * @return The `CancellationToken` instance. */ virtual client::CancellationToken CancelToken() = 0; + + /** + * @brief Provides a token which will schedule cancellation of the task. + * + * @param scheduler The task scheduler instance. + * + * @return The `CancellationToken` instance. + */ + virtual client::CancellationToken CancelToken( + const std::shared_ptr& scheduler) { + OLP_SDK_CORE_UNUSED(scheduler); + return CancelToken(); + } }; /** @@ -164,10 +190,12 @@ class CORE_API TaskContext { * Erases the type of the `Result` object produced by the `ExecuteFunc` * function and passes it to the `UserCallback` instance. * - * @tparam T The result type. + * @tparam Response The result type. */ template - class TaskContextImpl : public Impl { + class TaskContextImpl + : public Impl, + public std::enable_shared_from_this> { public: /// The task that produces the `Response` instance. using ExecuteFunc = std::function; @@ -189,7 +217,7 @@ class CORE_API TaskContext { context_(std::move(context)), state_{State::PENDING} {} - ~TaskContextImpl() override{}; + ~TaskContextImpl() override = default; /** * @brief Checks for the cancellation, executes the task, and calls @@ -213,8 +241,7 @@ class CORE_API TaskContext { callback = std::move(callback_); } - Response user_response = - client::ApiError(client::ErrorCode::Cancelled, "Cancelled"); + Response user_response = client::ApiError::Cancelled(); if (function && !context_.IsCancelled()) { auto response = function(context_); @@ -234,7 +261,7 @@ class CORE_API TaskContext { callback(std::move(user_response)); } - // Resources need to be released before the notification, else lambas + // Resources need to be released before the notification, else lambdas // would have captured resources like network or `TaskScheduler`. function = nullptr; callback = nullptr; @@ -280,6 +307,53 @@ class CORE_API TaskContext { [context]() mutable { context.CancelOperation(); }); } + /** + * @brief Provides a token which will schedule cancellation of the task. + * + * @param scheduler The task scheduler instance. + * + * @return The `CancellationToken` instance. + */ + client::CancellationToken CancelToken( + const std::shared_ptr& scheduler) override { + auto task_context = this->shared_from_this(); + return client::CancellationToken([scheduler, task_context]() mutable { + if (scheduler) { + scheduler->ScheduleCancelTask([=] { task_context->Cancel(); }); + } else { + task_context->Cancel(); + } + }); + } + + /** + * @brief Cancels the operation and calls the callback with a 'Cancelled' + * error. + */ + void Cancel() { + context_.CancelOperation(); + + // Checks whether TaskContext has been executed + if (state_.load() == State::COMPLETED) { + return; + } + + // If it hasn't -> mark as completed and proceed with user callback + state_.store(State::COMPLETED); + + UserCallback callback{}; + + { + std::lock_guard lock(mutex_); + callback = std::move(callback_); + execute_func_ = nullptr; + } + + if (callback) { + callback(ApiError::Cancelled()); + } + } + /** * @brief Indicates the state of the request. */ diff --git a/olp-cpp-sdk-core/include/olp/core/thread/TaskScheduler.h b/olp-cpp-sdk-core/include/olp/core/thread/TaskScheduler.h index 8190cadc7..d9561eb84 100644 --- a/olp-cpp-sdk-core/include/olp/core/thread/TaskScheduler.h +++ b/olp-cpp-sdk-core/include/olp/core/thread/TaskScheduler.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019-2021 HERE Europe B.V. + * Copyright (C) 2019-2022 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -71,6 +71,18 @@ class CORE_API TaskScheduler { EnqueueTask(std::move(func), priority); } + /** + * @brief Schedules the asynchronous cancellation task. + * + * @note Tasks added with this method has Priority::NORMAL priority. + * + * @param[in] func The callable target that should be added to the scheduling + * pipeline. + */ + void ScheduleCancelTask(CallFuncType&& func) { + EnqueueCancelTask(std::move(func)); + } + /** * @brief Schedules the asynchronous cancellable task. * @@ -136,6 +148,21 @@ class CORE_API TaskScheduler { OLP_SDK_CORE_UNUSED(priority); EnqueueTask(std::forward(func)); } + + /** + * @brief The enqueue cancellation task interface that is implemented by + * the subclass. + * + * Implement this method in the subclass that takes `TaskScheduler` + * as a base and provides a custom algorithm for scheduling tasks + * enqueued by the SDK. + * + * @note Tasks added trough this method should be scheduled with + * Priority::NORMAL priority. + */ + virtual void EnqueueCancelTask(CallFuncType&& func) { + EnqueueTask(std::forward(func)); + } }; /** diff --git a/olp-cpp-sdk-core/include/olp/core/thread/ThreadPoolTaskScheduler.h b/olp-cpp-sdk-core/include/olp/core/thread/ThreadPoolTaskScheduler.h index 078b2b951..409a20f79 100644 --- a/olp-cpp-sdk-core/include/olp/core/thread/ThreadPoolTaskScheduler.h +++ b/olp-cpp-sdk-core/include/olp/core/thread/ThreadPoolTaskScheduler.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019-2021 HERE Europe B.V. + * Copyright (C) 2019-2022 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -81,6 +81,18 @@ class CORE_API ThreadPoolTaskScheduler final : public TaskScheduler { void EnqueueTask(TaskScheduler::CallFuncType&& func, uint32_t priority) override; + /** + * @brief Overrides the base class method to enqueue cancellation tasks and + * execute them on the next free thread from the thread pool. + * + * @note Tasks added with this method has Priority::NORMAL priority. + * + * @param func The rvalue reference of the task that should be enqueued. + * Move this task into your queue. No internal references are + * kept. Once this method is called, you own the task. + */ + void EnqueueCancelTask(TaskScheduler::CallFuncType&& func) override; + private: class QueueImpl; @@ -88,6 +100,8 @@ class CORE_API ThreadPoolTaskScheduler final : public TaskScheduler { std::vector thread_pool_; /// SyncQueue used to manage tasks. std::unique_ptr queue_; + /// SyncQueue used to manage cancel tasks. + std::unique_ptr cancel_queue_; }; } // namespace thread diff --git a/olp-cpp-sdk-core/src/thread/ThreadPoolTaskScheduler.cpp b/olp-cpp-sdk-core/src/thread/ThreadPoolTaskScheduler.cpp index d8a915990..282c97be8 100644 --- a/olp-cpp-sdk-core/src/thread/ThreadPoolTaskScheduler.cpp +++ b/olp-cpp-sdk-core/src/thread/ThreadPoolTaskScheduler.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019-2021 HERE Europe B.V. + * Copyright (C) 2019-2022 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -74,8 +74,9 @@ struct ComparePrioritizedTask { } }; -void SetExecutorName(size_t idx) { - std::string thread_name = "OLPSDKPOOL_" + std::to_string(idx); +void SetExecutorName(std::string name) { + std::string thread_name = "OLPSDKPOOL_" + std::move(name); + SetCurrentThreadName(thread_name); OLP_SDK_LOG_INFO_F(kLogTag, "Starting thread '%s'", thread_name.c_str()); } @@ -97,13 +98,15 @@ class ThreadPoolTaskScheduler::QueueImpl { }; ThreadPoolTaskScheduler::ThreadPoolTaskScheduler(size_t thread_count) - : queue_{std::make_unique()} { - thread_pool_.reserve(thread_count); + : queue_{std::make_unique()}, + cancel_queue_{std::make_unique()} { + constexpr auto kCancelThreadsCount = 1; + + thread_pool_.reserve(thread_count + kCancelThreadsCount); for (size_t idx = 0; idx < thread_count; ++idx) { std::thread executor([this, idx]() { - // Set thread name for easy profiling and debugging - SetExecutorName(idx); + SetExecutorName(std::to_string(idx)); for (;;) { PrioritizedTask task; @@ -116,10 +119,25 @@ ThreadPoolTaskScheduler::ThreadPoolTaskScheduler(size_t thread_count) thread_pool_.push_back(std::move(executor)); } + + for (size_t idx = 0; idx < kCancelThreadsCount; ++idx) { + thread_pool_.emplace_back([this, idx] { + SetExecutorName("CANCEL_" + std::to_string(idx)); + + for (;;) { + PrioritizedTask task; + if (!cancel_queue_->Pull(task)) { + return; + } + task.function(); + } + }); + } } ThreadPoolTaskScheduler::~ThreadPoolTaskScheduler() { queue_->Close(); + cancel_queue_->Close(); for (auto& thread : thread_pool_) { thread.join(); } @@ -130,6 +148,11 @@ void ThreadPoolTaskScheduler::EnqueueTask(TaskScheduler::CallFuncType&& func) { queue_->Push({std::move(func), thread::NORMAL}); } +void ThreadPoolTaskScheduler::EnqueueCancelTask( + TaskScheduler::CallFuncType&& func) { + cancel_queue_->Push({std::move(func), thread::NORMAL}); +} + void ThreadPoolTaskScheduler::EnqueueTask(TaskScheduler::CallFuncType&& func, uint32_t priority) { queue_->Push({std::move(func), priority}); diff --git a/olp-cpp-sdk-dataservice-read/src/TaskSink.cpp b/olp-cpp-sdk-dataservice-read/src/TaskSink.cpp index 91d46200c..e8e7dc811 100644 --- a/olp-cpp-sdk-dataservice-read/src/TaskSink.cpp +++ b/olp-cpp-sdk-dataservice-read/src/TaskSink.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2020-2021 HERE Europe B.V. + * Copyright (C) 2020-2022 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,7 +63,7 @@ client::CancellationToken TaskSink::AddTask( [=](client::ApiResponse) { func(context); }, context); AddTaskImpl(task, priority); - return task.CancelToken(); + return task.CancelToken(task_scheduler_); } bool TaskSink::AddTaskImpl(client::TaskContext task, uint32_t priority) { diff --git a/olp-cpp-sdk-dataservice-read/src/TaskSink.h b/olp-cpp-sdk-dataservice-read/src/TaskSink.h index ed8b65df9..b5a74ac8d 100644 --- a/olp-cpp-sdk-dataservice-read/src/TaskSink.h +++ b/olp-cpp-sdk-dataservice-read/src/TaskSink.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2020-2021 HERE Europe B.V. + * Copyright (C) 2020-2022 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,7 +51,7 @@ class TaskSink { auto context = client::TaskContext::Create( std::move(task), std::move(callback), std::forward(args)...); AddTaskImpl(context, priority); - return context.CancelToken(); + return context.CancelToken(task_scheduler_); } template @@ -64,7 +64,7 @@ class TaskSink { if (!AddTaskImpl(context, priority)) { return boost::none; } - return context.CancelToken(); + return context.CancelToken(task_scheduler_); } protected: