diff --git a/Couchbase/Cluster.php b/Couchbase/Cluster.php index 19bfc882..3ab03d7e 100644 --- a/Couchbase/Cluster.php +++ b/Couchbase/Cluster.php @@ -111,6 +111,34 @@ function (string $connectionString, ClusterOptions $options) { ); } + /** + * Notifies the SDK about usage of `fork(2)` syscall. Typically PHP exposes it using `pcntl_fork()` function, but + * the library should have chance to properly close descriptors and reach safe point to allow forking the process. + * This is not a problem in case of `proc_open()` as in this case the memory and descriptors are not inherited by + * the child process. + * + * Allowed values for `$event` are: + * + * * ForkEvent::PREPARE - must be used before `fork()` to ensure the SDK reaches safe point + * * ForkEvent::CHILD - must be used in the child process, the branch where `pcntl_fork()` returns zero + * * ForkEvent::PARENT - must be used in the parent process, the branch where `pcntl_fork()` returns pid of the child process + * + * In case `pcntl_fork()` returns negative value, and the application decides to continue, `notifyFork(ForkEvent::PARENT)` + * must be invoked to resume the SDK. + * + * @see https://www.php.net/manual/en/function.pcntl-fork.php + * @see https://www.php.net/manual/en/function.proc-open.php + * + * @param string $event type of the event to send to the library (one of the constants in ForkEvent). + * @return void + * + * @since 4.2.1 + */ + public static function notifyFork(string $event) + { + return Extension\notifyFork($event); + } + /** * Returns a new bucket object. * diff --git a/Couchbase/ForkEvent.php b/Couchbase/ForkEvent.php new file mode 100644 index 00000000..03bbdcbb --- /dev/null +++ b/Couchbase/ForkEvent.php @@ -0,0 +1,51 @@ +ctx_.run(); }); + worker_ = std::thread([self = shared_from_this()]() { self->ctx_.run(); }); } void stop() @@ -418,8 +418,8 @@ class connection_handle::impl : public std::enable_shared_from_thisclose([barrier]() { barrier->set_value(); }); f.wait(); cluster_.reset(); - if (worker.joinable()) { - worker.join(); + if (worker_.joinable()) { + worker_.join(); } } } @@ -574,10 +574,33 @@ class connection_handle::impl : public std::enable_shared_from_thisctx_.run(); }); + break; + + case fork_event::child: + ctx_.notify_fork(asio::execution_context::fork_child); + ctx_.restart(); + worker_ = std::thread([self = shared_from_this()]() { self->ctx_.run(); }); + break; + } + } + private: asio::io_context ctx_{}; std::shared_ptr cluster_{ std::make_shared(ctx_) }; - std::thread worker; + std::thread worker_; core::origin origin_; }; @@ -620,6 +643,12 @@ connection_handle::replicas_configured_for_bucket(const zend_string* bucket_name return impl_->replicas_configured_for_bucket(cb_string_new(bucket_name)); } +void +connection_handle::notify_fork(fork_event event) const +{ + return impl_->notify_fork(event); +} + COUCHBASE_API core_error_info connection_handle::bucket_open(const std::string& name) @@ -2659,9 +2688,9 @@ zval_to_search_index(couchbase::core::operations::management::search_index_upser if (auto e = cb_assign_string(idx.plan_params_json, index, "planParams"); e.ec) { return e; } - request.index = idx; + request.index = idx; - return {}; + return {}; } COUCHBASE_API @@ -2844,7 +2873,10 @@ connection_handle::search_index_control_plan_freeze(zval* return_value, const ze COUCHBASE_API core_error_info -connection_handle::search_index_analyze_document(zval* return_value, const zend_string* index_name, const zend_string* document, const zval* options) +connection_handle::search_index_analyze_document(zval* return_value, + const zend_string* index_name, + const zend_string* document, + const zval* options) { couchbase::core::operations::management::search_index_analyze_document_request request{}; request.index_name = cb_string_new(index_name); @@ -2867,7 +2899,11 @@ connection_handle::search_index_analyze_document(zval* return_value, const zend_ COUCHBASE_API core_error_info -connection_handle::scope_search_index_get(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zend_string* index_name, const zval* options) +connection_handle::scope_search_index_get(zval* return_value, + const zend_string* bucket_name, + const zend_string* scope_name, + const zend_string* index_name, + const zval* options) { couchbase::core::operations::management::search_index_get_request request{ cb_string_new(index_name) }; @@ -2892,7 +2928,10 @@ connection_handle::scope_search_index_get(zval* return_value, const zend_string* COUCHBASE_API core_error_info -connection_handle::scope_search_index_get_all(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zval* options) +connection_handle::scope_search_index_get_all(zval* return_value, + const zend_string* bucket_name, + const zend_string* scope_name, + const zval* options) { couchbase::core::operations::management::search_index_get_all_request request{}; @@ -2922,7 +2961,11 @@ connection_handle::scope_search_index_get_all(zval* return_value, const zend_str COUCHBASE_API core_error_info -connection_handle::scope_search_index_upsert(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zval* index, const zval* options) +connection_handle::scope_search_index_upsert(zval* return_value, + const zend_string* bucket_name, + const zend_string* scope_name, + const zval* index, + const zval* options) { couchbase::core::operations::management::search_index_upsert_request request{}; @@ -2951,7 +2994,11 @@ connection_handle::scope_search_index_upsert(zval* return_value, const zend_stri COUCHBASE_API core_error_info -connection_handle::scope_search_index_drop(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zend_string* index_name, const zval* options) +connection_handle::scope_search_index_drop(zval* return_value, + const zend_string* bucket_name, + const zend_string* scope_name, + const zend_string* index_name, + const zval* options) { couchbase::core::operations::management::search_index_drop_request request{ cb_string_new(index_name) }; @@ -2973,7 +3020,11 @@ connection_handle::scope_search_index_drop(zval* return_value, const zend_string COUCHBASE_API core_error_info -connection_handle::scope_search_index_get_documents_count(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zend_string* index_name, const zval* options) +connection_handle::scope_search_index_get_documents_count(zval* return_value, + const zend_string* bucket_name, + const zend_string* scope_name, + const zend_string* index_name, + const zval* options) { couchbase::core::operations::management::search_index_get_documents_count_request request{ cb_string_new(index_name) }; @@ -2997,7 +3048,12 @@ connection_handle::scope_search_index_get_documents_count(zval* return_value, co COUCHBASE_API core_error_info -connection_handle::scope_search_index_control_ingest(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zend_string* index_name, bool pause, const zval* options) +connection_handle::scope_search_index_control_ingest(zval* return_value, + const zend_string* bucket_name, + const zend_string* scope_name, + const zend_string* index_name, + bool pause, + const zval* options) { couchbase::core::operations::management::search_index_control_ingest_request request{}; @@ -3022,7 +3078,12 @@ connection_handle::scope_search_index_control_ingest(zval* return_value, const z COUCHBASE_API core_error_info -connection_handle::scope_search_index_control_query(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zend_string* index_name, bool allow, const zval* options) +connection_handle::scope_search_index_control_query(zval* return_value, + const zend_string* bucket_name, + const zend_string* scope_name, + const zend_string* index_name, + bool allow, + const zval* options) { couchbase::core::operations::management::search_index_control_query_request request{}; @@ -3047,7 +3108,12 @@ connection_handle::scope_search_index_control_query(zval* return_value, const ze COUCHBASE_API core_error_info -connection_handle::scope_search_index_control_plan_freeze(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zend_string* index_name, bool freeze, const zval* options) +connection_handle::scope_search_index_control_plan_freeze(zval* return_value, + const zend_string* bucket_name, + const zend_string* scope_name, + const zend_string* index_name, + bool freeze, + const zval* options) { couchbase::core::operations::management::search_index_control_plan_freeze_request request{}; @@ -3072,7 +3138,12 @@ connection_handle::scope_search_index_control_plan_freeze(zval* return_value, co COUCHBASE_API core_error_info -connection_handle::scope_search_index_analyze_document(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zend_string* index_name, const zend_string* document, const zval* options) +connection_handle::scope_search_index_analyze_document(zval* return_value, + const zend_string* bucket_name, + const zend_string* scope_name, + const zend_string* index_name, + const zend_string* document, + const zval* options) { couchbase::core::operations::management::search_index_analyze_document_request request{}; @@ -3286,7 +3357,8 @@ zval_to_bucket_settings(const zval* bucket_settings) } else if (e.ec) { return { e, {} }; } - if (auto e = cb_assign_boolean(bucket.history_retention_collection_default, bucket_settings, "historyRetentionCollectionDefault"); e.ec) { + if (auto e = cb_assign_boolean(bucket.history_retention_collection_default, bucket_settings, "historyRetentionCollectionDefault"); + e.ec) { return { e, {} }; } if (auto e = cb_assign_integer(bucket.history_retention_bytes, bucket_settings, "historyRetentionBytes"); e.ec) { @@ -3655,7 +3727,12 @@ connection_handle::scope_drop(zval* return_value, const zend_string* bucket_name COUCHBASE_API core_error_info -connection_handle::collection_create(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zend_string* collection_name, const zval* settings, const zval* options) +connection_handle::collection_create(zval* return_value, + const zend_string* bucket_name, + const zend_string* scope_name, + const zend_string* collection_name, + const zval* settings, + const zval* options) { couchbase::core::operations::management::collection_create_request request{}; @@ -3686,7 +3763,11 @@ connection_handle::collection_create(zval* return_value, const zend_string* buck COUCHBASE_API core_error_info -connection_handle::collection_drop(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zend_string* collection_name, const zval* options) +connection_handle::collection_drop(zval* return_value, + const zend_string* bucket_name, + const zend_string* scope_name, + const zend_string* collection_name, + const zval* options) { couchbase::core::operations::management::collection_drop_request request{}; @@ -3709,7 +3790,12 @@ connection_handle::collection_drop(zval* return_value, const zend_string* bucket COUCHBASE_API core_error_info -connection_handle::collection_update(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zend_string* collection_name, const zval* settings, const zval* options) +connection_handle::collection_update(zval* return_value, + const zend_string* bucket_name, + const zend_string* scope_name, + const zend_string* collection_name, + const zval* settings, + const zval* options) { couchbase::core::operations::management::collection_update_request request{}; diff --git a/src/wrapper/connection_handle.hxx b/src/wrapper/connection_handle.hxx index e510843e..ef6c0647 100644 --- a/src/wrapper/connection_handle.hxx +++ b/src/wrapper/connection_handle.hxx @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -81,6 +82,9 @@ class connection_handle COUCHBASE_API bool replicas_configured_for_bucket(const zend_string* bucket_name); + COUCHBASE_API + void notify_fork(fork_event event) const; + COUCHBASE_API core_error_info open(); diff --git a/src/wrapper/persistent_connections_cache.cxx b/src/wrapper/persistent_connections_cache.cxx index be6a93fb..dc0a8d81 100644 --- a/src/wrapper/persistent_connections_cache.cxx +++ b/src/wrapper/persistent_connections_cache.cxx @@ -18,8 +18,11 @@ #include "common.hxx" #include "connection_handle.hxx" +#include "transactions_resource.hxx" #include +#include +#include #include @@ -171,4 +174,87 @@ destroy_persistent_connection(zend_resource* res) } } +namespace +{ +int +notify_transaction(zval* zv, void* event_ptr) +{ + if (event_ptr == nullptr) { + return ZEND_HASH_APPLY_KEEP; + } + + zend_resource* res = Z_RES_P(zv); + const fork_event event = *(static_cast(event_ptr)); + + if (res->type == get_transactions_destructor_id()) { + auto const* transaction = static_cast(res->ptr); + transaction->notify_fork(event); + } + return ZEND_HASH_APPLY_KEEP; +} + +int +notify_connection(zval* zv, void* event_ptr) +{ + if (event_ptr == nullptr) { + return ZEND_HASH_APPLY_KEEP; + } + + zend_resource* res = Z_RES_P(zv); + const fork_event event = *(static_cast(event_ptr)); + + if (res->type == persistent_connection_destructor_id_) { + const auto* connection = static_cast(res->ptr); + connection->notify_fork(event); + } + return ZEND_HASH_APPLY_KEEP; +} + +std::pair> +get_fork_event(const zend_string* fork_event_str) +{ + if (fork_event_str == nullptr || ZSTR_VAL(fork_event_str) == nullptr || ZSTR_LEN(fork_event_str) == 0) { + return { { errc::common::invalid_argument, ERROR_LOCATION, "expected non-empty string for forkEvent argument" }, {} }; + } + + if (zend_binary_strcmp(ZSTR_VAL(fork_event_str), ZSTR_LEN(fork_event_str), ZEND_STRL("prepare")) == 0) { + return { {}, couchbase::fork_event::prepare }; + } + if (zend_binary_strcmp(ZSTR_VAL(fork_event_str), ZSTR_LEN(fork_event_str), ZEND_STRL("parent")) == 0) { + return { {}, couchbase::fork_event::parent }; + } + if (zend_binary_strcmp(ZSTR_VAL(fork_event_str), ZSTR_LEN(fork_event_str), ZEND_STRL("child")) == 0) { + return { {}, couchbase::fork_event::child }; + } + return { { errc::common::invalid_argument, + ERROR_LOCATION, + fmt::format("unknown forkEvent: {}", std::string_view(ZSTR_VAL(fork_event_str), ZSTR_LEN(fork_event_str))) }, + {} }; +} +} // namespace + +COUCHBASE_API +core_error_info +notify_fork(const zend_string* fork_event) +{ + auto [e, event] = get_fork_event(fork_event); + if (e.ec) { + return e; + } + + /* transactions must be first to stop */ + if (event == fork_event::prepare) { + zend_hash_apply_with_argument(&EG(persistent_list), notify_transaction, &event); + } + + zend_hash_apply_with_argument(&EG(persistent_list), notify_connection, &event); + + /* transactions must be last to start */ + if (event != fork_event::prepare) { + zend_hash_apply_with_argument(&EG(persistent_list), notify_transaction, &event); + } + + return {}; +} + } // namespace couchbase::php diff --git a/src/wrapper/persistent_connections_cache.hxx b/src/wrapper/persistent_connections_cache.hxx index 9c796bff..09b28568 100644 --- a/src/wrapper/persistent_connections_cache.hxx +++ b/src/wrapper/persistent_connections_cache.hxx @@ -39,4 +39,8 @@ destroy_persistent_connection(zend_resource* res); COUCHBASE_API int check_persistent_connection(zval* zv); -} // namespace couchbase::php \ No newline at end of file + +COUCHBASE_API +core_error_info +notify_fork(const zend_string* fork_event); +} // namespace couchbase::php diff --git a/src/wrapper/transactions_resource.cxx b/src/wrapper/transactions_resource.cxx index 0e02cfe9..a7c96f7c 100644 --- a/src/wrapper/transactions_resource.cxx +++ b/src/wrapper/transactions_resource.cxx @@ -66,6 +66,11 @@ class transactions_resource::impl : public std::enable_shared_from_this cluster_; couchbase::core::transactions::transactions transactions_; @@ -85,6 +90,12 @@ transactions_resource::transactions() return impl_->transactions(); } +void +transactions_resource::notify_fork(fork_event event) const +{ + return impl_->notify_fork(event); +} + #define ASSIGN_DURATION_OPTION(name, setter, key, value) \ if (zend_binary_strcmp(ZSTR_VAL(key), ZSTR_LEN(key), ZEND_STRL(name)) == 0) { \ if ((value) == nullptr || Z_TYPE_P(value) == IS_NULL) { \ diff --git a/src/wrapper/transactions_resource.hxx b/src/wrapper/transactions_resource.hxx index 932c3fe4..17b0a5ff 100644 --- a/src/wrapper/transactions_resource.hxx +++ b/src/wrapper/transactions_resource.hxx @@ -50,6 +50,8 @@ class transactions_resource COUCHBASE_API core::transactions::transactions& transactions(); + void notify_fork(fork_event event) const; + private: class impl; diff --git a/tests/ForkTest.php b/tests/ForkTest.php new file mode 100644 index 00000000..b3856395 --- /dev/null +++ b/tests/ForkTest.php @@ -0,0 +1,47 @@ +cluster = $this->connectCluster(); + } + + public function testForkWorkflow() + { + if (!extension_loaded("pcntl")) { + $this->markTestSkipped("The 'pcntl' extension require to test Cluster::notifyFork helper"); + } + $id = $this->uniqueId(); + $collection = $this->defaultCollection(); + $res = $collection->upsert($id, ["answer" => 42]); + $cas = $res->cas(); + $this->assertNotNull($cas); + + Cluster::notifyFork(ForkEvent::PREPARE); + + $pid = pcntl_fork(); + $this->assertGreaterThanOrEqual(0, $pid); + if ($pid == 0) { + Cluster::notifyFork(ForkEvent::CHILD); + $res = $collection->get($id); + $this->assertEquals($cas, $res->cas()); + exit(0); + } else { + Cluster::notifyFork(ForkEvent::PARENT); + $res = $collection->get($id); + $this->assertEquals($cas, $res->cas()); + } + } +}