Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

docs(pubsub): Add Pub/Sub ingestion from Kafka samples #14954

Merged
merged 2 commits into from
Jan 24, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 219 additions & 0 deletions google/cloud/pubsub/samples/topic_admin_samples.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,121 @@ void CreateTopicWithCloudStorageIngestion(
argv.at(4), argv.at(5), argv.at(6));
}

void CreateTopicWithAwsMskIngestion(
google::cloud::pubsub_admin::TopicAdminClient client,
std::vector<std::string> const& argv) {
// [START pubsub_create_topic_with_aws_msk_ingestion]
namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
std::string topic_id, std::string const& cluster_arn,
std::string const& msk_topic, std::string const& aws_role_arn,
std::string const& gcp_service_account) {
google::pubsub::v1::Topic request;
request.set_name(
pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
auto* aws_msk =
request.mutable_ingestion_data_source_settings()->mutable_aws_msk();
aws_msk->set_cluster_arn(cluster_arn);
aws_msk->set_topic(msk_topic);
aws_msk->set_aws_role_arn(aws_role_arn);
aws_msk->set_gcp_service_account(gcp_service_account);

auto topic = client.CreateTopic(request);
// Note that kAlreadyExists is a possible error when the library retries.
if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
std::cout << "The topic already exists\n";
return;
}
if (!topic) throw std::move(topic).status();

std::cout << "The topic was successfully created: " << topic->DebugString()
<< "\n";
}
// [END pubsub_create_topic_with_aws_msk_ingestion]
(std::move(client), argv.at(0), argv.at(1), argv.at(2), argv.at(3),
argv.at(4), argv.at(5));
}

void CreateTopicWithConfluentCloudIngestion(
google::cloud::pubsub_admin::TopicAdminClient client,
std::vector<std::string> const& argv) {
// [START pubsub_create_topic_with_confluent_cloud_ingestion]
namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
std::string topic_id, std::string const& bootstrap_server,
std::string const& cluster_id, std::string const& confluent_topic,
std::string const& identity_pool_id,
std::string const& gcp_service_account) {
google::pubsub::v1::Topic request;
request.set_name(
pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
auto* confluent_cloud = request.mutable_ingestion_data_source_settings()
->mutable_confluent_cloud();
confluent_cloud->set_bootstrap_server(bootstrap_server);
confluent_cloud->set_cluster_id(cluster_id);
confluent_cloud->set_topic(confluent_topic);
confluent_cloud->set_identity_pool_id(identity_pool_id);
confluent_cloud->set_gcp_service_account(gcp_service_account);

auto topic = client.CreateTopic(request);
// Note that kAlreadyExists is a possible error when the library retries.
if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
std::cout << "The topic already exists\n";
return;
}
if (!topic) throw std::move(topic).status();

std::cout << "The topic was successfully created: " << topic->DebugString()
<< "\n";
}
// [END pubsub_create_topic_with_confluent_cloud_ingestion]
(std::move(client), argv.at(0), argv.at(1), argv.at(2), argv.at(3),
argv.at(4), argv.at(5), argv.at(6));
}

void CreateTopicWithAzureEventHubsIngestion(
google::cloud::pubsub_admin::TopicAdminClient client,
std::vector<std::string> const& argv) {
// [START pubsub_create_topic_with_azure_event_hubs_ingestion]
namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
std::string topic_id, std::string const& resource_group,
std::string const& event_hubs_namespace, std::string const& event_hub,
std::string const& client_id, std::string const& tenant_id,
std::string const& subscription_id,
std::string const& gcp_service_account) {
google::pubsub::v1::Topic request;
request.set_name(
pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
auto* azure_event_hubs = request.mutable_ingestion_data_source_settings()
->mutable_azure_event_hubs();
azure_event_hubs->set_resource_group(resource_group);
azure_event_hubs->set_namespace_(event_hubs_namespace);
azure_event_hubs->set_event_hub(event_hub);
azure_event_hubs->set_client_id(client_id);
azure_event_hubs->set_tenant_id(tenant_id);
azure_event_hubs->set_subscription_id(subscription_id);
azure_event_hubs->set_gcp_service_account(gcp_service_account);

auto topic = client.CreateTopic(request);
// Note that kAlreadyExists is a possible error when the library retries.
if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
std::cout << "The topic already exists\n";
return;
}
if (!topic) throw std::move(topic).status();

std::cout << "The topic was successfully created: " << topic->DebugString()
<< "\n";
}
// [END pubsub_create_topic_with_azure_event_hubs_ingestion]
(std::move(client), argv.at(0), argv.at(1), argv.at(2), argv.at(3),
argv.at(4), argv.at(5), argv.at(6), argv.at(7), argv.at(8));
}

void GetTopic(google::cloud::pubsub_admin::TopicAdminClient client,
std::vector<std::string> const& argv) {
namespace pubsub = ::google::cloud::pubsub;
Expand Down Expand Up @@ -626,10 +741,42 @@ void AutoRun(std::vector<std::string> const& argv) {
"fake-service-account@fake-gcp-project.iam.gserviceaccount.com";
auto const* const kinesis_updated_gcp_service_account =
"fake-update-service-account@fake-gcp-project.iam.gserviceaccount.com";

auto const cloud_storage_topic_id =
"cloud-storage-" + RandomTopicId(generator) + "_ingestion_topic";
auto const cloud_storage_bucket = project_id + "-pubsub-bucket";

auto const aws_msk_topic_id =
"aws-msk-" + RandomTopicId(generator) + "_ingestion_topic";
auto const* const aws_msk_cluster_arn =
"arn:aws:kafka:us-east-1:1111111111:cluster/fake-cluster-name/11111111";
auto const* const aws_msk_topic = "fake-msk-topic";
auto const* const aws_msk_role_arn =
"arn:aws:iam::111111111111:role/fake-role-name";
auto const* const aws_msk_gcp_service_account =
"fake-service-account@fake-gcp-project.iam.gserviceaccount.com";

auto const confluent_cloud_topic_id =
"confluent-cloud-" + RandomTopicId(generator) + "_ingestion_topic";
auto const* const confluent_cloud_bootstrap_server =
"fake-bootstrap-server-id.us-south1.gcp.confluent.cloud:9092";
auto const* const confluent_cloud_cluster_id = "fake-cluster-id";
auto const* const confluent_cloud_topic = "fake-topic";
auto const* const confluent_cloud_identity_pool_id = "fake-pool-id";
auto const* const confluent_cloud_gcp_service_account =
"fake-service-account@fake-gcp-project.iam.gserviceaccount.com";

auto const azure_event_hubs_topic_id =
"azure-event-hubs-" + RandomTopicId(generator) + "_ingestion_topic";
auto const* const azure_event_hubs_resource_group = "fake-resource-group";
auto const* const azure_event_hubs_namespace = "fake-namespace";
auto const* const azure_event_hubs_event_hub = "fake-event-hub";
auto const* const azure_event_hubs_client_id = "fake-client-id";
auto const* const azure_event_hubs_tenant_id = "fake-tenant-id";
auto const* const azure_event_hubs_subscription_id = "fake-subscription-id";
auto const* const azure_event_hubs_gcp_service_account =
"fake-service-account@fake-gcp-project.iam.gserviceaccount.com";

using ::google::cloud::StatusCode;
auto ignore_emulator_failures =
[](auto lambda, StatusCode code = StatusCode::kUnimplemented) {
Expand Down Expand Up @@ -696,6 +843,63 @@ void AutoRun(std::vector<std::string> const& argv) {
},
StatusCode::kInvalidArgument);

std::cout << "\nRunning CreateTopicWithAwsMskIngestion() sample" << std::endl;

ignore_emulator_failures(
[&] {
CreateTopicWithAwsMskIngestion(
topic_admin_client,
{project_id, aws_msk_topic_id, aws_msk_cluster_arn, aws_msk_topic,
aws_msk_role_arn, aws_msk_gcp_service_account});
cleanup.Defer(
[topic_admin_client, project_id, aws_msk_topic_id]() mutable {
std::cout << "\nRunning DeleteTopic() sample" << std::endl;
DeleteTopic(topic_admin_client, {project_id, aws_msk_topic_id});
});
},
StatusCode::kInvalidArgument);

std::cout << "\nRunning CreateTopicWithConfluentCloudIngestion() sample"
<< std::endl;

ignore_emulator_failures(
[&] {
CreateTopicWithConfluentCloudIngestion(
topic_admin_client,
{project_id, confluent_cloud_topic_id,
confluent_cloud_bootstrap_server, confluent_cloud_cluster_id,
confluent_cloud_topic, confluent_cloud_identity_pool_id,
confluent_cloud_gcp_service_account});
cleanup.Defer([topic_admin_client, project_id,
confluent_cloud_topic_id]() mutable {
std::cout << "\nRunning DeleteTopic() sample" << std::endl;
DeleteTopic(topic_admin_client,
{project_id, confluent_cloud_topic_id});
});
},
StatusCode::kInvalidArgument);

std::cout << "\nRunning CreateTopicWithAzureEventHubsIngestion() sample"
<< std::endl;

ignore_emulator_failures(
[&] {
CreateTopicWithAzureEventHubsIngestion(
topic_admin_client,
{project_id, azure_event_hubs_topic_id,
azure_event_hubs_resource_group, azure_event_hubs_namespace,
azure_event_hubs_event_hub, azure_event_hubs_client_id,
azure_event_hubs_tenant_id, azure_event_hubs_subscription_id,
azure_event_hubs_gcp_service_account});
cleanup.Defer([topic_admin_client, project_id,
azure_event_hubs_topic_id]() mutable {
std::cout << "\nRunning DeleteTopic() sample" << std::endl;
DeleteTopic(topic_admin_client,
{project_id, azure_event_hubs_topic_id});
});
},
StatusCode::kInvalidArgument);

std::cout << "\nRunning UpdateTopicType() sample" << std::endl;

UpdateTopicType(
Expand Down Expand Up @@ -768,6 +972,21 @@ int main(int argc, char* argv[]) { // NOLINT(bugprone-exception-escape)
{"project-id", "topic-id", "bucket", "input-format", "text-delimiter",
"match-glob", "minimum-object-create-time"},
CreateTopicWithCloudStorageIngestion),
CreateTopicAdminCommand(
"create-topic-with-aws-msk-ingestion",
{"project-id", "topic-id", "cluster-arn", "msk-topic", "aws-role-arn",
"gcp-service-account"},
CreateTopicWithAwsMskIngestion),
CreateTopicAdminCommand(
"create-topic-with-confluent-cloud-ingestion",
{"project-id", "topic-id", "bootstrap-server", "cluster-id",
"confluent-cloud-topic", "identity-pool-id", "gcp-service-account"},
CreateTopicWithConfluentCloudIngestion),
CreateTopicAdminCommand(
"create-topic-with-azure-event-hubs-ingestion",
{"project-id", "topic-id", "resource-group", "namespace", "event-hub",
"client-id", "tenant-id", "subscription-id", "gcp-service-account"},
CreateTopicWithAzureEventHubsIngestion),
CreateTopicAdminCommand(
"create-topic-with-schema",
{"project-id", "topic-id", "schema-id", "encoding"},
Expand Down
Loading