diff --git a/google/cloud/pubsub/samples/topic_admin_samples.cc b/google/cloud/pubsub/samples/topic_admin_samples.cc index 1e8e5b51f57e2..9ffd2b58c22c4 100644 --- a/google/cloud/pubsub/samples/topic_admin_samples.cc +++ b/google/cloud/pubsub/samples/topic_admin_samples.cc @@ -272,6 +272,121 @@ void CreateTopicWithCloudStorageIngestion( argv.at(4), argv.at(5), argv.at(6)); } +void CreateTopicWithAwsMskIngestion( + google::cloud::pubsub_admin::TopicAdminClient client, + std::vector const& argv) { + // [START pubsub_create_topic_with_aws_msk_ingestion] + namespace pubsub = ::google::cloud::pubsub; + namespace pubsub_admin = ::google::cloud::pubsub_admin; + [](pubsub_admin::TopicAdminClient client, std::string project_id, + std::string topic_id, std::string const& cluster_arn, + std::string const& msk_topic, std::string const& aws_role_arn, + std::string const& gcp_service_account) { + google::pubsub::v1::Topic request; + request.set_name( + pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName()); + auto* aws_msk = + request.mutable_ingestion_data_source_settings()->mutable_aws_msk(); + aws_msk->set_cluster_arn(cluster_arn); + aws_msk->set_topic(msk_topic); + aws_msk->set_aws_role_arn(aws_role_arn); + aws_msk->set_gcp_service_account(gcp_service_account); + + auto topic = client.CreateTopic(request); + // Note that kAlreadyExists is a possible error when the library retries. + if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) { + std::cout << "The topic already exists\n"; + return; + } + if (!topic) throw std::move(topic).status(); + + std::cout << "The topic was successfully created: " << topic->DebugString() + << "\n"; + } + // [END pubsub_create_topic_with_aws_msk_ingestion] + (std::move(client), argv.at(0), argv.at(1), argv.at(2), argv.at(3), + argv.at(4), argv.at(5)); +} + +void CreateTopicWithConfluentCloudIngestion( + google::cloud::pubsub_admin::TopicAdminClient client, + std::vector const& argv) { + // [START pubsub_create_topic_with_confluent_cloud_ingestion] + namespace pubsub = ::google::cloud::pubsub; + namespace pubsub_admin = ::google::cloud::pubsub_admin; + [](pubsub_admin::TopicAdminClient client, std::string project_id, + std::string topic_id, std::string const& bootstrap_server, + std::string const& cluster_id, std::string const& confluent_topic, + std::string const& identity_pool_id, + std::string const& gcp_service_account) { + google::pubsub::v1::Topic request; + request.set_name( + pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName()); + auto* confluent_cloud = request.mutable_ingestion_data_source_settings() + ->mutable_confluent_cloud(); + confluent_cloud->set_bootstrap_server(bootstrap_server); + confluent_cloud->set_cluster_id(cluster_id); + confluent_cloud->set_topic(confluent_topic); + confluent_cloud->set_identity_pool_id(identity_pool_id); + confluent_cloud->set_gcp_service_account(gcp_service_account); + + auto topic = client.CreateTopic(request); + // Note that kAlreadyExists is a possible error when the library retries. + if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) { + std::cout << "The topic already exists\n"; + return; + } + if (!topic) throw std::move(topic).status(); + + std::cout << "The topic was successfully created: " << topic->DebugString() + << "\n"; + } + // [END pubsub_create_topic_with_confluent_cloud_ingestion] + (std::move(client), argv.at(0), argv.at(1), argv.at(2), argv.at(3), + argv.at(4), argv.at(5), argv.at(6)); +} + +void CreateTopicWithAzureEventHubsIngestion( + google::cloud::pubsub_admin::TopicAdminClient client, + std::vector const& argv) { + // [START pubsub_create_topic_with_azure_event_hubs_ingestion] + namespace pubsub = ::google::cloud::pubsub; + namespace pubsub_admin = ::google::cloud::pubsub_admin; + [](pubsub_admin::TopicAdminClient client, std::string project_id, + std::string topic_id, std::string const& resource_group, + std::string const& event_hubs_namespace, std::string const& event_hub, + std::string const& client_id, std::string const& tenant_id, + std::string const& subscription_id, + std::string const& gcp_service_account) { + google::pubsub::v1::Topic request; + request.set_name( + pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName()); + auto* azure_event_hubs = request.mutable_ingestion_data_source_settings() + ->mutable_azure_event_hubs(); + azure_event_hubs->set_resource_group(resource_group); + azure_event_hubs->set_namespace_(event_hubs_namespace); + azure_event_hubs->set_event_hub(event_hub); + azure_event_hubs->set_client_id(client_id); + azure_event_hubs->set_tenant_id(tenant_id); + azure_event_hubs->set_subscription_id(subscription_id); + azure_event_hubs->set_gcp_service_account(gcp_service_account); + + auto topic = client.CreateTopic(request); + // Note that kAlreadyExists is a possible error when the library retries. + if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) { + std::cout << "The topic already exists\n"; + return; + } + if (!topic) throw std::move(topic).status(); + + std::cout << "The topic was successfully created: " << topic->DebugString() + << "\n"; + } + // [END pubsub_create_topic_with_azure_event_hubs_ingestion] + (std::move(client), argv.at(0), argv.at(1), argv.at(2), argv.at(3), + argv.at(4), argv.at(5), argv.at(6), argv.at(7), argv.at(8)); +} + void GetTopic(google::cloud::pubsub_admin::TopicAdminClient client, std::vector const& argv) { namespace pubsub = ::google::cloud::pubsub; @@ -626,10 +741,42 @@ void AutoRun(std::vector const& argv) { "fake-service-account@fake-gcp-project.iam.gserviceaccount.com"; auto const* const kinesis_updated_gcp_service_account = "fake-update-service-account@fake-gcp-project.iam.gserviceaccount.com"; + auto const cloud_storage_topic_id = "cloud-storage-" + RandomTopicId(generator) + "_ingestion_topic"; auto const cloud_storage_bucket = project_id + "-pubsub-bucket"; + auto const aws_msk_topic_id = + "aws-msk-" + RandomTopicId(generator) + "_ingestion_topic"; + auto const* const aws_msk_cluster_arn = + "arn:aws:kafka:us-east-1:1111111111:cluster/fake-cluster-name/11111111"; + auto const* const aws_msk_topic = "fake-msk-topic"; + auto const* const aws_msk_role_arn = + "arn:aws:iam::111111111111:role/fake-role-name"; + auto const* const aws_msk_gcp_service_account = + "fake-service-account@fake-gcp-project.iam.gserviceaccount.com"; + + auto const confluent_cloud_topic_id = + "confluent-cloud-" + RandomTopicId(generator) + "_ingestion_topic"; + auto const* const confluent_cloud_bootstrap_server = + "fake-bootstrap-server-id.us-south1.gcp.confluent.cloud:9092"; + auto const* const confluent_cloud_cluster_id = "fake-cluster-id"; + auto const* const confluent_cloud_topic = "fake-topic"; + auto const* const confluent_cloud_identity_pool_id = "fake-pool-id"; + auto const* const confluent_cloud_gcp_service_account = + "fake-service-account@fake-gcp-project.iam.gserviceaccount.com"; + + auto const azure_event_hubs_topic_id = + "azure-event-hubs-" + RandomTopicId(generator) + "_ingestion_topic"; + auto const* const azure_event_hubs_resource_group = "fake-resource-group"; + auto const* const azure_event_hubs_namespace = "fake-namespace"; + auto const* const azure_event_hubs_event_hub = "fake-event-hub"; + auto const* const azure_event_hubs_client_id = "fake-client-id"; + auto const* const azure_event_hubs_tenant_id = "fake-tenant-id"; + auto const* const azure_event_hubs_subscription_id = "fake-subscription-id"; + auto const* const azure_event_hubs_gcp_service_account = + "fake-service-account@fake-gcp-project.iam.gserviceaccount.com"; + using ::google::cloud::StatusCode; auto ignore_emulator_failures = [](auto lambda, StatusCode code = StatusCode::kUnimplemented) { @@ -696,6 +843,63 @@ void AutoRun(std::vector const& argv) { }, StatusCode::kInvalidArgument); + std::cout << "\nRunning CreateTopicWithAwsMskIngestion() sample" << std::endl; + + ignore_emulator_failures( + [&] { + CreateTopicWithAwsMskIngestion( + topic_admin_client, + {project_id, aws_msk_topic_id, aws_msk_cluster_arn, aws_msk_topic, + aws_msk_role_arn, aws_msk_gcp_service_account}); + cleanup.Defer( + [topic_admin_client, project_id, aws_msk_topic_id]() mutable { + std::cout << "\nRunning DeleteTopic() sample" << std::endl; + DeleteTopic(topic_admin_client, {project_id, aws_msk_topic_id}); + }); + }, + StatusCode::kInvalidArgument); + + std::cout << "\nRunning CreateTopicWithConfluentCloudIngestion() sample" + << std::endl; + + ignore_emulator_failures( + [&] { + CreateTopicWithConfluentCloudIngestion( + topic_admin_client, + {project_id, confluent_cloud_topic_id, + confluent_cloud_bootstrap_server, confluent_cloud_cluster_id, + confluent_cloud_topic, confluent_cloud_identity_pool_id, + confluent_cloud_gcp_service_account}); + cleanup.Defer([topic_admin_client, project_id, + confluent_cloud_topic_id]() mutable { + std::cout << "\nRunning DeleteTopic() sample" << std::endl; + DeleteTopic(topic_admin_client, + {project_id, confluent_cloud_topic_id}); + }); + }, + StatusCode::kInvalidArgument); + + std::cout << "\nRunning CreateTopicWithAzureEventHubsIngestion() sample" + << std::endl; + + ignore_emulator_failures( + [&] { + CreateTopicWithAzureEventHubsIngestion( + topic_admin_client, + {project_id, azure_event_hubs_topic_id, + azure_event_hubs_resource_group, azure_event_hubs_namespace, + azure_event_hubs_event_hub, azure_event_hubs_client_id, + azure_event_hubs_tenant_id, azure_event_hubs_subscription_id, + azure_event_hubs_gcp_service_account}); + cleanup.Defer([topic_admin_client, project_id, + azure_event_hubs_topic_id]() mutable { + std::cout << "\nRunning DeleteTopic() sample" << std::endl; + DeleteTopic(topic_admin_client, + {project_id, azure_event_hubs_topic_id}); + }); + }, + StatusCode::kInvalidArgument); + std::cout << "\nRunning UpdateTopicType() sample" << std::endl; UpdateTopicType( @@ -768,6 +972,21 @@ int main(int argc, char* argv[]) { // NOLINT(bugprone-exception-escape) {"project-id", "topic-id", "bucket", "input-format", "text-delimiter", "match-glob", "minimum-object-create-time"}, CreateTopicWithCloudStorageIngestion), + CreateTopicAdminCommand( + "create-topic-with-aws-msk-ingestion", + {"project-id", "topic-id", "cluster-arn", "msk-topic", "aws-role-arn", + "gcp-service-account"}, + CreateTopicWithAwsMskIngestion), + CreateTopicAdminCommand( + "create-topic-with-confluent-cloud-ingestion", + {"project-id", "topic-id", "bootstrap-server", "cluster-id", + "confluent-cloud-topic", "identity-pool-id", "gcp-service-account"}, + CreateTopicWithConfluentCloudIngestion), + CreateTopicAdminCommand( + "create-topic-with-azure-event-hubs-ingestion", + {"project-id", "topic-id", "resource-group", "namespace", "event-hub", + "client-id", "tenant-id", "subscription-id", "gcp-service-account"}, + CreateTopicWithAzureEventHubsIngestion), CreateTopicAdminCommand( "create-topic-with-schema", {"project-id", "topic-id", "schema-id", "encoding"},