From 4110b56314eb7c3ecbc56b9f6fcb37eb70286858 Mon Sep 17 00:00:00 2001 From: WenTao Ou Date: Sun, 31 Oct 2021 23:54:54 +0800 Subject: [PATCH] Implementation of `OtlpHttpLogExporter` (#1030) --- CHANGELOG.md | 2 + api/test/common/spinlock_benchmark.cc | 12 +- bazel/opentelemetry_proto.BUILD | 55 +- bazel/repository.bzl | 6 +- cmake/opentelemetry-cpp-config.cmake.in | 6 + exporters/otlp/BUILD | 68 +- exporters/otlp/CMakeLists.txt | 90 ++- .../exporters/otlp/otlp_environment.h | 61 +- .../exporters/otlp/otlp_http_client.h | 130 ++++ .../exporters/otlp/otlp_http_exporter.h | 45 +- .../exporters/otlp/otlp_http_log_exporter.h | 105 +++ .../exporters/otlp/otlp_log_recordable.h | 107 +++ .../exporters/otlp/otlp_recordable.h | 4 +- .../exporters/otlp/otlp_recordable_utils.h | 63 ++ .../exporters/otlp/protobuf_include_prefix.h | 2 + exporters/otlp/src/otlp_grpc_exporter.cc | 35 +- exporters/otlp/src/otlp_http_client.cc | 673 ++++++++++++++++++ exporters/otlp/src/otlp_http_exporter.cc | 670 +---------------- exporters/otlp/src/otlp_http_log_exporter.cc | 62 ++ exporters/otlp/src/otlp_log_recordable.cc | 204 ++++++ exporters/otlp/src/otlp_recordable.cc | 217 +----- exporters/otlp/src/otlp_recordable_utils.cc | 320 +++++++++ .../otlp/test/otlp_http_exporter_test.cc | 65 +- .../otlp/test/otlp_http_log_exporter_test.cc | 499 +++++++++++++ sdk/test/trace/tracer_provider_test.cc | 14 +- sdk/test/trace/tracer_test.cc | 14 +- 26 files changed, 2550 insertions(+), 979 deletions(-) create mode 100644 exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h create mode 100644 exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h create mode 100644 exporters/otlp/include/opentelemetry/exporters/otlp/otlp_log_recordable.h create mode 100644 exporters/otlp/include/opentelemetry/exporters/otlp/otlp_recordable_utils.h create mode 100644 exporters/otlp/src/otlp_http_client.cc create mode 100644 exporters/otlp/src/otlp_http_log_exporter.cc create mode 100644 exporters/otlp/src/otlp_log_recordable.cc create mode 100644 exporters/otlp/src/otlp_recordable_utils.cc create mode 100644 exporters/otlp/test/otlp_http_log_exporter_test.cc diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b42529ded..ebd0e0a29d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ Increment the: ## [Unreleased] +* [EXPORTER] Add OTLP/HTTP Log Exporter ([#1030](https://github.com/open-telemetry/opentelemetry-cpp/pull/1030)) + ## [1.0.1] 2021-10-21 * [EXPORTER] Exports span attributes to ETW ([#1021](https://github.com/open-telemetry/opentelemetry-cpp/pull/1021)) diff --git a/api/test/common/spinlock_benchmark.cc b/api/test/common/spinlock_benchmark.cc index 3557063479..95c3f7c7af 100644 --- a/api/test/common/spinlock_benchmark.cc +++ b/api/test/common/spinlock_benchmark.cc @@ -69,6 +69,7 @@ static void BM_NaiveSpinLockThrashing(benchmark::State &s) [](SpinLockMutex &m) { while (!m.try_lock()) { + // Left this comment to keep the same format on old and new versions of clang-format } }, [](SpinLockMutex &m) { m.unlock(); }); @@ -106,16 +107,21 @@ static void BM_ThreadYieldSpinLockThrashing(benchmark::State &s) SpinThrash>( s, mutex, [](std::atomic &l) { - if (!l.exchange(true, std::memory_order_acquire)) + if (!l.exchange(true, std::memory_order_acq_rel)) { return; } - for (std::size_t i = 0; i < 100; ++i) + for (std::size_t i = 0; i < 128; ++i) { - if (!l.load(std::memory_order_acquire) && !l.exchange(true, std::memory_order_acquire)) + if (!l.load(std::memory_order_acquire) && !l.exchange(true, std::memory_order_acq_rel)) { return; } + + if (i % 32 == 0) + { + std::this_thread::yield(); + } } std::this_thread::yield(); }, diff --git a/bazel/opentelemetry_proto.BUILD b/bazel/opentelemetry_proto.BUILD index 9c91edc2d0..35319270da 100644 --- a/bazel/opentelemetry_proto.BUILD +++ b/bazel/opentelemetry_proto.BUILD @@ -20,7 +20,7 @@ load("@com_github_grpc_grpc//bazel:cc_grpc_library.bzl", "cc_grpc_library") proto_library( name = "common_proto", srcs = [ - "opentelemetry/proto/common/v1/common.proto", + "opentelemetry/proto/common/v1/common.proto", ], ) @@ -32,10 +32,10 @@ cc_proto_library( proto_library( name = "resource_proto", srcs = [ - "opentelemetry/proto/resource/v1/resource.proto", + "opentelemetry/proto/resource/v1/resource.proto", ], deps = [ - ":common_proto", + ":common_proto", ], ) @@ -47,11 +47,11 @@ cc_proto_library( proto_library( name = "trace_proto", srcs = [ - "opentelemetry/proto/trace/v1/trace.proto", + "opentelemetry/proto/trace/v1/trace.proto", ], deps = [ - ":common_proto", - ":resource_proto", + ":common_proto", + ":resource_proto", ], ) @@ -63,10 +63,10 @@ cc_proto_library( proto_library( name = "trace_service_proto", srcs = [ - "opentelemetry/proto/collector/trace/v1/trace_service.proto", + "opentelemetry/proto/collector/trace/v1/trace_service.proto", ], deps = [ - ":trace_proto", + ":trace_proto", ], ) @@ -78,7 +78,46 @@ cc_proto_library( cc_grpc_library( name = "trace_service_grpc_cc", srcs = [":trace_service_proto"], + generate_mocks = True, grpc_only = True, deps = [":trace_service_proto_cc"], +) + +proto_library( + name = "logs_proto", + srcs = [ + "opentelemetry/proto/logs/v1/logs.proto", + ], + deps = [ + ":common_proto", + ":resource_proto", + ], +) + +cc_proto_library( + name = "logs_proto_cc", + deps = [":logs_proto"], +) + +proto_library( + name = "logs_service_proto", + srcs = [ + "opentelemetry/proto/collector/logs/v1/logs_service.proto", + ], + deps = [ + ":logs_proto", + ], +) + +cc_proto_library( + name = "logs_service_proto_cc", + deps = [":logs_service_proto"], +) + +cc_grpc_library( + name = "logs_service_grpc_cc", + srcs = [":logs_service_proto"], generate_mocks = True, + grpc_only = True, + deps = [":logs_service_proto_cc"], ) diff --git a/bazel/repository.bzl b/bazel/repository.bzl index 85d320420e..78fcc4a3f1 100644 --- a/bazel/repository.bzl +++ b/bazel/repository.bzl @@ -54,10 +54,10 @@ def opentelemetry_cpp_deps(): http_archive, name = "com_github_opentelemetry_proto", build_file = "@io_opentelemetry_cpp//bazel:opentelemetry_proto.BUILD", - sha256 = "9ec38ab51eedbd7601979b0eda962cf37bc8a4dc35fcef604801e463f01dcc00", - strip_prefix = "opentelemetry-proto-0.9.0", + sha256 = "985367f8905e91018e636cbf0d83ab3f834b665c4f5899a27d10cae9657710e2", + strip_prefix = "opentelemetry-proto-0.11.0", urls = [ - "https://github.com/open-telemetry/opentelemetry-proto/archive/v0.9.0.tar.gz", + "https://github.com/open-telemetry/opentelemetry-proto/archive/v0.11.0.tar.gz", ], ) diff --git a/cmake/opentelemetry-cpp-config.cmake.in b/cmake/opentelemetry-cpp-config.cmake.in index 03951e6488..526d94eca4 100644 --- a/cmake/opentelemetry-cpp-config.cmake.in +++ b/cmake/opentelemetry-cpp-config.cmake.in @@ -29,8 +29,11 @@ # opentelemetry-cpp::metrics - Imported target of opentelemetry-cpp::metrics # opentelemetry-cpp::logs - Imported target of opentelemetry-cpp::logs # opentelemetry-cpp::in_memory_span_exporter - Imported target of opentelemetry-cpp::in_memory_span_exporter +# opentelemetry-cpp::otlp_recordable - Imported target of opentelemetry-cpp::otlp_recordable # opentelemetry-cpp::otlp_grpc_exporter - Imported target of opentelemetry-cpp::otlp_grpc_exporter +# opentelemetry-cpp::otlp_http_client - Imported target of opentelemetry-cpp::otlp_http_client # opentelemetry-cpp::otlp_http_exporter - Imported target of opentelemetry-cpp::otlp_http_exporter +# opentelemetry-cpp::otlp_http_log_exporter - Imported target of opentelemetry-cpp::otlp_http_log_exporter # opentelemetry-cpp::ostream_log_exporter - Imported target of opentelemetry-cpp::ostream_log_exporter # opentelemetry-cpp::ostream_metrics_exporter - Imported target of opentelemetry-cpp::ostream_metrics_exporter # opentelemetry-cpp::ostream_span_exporter - Imported target of opentelemetry-cpp::ostream_span_exporter @@ -78,8 +81,11 @@ set(_OPENTELEMETRY_CPP_LIBRARIES_TEST_TARGETS metrics logs in_memory_span_exporter + otlp_recordable otlp_grpc_exporter + otlp_http_client otlp_http_exporter + otlp_http_log_exporter ostream_log_exporter ostream_metrics_exporter ostream_span_exporter diff --git a/exporters/otlp/BUILD b/exporters/otlp/BUILD index 0ef968ef46..a287573b16 100644 --- a/exporters/otlp/BUILD +++ b/exporters/otlp/BUILD @@ -19,18 +19,24 @@ load("//bazel:otel_cc_benchmark.bzl", "otel_cc_benchmark") cc_library( name = "otlp_recordable", srcs = [ + "src/otlp_log_recordable.cc", "src/otlp_recordable.cc", + "src/otlp_recordable_utils.cc", ], hdrs = [ + "include/opentelemetry/exporters/otlp/otlp_log_recordable.h", "include/opentelemetry/exporters/otlp/otlp_recordable.h", + "include/opentelemetry/exporters/otlp/otlp_recordable_utils.h", "include/opentelemetry/exporters/otlp/protobuf_include_prefix.h", "include/opentelemetry/exporters/otlp/protobuf_include_suffix.h", ], strip_include_prefix = "include", deps = [ + "//sdk/src/logs", "//sdk/src/resource", "//sdk/src/trace", - "@com_github_opentelemetry_proto//:trace_proto_cc", + "@com_github_opentelemetry_proto//:logs_service_proto_cc", + "@com_github_opentelemetry_proto//:trace_service_proto_cc", ], ) @@ -58,13 +64,13 @@ cc_library( ) cc_library( - name = "otlp_http_exporter", + name = "otlp_http_client", srcs = [ - "src/otlp_http_exporter.cc", + "src/otlp_http_client.cc", ], hdrs = [ "include/opentelemetry/exporters/otlp/otlp_environment.h", - "include/opentelemetry/exporters/otlp/otlp_http_exporter.h", + "include/opentelemetry/exporters/otlp/otlp_http_client.h", "include/opentelemetry/exporters/otlp/protobuf_include_prefix.h", "include/opentelemetry/exporters/otlp/protobuf_include_suffix.h", ], @@ -80,11 +86,51 @@ cc_library( }), strip_include_prefix = "include", deps = [ - ":otlp_recordable", + "//api", "//ext/src/http/client/curl:http_client_curl", + "//sdk:headers", + "@com_github_opentelemetry_proto//:common_proto_cc", + "@github_nlohmann_json//:json", + ], +) + +cc_library( + name = "otlp_http_exporter", + srcs = [ + "src/otlp_http_exporter.cc", + ], + hdrs = [ + "include/opentelemetry/exporters/otlp/otlp_environment.h", + "include/opentelemetry/exporters/otlp/otlp_http_exporter.h", + "include/opentelemetry/exporters/otlp/protobuf_include_prefix.h", + "include/opentelemetry/exporters/otlp/protobuf_include_suffix.h", + ], + strip_include_prefix = "include", + deps = [ + ":otlp_http_client", + ":otlp_recordable", + "//sdk/src/trace", + "@com_github_opentelemetry_proto//:trace_service_proto_cc", + ], +) + +cc_library( + name = "otlp_http_log_exporter", + srcs = [ + "src/otlp_http_log_exporter.cc", + ], + hdrs = [ + "include/opentelemetry/exporters/otlp/otlp_environment.h", + "include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h", + "include/opentelemetry/exporters/otlp/protobuf_include_prefix.h", + "include/opentelemetry/exporters/otlp/protobuf_include_suffix.h", + ], + strip_include_prefix = "include", + deps = [ + ":otlp_http_client", + ":otlp_recordable", "//sdk/src/trace", "@com_github_opentelemetry_proto//:trace_service_proto_cc", - "@github_nlohmann_json//:json", ], ) @@ -117,6 +163,16 @@ cc_test( ], ) +cc_test( + name = "otlp_http_log_exporter_test", + srcs = ["test/otlp_http_log_exporter_test.cc"], + deps = [ + ":otlp_http_log_exporter", + "//api", + "@com_google_googletest//:gtest_main", + ], +) + otel_cc_benchmark( name = "otlp_grpc_exporter_benchmark", srcs = ["test/otlp_grpc_exporter_benchmark.cc"], diff --git a/exporters/otlp/CMakeLists.txt b/exporters/otlp/CMakeLists.txt index 3f8c6babcf..0a67c9de0f 100644 --- a/exporters/otlp/CMakeLists.txt +++ b/exporters/otlp/CMakeLists.txt @@ -1,4 +1,7 @@ -add_library(opentelemetry_otlp_recordable src/otlp_recordable.cc) +add_library( + opentelemetry_otlp_recordable + src/otlp_log_recordable.cc src/otlp_recordable.cc + src/otlp_recordable_utils.cc) set_target_properties(opentelemetry_otlp_recordable PROPERTIES EXPORT_NAME otlp_recordable) @@ -28,6 +31,21 @@ endif() if(WITH_OTLP_HTTP) find_package(CURL REQUIRED) + add_library(opentelemetry_exporter_otlp_http_client src/otlp_http_client.cc) + set_target_properties(opentelemetry_exporter_otlp_http_client + PROPERTIES EXPORT_NAME otlp_http_client) + target_link_libraries( + opentelemetry_exporter_otlp_http_client + PUBLIC opentelemetry_sdk opentelemetry_proto http_client_curl + nlohmann_json::nlohmann_json) + target_include_directories( + opentelemetry_exporter_otlp_http_client + PUBLIC "$" + "$") + + list(APPEND OPENTELEMETRY_OTLP_TARGETS + opentelemetry_exporter_otlp_http_client) + add_library(opentelemetry_exporter_otlp_http src/otlp_http_exporter.cc) set_target_properties(opentelemetry_exporter_otlp_http @@ -35,10 +53,25 @@ if(WITH_OTLP_HTTP) target_link_libraries( opentelemetry_exporter_otlp_http - PUBLIC opentelemetry_otlp_recordable http_client_curl - nlohmann_json::nlohmann_json) + PUBLIC opentelemetry_otlp_recordable + opentelemetry_exporter_otlp_http_client) list(APPEND OPENTELEMETRY_OTLP_TARGETS opentelemetry_exporter_otlp_http) + + if(WITH_LOGS_PREVIEW) + add_library(opentelemetry_exporter_otlp_http_log + src/otlp_http_log_exporter.cc) + + set_target_properties(opentelemetry_exporter_otlp_http_log + PROPERTIES EXPORT_NAME otlp_http_log_exporter) + + target_link_libraries( + opentelemetry_exporter_otlp_http_log + PUBLIC opentelemetry_otlp_recordable + opentelemetry_exporter_otlp_http_client) + + list(APPEND OPENTELEMETRY_OTLP_TARGETS opentelemetry_exporter_otlp_http_log) + endif() endif() install( @@ -81,20 +114,39 @@ if(BUILD_TESTING) else() find_library(GMOCK_LIB gmock PATH_SUFFIXES lib) endif() - add_executable(otlp_grpc_exporter_test test/otlp_grpc_exporter_test.cc) - target_link_libraries( - otlp_grpc_exporter_test ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} - ${GMOCK_LIB} opentelemetry_exporter_otlp_grpc) - gtest_add_tests( - TARGET otlp_grpc_exporter_test - TEST_PREFIX exporter.otlp. - TEST_LIST otlp_grpc_exporter_test) - add_executable(otlp_http_exporter_test test/otlp_http_exporter_test.cc) - target_link_libraries( - otlp_http_exporter_test ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} - ${GMOCK_LIB} opentelemetry_exporter_otlp_http) - gtest_add_tests( - TARGET otlp_http_exporter_test - TEST_PREFIX exporter.otlp. - TEST_LIST otlp_http_exporter_test) + + if(WITH_OTLP_GRPC) + add_executable(otlp_grpc_exporter_test test/otlp_grpc_exporter_test.cc) + target_link_libraries( + otlp_grpc_exporter_test ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} + ${GMOCK_LIB} opentelemetry_exporter_otlp_grpc) + gtest_add_tests( + TARGET otlp_grpc_exporter_test + TEST_PREFIX exporter.otlp. + TEST_LIST otlp_grpc_exporter_test) + endif() + + if(WITH_OTLP_HTTP) + add_executable(otlp_http_exporter_test test/otlp_http_exporter_test.cc) + target_link_libraries( + otlp_http_exporter_test ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} + ${GMOCK_LIB} opentelemetry_exporter_otlp_http) + gtest_add_tests( + TARGET otlp_http_exporter_test + TEST_PREFIX exporter.otlp. + TEST_LIST otlp_http_exporter_test) + + if(WITH_LOGS_PREVIEW) + add_executable(otlp_http_log_exporter_test + test/otlp_http_log_exporter_test.cc) + target_link_libraries( + otlp_http_log_exporter_test ${GTEST_BOTH_LIBRARIES} + ${CMAKE_THREAD_LIBS_INIT} ${GMOCK_LIB} + opentelemetry_exporter_otlp_http_log opentelemetry_logs) + gtest_add_tests( + TARGET otlp_http_log_exporter_test + TEST_PREFIX exporter.otlp. + TEST_LIST otlp_http_log_exporter_test) + endif() + endif() endif() # BUILD_TESTING diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_environment.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_environment.h index 4a1882984b..c60de87eb9 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_environment.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_environment.h @@ -3,17 +3,18 @@ #pragma once +#include "opentelemetry/common/kv_properties.h" +#include "opentelemetry/nostd/string_view.h" + +#include "opentelemetry/sdk/common/attribute_utils.h" +#include "opentelemetry/sdk/common/env_variables.h" + #include #include #include #include #include -#include "opentelemetry/nostd/string_view.h" - -#include "opentelemetry/sdk/common/attribute_utils.h" -#include "opentelemetry/sdk/common/env_variables.h" - OPENTELEMETRY_BEGIN_NAMESPACE namespace exporter { @@ -44,6 +45,10 @@ inline const std::string GetOtlpDefaultHttpEndpoint() if (endpoint.empty()) { endpoint = opentelemetry::sdk::common::GetEnvironmentVariable(kOtlpEndpointEnv); + if (!endpoint.empty()) + { + endpoint += "/v1/traces"; + } } return endpoint.size() ? endpoint : kOtlpEndpointDefault; } @@ -216,6 +221,52 @@ inline OtlpHeaders GetOtlpDefaultHeaders() return result; } +inline const std::string GetOtlpDefaultHttpLogEndpoint() +{ + constexpr char kOtlpLogsEndpointEnv[] = "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"; + constexpr char kOtlpEndpointEnv[] = "OTEL_EXPORTER_OTLP_ENDPOINT"; + constexpr char kOtlpEndpointDefault[] = "http://localhost:4318/v1/logs"; + + auto endpoint = opentelemetry::sdk::common::GetEnvironmentVariable(kOtlpLogsEndpointEnv); + if (endpoint.empty()) + { + endpoint = opentelemetry::sdk::common::GetEnvironmentVariable(kOtlpEndpointEnv); + if (!endpoint.empty()) + { + endpoint += "/v1/logs"; + } + } + return endpoint.size() ? endpoint : kOtlpEndpointDefault; +} + +inline const std::chrono::system_clock::duration GetOtlpDefaultLogTimeout() +{ + constexpr char kOtlpLogsTimeoutEnv[] = "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT"; + constexpr char kOtlpTimeoutEnv[] = "OTEL_EXPORTER_OTLP_TIMEOUT"; + + auto timeout = opentelemetry::sdk::common::GetEnvironmentVariable(kOtlpLogsTimeoutEnv); + if (timeout.empty()) + { + timeout = opentelemetry::sdk::common::GetEnvironmentVariable(kOtlpTimeoutEnv); + } + return GetOtlpTimeoutFromString(timeout.c_str()); +} + +inline OtlpHeaders GetOtlpDefaultLogHeaders() +{ + constexpr char kOtlpLogsHeadersEnv[] = "OTEL_EXPORTER_OTLP_LOGS_HEADERS"; + constexpr char kOtlpHeadersEnv[] = "OTEL_EXPORTER_OTLP_HEADERS"; + + OtlpHeaders result; + std::unordered_set log_remove_cache; + DumpOtlpHeaders(result, kOtlpHeadersEnv, log_remove_cache); + + log_remove_cache.clear(); + DumpOtlpHeaders(result, kOtlpLogsHeadersEnv, log_remove_cache); + + return result; +} + } // namespace otlp } // namespace exporter OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h new file mode 100644 index 0000000000..ff2a943554 --- /dev/null +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h @@ -0,0 +1,130 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" + +#include "google/protobuf/message.h" + +#include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" + +#include "opentelemetry/ext/http/client/http_client.h" +#include "opentelemetry/sdk/common/exporter_utils.h" + +#include "opentelemetry/exporters/otlp/otlp_environment.h" + +#include +#include +#include +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter +{ +namespace otlp +{ +// The default URL path to post metric data. +constexpr char kDefaultMetricsPath[] = "/v1/metrics"; +// The HTTP header "Content-Type" +constexpr char kHttpJsonContentType[] = "application/json"; +constexpr char kHttpBinaryContentType[] = "application/x-protobuf"; + +enum class JsonBytesMappingKind +{ + kHexId, + kHex, + kBase64, +}; + +enum class HttpRequestContentType +{ + kJson, + kBinary, +}; + +/** + * Struct to hold OTLP HTTP client options. + */ +struct OtlpHttpClientOptions +{ + std::string url; + + // By default, post json data + HttpRequestContentType content_type = HttpRequestContentType::kJson; + + // If convert bytes into hex. By default, we will convert all bytes but id into base64 + // This option is ignored if content_type is not kJson + JsonBytesMappingKind json_bytes_mapping = JsonBytesMappingKind::kHexId; + + // If using the json name of protobuf field to set the key of json. By default, we will use the + // field name just like proto files. + bool use_json_name = false; + + // Whether to print the status of the HTTP client in the console + bool console_debug = false; + + // TODO: Enable/disable to verify SSL certificate + std::chrono::system_clock::duration timeout = GetOtlpDefaultTimeout(); + + // Additional HTTP headers + OtlpHeaders http_headers = GetOtlpDefaultHeaders(); + + inline OtlpHttpClientOptions(nostd::string_view input_url, + HttpRequestContentType input_content_type, + JsonBytesMappingKind input_json_bytes_mapping, + bool input_use_json_name, + bool input_console_debug, + std::chrono::system_clock::duration input_timeout, + const OtlpHeaders &input_http_headers) + : url(input_url), + content_type(input_content_type), + json_bytes_mapping(input_json_bytes_mapping), + use_json_name(input_use_json_name), + console_debug(input_console_debug), + timeout(input_timeout), + http_headers(input_http_headers) + {} +}; + +/** + * The OTLP HTTP client exports span data in OpenTelemetry Protocol (OTLP) format. + */ +class OtlpHttpClient +{ +public: + /** + * Create an OtlpHttpClient using the given options. + */ + explicit OtlpHttpClient(OtlpHttpClientOptions &&options); + + /** + * Export + * @param message message to export, it should be ExportTraceServiceRequest, + * ExportMetricsServiceRequest or ExportLogsServiceRequest + */ + sdk::common::ExportResult Export(const google::protobuf::Message &message) noexcept; + + /** + * Shut down the HTTP client. + * @param timeout an optional timeout, the default timeout of 0 means that no + * timeout is applied. + * @return return the status of this operation + */ + bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept; + +private: + // Stores if this HTTP client had its Shutdown() method called + bool is_shutdown_ = false; + + // The configuration options associated with this HTTP client. + const OtlpHttpClientOptions options_; + + // Object that stores the HTTP sessions that have been created + std::shared_ptr http_client_; + // Cached parsed URI + std::string http_uri_; +}; +} // namespace otlp +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h index c51c1cf266..852745ac39 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h @@ -3,43 +3,22 @@ #pragma once -#include -#include -#include -#include - // We need include exporter.h first, which will include Windows.h with NOMINMAX on Windows #include "opentelemetry/sdk/trace/exporter.h" -#include "opentelemetry/ext/http/client/http_client.h" +#include "opentelemetry/exporters/otlp/otlp_http_client.h" #include "opentelemetry/exporters/otlp/otlp_environment.h" +#include +#include +#include + OPENTELEMETRY_BEGIN_NAMESPACE namespace exporter { namespace otlp { -// The default URL path to post metric data. -constexpr char kDefaultMetricsPath[] = "/v1/metrics"; -// The default URL path to post metric data. -constexpr char kDefaultLogPath[] = "/v1/logs"; -// The HTTP header "Content-Type" -constexpr char kHttpJsonContentType[] = "application/json"; -constexpr char kHttpBinaryContentType[] = "application/x-protobuf"; - -enum class JsonBytesMappingKind -{ - kHexId, - kHex, - kBase64, -}; - -enum class HttpRequestContentType -{ - kJson, - kBinary, -}; /** * Struct to hold OTLP exporter options. @@ -93,14 +72,15 @@ class OtlpHttpExporter final : public opentelemetry::sdk::trace::SpanExporter * Create a span recordable. * @return a newly initialized Recordable object */ - std::unique_ptr MakeRecordable() noexcept override; + std::unique_ptr MakeRecordable() noexcept override; /** * Export * @param spans a span of unique pointers to span recordables */ - sdk::common::ExportResult Export( - const nostd::span> &spans) noexcept override; + opentelemetry::sdk::common::ExportResult Export( + const nostd::span> &spans) noexcept + override; /** * Shut down the exporter. @@ -111,9 +91,6 @@ class OtlpHttpExporter final : public opentelemetry::sdk::trace::SpanExporter bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override; private: - // Stores if this exporter had its Shutdown() method called - bool is_shutdown_ = false; - // For testing friend class OtlpHttpExporterTestPeer; @@ -121,9 +98,7 @@ class OtlpHttpExporter final : public opentelemetry::sdk::trace::SpanExporter const OtlpHttpExporterOptions options_; // Object that stores the HTTP sessions that have been created - std::shared_ptr http_client_; - // Cached parsed URI - std::string http_uri_; + OtlpHttpClient http_client_; }; } // namespace otlp } // namespace exporter diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h new file mode 100644 index 0000000000..738a60d8f6 --- /dev/null +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h @@ -0,0 +1,105 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once +#ifdef ENABLE_LOGS_PREVIEW + +# include "opentelemetry/sdk/logs/exporter.h" + +# include "opentelemetry/exporters/otlp/otlp_http_client.h" + +# include "opentelemetry/exporters/otlp/otlp_environment.h" + +# include +# include +# include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter +{ +namespace otlp +{ + +/** + * Struct to hold OTLP exporter options. + */ +struct OtlpHttpLogExporterOptions +{ + // The endpoint to export to. By default + // @see + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md + // @see https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver/otlpreceiver + std::string url = GetOtlpDefaultHttpLogEndpoint(); + + // By default, post json data + HttpRequestContentType content_type = HttpRequestContentType::kJson; + + // If convert bytes into hex. By default, we will convert all bytes but id into base64 + // This option is ignored if content_type is not kJson + JsonBytesMappingKind json_bytes_mapping = JsonBytesMappingKind::kHexId; + + // If using the json name of protobuf field to set the key of json. By default, we will use the + // field name just like proto files. + bool use_json_name = false; + + // Whether to print the status of the exporter in the console + bool console_debug = false; + + // TODO: Enable/disable to verify SSL certificate + std::chrono::system_clock::duration timeout = GetOtlpDefaultLogTimeout(); + + // Additional HTTP headers + OtlpHeaders http_headers = GetOtlpDefaultLogHeaders(); +}; + +/** + * The OTLP exporter exports log data in OpenTelemetry Protocol (OTLP) format. + */ +class OtlpHttpLogExporter final : public opentelemetry::sdk::logs::LogExporter +{ +public: + /** + * Create an OtlpHttpLogExporter with default exporter options. + */ + OtlpHttpLogExporter(); + + /** + * Create an OtlpHttpLogExporter with user specified options. + * @param options An object containing the user's configuration options. + */ + OtlpHttpLogExporter(const OtlpHttpLogExporterOptions &options); + + /** + * Creates a recordable that stores the data in a JSON object + */ + std::unique_ptr MakeRecordable() noexcept override; + + /** + * Exports a vector of log records to the Elasticsearch instance. Guaranteed to return after a + * timeout specified from the options passed from the constructor. + * @param records A list of log records to send to Elasticsearch. + */ + opentelemetry::sdk::common::ExportResult Export( + const nostd::span> &records) noexcept + override; + + /** + * Shutdown this exporter. + * @param timeout The maximum time to wait for the shutdown method to return + */ + bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override; + +private: + // For testing + friend class OtlpHttpLogExporterTestPeer; + + // Configuration options for the exporter + const OtlpHttpLogExporterOptions options_; + + // Object that stores the HTTP sessions that have been created + OtlpHttpClient http_client_; +}; +} // namespace otlp +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE +#endif diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_log_recordable.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_log_recordable.h new file mode 100644 index 0000000000..a4a78fd6a9 --- /dev/null +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_log_recordable.h @@ -0,0 +1,107 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once +#ifdef ENABLE_LOGS_PREVIEW + +# include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" + +# include "opentelemetry/proto/logs/v1/logs.pb.h" +# include "opentelemetry/proto/resource/v1/resource.pb.h" + +# include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" + +# include "opentelemetry/sdk/logs/recordable.h" +# include "opentelemetry/version.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter +{ +namespace otlp +{ + +/** + * An OTLP Recordable implemenation + */ +class OtlpLogRecordable final : public opentelemetry::sdk::logs::Recordable +{ +public: + virtual ~OtlpLogRecordable() = default; + + proto::logs::v1::LogRecord &log_record() noexcept { return log_record_; } + const proto::logs::v1::LogRecord &log_record() const noexcept { return log_record_; } + + /** Dynamically converts the resource of this log into a proto. */ + proto::resource::v1::Resource ProtoResource() const noexcept; + + /** + * Set the timestamp for this log. + * @param timestamp the timestamp to set + */ + virtual void SetTimestamp(opentelemetry::common::SystemTimestamp timestamp) noexcept override; + + /** + * Set the severity for this log. + * @param severity the severity of the event + */ + virtual void SetSeverity(opentelemetry::logs::Severity severity) noexcept override; + + /** + * Set name for this log + * @param name the name to set + */ + virtual void SetName(nostd::string_view name) noexcept override; + + /** + * Set body field for this log. + * @param message the body to set + */ + virtual void SetBody(nostd::string_view message) noexcept override; + + /** + * Set a single resource of a log record. + * @param key the name of the resource to set + * @param value the resource value to set + */ + virtual void SetResource(nostd::string_view key, + const opentelemetry::common::AttributeValue &value) noexcept override; + + /** + * Set an attribute of a log. + * @param key the name of the attribute + * @param value the attribute value + */ + virtual void SetAttribute(nostd::string_view key, + const opentelemetry::common::AttributeValue &value) noexcept override; + + /** + * Set the trace id for this log. + * @param trace_id the trace id to set + */ + virtual void SetTraceId(opentelemetry::trace::TraceId trace_id) noexcept override; + + /** + * Set the span id for this log. + * @param span_id the span id to set + */ + virtual void SetSpanId(opentelemetry::trace::SpanId span_id) noexcept override; + + /** + * Inject trace_flags for this log. + * @param trace_flags the trace flags to set + */ + virtual void SetTraceFlags(opentelemetry::trace::TraceFlags trace_flags) noexcept override; + +private: + proto::logs::v1::LogRecord log_record_; + proto::resource::v1::Resource private_resource_; + // TODO shared resource + // const opentelemetry::sdk::resource::Resource *resource_ = nullptr; + // TODO InstrumentationLibrary + // const opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary + // *instrumentation_library_ = nullptr; +}; +} // namespace otlp +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE +#endif diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_recordable.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_recordable.h index dbb599880e..a29d063a92 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_recordable.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_recordable.h @@ -4,6 +4,7 @@ #pragma once #include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" + #include "opentelemetry/proto/resource/v1/resource.pb.h" #include "opentelemetry/proto/trace/v1/trace.pb.h" @@ -17,9 +18,10 @@ namespace exporter { namespace otlp { -class OtlpRecordable final : public sdk::trace::Recordable +class OtlpRecordable final : public opentelemetry::sdk::trace::Recordable { public: + proto::trace::v1::Span &span() noexcept { return span_; } const proto::trace::v1::Span &span() const noexcept { return span_; } /** Dynamically converts the resource of this span into a proto. */ diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_recordable_utils.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_recordable_utils.h new file mode 100644 index 0000000000..0c09bc8717 --- /dev/null +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_recordable_utils.h @@ -0,0 +1,63 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" + +#include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h" +#include "opentelemetry/proto/collector/trace/v1/trace_service.pb.h" +#include "opentelemetry/proto/resource/v1/resource.pb.h" + +#include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" + +#include "opentelemetry/common/attribute_value.h" +#include "opentelemetry/nostd/string_view.h" +#include "opentelemetry/version.h" + +#include "opentelemetry/sdk/common/attribute_utils.h" +#include "opentelemetry/sdk/resource/resource.h" +#include "opentelemetry/sdk/trace/recordable.h" + +#ifdef ENABLE_LOGS_PREVIEW +# include "opentelemetry/sdk/logs/recordable.h" +#endif + +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter +{ +namespace otlp +{ +/** + * The OtlpRecordableUtils contains utility functions for OTLP recordable + */ +class OtlpRecordableUtils +{ +public: + static void PopulateAttribute(opentelemetry::proto::common::v1::KeyValue *attribute, + nostd::string_view key, + const opentelemetry::common::AttributeValue &value) noexcept; + + static void PopulateAttribute( + opentelemetry::proto::common::v1::KeyValue *attribute, + nostd::string_view key, + const opentelemetry::sdk::common::OwnedAttributeValue &value) noexcept; + + static void PopulateAttribute(opentelemetry::proto::resource::v1::Resource *proto, + const opentelemetry::sdk::resource::Resource &resource) noexcept; + + static void PopulateRequest( + const nostd::span> &spans, + proto::collector::trace::v1::ExportTraceServiceRequest *request) noexcept; + +#ifdef ENABLE_LOGS_PREVIEW + static void PopulateRequest( + const nostd::span> &logs, + proto::collector::logs::v1::ExportLogsServiceRequest *request) noexcept; +#endif +}; +} // namespace otlp +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/protobuf_include_prefix.h b/exporters/otlp/include/opentelemetry/exporters/otlp/protobuf_include_prefix.h index d3947999cc..ae103b0173 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/protobuf_include_prefix.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/protobuf_include_prefix.h @@ -20,6 +20,8 @@ # pragma warning(disable : 4244) # pragma warning(disable : 4251) # pragma warning(disable : 4267) +# pragma warning(disable : 4668) +# pragma warning(disable : 4946) #endif #if defined(__GNUC__) && !defined(__clang__) && !defined(__apple_build_version__) diff --git a/exporters/otlp/src/otlp_grpc_exporter.cc b/exporters/otlp/src/otlp_grpc_exporter.cc index c4121d7dd8..90d8e4ee9f 100644 --- a/exporters/otlp/src/otlp_grpc_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_exporter.cc @@ -3,6 +3,7 @@ #include "opentelemetry/exporters/otlp/otlp_grpc_exporter.h" #include "opentelemetry/exporters/otlp/otlp_recordable.h" +#include "opentelemetry/exporters/otlp/otlp_recordable_utils.h" #include "opentelemetry/ext/http/common/url_parser.h" #include "opentelemetry/sdk_config.h" @@ -17,37 +18,6 @@ namespace otlp { // ----------------------------- Helper functions ------------------------------ - -/** - * Add span protobufs contained in recordables to request. - * @param spans the spans to export - * @param request the current request - */ -void PopulateRequest(const nostd::span> &spans, - proto::collector::trace::v1::ExportTraceServiceRequest *request) -{ - auto resource_span = request->add_resource_spans(); - auto instrumentation_lib = resource_span->add_instrumentation_library_spans(); - bool first_pass = true; - - for (auto &recordable : spans) - { - auto rec = std::unique_ptr(static_cast(recordable.release())); - *instrumentation_lib->add_spans() = std::move(rec->span()); - *instrumentation_lib->mutable_instrumentation_library() = rec->GetProtoInstrumentationLibrary(); - - if (first_pass) - { - *instrumentation_lib->mutable_schema_url() = rec->GetInstrumentationLibrarySchemaURL(); - - *resource_span->mutable_resource() = rec->ProtoResource(); - *resource_span->mutable_schema_url() = rec->GetResourceSchemaURL(); - - first_pass = false; - } - } -} - static std::string get_file_contents(const char *fpath) { std::ifstream finstream(fpath); @@ -125,8 +95,7 @@ sdk::common::ExportResult OtlpGrpcExporter::Export( const nostd::span> &spans) noexcept { proto::collector::trace::v1::ExportTraceServiceRequest request; - - PopulateRequest(spans, &request); + OtlpRecordableUtils::PopulateRequest(spans, &request); grpc::ClientContext context; proto::collector::trace::v1::ExportTraceServiceResponse response; diff --git a/exporters/otlp/src/otlp_http_client.cc b/exporters/otlp/src/otlp_http_client.cc new file mode 100644 index 0000000000..e20236877f --- /dev/null +++ b/exporters/otlp/src/otlp_http_client.cc @@ -0,0 +1,673 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/exporters/otlp/otlp_http_client.h" + +#if defined(HAVE_GSL) +# include +#else +# include +#endif + +#include "opentelemetry/ext/http/client/http_client_factory.h" +#include "opentelemetry/ext/http/common/url_parser.h" + +#include "nlohmann/json.hpp" + +#include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" + +#include "google/protobuf/message.h" +#include "google/protobuf/reflection.h" +#include "google/protobuf/stubs/common.h" + +#if defined(GOOGLE_PROTOBUF_VERSION) && GOOGLE_PROTOBUF_VERSION >= 3007000 +# include "google/protobuf/stubs/strutil.h" +#else +# include "google/protobuf/stubs/port.h" +namespace google +{ +namespace protobuf +{ +LIBPROTOBUF_EXPORT int Base64Escape(const unsigned char *src, int slen, char *dest, int szdest); +} // namespace protobuf +} // namespace google +#endif + +#include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" + +#include "opentelemetry/sdk/common/global_log_handler.h" +#include "opentelemetry/sdk_config.h" + +#include +#include +#include +#include +#include +#include + +#ifdef GetMessage +# undef GetMessage +#endif + +namespace nostd = opentelemetry::nostd; +namespace http_client = opentelemetry::ext::http::client; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter +{ +namespace otlp +{ + +namespace +{ + +/** + * This class handles the response message from the Elasticsearch request + */ +class ResponseHandler : public http_client::EventHandler +{ +public: + /** + * Creates a response handler, that by default doesn't display to console + */ + ResponseHandler(bool console_debug = false) : console_debug_{console_debug} {} + + /** + * Automatically called when the response is received, store the body into a string and notify any + * threads blocked on this result + */ + void OnResponse(http_client::Response &response) noexcept override + { + // Lock the private members so they can't be read while being modified + { + std::unique_lock lk(mutex_); + + // Store the body of the request + body_ = std::string(response.GetBody().begin(), response.GetBody().end()); + + if (console_debug_) + { + std::stringstream ss; + ss << "[OTLP HTTP Client] Status:" << response.GetStatusCode() << "Header:"; + response.ForEachHeader([&ss](opentelemetry::nostd::string_view header_name, + opentelemetry::nostd::string_view header_value) { + ss << "\t" << header_name.data() << " : " << header_value.data() << ","; + return true; + }); + ss << "Body:" << body_; + OTEL_INTERNAL_LOG_DEBUG(ss.str()); + } + + // Set the response_received_ flag to true and notify any threads waiting on this result + response_received_ = true; + } + cv_.notify_all(); + } + + /**resource + * A method the user calls to block their thread until the response is received. The longest + * duration is the timeout of the request, set by SetTimeoutMs() + */ + bool waitForResponse() + { + std::unique_lock lk(mutex_); + cv_.wait(lk); + return response_received_; + } + + /** + * Returns the body of the response + */ + std::string GetResponseBody() + { + // Lock so that body_ can't be written to while returning it + std::unique_lock lk(mutex_); + return body_; + } + + // Callback method when an http event occurs + void OnEvent(http_client::SessionState state, + opentelemetry::nostd::string_view reason) noexcept override + { + // If any failure event occurs, release the condition variable to unblock main thread + switch (state) + { + case http_client::SessionState::CreateFailed: + OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: session create failed"); + cv_.notify_all(); + break; + + case http_client::SessionState::Created: + if (console_debug_) + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: session created"); + } + break; + + case http_client::SessionState::Destroyed: + if (console_debug_) + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: session destroyed"); + } + break; + + case http_client::SessionState::Connecting: + if (console_debug_) + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: connecting to peer"); + } + break; + + case http_client::SessionState::ConnectFailed: + OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: connection failed"); + cv_.notify_all(); + break; + + case http_client::SessionState::Connected: + if (console_debug_) + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: connected"); + } + break; + + case http_client::SessionState::Sending: + if (console_debug_) + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: sending request"); + } + break; + + case http_client::SessionState::SendFailed: + OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: request send failed"); + cv_.notify_all(); + break; + + case http_client::SessionState::Response: + if (console_debug_) + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: response received"); + } + break; + + case http_client::SessionState::SSLHandshakeFailed: + OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: SSL handshake failed"); + cv_.notify_all(); + break; + + case http_client::SessionState::TimedOut: + OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: request time out"); + cv_.notify_all(); + break; + + case http_client::SessionState::NetworkError: + OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: network error"); + cv_.notify_all(); + break; + + case http_client::SessionState::ReadError: + if (console_debug_) + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: error reading response"); + } + break; + + case http_client::SessionState::WriteError: + if (console_debug_) + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] DEBUG:Session state: error writing request"); + } + break; + + case http_client::SessionState::Cancelled: + OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: (manually) cancelled\n"); + cv_.notify_all(); + break; + + default: + break; + } + } + +private: + // Define a condition variable and mutex + std::condition_variable cv_; + std::mutex mutex_; + + // Whether the response from Elasticsearch has been received + bool response_received_ = false; + + // A string to store the response body + std::string body_ = ""; + + // Whether to print the results from the callback + bool console_debug_ = false; +}; + +static inline char HexEncode(unsigned char byte) +{ +#if defined(HAVE_GSL) + Expects(byte <= 16); +#else + assert(byte <= 16); +#endif + if (byte >= 10) + { + return byte - 10 + 'a'; + } + else + { + return byte + '0'; + } +} + +static std::string HexEncode(const std::string &bytes) +{ + std::string ret; + ret.reserve(bytes.size() * 2); + for (std::string::size_type i = 0; i < bytes.size(); ++i) + { + unsigned char byte = static_cast(bytes[i]); + ret.push_back(HexEncode(byte >> 4)); + ret.push_back(HexEncode(byte & 0x0f)); + } + return ret; +} + +static std::string BytesMapping(const std::string &bytes, + const google::protobuf::FieldDescriptor *field_descriptor, + JsonBytesMappingKind kind) +{ + switch (kind) + { + case JsonBytesMappingKind::kHexId: { + if (field_descriptor->lowercase_name() == "trace_id" || + field_descriptor->lowercase_name() == "span_id" || + field_descriptor->lowercase_name() == "parent_span_id") + { + return HexEncode(bytes); + } + else + { + std::string base64_value; + google::protobuf::Base64Escape(bytes, &base64_value); + return base64_value; + } + } + case JsonBytesMappingKind::kBase64: { + // Base64 is the default bytes mapping of protobuf + std::string base64_value; + google::protobuf::Base64Escape(bytes, &base64_value); + return base64_value; + } + case JsonBytesMappingKind::kHex: + return HexEncode(bytes); + default: + return bytes; + } +} + +static void ConvertGenericFieldToJson(nlohmann::json &value, + const google::protobuf::Message &message, + const google::protobuf::FieldDescriptor *field_descriptor, + const OtlpHttpClientOptions &options); + +static void ConvertListFieldToJson(nlohmann::json &value, + const google::protobuf::Message &message, + const google::protobuf::FieldDescriptor *field_descriptor, + const OtlpHttpClientOptions &options); + +static void ConvertGenericMessageToJson(nlohmann::json &value, + const google::protobuf::Message &message, + const OtlpHttpClientOptions &options) +{ + std::vector fields_with_data; + message.GetReflection()->ListFields(message, &fields_with_data); + for (std::size_t i = 0; i < fields_with_data.size(); ++i) + { + const google::protobuf::FieldDescriptor *field_descriptor = fields_with_data[i]; + nlohmann::json &child_value = options.use_json_name ? value[field_descriptor->json_name()] + : value[field_descriptor->name()]; + if (field_descriptor->is_repeated()) + { + ConvertListFieldToJson(child_value, message, field_descriptor, options); + } + else + { + ConvertGenericFieldToJson(child_value, message, field_descriptor, options); + } + } +} + +void ConvertGenericFieldToJson(nlohmann::json &value, + const google::protobuf::Message &message, + const google::protobuf::FieldDescriptor *field_descriptor, + const OtlpHttpClientOptions &options) +{ + switch (field_descriptor->cpp_type()) + { + case google::protobuf::FieldDescriptor::CPPTYPE_INT32: { + value = message.GetReflection()->GetInt32(message, field_descriptor); + break; + } + case google::protobuf::FieldDescriptor::CPPTYPE_INT64: { + // According to Protobuf specs 64-bit integer numbers in JSON-encoded payloads are encoded as + // decimal strings, and either numbers or strings are accepted when decoding. + value = std::to_string(message.GetReflection()->GetInt64(message, field_descriptor)); + break; + } + case google::protobuf::FieldDescriptor::CPPTYPE_UINT32: { + value = message.GetReflection()->GetUInt32(message, field_descriptor); + break; + } + case google::protobuf::FieldDescriptor::CPPTYPE_UINT64: { + // According to Protobuf specs 64-bit integer numbers in JSON-encoded payloads are encoded as + // decimal strings, and either numbers or strings are accepted when decoding. + value = std::to_string(message.GetReflection()->GetUInt64(message, field_descriptor)); + break; + } + case google::protobuf::FieldDescriptor::CPPTYPE_STRING: { + std::string empty; + if (field_descriptor->type() == google::protobuf::FieldDescriptor::TYPE_BYTES) + { + value = BytesMapping( + message.GetReflection()->GetStringReference(message, field_descriptor, &empty), + field_descriptor, options.json_bytes_mapping); + } + else + { + value = message.GetReflection()->GetStringReference(message, field_descriptor, &empty); + } + break; + } + case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: { + ConvertGenericMessageToJson( + value, message.GetReflection()->GetMessage(message, field_descriptor, nullptr), options); + break; + } + case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE: { + value = message.GetReflection()->GetDouble(message, field_descriptor); + break; + } + case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT: { + value = message.GetReflection()->GetFloat(message, field_descriptor); + break; + } + case google::protobuf::FieldDescriptor::CPPTYPE_BOOL: { + value = message.GetReflection()->GetBool(message, field_descriptor); + break; + } + case google::protobuf::FieldDescriptor::CPPTYPE_ENUM: { + value = message.GetReflection()->GetEnumValue(message, field_descriptor); + break; + } + default: { + break; + } + } +} + +void ConvertListFieldToJson(nlohmann::json &value, + const google::protobuf::Message &message, + const google::protobuf::FieldDescriptor *field_descriptor, + const OtlpHttpClientOptions &options) +{ + auto field_size = message.GetReflection()->FieldSize(message, field_descriptor); + + switch (field_descriptor->cpp_type()) + { + case google::protobuf::FieldDescriptor::CPPTYPE_INT32: { + for (int i = 0; i < field_size; ++i) + { + value.push_back(message.GetReflection()->GetRepeatedInt32(message, field_descriptor, i)); + } + + break; + } + case google::protobuf::FieldDescriptor::CPPTYPE_INT64: { + for (int i = 0; i < field_size; ++i) + { + // According to Protobuf specs 64-bit integer numbers in JSON-encoded payloads are encoded + // as decimal strings, and either numbers or strings are accepted when decoding. + value.push_back(std::to_string( + message.GetReflection()->GetRepeatedInt64(message, field_descriptor, i))); + } + + break; + } + case google::protobuf::FieldDescriptor::CPPTYPE_UINT32: { + for (int i = 0; i < field_size; ++i) + { + value.push_back(message.GetReflection()->GetRepeatedUInt32(message, field_descriptor, i)); + } + + break; + } + case google::protobuf::FieldDescriptor::CPPTYPE_UINT64: { + for (int i = 0; i < field_size; ++i) + { + // According to Protobuf specs 64-bit integer numbers in JSON-encoded payloads are encoded + // as decimal strings, and either numbers or strings are accepted when decoding. + value.push_back(std::to_string( + message.GetReflection()->GetRepeatedUInt64(message, field_descriptor, i))); + } + + break; + } + case google::protobuf::FieldDescriptor::CPPTYPE_STRING: { + std::string empty; + if (field_descriptor->type() == google::protobuf::FieldDescriptor::TYPE_BYTES) + { + for (int i = 0; i < field_size; ++i) + { + value.push_back(BytesMapping(message.GetReflection()->GetRepeatedStringReference( + message, field_descriptor, i, &empty), + field_descriptor, options.json_bytes_mapping)); + } + } + else + { + for (int i = 0; i < field_size; ++i) + { + value.push_back(message.GetReflection()->GetRepeatedStringReference( + message, field_descriptor, i, &empty)); + } + } + break; + } + case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: { + for (int i = 0; i < field_size; ++i) + { + nlohmann::json sub_value; + ConvertGenericMessageToJson( + sub_value, message.GetReflection()->GetRepeatedMessage(message, field_descriptor, i), + options); + value.push_back(std::move(sub_value)); + } + + break; + } + case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE: { + for (int i = 0; i < field_size; ++i) + { + value.push_back(message.GetReflection()->GetRepeatedDouble(message, field_descriptor, i)); + } + + break; + } + case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT: { + for (int i = 0; i < field_size; ++i) + { + value.push_back(message.GetReflection()->GetRepeatedFloat(message, field_descriptor, i)); + } + + break; + } + case google::protobuf::FieldDescriptor::CPPTYPE_BOOL: { + for (int i = 0; i < field_size; ++i) + { + value.push_back(message.GetReflection()->GetRepeatedBool(message, field_descriptor, i)); + } + + break; + } + case google::protobuf::FieldDescriptor::CPPTYPE_ENUM: { + for (int i = 0; i < field_size; ++i) + { + value.push_back( + message.GetReflection()->GetRepeatedEnumValue(message, field_descriptor, i)); + } + break; + } + default: { + break; + } + } +} + +} // namespace + +OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions &&options) + : options_(options), http_client_(http_client::HttpClientFactory::Create()) +{} + +// ----------------------------- HTTP Client methods ------------------------------ +opentelemetry::sdk::common::ExportResult OtlpHttpClient::Export( + const google::protobuf::Message &message) noexcept +{ + // Return failure if this exporter has been shutdown + if (is_shutdown_) + { + const char *error_message = "[OTLP HTTP Client] Export failed, exporter is shutdown"; + if (options_.console_debug) + { + std::cerr << error_message << std::endl; + } + OTEL_INTERNAL_LOG_ERROR(error_message); + + return opentelemetry::sdk::common::ExportResult::kFailure; + } + + // Parse uri and store it to cache + if (http_uri_.empty()) + { + auto parse_url = opentelemetry::ext::http::common::UrlParser(std::string(options_.url)); + if (!parse_url.success_) + { + std::string error_message = "[OTLP HTTP Client] Export failed, invalid url: " + options_.url; + if (options_.console_debug) + { + std::cerr << error_message << std::endl; + } + OTEL_INTERNAL_LOG_ERROR(error_message.c_str()); + + return opentelemetry::sdk::common::ExportResult::kFailure; + } + + if (!parse_url.path_.empty() && parse_url.path_[0] == '/') + { + http_uri_ = parse_url.path_.substr(1); + } + else + { + http_uri_ = parse_url.path_; + } + } + + http_client::Body body_vec; + std::string content_type; + if (options_.content_type == HttpRequestContentType::kBinary) + { + body_vec.resize(message.ByteSizeLong()); + if (message.SerializeWithCachedSizesToArray( + reinterpret_cast(&body_vec[0]))) + { + if (options_.console_debug) + { + OTEL_INTERNAL_LOG_DEBUG( + "[OTLP HTTP Client] Request body(Binary): " << message.Utf8DebugString()); + } + } + else + { + if (options_.console_debug) + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Serialize body failed(Binary):" + << message.InitializationErrorString()); + } + return opentelemetry::sdk::common::ExportResult::kFailure; + } + content_type = kHttpBinaryContentType; + } + else + { + nlohmann::json json_request; + + // Convert from proto into json object + ConvertGenericMessageToJson(json_request, message, options_); + + std::string post_body_json = + json_request.dump(-1, ' ', false, nlohmann::detail::error_handler_t::replace); + if (options_.console_debug) + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Request body(Json)" << post_body_json); + } + body_vec.assign(post_body_json.begin(), post_body_json.end()); + content_type = kHttpJsonContentType; + } + + // Send the request + auto session = http_client_->CreateSession(options_.url); + auto request = session->CreateRequest(); + + for (auto &header : options_.http_headers) + { + request->AddHeader(header.first, header.second); + } + request->SetUri(http_uri_); + request->SetTimeoutMs(std::chrono::duration_cast(options_.timeout)); + request->SetMethod(http_client::Method::Post); + request->SetBody(body_vec); + request->ReplaceHeader("Content-Type", content_type); + + // Send the request + std::unique_ptr handler(new ResponseHandler(options_.console_debug)); + session->SendRequest(*handler); + + // Wait for the response to be received + if (options_.console_debug) + { + OTEL_INTERNAL_LOG_DEBUG( + "[OTLP HTTP Client] DEBUG: Waiting for response from " + << options_.url << " (timeout = " + << std::chrono::duration_cast(options_.timeout).count() + << " milliseconds)"); + } + bool write_successful = handler->waitForResponse(); + + // End the session + session->FinishSession(); + + // If an error occurred with the HTTP request + if (!write_successful) + { + // TODO: retry logic + return opentelemetry::sdk::common::ExportResult::kFailure; + } + + return opentelemetry::sdk::common::ExportResult::kSuccess; +} + +bool OtlpHttpClient::Shutdown(std::chrono::microseconds) noexcept +{ + is_shutdown_ = true; + + // Shutdown the session manager + http_client_->CancelAllSessions(); + http_client_->FinishAllSessions(); + + return true; +} + +} // namespace otlp +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index d27c9fac3b..8b017f25f1 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -3,36 +3,15 @@ #include "opentelemetry/exporters/otlp/otlp_http_exporter.h" #include "opentelemetry/exporters/otlp/otlp_recordable.h" -#include "opentelemetry/ext/http/client/http_client_factory.h" -#include "opentelemetry/ext/http/common/url_parser.h" - -#include "nlohmann/json.hpp" +#include "opentelemetry/exporters/otlp/otlp_recordable_utils.h" #include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" -#include "google/protobuf/message.h" -#include "google/protobuf/reflection.h" -#include "google/protobuf/stubs/stl_util.h" #include "opentelemetry/proto/collector/trace/v1/trace_service.pb.h" #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" -#include "opentelemetry/sdk/common/global_log_handler.h" -#include "opentelemetry/sdk_config.h" - -#include -#include -#include -#include -#include -#include - -#ifdef GetMessage -# undef GetMessage -#endif - -namespace nostd = opentelemetry::nostd; -namespace http_client = opentelemetry::ext::http::client; +namespace nostd = opentelemetry::nostd; OPENTELEMETRY_BEGIN_NAMESPACE namespace exporter @@ -40,649 +19,38 @@ namespace exporter namespace otlp { -namespace -{ - -/** - * This class handles the response message from the Elasticsearch request - */ -class ResponseHandler : public http_client::EventHandler -{ -public: - /** - * Creates a response handler, that by default doesn't display to console - */ - ResponseHandler(bool console_debug = false) : console_debug_{console_debug} {} - - /** - * Automatically called when the response is received, store the body into a string and notify any - * threads blocked on this result - */ - void OnResponse(http_client::Response &response) noexcept override - { - // Lock the private members so they can't be read while being modified - { - std::unique_lock lk(mutex_); - - // Store the body of the request - body_ = std::string(response.GetBody().begin(), response.GetBody().end()); - - if (console_debug_) - { - std::stringstream ss; - ss << "[OTLP HTTP Exporter] Status:" << response.GetStatusCode() << "Header:"; - response.ForEachHeader([&ss](opentelemetry::nostd::string_view header_name, - opentelemetry::nostd::string_view header_value) { - ss << "\t" << header_name.data() << " : " << header_value.data() << ","; - return true; - }); - ss << "Body:" << body_; - OTEL_INTERNAL_LOG_DEBUG(ss.str()); - } - - // Set the response_received_ flag to true and notify any threads waiting on this result - response_received_ = true; - } - cv_.notify_all(); - } - - /**resource - * A method the user calls to block their thread until the response is received. The longest - * duration is the timeout of the request, set by SetTimeoutMs() - */ - bool waitForResponse() - { - std::unique_lock lk(mutex_); - cv_.wait(lk); - return response_received_; - } - - /** - * Returns the body of the response - */ - std::string GetResponseBody() - { - // Lock so that body_ can't be written to while returning it - std::unique_lock lk(mutex_); - return body_; - } - - // Callback method when an http event occurs - void OnEvent(http_client::SessionState state, - opentelemetry::nostd::string_view reason) noexcept override - { - // If any failure event occurs, release the condition variable to unblock main thread - switch (state) - { - case http_client::SessionState::CreateFailed: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Exporter] Session state: session create failed"); - cv_.notify_all(); - break; - - case http_client::SessionState::Created: - if (console_debug_) - { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Exporter] Session state: session created"); - } - break; - - case http_client::SessionState::Destroyed: - if (console_debug_) - { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Exporter] Session state: session destroyed"); - } - break; - - case http_client::SessionState::Connecting: - if (console_debug_) - { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Exporter] Session state: connecting to peer"); - } - break; - - case http_client::SessionState::ConnectFailed: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Exporter] Session state: connection failed"); - cv_.notify_all(); - break; - - case http_client::SessionState::Connected: - if (console_debug_) - { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Exporter] Session state: connected"); - } - break; - - case http_client::SessionState::Sending: - if (console_debug_) - { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Exporter] Session state: sending request"); - } - break; - - case http_client::SessionState::SendFailed: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Exporter] Session state: request send failed"); - cv_.notify_all(); - break; - - case http_client::SessionState::Response: - if (console_debug_) - { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Exporter] Session state: response received"); - } - break; - - case http_client::SessionState::SSLHandshakeFailed: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Exporter] Session state: SSL handshake failed"); - cv_.notify_all(); - break; - - case http_client::SessionState::TimedOut: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Exporter] Session state: request time out"); - cv_.notify_all(); - break; - - case http_client::SessionState::NetworkError: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Exporter] Session state: network error"); - cv_.notify_all(); - break; - - case http_client::SessionState::ReadError: - if (console_debug_) - { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Exporter] Session state: error reading response"); - } - break; - - case http_client::SessionState::WriteError: - if (console_debug_) - { - OTEL_INTERNAL_LOG_DEBUG( - "[OTLP HTTP Exporter] DEBUG:Session state: error writing request"); - } - break; - - case http_client::SessionState::Cancelled: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Exporter] Session state: (manually) cancelled\n"); - cv_.notify_all(); - break; - - default: - break; - } - } - -private: - // Define a condition variable and mutex - std::condition_variable cv_; - std::mutex mutex_; - - // Whether the response from Elasticsearch has been received - bool response_received_ = false; - - // A string to store the response body - std::string body_ = ""; - - // Whether to print the results from the callback - bool console_debug_ = false; -}; - -static inline char HexEncode(unsigned char byte) -{ - if (byte >= 10) - { - return byte - 10 + 'a'; - } - else - { - return byte + '0'; - } -} - -static std::string HexEncode(const std::string &bytes) -{ - std::string ret; - ret.reserve(bytes.size() * 2); - for (std::string::size_type i = 0; i < bytes.size(); ++i) - { - unsigned char byte = static_cast(bytes[i]); - ret.push_back(HexEncode(byte >> 4)); - ret.push_back(HexEncode(byte & 0x0f)); - } - return ret; -} - -static std::string BytesMapping(const std::string &bytes, - const google::protobuf::FieldDescriptor *field_descriptor, - JsonBytesMappingKind kind) -{ - switch (kind) - { - case JsonBytesMappingKind::kHexId: { - if (field_descriptor->lowercase_name() == "trace_id" || - field_descriptor->lowercase_name() == "span_id" || - field_descriptor->lowercase_name() == "parent_span_id") - { - return HexEncode(bytes); - } - else - { - std::string base64_value; - google::protobuf::Base64Escape(bytes, &base64_value); - return base64_value; - } - } - case JsonBytesMappingKind::kBase64: { - // Base64 is the default bytes mapping of protobuf - std::string base64_value; - google::protobuf::Base64Escape(bytes, &base64_value); - return base64_value; - } - case JsonBytesMappingKind::kHex: - return HexEncode(bytes); - default: - return bytes; - } -} - -static void ConvertGenericFieldToJson(nlohmann::json &value, - const google::protobuf::Message &message, - const google::protobuf::FieldDescriptor *field_descriptor, - const OtlpHttpExporterOptions &options); - -static void ConvertListFieldToJson(nlohmann::json &value, - const google::protobuf::Message &message, - const google::protobuf::FieldDescriptor *field_descriptor, - const OtlpHttpExporterOptions &options); - -static void ConvertGenericMessageToJson(nlohmann::json &value, - const google::protobuf::Message &message, - const OtlpHttpExporterOptions &options) -{ - std::vector fields_with_data; - message.GetReflection()->ListFields(message, &fields_with_data); - for (std::size_t i = 0; i < fields_with_data.size(); ++i) - { - const google::protobuf::FieldDescriptor *field_descriptor = fields_with_data[i]; - nlohmann::json &child_value = options.use_json_name ? value[field_descriptor->json_name()] - : value[field_descriptor->name()]; - if (field_descriptor->is_repeated()) - { - ConvertListFieldToJson(child_value, message, field_descriptor, options); - } - else - { - ConvertGenericFieldToJson(child_value, message, field_descriptor, options); - } - } -} - -void ConvertGenericFieldToJson(nlohmann::json &value, - const google::protobuf::Message &message, - const google::protobuf::FieldDescriptor *field_descriptor, - const OtlpHttpExporterOptions &options) -{ - switch (field_descriptor->cpp_type()) - { - case google::protobuf::FieldDescriptor::CPPTYPE_INT32: { - value = message.GetReflection()->GetInt32(message, field_descriptor); - break; - } - case google::protobuf::FieldDescriptor::CPPTYPE_INT64: { - // According to Protobuf specs 64-bit integer numbers in JSON-encoded payloads are encoded as - // decimal strings, and either numbers or strings are accepted when decoding. - value = std::to_string(message.GetReflection()->GetInt64(message, field_descriptor)); - break; - } - case google::protobuf::FieldDescriptor::CPPTYPE_UINT32: { - value = message.GetReflection()->GetUInt32(message, field_descriptor); - break; - } - case google::protobuf::FieldDescriptor::CPPTYPE_UINT64: { - // According to Protobuf specs 64-bit integer numbers in JSON-encoded payloads are encoded as - // decimal strings, and either numbers or strings are accepted when decoding. - value = std::to_string(message.GetReflection()->GetUInt64(message, field_descriptor)); - break; - } - case google::protobuf::FieldDescriptor::CPPTYPE_STRING: { - std::string empty; - if (field_descriptor->type() == google::protobuf::FieldDescriptor::TYPE_BYTES) - { - value = BytesMapping( - message.GetReflection()->GetStringReference(message, field_descriptor, &empty), - field_descriptor, options.json_bytes_mapping); - } - else - { - value = message.GetReflection()->GetStringReference(message, field_descriptor, &empty); - } - break; - } - case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: { - ConvertGenericMessageToJson( - value, message.GetReflection()->GetMessage(message, field_descriptor, nullptr), options); - break; - } - case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE: { - value = message.GetReflection()->GetDouble(message, field_descriptor); - break; - } - case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT: { - value = message.GetReflection()->GetFloat(message, field_descriptor); - break; - } - case google::protobuf::FieldDescriptor::CPPTYPE_BOOL: { - value = message.GetReflection()->GetBool(message, field_descriptor); - break; - } - case google::protobuf::FieldDescriptor::CPPTYPE_ENUM: { - value = message.GetReflection()->GetEnumValue(message, field_descriptor); - break; - } - default: { - break; - } - } -} - -void ConvertListFieldToJson(nlohmann::json &value, - const google::protobuf::Message &message, - const google::protobuf::FieldDescriptor *field_descriptor, - const OtlpHttpExporterOptions &options) -{ - auto field_size = message.GetReflection()->FieldSize(message, field_descriptor); - - switch (field_descriptor->cpp_type()) - { - case google::protobuf::FieldDescriptor::CPPTYPE_INT32: { - for (int i = 0; i < field_size; ++i) - { - value.push_back(message.GetReflection()->GetRepeatedInt32(message, field_descriptor, i)); - } - - break; - } - case google::protobuf::FieldDescriptor::CPPTYPE_INT64: { - for (int i = 0; i < field_size; ++i) - { - // According to Protobuf specs 64-bit integer numbers in JSON-encoded payloads are encoded - // as decimal strings, and either numbers or strings are accepted when decoding. - value.push_back(std::to_string( - message.GetReflection()->GetRepeatedInt64(message, field_descriptor, i))); - } - - break; - } - case google::protobuf::FieldDescriptor::CPPTYPE_UINT32: { - for (int i = 0; i < field_size; ++i) - { - value.push_back(message.GetReflection()->GetRepeatedUInt32(message, field_descriptor, i)); - } - - break; - } - case google::protobuf::FieldDescriptor::CPPTYPE_UINT64: { - for (int i = 0; i < field_size; ++i) - { - // According to Protobuf specs 64-bit integer numbers in JSON-encoded payloads are encoded - // as decimal strings, and either numbers or strings are accepted when decoding. - value.push_back(std::to_string( - message.GetReflection()->GetRepeatedUInt64(message, field_descriptor, i))); - } - - break; - } - case google::protobuf::FieldDescriptor::CPPTYPE_STRING: { - std::string empty; - if (field_descriptor->type() == google::protobuf::FieldDescriptor::TYPE_BYTES) - { - for (int i = 0; i < field_size; ++i) - { - value.push_back(BytesMapping(message.GetReflection()->GetRepeatedStringReference( - message, field_descriptor, i, &empty), - field_descriptor, options.json_bytes_mapping)); - } - } - else - { - for (int i = 0; i < field_size; ++i) - { - value.push_back(message.GetReflection()->GetRepeatedStringReference( - message, field_descriptor, i, &empty)); - } - } - break; - } - case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: { - for (int i = 0; i < field_size; ++i) - { - nlohmann::json sub_value; - ConvertGenericMessageToJson( - sub_value, message.GetReflection()->GetRepeatedMessage(message, field_descriptor, i), - options); - value.push_back(std::move(sub_value)); - } - - break; - } - case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE: { - for (int i = 0; i < field_size; ++i) - { - value.push_back(message.GetReflection()->GetRepeatedDouble(message, field_descriptor, i)); - } - - break; - } - case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT: { - for (int i = 0; i < field_size; ++i) - { - value.push_back(message.GetReflection()->GetRepeatedFloat(message, field_descriptor, i)); - } - - break; - } - case google::protobuf::FieldDescriptor::CPPTYPE_BOOL: { - for (int i = 0; i < field_size; ++i) - { - value.push_back(message.GetReflection()->GetRepeatedBool(message, field_descriptor, i)); - } - - break; - } - case google::protobuf::FieldDescriptor::CPPTYPE_ENUM: { - for (int i = 0; i < field_size; ++i) - { - value.push_back( - message.GetReflection()->GetRepeatedEnumValue(message, field_descriptor, i)); - } - break; - } - default: { - break; - } - } -} - -/** - * Add span protobufs contained in recordables to request. - * @param spans the spans to export - * @param request the current request - */ -static void PopulateRequest( - const nostd::span> &spans, - opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest *request) -{ - auto resource_span = request->add_resource_spans(); - auto instrumentation_lib = resource_span->add_instrumentation_library_spans(); - bool has_resource = false; - - for (auto &recordable : spans) - { - auto rec = std::unique_ptr( - static_cast(recordable.release())); - *instrumentation_lib->add_spans() = std::move(rec->span()); - - if (!has_resource) - { - *resource_span->mutable_resource() = rec->ProtoResource(); - has_resource = true; - } - } -} - -} // namespace - OtlpHttpExporter::OtlpHttpExporter() : OtlpHttpExporter(OtlpHttpExporterOptions()) {} OtlpHttpExporter::OtlpHttpExporter(const OtlpHttpExporterOptions &options) - : options_(options), http_client_(http_client::HttpClientFactory::Create()) + : options_(options), + http_client_(OtlpHttpClientOptions(options.url, + options.content_type, + options.json_bytes_mapping, + options.use_json_name, + options.console_debug, + options.timeout, + options.http_headers)) {} // ----------------------------- Exporter methods ------------------------------ -std::unique_ptr OtlpHttpExporter::MakeRecordable() noexcept +std::unique_ptr OtlpHttpExporter::MakeRecordable() noexcept { - return std::unique_ptr(new exporter::otlp::OtlpRecordable()); + return std::unique_ptr( + new exporter::otlp::OtlpRecordable()); } -sdk::common::ExportResult OtlpHttpExporter::Export( - const nostd::span> &spans) noexcept +opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export( + const nostd::span> &spans) noexcept { - // Return failure if this exporter has been shutdown - if (is_shutdown_) - { - const char *error_message = "[OTLP HTTP Exporter] Export failed, exporter is shutdown"; - if (options_.console_debug) - { - std::cerr << error_message << std::endl; - } - OTEL_INTERNAL_LOG_ERROR(error_message); - - return sdk::common::ExportResult::kFailure; - } - - // Parse uri and store it to cache - if (http_uri_.empty()) - { - auto parse_url = opentelemetry::ext::http::common::UrlParser(std::string(options_.url)); - if (!parse_url.success_) - { - std::string error_message = - "[OTLP HTTP Exporter] Export failed, invalid url: " + options_.url; - if (options_.console_debug) - { - std::cerr << error_message << std::endl; - } - OTEL_INTERNAL_LOG_ERROR(error_message.c_str()); - - return sdk::common::ExportResult::kFailure; - } - - if (!parse_url.path_.empty() && parse_url.path_[0] == '/') - { - http_uri_ = parse_url.path_.substr(1); - } - else - { - http_uri_ = parse_url.path_; - } - } - proto::collector::trace::v1::ExportTraceServiceRequest service_request; - PopulateRequest(spans, &service_request); - - http_client::Body body_vec; - std::string content_type; - if (options_.content_type == HttpRequestContentType::kBinary) - { - body_vec.resize(service_request.ByteSizeLong()); - if (service_request.SerializeWithCachedSizesToArray( - reinterpret_cast(&body_vec[0]))) - { - if (options_.console_debug) - { - OTEL_INTERNAL_LOG_DEBUG( - "[OTLP HTTP Exporter] Request body(Binary): " << service_request.Utf8DebugString()); - } - } - else - { - if (options_.console_debug) - { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Exporter] Serialize body failed(Binary):" - << service_request.InitializationErrorString()); - } - return sdk::common::ExportResult::kFailure; - } - content_type = kHttpBinaryContentType; - } - else - { - nlohmann::json json_request; - - // Convert from proto into json object - ConvertGenericMessageToJson(json_request, service_request, options_); - - std::string post_body_json = - json_request.dump(-1, ' ', false, nlohmann::detail::error_handler_t::replace); - if (options_.console_debug) - { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Exporter] Request body(Json)" << post_body_json); - } - body_vec.assign(post_body_json.begin(), post_body_json.end()); - content_type = kHttpJsonContentType; - } - - // Send the request - auto session = http_client_->CreateSession(options_.url); - auto request = session->CreateRequest(); - - for (auto &header : options_.http_headers) - { - request->AddHeader(header.first, header.second); - } - request->SetUri(http_uri_); - request->SetTimeoutMs(std::chrono::duration_cast(options_.timeout)); - request->SetMethod(http_client::Method::Post); - request->SetBody(body_vec); - request->ReplaceHeader("Content-Type", content_type); - - // Send the request - std::unique_ptr handler(new ResponseHandler(options_.console_debug)); - session->SendRequest(*handler); - - // Wait for the response to be received - if (options_.console_debug) - { - OTEL_INTERNAL_LOG_DEBUG( - "[OTLP HTTP Exporter] DEBUG: Waiting for response from " - << options_.url << " (timeout = " - << std::chrono::duration_cast(options_.timeout).count() - << " milliseconds)"); - } - bool write_successful = handler->waitForResponse(); - - // End the session - session->FinishSession(); - - // If an error occurred with the HTTP request - if (!write_successful) - { - // TODO: retry logic - return sdk::common::ExportResult::kFailure; - } - - return sdk::common::ExportResult::kSuccess; + OtlpRecordableUtils::PopulateRequest(spans, &service_request); + return http_client_.Export(service_request); } -bool OtlpHttpExporter::Shutdown(std::chrono::microseconds) noexcept +bool OtlpHttpExporter::Shutdown(std::chrono::microseconds timeout) noexcept { - is_shutdown_ = true; - - // Shutdown the session manager - http_client_->CancelAllSessions(); - http_client_->FinishAllSessions(); - - return true; + return http_client_.Shutdown(timeout); } } // namespace otlp diff --git a/exporters/otlp/src/otlp_http_log_exporter.cc b/exporters/otlp/src/otlp_http_log_exporter.cc new file mode 100644 index 0000000000..f4cc6a9f9c --- /dev/null +++ b/exporters/otlp/src/otlp_http_log_exporter.cc @@ -0,0 +1,62 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifdef ENABLE_LOGS_PREVIEW + +# include "opentelemetry/exporters/otlp/otlp_http_log_exporter.h" +# include "opentelemetry/exporters/otlp/otlp_log_recordable.h" +# include "opentelemetry/exporters/otlp/otlp_recordable_utils.h" + +# include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" + +# include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h" + +# include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" + +namespace nostd = opentelemetry::nostd; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter +{ +namespace otlp +{ + +OtlpHttpLogExporter::OtlpHttpLogExporter() : OtlpHttpLogExporter(OtlpHttpLogExporterOptions()) {} + +OtlpHttpLogExporter::OtlpHttpLogExporter(const OtlpHttpLogExporterOptions &options) + : options_(options), + http_client_(OtlpHttpClientOptions(options.url, + options.content_type, + options.json_bytes_mapping, + options.use_json_name, + options.console_debug, + options.timeout, + options.http_headers)) +{} + +// ----------------------------- Exporter methods ------------------------------ + +std::unique_ptr OtlpHttpLogExporter::MakeRecordable() noexcept +{ + return std::unique_ptr( + new exporter::otlp::OtlpLogRecordable()); +} + +opentelemetry::sdk::common::ExportResult OtlpHttpLogExporter::Export( + const nostd::span> &logs) noexcept +{ + proto::collector::logs::v1::ExportLogsServiceRequest service_request; + OtlpRecordableUtils::PopulateRequest(logs, &service_request); + return http_client_.Export(service_request); +} + +bool OtlpHttpLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept +{ + return http_client_.Shutdown(timeout); +} + +} // namespace otlp +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE + +#endif diff --git a/exporters/otlp/src/otlp_log_recordable.cc b/exporters/otlp/src/otlp_log_recordable.cc new file mode 100644 index 0000000000..1950a31ab8 --- /dev/null +++ b/exporters/otlp/src/otlp_log_recordable.cc @@ -0,0 +1,204 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifdef ENABLE_LOGS_PREVIEW + +# include "opentelemetry/exporters/otlp/otlp_log_recordable.h" + +# include "opentelemetry/exporters/otlp/otlp_recordable_utils.h" + +namespace nostd = opentelemetry::nostd; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter +{ +namespace otlp +{ + +proto::resource::v1::Resource OtlpLogRecordable::ProtoResource() const noexcept +{ + // TODO Populate shared resource + return private_resource_; +} + +void OtlpLogRecordable::SetTimestamp(opentelemetry::common::SystemTimestamp timestamp) noexcept +{ + log_record_.set_time_unix_nano(timestamp.time_since_epoch().count()); +} + +void OtlpLogRecordable::SetSeverity(opentelemetry::logs::Severity severity) noexcept +{ + switch (severity) + { + case opentelemetry::logs::Severity::kTrace: { + log_record_.set_severity_text("TRACE"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_TRACE); + break; + } + case opentelemetry::logs::Severity::kTrace2: { + log_record_.set_severity_text("TRACE2"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_TRACE2); + break; + } + case opentelemetry::logs::Severity::kTrace3: { + log_record_.set_severity_text("TRACE3"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_TRACE3); + break; + } + case opentelemetry::logs::Severity::kTrace4: { + log_record_.set_severity_text("TRACE4"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_TRACE4); + break; + } + case opentelemetry::logs::Severity::kDebug: { + log_record_.set_severity_text("DEBUG"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_DEBUG); + break; + } + case opentelemetry::logs::Severity::kDebug2: { + log_record_.set_severity_text("DEBUG2"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_DEBUG2); + break; + } + case opentelemetry::logs::Severity::kDebug3: { + log_record_.set_severity_text("DEBUG3"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_DEBUG3); + break; + } + case opentelemetry::logs::Severity::kDebug4: { + log_record_.set_severity_text("DEBUG4"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_DEBUG4); + break; + } + case opentelemetry::logs::Severity::kInfo: { + log_record_.set_severity_text("INFO"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_INFO); + break; + } + case opentelemetry::logs::Severity::kInfo2: { + log_record_.set_severity_text("INFO2"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_INFO2); + break; + } + case opentelemetry::logs::Severity::kInfo3: { + log_record_.set_severity_text("INFO3"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_INFO3); + break; + } + case opentelemetry::logs::Severity::kInfo4: { + log_record_.set_severity_text("INFO4"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_INFO4); + break; + } + case opentelemetry::logs::Severity::kWarn: { + log_record_.set_severity_text("WARN"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_WARN); + break; + } + case opentelemetry::logs::Severity::kWarn2: { + log_record_.set_severity_text("WARN2"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_WARN2); + break; + } + case opentelemetry::logs::Severity::kWarn3: { + log_record_.set_severity_text("WARN3"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_WARN3); + break; + } + case opentelemetry::logs::Severity::kWarn4: { + log_record_.set_severity_text("WARN4"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_WARN4); + break; + } + case opentelemetry::logs::Severity::kError: { + log_record_.set_severity_text("ERROR"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_ERROR); + break; + } + case opentelemetry::logs::Severity::kError2: { + log_record_.set_severity_text("ERROR2"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_ERROR2); + break; + } + case opentelemetry::logs::Severity::kError3: { + log_record_.set_severity_text("ERROR3"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_ERROR3); + break; + } + case opentelemetry::logs::Severity::kError4: { + log_record_.set_severity_text("ERROR4"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_ERROR4); + break; + } + case opentelemetry::logs::Severity::kFatal: { + log_record_.set_severity_text("FATAL"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_FATAL); + break; + } + case opentelemetry::logs::Severity::kFatal2: { + log_record_.set_severity_text("FATAL2"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_FATAL2); + break; + } + case opentelemetry::logs::Severity::kFatal3: { + log_record_.set_severity_text("FATAL3"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_FATAL3); + break; + } + case opentelemetry::logs::Severity::kFatal4: { + log_record_.set_severity_text("FATAL4"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_FATAL4); + break; + } + default: { + log_record_.set_severity_text("INVALID"); + log_record_.set_severity_number(proto::logs::v1::SEVERITY_NUMBER_UNSPECIFIED); + break; + } + } +} + +void OtlpLogRecordable::SetName(nostd::string_view name) noexcept +{ + log_record_.set_name(name.data(), name.size()); +} + +void OtlpLogRecordable::SetBody(nostd::string_view message) noexcept +{ + log_record_.mutable_body()->set_string_value(message.data(), message.size()); +} + +void OtlpLogRecordable::SetResource(nostd::string_view key, + const opentelemetry::common::AttributeValue &value) noexcept +{ + OtlpRecordableUtils::PopulateAttribute(private_resource_.add_attributes(), key, value); +} + +void OtlpLogRecordable::SetAttribute(nostd::string_view key, + const opentelemetry::common::AttributeValue &value) noexcept +{ + OtlpRecordableUtils::PopulateAttribute(log_record_.add_attributes(), key, value); +} + +void OtlpLogRecordable::SetTraceId(opentelemetry::trace::TraceId trace_id) noexcept +{ + log_record_.set_trace_id(reinterpret_cast(trace_id.Id().data()), + trace::TraceId::kSize); +} + +void OtlpLogRecordable::SetSpanId(opentelemetry::trace::SpanId span_id) noexcept +{ + log_record_.set_span_id(reinterpret_cast(span_id.Id().data()), + trace::SpanId::kSize); +} + +void OtlpLogRecordable::SetTraceFlags(opentelemetry::trace::TraceFlags trace_flags) noexcept +{ + log_record_.set_flags(trace_flags.flags()); +} + +} // namespace otlp +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE + +#endif diff --git a/exporters/otlp/src/otlp_recordable.cc b/exporters/otlp/src/otlp_recordable.cc index b319a5ea9c..9b63140f4b 100644 --- a/exporters/otlp/src/otlp_recordable.cc +++ b/exporters/otlp/src/otlp_recordable.cc @@ -3,18 +3,16 @@ #include "opentelemetry/exporters/otlp/otlp_recordable.h" +#include "opentelemetry/exporters/otlp/otlp_recordable_utils.h" + +namespace nostd = opentelemetry::nostd; + OPENTELEMETRY_BEGIN_NAMESPACE namespace exporter { namespace otlp { -// -// See `attribute_value.h` for details. -// -const int kAttributeValueSize = 16; -const int kOwnedAttributeValueSize = 15; - void OtlpRecordable::SetIdentity(const opentelemetry::trace::SpanContext &span_context, opentelemetry::trace::SpanId parent_span_id) noexcept { @@ -27,211 +25,12 @@ void OtlpRecordable::SetIdentity(const opentelemetry::trace::SpanContext &span_c span_.set_trace_state(span_context.trace_state()->ToHeader()); } -void PopulateAttribute(opentelemetry::proto::common::v1::KeyValue *attribute, - nostd::string_view key, - const opentelemetry::common::AttributeValue &value) -{ - // Assert size of variant to ensure that this method gets updated if the variant - // definition changes - static_assert( - nostd::variant_size::value == kAttributeValueSize, - "AttributeValue contains unknown type"); - - attribute->set_key(key.data(), key.size()); - - if (nostd::holds_alternative(value)) - { - attribute->mutable_value()->set_bool_value(nostd::get(value)); - } - else if (nostd::holds_alternative(value)) - { - attribute->mutable_value()->set_int_value(nostd::get(value)); - } - else if (nostd::holds_alternative(value)) - { - attribute->mutable_value()->set_int_value(nostd::get(value)); - } - else if (nostd::holds_alternative(value)) - { - attribute->mutable_value()->set_int_value(nostd::get(value)); - } - else if (nostd::holds_alternative(value)) - { - attribute->mutable_value()->set_int_value(nostd::get(value)); - } - else if (nostd::holds_alternative(value)) - { - attribute->mutable_value()->set_double_value(nostd::get(value)); - } - else if (nostd::holds_alternative(value)) - { - attribute->mutable_value()->set_string_value(nostd::get(value)); - } - else if (nostd::holds_alternative(value)) - { - attribute->mutable_value()->set_string_value(nostd::get(value).data(), - nostd::get(value).size()); - } - else if (nostd::holds_alternative>(value)) - { - for (const auto &val : nostd::get>(value)) - { - attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val); - } - } - else if (nostd::holds_alternative>(value)) - { - for (const auto &val : nostd::get>(value)) - { - attribute->mutable_value()->mutable_array_value()->add_values()->set_bool_value(val); - } - } - else if (nostd::holds_alternative>(value)) - { - for (const auto &val : nostd::get>(value)) - { - attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val); - } - } - else if (nostd::holds_alternative>(value)) - { - for (const auto &val : nostd::get>(value)) - { - attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val); - } - } - else if (nostd::holds_alternative>(value)) - { - for (const auto &val : nostd::get>(value)) - { - attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val); - } - } - else if (nostd::holds_alternative>(value)) - { - for (const auto &val : nostd::get>(value)) - { - attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val); - } - } - else if (nostd::holds_alternative>(value)) - { - for (const auto &val : nostd::get>(value)) - { - attribute->mutable_value()->mutable_array_value()->add_values()->set_double_value(val); - } - } - else if (nostd::holds_alternative>(value)) - { - for (const auto &val : nostd::get>(value)) - { - attribute->mutable_value()->mutable_array_value()->add_values()->set_string_value(val.data(), - val.size()); - } - } -} - -/** Maps from C++ attribute into OTLP proto attribute. */ -void PopulateAttribute(opentelemetry::proto::common::v1::KeyValue *attribute, - nostd::string_view key, - const sdk::common::OwnedAttributeValue &value) -{ - // Assert size of variant to ensure that this method gets updated if the variant - // definition changes - static_assert( - nostd::variant_size::value == kOwnedAttributeValueSize, - "OwnedAttributeValue contains unknown type"); - - attribute->set_key(key.data(), key.size()); - - if (nostd::holds_alternative(value)) - { - attribute->mutable_value()->set_bool_value(nostd::get(value)); - } - else if (nostd::holds_alternative(value)) - { - attribute->mutable_value()->set_int_value(nostd::get(value)); - } - else if (nostd::holds_alternative(value)) - { - attribute->mutable_value()->set_int_value(nostd::get(value)); - } - else if (nostd::holds_alternative(value)) - { - attribute->mutable_value()->set_int_value(nostd::get(value)); - } - else if (nostd::holds_alternative(value)) - { - attribute->mutable_value()->set_int_value(nostd::get(value)); - } - else if (nostd::holds_alternative(value)) - { - attribute->mutable_value()->set_double_value(nostd::get(value)); - } - else if (nostd::holds_alternative(value)) - { - attribute->mutable_value()->set_string_value(nostd::get(value)); - } - else if (nostd::holds_alternative>(value)) - { - for (const auto &val : nostd::get>(value)) - { - attribute->mutable_value()->mutable_array_value()->add_values()->set_bool_value(val); - } - } - else if (nostd::holds_alternative>(value)) - { - for (const auto &val : nostd::get>(value)) - { - attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val); - } - } - else if (nostd::holds_alternative>(value)) - { - for (const auto &val : nostd::get>(value)) - { - attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val); - } - } - else if (nostd::holds_alternative>(value)) - { - for (const auto &val : nostd::get>(value)) - { - attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val); - } - } - else if (nostd::holds_alternative>(value)) - { - for (const auto &val : nostd::get>(value)) - { - attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val); - } - } - else if (nostd::holds_alternative>(value)) - { - for (const auto &val : nostd::get>(value)) - { - attribute->mutable_value()->mutable_array_value()->add_values()->set_double_value(val); - } - } - else if (nostd::holds_alternative>(value)) - { - for (const auto &val : nostd::get>(value)) - { - attribute->mutable_value()->mutable_array_value()->add_values()->set_string_value(val); - } - } -} - proto::resource::v1::Resource OtlpRecordable::ProtoResource() const noexcept { proto::resource::v1::Resource proto; if (resource_) { - for (const auto &kv : resource_->GetAttributes()) - { - PopulateAttribute(proto.add_attributes(), kv.first, kv.second); - } + OtlpRecordableUtils::PopulateAttribute(&proto, *resource_); } return proto; @@ -280,7 +79,7 @@ void OtlpRecordable::SetAttribute(nostd::string_view key, const opentelemetry::common::AttributeValue &value) noexcept { auto *attribute = span_.add_attributes(); - PopulateAttribute(attribute, key, value); + OtlpRecordableUtils::PopulateAttribute(attribute, key, value); } void OtlpRecordable::AddEvent(nostd::string_view name, @@ -292,7 +91,7 @@ void OtlpRecordable::AddEvent(nostd::string_view name, event->set_time_unix_nano(timestamp.time_since_epoch().count()); attributes.ForEachKeyValue([&](nostd::string_view key, common::AttributeValue value) noexcept { - PopulateAttribute(event->add_attributes(), key, value); + OtlpRecordableUtils::PopulateAttribute(event->add_attributes(), key, value); return true; }); } @@ -307,7 +106,7 @@ void OtlpRecordable::AddLink(const opentelemetry::trace::SpanContext &span_conte trace::SpanId::kSize); link->set_trace_state(span_context.trace_state()->ToHeader()); attributes.ForEachKeyValue([&](nostd::string_view key, common::AttributeValue value) noexcept { - PopulateAttribute(link->add_attributes(), key, value); + OtlpRecordableUtils::PopulateAttribute(link->add_attributes(), key, value); return true; }); } diff --git a/exporters/otlp/src/otlp_recordable_utils.cc b/exporters/otlp/src/otlp_recordable_utils.cc new file mode 100644 index 0000000000..483f01f905 --- /dev/null +++ b/exporters/otlp/src/otlp_recordable_utils.cc @@ -0,0 +1,320 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/exporters/otlp/otlp_recordable_utils.h" + +#include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" + +#include "opentelemetry/proto/logs/v1/logs.pb.h" +#include "opentelemetry/proto/trace/v1/trace.pb.h" + +#include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" + +#include "opentelemetry/exporters/otlp/otlp_log_recordable.h" +#include "opentelemetry/exporters/otlp/otlp_recordable.h" + +namespace nostd = opentelemetry::nostd; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter +{ +namespace otlp +{ + +// +// See `attribute_value.h` for details. +// +const int kAttributeValueSize = 16; +const int kOwnedAttributeValueSize = 15; + +void OtlpRecordableUtils::PopulateAttribute( + opentelemetry::proto::common::v1::KeyValue *attribute, + nostd::string_view key, + const opentelemetry::common::AttributeValue &value) noexcept +{ + if (nullptr == attribute) + { + return; + } + + // Assert size of variant to ensure that this method gets updated if the variant + // definition changes + static_assert( + nostd::variant_size::value == kAttributeValueSize, + "AttributeValue contains unknown type"); + + attribute->set_key(key.data(), key.size()); + + if (nostd::holds_alternative(value)) + { + attribute->mutable_value()->set_bool_value(nostd::get(value)); + } + else if (nostd::holds_alternative(value)) + { + attribute->mutable_value()->set_int_value(nostd::get(value)); + } + else if (nostd::holds_alternative(value)) + { + attribute->mutable_value()->set_int_value(nostd::get(value)); + } + else if (nostd::holds_alternative(value)) + { + attribute->mutable_value()->set_int_value(nostd::get(value)); + } + else if (nostd::holds_alternative(value)) + { + attribute->mutable_value()->set_int_value(nostd::get(value)); + } + else if (nostd::holds_alternative(value)) + { + attribute->mutable_value()->set_double_value(nostd::get(value)); + } + else if (nostd::holds_alternative(value)) + { + attribute->mutable_value()->set_string_value(nostd::get(value)); + } + else if (nostd::holds_alternative(value)) + { + attribute->mutable_value()->set_string_value(nostd::get(value).data(), + nostd::get(value).size()); + } + else if (nostd::holds_alternative>(value)) + { + for (const auto &val : nostd::get>(value)) + { + attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val); + } + } + else if (nostd::holds_alternative>(value)) + { + for (const auto &val : nostd::get>(value)) + { + attribute->mutable_value()->mutable_array_value()->add_values()->set_bool_value(val); + } + } + else if (nostd::holds_alternative>(value)) + { + for (const auto &val : nostd::get>(value)) + { + attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val); + } + } + else if (nostd::holds_alternative>(value)) + { + for (const auto &val : nostd::get>(value)) + { + attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val); + } + } + else if (nostd::holds_alternative>(value)) + { + for (const auto &val : nostd::get>(value)) + { + attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val); + } + } + else if (nostd::holds_alternative>(value)) + { + for (const auto &val : nostd::get>(value)) + { + attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val); + } + } + else if (nostd::holds_alternative>(value)) + { + for (const auto &val : nostd::get>(value)) + { + attribute->mutable_value()->mutable_array_value()->add_values()->set_double_value(val); + } + } + else if (nostd::holds_alternative>(value)) + { + for (const auto &val : nostd::get>(value)) + { + attribute->mutable_value()->mutable_array_value()->add_values()->set_string_value(val.data(), + val.size()); + } + } +} + +/** Maps from C++ attribute into OTLP proto attribute. */ +void OtlpRecordableUtils::PopulateAttribute( + opentelemetry::proto::common::v1::KeyValue *attribute, + nostd::string_view key, + const opentelemetry::sdk::common::OwnedAttributeValue &value) noexcept +{ + if (nullptr == attribute) + { + return; + } + + // Assert size of variant to ensure that this method gets updated if the variant + // definition changes + static_assert(nostd::variant_size::value == + kOwnedAttributeValueSize, + "OwnedAttributeValue contains unknown type"); + + attribute->set_key(key.data(), key.size()); + + if (nostd::holds_alternative(value)) + { + attribute->mutable_value()->set_bool_value(nostd::get(value)); + } + else if (nostd::holds_alternative(value)) + { + attribute->mutable_value()->set_int_value(nostd::get(value)); + } + else if (nostd::holds_alternative(value)) + { + attribute->mutable_value()->set_int_value(nostd::get(value)); + } + else if (nostd::holds_alternative(value)) + { + attribute->mutable_value()->set_int_value(nostd::get(value)); + } + else if (nostd::holds_alternative(value)) + { + attribute->mutable_value()->set_int_value(nostd::get(value)); + } + else if (nostd::holds_alternative(value)) + { + attribute->mutable_value()->set_double_value(nostd::get(value)); + } + else if (nostd::holds_alternative(value)) + { + attribute->mutable_value()->set_string_value(nostd::get(value)); + } + else if (nostd::holds_alternative>(value)) + { + for (const auto &val : nostd::get>(value)) + { + attribute->mutable_value()->mutable_array_value()->add_values()->set_bool_value(val); + } + } + else if (nostd::holds_alternative>(value)) + { + for (const auto &val : nostd::get>(value)) + { + attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val); + } + } + else if (nostd::holds_alternative>(value)) + { + for (const auto &val : nostd::get>(value)) + { + attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val); + } + } + else if (nostd::holds_alternative>(value)) + { + for (const auto &val : nostd::get>(value)) + { + attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val); + } + } + else if (nostd::holds_alternative>(value)) + { + for (const auto &val : nostd::get>(value)) + { + attribute->mutable_value()->mutable_array_value()->add_values()->set_int_value(val); + } + } + else if (nostd::holds_alternative>(value)) + { + for (const auto &val : nostd::get>(value)) + { + attribute->mutable_value()->mutable_array_value()->add_values()->set_double_value(val); + } + } + else if (nostd::holds_alternative>(value)) + { + for (const auto &val : nostd::get>(value)) + { + attribute->mutable_value()->mutable_array_value()->add_values()->set_string_value(val); + } + } +} + +void OtlpRecordableUtils::PopulateAttribute( + opentelemetry::proto::resource::v1::Resource *proto, + const opentelemetry::sdk::resource::Resource &resource) noexcept +{ + if (nullptr == proto) + { + return; + } + + for (const auto &kv : resource.GetAttributes()) + { + OtlpRecordableUtils::PopulateAttribute(proto->add_attributes(), kv.first, kv.second); + } +} + +void OtlpRecordableUtils::PopulateRequest( + const nostd::span> &spans, + proto::collector::trace::v1::ExportTraceServiceRequest *request) noexcept +{ + if (nullptr == request) + { + return; + } + + auto resource_span = request->add_resource_spans(); + auto instrumentation_lib = resource_span->add_instrumentation_library_spans(); + bool first_pass = true; + + for (auto &recordable : spans) + { + auto rec = std::unique_ptr(static_cast(recordable.release())); + *instrumentation_lib->add_spans() = std::move(rec->span()); + *instrumentation_lib->mutable_instrumentation_library() = rec->GetProtoInstrumentationLibrary(); + + if (first_pass) + { + instrumentation_lib->set_schema_url(rec->GetInstrumentationLibrarySchemaURL()); + + *resource_span->mutable_resource() = rec->ProtoResource(); + resource_span->set_schema_url(rec->GetResourceSchemaURL()); + + first_pass = false; + } + } +} + +#ifdef ENABLE_LOGS_PREVIEW +void OtlpRecordableUtils::PopulateRequest( + const nostd::span> &logs, + proto::collector::logs::v1::ExportLogsServiceRequest *request) noexcept +{ + if (nullptr == request) + { + return; + } + + for (auto &recordable : logs) + { + auto resource_logs = request->add_resource_logs(); + auto instrumentation_lib = resource_logs->add_instrumentation_library_logs(); + + auto rec = + std::unique_ptr(static_cast(recordable.release())); + + // TODO schema url + *resource_logs->mutable_resource() = rec->ProtoResource(); + + // TODO schema url + // resource_logs->set_schema_url(rec->GetResourceSchemaURL()); + + *instrumentation_lib->add_logs() = std::move(rec->log_record()); + // TODO instrumentation_library + // *instrumentation_lib->mutable_instrumentation_library() = + // rec->GetProtoInstrumentationLibrary(); + // TODO schema data + // instrumentation_lib->set_schema_url(rec->GetInstrumentationLibrarySchemaURL()); + } +} +#endif + +} // namespace otlp +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/otlp/test/otlp_http_exporter_test.cc b/exporters/otlp/test/otlp_http_exporter_test.cc index 413bf7c1a1..9999511f50 100644 --- a/exporters/otlp/test/otlp_http_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_exporter_test.cc @@ -50,6 +50,7 @@ class OtlpHttpExporterTestPeer : public ::testing::Test, public HTTP_SERVER_NS:: std::vector received_requests_json_; std::vector received_requests_binary_; + std::map received_requests_headers_; public: OtlpHttpExporterTestPeer() : is_setup_(false), is_running_(false){}; @@ -90,6 +91,7 @@ class OtlpHttpExporterTestPeer : public ::testing::Test, public HTTP_SERVER_NS:: request_content_type = &it->second; } } + received_requests_headers_ = request.headers; int response_status = 0; @@ -171,6 +173,8 @@ class OtlpHttpExporterTestPeer : public ::testing::Test, public HTTP_SERVER_NS:: opts.url = server_address_; opts.content_type = content_type; opts.console_debug = true; + opts.http_headers.insert( + std::make_pair("Custom-Header-Key", "Custom-Header-Value")); return std::unique_ptr(new OtlpHttpExporter(opts)); } @@ -239,6 +243,14 @@ TEST_F(OtlpHttpExporterTestPeer, ExportJsonIntegrationTest) auto span = *instrumentation_library_span["spans"].begin(); auto received_trace_id = span["trace_id"].get(); EXPECT_EQ(received_trace_id, report_trace_id); + { + auto custom_header = received_requests_headers_.find("Custom-Header-Key"); + ASSERT_TRUE(custom_header != received_requests_headers_.end()); + if (custom_header != received_requests_headers_.end()) + { + EXPECT_EQ("Custom-Header-Value", custom_header->second); + } + } } // Create spans, let processor call Export() @@ -335,9 +347,8 @@ TEST_F(OtlpHttpExporterTestPeer, ConfigJsonBytesMappingTest) // Test exporter configuration options with use_ssl_credentials TEST_F(OtlpHttpExporterTestPeer, ConfigFromEnv) { - const std::string url = "http://localhost:9999/v1/traces"; - const std::string url_env = "OTEL_EXPORTER_OTLP_ENDPOINT=" + url; - putenv(const_cast(url_env.data())); + const std::string url = "http://localhost:9999/v1/traces"; + putenv("OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:9999"); putenv("OTEL_EXPORTER_OTLP_TIMEOUT=20s"); putenv("OTEL_EXPORTER_OTLP_HEADERS=k1=v1,k2=v2"); putenv("OTEL_EXPORTER_OTLP_TRACES_HEADERS=k1=v3,k1=v4"); @@ -379,6 +390,54 @@ TEST_F(OtlpHttpExporterTestPeer, ConfigFromEnv) unsetenv("OTEL_EXPORTER_OTLP_HEADERS"); unsetenv("OTEL_EXPORTER_OTLP_TRACES_HEADERS"); +# endif +} + +TEST_F(OtlpHttpExporterTestPeer, ConfigFromTracesEnv) +{ + const std::string url = "http://localhost:9999/v1/traces"; + putenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:9999/v1/traces"); + putenv("OTEL_EXPORTER_OTLP_TIMEOUT=20s"); + putenv("OTEL_EXPORTER_OTLP_HEADERS=k1=v1,k2=v2"); + putenv("OTEL_EXPORTER_OTLP_TRACES_HEADERS=k1=v3,k1=v4"); + + std::unique_ptr exporter(new OtlpHttpExporter()); + EXPECT_EQ(GetOptions(exporter).url, url); + EXPECT_EQ( + GetOptions(exporter).timeout.count(), + std::chrono::duration_cast(std::chrono::seconds{20}) + .count()); + EXPECT_EQ(GetOptions(exporter).http_headers.size(), 3); + { + // Test k2 + auto range = GetOptions(exporter).http_headers.equal_range("k2"); + EXPECT_TRUE(range.first != range.second); + EXPECT_EQ(range.first->second, std::string("v2")); + ++range.first; + EXPECT_TRUE(range.first == range.second); + } + { + // k1 + auto range = GetOptions(exporter).http_headers.equal_range("k1"); + EXPECT_TRUE(range.first != range.second); + EXPECT_EQ(range.first->second, std::string("v3")); + ++range.first; + EXPECT_EQ(range.first->second, std::string("v4")); + ++range.first; + EXPECT_TRUE(range.first == range.second); + } +# if defined(_MSC_VER) + putenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT="); + putenv("OTEL_EXPORTER_OTLP_TIMEOUT="); + putenv("OTEL_EXPORTER_OTLP_HEADERS="); + putenv("OTEL_EXPORTER_OTLP_TRACES_HEADERS="); + +# else + unsetenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"); + unsetenv("OTEL_EXPORTER_OTLP_TIMEOUT"); + unsetenv("OTEL_EXPORTER_OTLP_HEADERS"); + unsetenv("OTEL_EXPORTER_OTLP_TRACES_HEADERS"); + # endif } # endif diff --git a/exporters/otlp/test/otlp_http_log_exporter_test.cc b/exporters/otlp/test/otlp_http_log_exporter_test.cc new file mode 100644 index 0000000000..e27a4e434a --- /dev/null +++ b/exporters/otlp/test/otlp_http_log_exporter_test.cc @@ -0,0 +1,499 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef HAVE_CPP_STDLIB +# ifdef ENABLE_LOGS_PREVIEW + +# include "opentelemetry/exporters/otlp/otlp_http_log_exporter.h" + +# include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" + +# include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h" + +# include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" + +# include "opentelemetry/common/key_value_iterable_view.h" +# include "opentelemetry/ext/http/server/http_server.h" +# include "opentelemetry/logs/provider.h" +# include "opentelemetry/sdk/logs/batch_log_processor.h" +# include "opentelemetry/sdk/logs/exporter.h" +# include "opentelemetry/sdk/logs/log_record.h" +# include "opentelemetry/sdk/logs/logger_provider.h" +# include "opentelemetry/sdk/resource/resource.h" + +# include + +# include "nlohmann/json.hpp" + +# if defined(_MSC_VER) +# define putenv _putenv +# endif + +using namespace testing; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter +{ +namespace otlp +{ + +template +static nostd::span MakeSpan(T (&array)[N]) +{ + return nostd::span(array); +} + +class OtlpHttpLogExporterTestPeer : public ::testing::Test, + public HTTP_SERVER_NS::HttpRequestCallback +{ +protected: + HTTP_SERVER_NS::HttpServer server_; + std::string server_address_; + std::atomic is_setup_; + std::atomic is_running_; + std::mutex mtx_requests; + std::condition_variable cv_got_events; + std::vector received_requests_json_; + std::vector + received_requests_binary_; + std::map received_requests_headers_; + +public: + OtlpHttpLogExporterTestPeer() : is_setup_(false), is_running_(false){}; + + virtual void SetUp() override + { + if (is_setup_.exchange(true)) + { + return; + } + int port = server_.addListeningPort(14372); + std::ostringstream os; + os << "localhost:" << port; + server_address_ = "http://" + os.str() + "/v1/logs"; + server_.setServerName(os.str()); + server_.setKeepalive(false); + server_.addHandler("/v1/logs", *this); + server_.start(); + is_running_ = true; + } + + virtual void TearDown() override + { + if (!is_setup_.exchange(false)) + return; + server_.stop(); + is_running_ = false; + } + + virtual int onHttpRequest(HTTP_SERVER_NS::HttpRequest const &request, + HTTP_SERVER_NS::HttpResponse &response) override + { + const std::string *request_content_type = nullptr; + { + auto it = request.headers.find("Content-Type"); + if (it != request.headers.end()) + { + request_content_type = &it->second; + } + } + received_requests_headers_ = request.headers; + + int response_status = 0; + + if (request.uri == "/v1/logs") + { + response.headers["Content-Type"] = "application/json"; + std::unique_lock lk(mtx_requests); + if (nullptr != request_content_type && *request_content_type == kHttpBinaryContentType) + { + opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest request_body; + if (request_body.ParseFromArray(&request.content[0], + static_cast(request.content.size()))) + { + received_requests_binary_.push_back(request_body); + response.body = "{\"code\": 0, \"message\": \"success\"}"; + } + else + { + response.body = "{\"code\": 400, \"message\": \"Parse binary failed\"}"; + response_status = 400; + } + } + else if (nullptr != request_content_type && *request_content_type == kHttpJsonContentType) + { + auto json = nlohmann::json::parse(request.content, nullptr, false); + response.headers["Content-Type"] = "application/json"; + if (json.is_discarded()) + { + response.body = "{\"code\": 400, \"message\": \"Parse json failed\"}"; + response_status = 400; + } + else + { + received_requests_json_.push_back(json); + response.body = "{\"code\": 0, \"message\": \"success\"}"; + } + } + else + { + response.body = "{\"code\": 400, \"message\": \"Unsupported content type\"}"; + response_status = 400; + } + + response_status = 200; + } + else + { + std::unique_lock lk(mtx_requests); + response.headers["Content-Type"] = "text/plain"; + response.body = "404 Not Found"; + response_status = 200; + } + + cv_got_events.notify_one(); + + return response_status; + } + + bool waitForRequests(unsigned timeOutSec, size_t expected_count = 1) + { + std::unique_lock lk(mtx_requests); + if (cv_got_events.wait_for(lk, std::chrono::milliseconds(1000 * timeOutSec), + [&] { return getCurrentRequestCount() >= expected_count; })) + { + return true; + } + return false; + } + + size_t getCurrentRequestCount() const + { + return received_requests_json_.size() + received_requests_binary_.size(); + } + +public: + std::unique_ptr GetExporter(HttpRequestContentType content_type) + { + OtlpHttpLogExporterOptions opts; + opts.url = server_address_; + opts.content_type = content_type; + opts.console_debug = true; + opts.http_headers.insert( + std::make_pair("Custom-Header-Key", "Custom-Header-Value")); + return std::unique_ptr(new OtlpHttpLogExporter(opts)); + } + + // Get the options associated with the given exporter. + const OtlpHttpLogExporterOptions &GetOptions(std::unique_ptr &exporter) + { + return exporter->options_; + } +}; + +// Create log records, let processor call Export() +TEST_F(OtlpHttpLogExporterTestPeer, ExportJsonIntegrationTest) +{ + size_t old_count = getCurrentRequestCount(); + auto exporter = GetExporter(HttpRequestContentType::kJson); + + bool resource_storage_bool_value[] = {true, false, true}; + int32_t resource_storage_int32_value[] = {1, 2}; + uint32_t resource_storage_uint32_value[] = {3, 4}; + int64_t resource_storage_int64_value[] = {5, 6}; + uint64_t resource_storage_uint64_value[] = {7, 8}; + double resource_storage_double_value[] = {3.2, 3.3}; + std::string resource_storage_string_value[] = {"vector", "string"}; + + auto provider = nostd::shared_ptr(new sdk::logs::LoggerProvider()); + provider->SetProcessor(std::unique_ptr( + new sdk::logs::BatchLogProcessor(std::move(exporter), 5, std::chrono::milliseconds(256), 1))); + + std::string report_trace_id; + std::string report_span_id; + { + uint8_t trace_id_bin[opentelemetry::trace::TraceId::kSize] = { + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; + char trace_id_hex[2 * opentelemetry::trace::TraceId::kSize] = {0}; + opentelemetry::trace::TraceId trace_id{trace_id_bin}; + uint8_t span_id_bin[opentelemetry::trace::SpanId::kSize] = {'7', '6', '5', '4', + '3', '2', '1', '0'}; + char span_id_hex[2 * opentelemetry::trace::SpanId::kSize] = {0}; + opentelemetry::trace::SpanId span_id{span_id_bin}; + + auto logger = provider->GetLogger("test"); + logger->Log(opentelemetry::logs::Severity::kInfo, "Log name", "Log message", + {{"service.name", "unit_test_service"}, + {"tenant.id", "test_user"}, + {"bool_value", true}, + {"int32_value", static_cast(1)}, + {"uint32_value", static_cast(2)}, + {"int64_value", static_cast(0x1100000000LL)}, + {"uint64_value", static_cast(0x1200000000ULL)}, + {"double_value", static_cast(3.1)}, + {"vec_bool_value", resource_storage_bool_value}, + {"vec_int32_value", resource_storage_int32_value}, + {"vec_uint32_value", resource_storage_uint32_value}, + {"vec_int64_value", resource_storage_int64_value}, + {"vec_uint64_value", resource_storage_uint64_value}, + {"vec_double_value", resource_storage_double_value}, + {"vec_string_value", resource_storage_string_value}}, + {{"log_attribute", "test_value"}}, trace_id, span_id, + opentelemetry::trace::TraceFlags{opentelemetry::trace::TraceFlags::kIsSampled}, + std::chrono::system_clock::now()); + + trace_id.ToLowerBase16(MakeSpan(trace_id_hex)); + report_trace_id.assign(trace_id_hex, sizeof(trace_id_hex)); + + span_id.ToLowerBase16(MakeSpan(span_id_hex)); + report_span_id.assign(span_id_hex, sizeof(span_id_hex)); + } + + ASSERT_TRUE(waitForRequests(2, old_count + 1)); + auto check_json = received_requests_json_.back(); + auto resource_logs = *check_json["resource_logs"].begin(); + auto instrumentation_library_span = *resource_logs["instrumentation_library_logs"].begin(); + auto log = *instrumentation_library_span["logs"].begin(); + auto received_trace_id = log["trace_id"].get(); + auto received_span_id = log["span_id"].get(); + EXPECT_EQ(received_trace_id, report_trace_id); + EXPECT_EQ(received_span_id, report_span_id); + EXPECT_EQ("Log name", log["name"].get()); + EXPECT_EQ("Log message", log["body"]["string_value"].get()); + EXPECT_EQ("test_value", (*log["attributes"].begin())["value"]["string_value"].get()); + EXPECT_LE(15, resource_logs["resource"]["attributes"].size()); + bool check_service_name = false; + for (auto attribute : resource_logs["resource"]["attributes"]) + { + if ("service.name" == attribute["key"].get()) + { + check_service_name = true; + EXPECT_EQ("unit_test_service", attribute["value"]["string_value"].get()); + } + } + ASSERT_TRUE(check_service_name); + + { + auto custom_header = received_requests_headers_.find("Custom-Header-Key"); + ASSERT_TRUE(custom_header != received_requests_headers_.end()); + if (custom_header != received_requests_headers_.end()) + { + EXPECT_EQ("Custom-Header-Value", custom_header->second); + } + } +} + +// Create log records, let processor call Export() +TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTest) +{ + size_t old_count = getCurrentRequestCount(); + + auto exporter = GetExporter(HttpRequestContentType::kBinary); + + bool resource_storage_bool_value[] = {true, false, true}; + int32_t resource_storage_int32_value[] = {1, 2}; + uint32_t resource_storage_uint32_value[] = {3, 4}; + int64_t resource_storage_int64_value[] = {5, 6}; + uint64_t resource_storage_uint64_value[] = {7, 8}; + double resource_storage_double_value[] = {3.2, 3.3}; + std::string resource_storage_string_value[] = {"vector", "string"}; + + auto provider = nostd::shared_ptr(new sdk::logs::LoggerProvider()); + provider->SetProcessor(std::unique_ptr( + new sdk::logs::BatchLogProcessor(std::move(exporter), 5, std::chrono::milliseconds(256), 1))); + + std::string report_trace_id; + std::string report_span_id; + { + uint8_t trace_id_bin[opentelemetry::trace::TraceId::kSize] = { + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; + opentelemetry::trace::TraceId trace_id{trace_id_bin}; + uint8_t span_id_bin[opentelemetry::trace::SpanId::kSize] = {'7', '6', '5', '4', + '3', '2', '1', '0'}; + opentelemetry::trace::SpanId span_id{span_id_bin}; + + auto logger = provider->GetLogger("test"); + logger->Log(opentelemetry::logs::Severity::kInfo, "Log name", "Log message", + {{"service.name", "unit_test_service"}, + {"tenant.id", "test_user"}, + {"bool_value", true}, + {"int32_value", static_cast(1)}, + {"uint32_value", static_cast(2)}, + {"int64_value", static_cast(0x1100000000LL)}, + {"uint64_value", static_cast(0x1200000000ULL)}, + {"double_value", static_cast(3.1)}, + {"vec_bool_value", resource_storage_bool_value}, + {"vec_int32_value", resource_storage_int32_value}, + {"vec_uint32_value", resource_storage_uint32_value}, + {"vec_int64_value", resource_storage_int64_value}, + {"vec_uint64_value", resource_storage_uint64_value}, + {"vec_double_value", resource_storage_double_value}, + {"vec_string_value", resource_storage_string_value}}, + {{"log_attribute", "test_value"}}, trace_id, span_id, + opentelemetry::trace::TraceFlags{opentelemetry::trace::TraceFlags::kIsSampled}, + std::chrono::system_clock::now()); + + report_trace_id.assign(reinterpret_cast(trace_id_bin), sizeof(trace_id_bin)); + report_span_id.assign(reinterpret_cast(span_id_bin), sizeof(span_id_bin)); + } + + ASSERT_TRUE(waitForRequests(2, old_count + 1)); + auto received_log = + received_requests_binary_.back().resource_logs(0).instrumentation_library_logs(0).logs(0); + EXPECT_EQ(received_log.trace_id(), report_trace_id); + EXPECT_EQ(received_log.span_id(), report_span_id); + EXPECT_EQ("Log name", received_log.name()); + EXPECT_EQ("Log message", received_log.body().string_value()); + EXPECT_EQ("test_value", received_log.attributes(0).value().string_value()); + EXPECT_LE(15, received_requests_binary_.back().resource_logs(0).resource().attributes_size()); + bool check_service_name = false; + for (auto &attribute : received_requests_binary_.back().resource_logs(0).resource().attributes()) + { + if ("service.name" == attribute.key()) + { + check_service_name = true; + EXPECT_EQ("unit_test_service", attribute.value().string_value()); + } + } + ASSERT_TRUE(check_service_name); +} + +// Test exporter configuration options +TEST_F(OtlpHttpLogExporterTestPeer, ConfigTest) +{ + OtlpHttpLogExporterOptions opts; + opts.url = "http://localhost:45456/v1/logs"; + std::unique_ptr exporter(new OtlpHttpLogExporter(opts)); + EXPECT_EQ(GetOptions(exporter).url, "http://localhost:45456/v1/logs"); +} + +// Test exporter configuration options with use_json_name +TEST_F(OtlpHttpLogExporterTestPeer, ConfigUseJsonNameTest) +{ + OtlpHttpLogExporterOptions opts; + opts.use_json_name = true; + std::unique_ptr exporter(new OtlpHttpLogExporter(opts)); + EXPECT_EQ(GetOptions(exporter).use_json_name, true); +} + +// Test exporter configuration options with json_bytes_mapping=JsonBytesMappingKind::kHex +TEST_F(OtlpHttpLogExporterTestPeer, ConfigJsonBytesMappingTest) +{ + OtlpHttpLogExporterOptions opts; + opts.json_bytes_mapping = JsonBytesMappingKind::kHex; + std::unique_ptr exporter(new OtlpHttpLogExporter(opts)); + EXPECT_EQ(GetOptions(exporter).json_bytes_mapping, JsonBytesMappingKind::kHex); +} + +# ifndef NO_GETENV +// Test exporter configuration options with use_ssl_credentials +TEST_F(OtlpHttpLogExporterTestPeer, ConfigFromEnv) +{ + const std::string url = "http://localhost:9999/v1/logs"; + putenv("OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:9999"); + putenv("OTEL_EXPORTER_OTLP_TIMEOUT=20s"); + putenv("OTEL_EXPORTER_OTLP_HEADERS=k1=v1,k2=v2"); + putenv("OTEL_EXPORTER_OTLP_LOGS_HEADERS=k1=v3,k1=v4"); + + std::unique_ptr exporter(new OtlpHttpLogExporter()); + EXPECT_EQ(GetOptions(exporter).url, url); + EXPECT_EQ( + GetOptions(exporter).timeout.count(), + std::chrono::duration_cast(std::chrono::seconds{20}) + .count()); + EXPECT_EQ(GetOptions(exporter).http_headers.size(), 3); + { + // Test k2 + auto range = GetOptions(exporter).http_headers.equal_range("k2"); + EXPECT_TRUE(range.first != range.second); + EXPECT_EQ(range.first->second, std::string("v2")); + ++range.first; + EXPECT_TRUE(range.first == range.second); + } + { + // k1 + auto range = GetOptions(exporter).http_headers.equal_range("k1"); + EXPECT_TRUE(range.first != range.second); + EXPECT_EQ(range.first->second, std::string("v3")); + ++range.first; + EXPECT_EQ(range.first->second, std::string("v4")); + ++range.first; + EXPECT_TRUE(range.first == range.second); + } +# if defined(_MSC_VER) + putenv("OTEL_EXPORTER_OTLP_ENDPOINT="); + putenv("OTEL_EXPORTER_OTLP_TIMEOUT="); + putenv("OTEL_EXPORTER_OTLP_HEADERS="); + putenv("OTEL_EXPORTER_OTLP_LOGS_HEADERS="); + +# else + unsetenv("OTEL_EXPORTER_OTLP_ENDPOINT"); + unsetenv("OTEL_EXPORTER_OTLP_TIMEOUT"); + unsetenv("OTEL_EXPORTER_OTLP_HEADERS"); + unsetenv("OTEL_EXPORTER_OTLP_LOGS_HEADERS"); + +# endif +} + +TEST_F(OtlpHttpLogExporterTestPeer, ConfigFromLogsEnv) +{ + const std::string url = "http://localhost:9999/v1/logs"; + putenv("OTEL_EXPORTER_OTLP_LOGS_ENDPOINT=http://localhost:9999/v1/logs"); + putenv("OTEL_EXPORTER_OTLP_TIMEOUT=20s"); + putenv("OTEL_EXPORTER_OTLP_HEADERS=k1=v1,k2=v2"); + putenv("OTEL_EXPORTER_OTLP_LOGS_HEADERS=k1=v3,k1=v4"); + + std::unique_ptr exporter(new OtlpHttpLogExporter()); + EXPECT_EQ(GetOptions(exporter).url, url); + EXPECT_EQ( + GetOptions(exporter).timeout.count(), + std::chrono::duration_cast(std::chrono::seconds{20}) + .count()); + EXPECT_EQ(GetOptions(exporter).http_headers.size(), 3); + { + // Test k2 + auto range = GetOptions(exporter).http_headers.equal_range("k2"); + EXPECT_TRUE(range.first != range.second); + EXPECT_EQ(range.first->second, std::string("v2")); + ++range.first; + EXPECT_TRUE(range.first == range.second); + } + { + // k1 + auto range = GetOptions(exporter).http_headers.equal_range("k1"); + EXPECT_TRUE(range.first != range.second); + EXPECT_EQ(range.first->second, std::string("v3")); + ++range.first; + EXPECT_EQ(range.first->second, std::string("v4")); + ++range.first; + EXPECT_TRUE(range.first == range.second); + } +# if defined(_MSC_VER) + putenv("OTEL_EXPORTER_OTLP_LOGS_ENDPOINT="); + putenv("OTEL_EXPORTER_OTLP_TIMEOUT="); + putenv("OTEL_EXPORTER_OTLP_HEADERS="); + putenv("OTEL_EXPORTER_OTLP_LOGS_HEADERS="); + +# else + unsetenv("OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"); + unsetenv("OTEL_EXPORTER_OTLP_TIMEOUT"); + unsetenv("OTEL_EXPORTER_OTLP_HEADERS"); + unsetenv("OTEL_EXPORTER_OTLP_LOGS_HEADERS"); + +# endif +} + +TEST_F(OtlpHttpLogExporterTestPeer, DefaultEndpoint) +{ + EXPECT_EQ("http://localhost:4318/v1/logs", GetOtlpDefaultHttpLogEndpoint()); + EXPECT_EQ("http://localhost:4318/v1/traces", GetOtlpDefaultHttpEndpoint()); + EXPECT_EQ("http://localhost:4317", GetOtlpDefaultGrpcEndpoint()); +} + +# endif + +} // namespace otlp +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE +# endif +#endif diff --git a/sdk/test/trace/tracer_provider_test.cc b/sdk/test/trace/tracer_provider_test.cc index 9c1c922c2c..25304bc1ba 100644 --- a/sdk/test/trace/tracer_provider_test.cc +++ b/sdk/test/trace/tracer_provider_test.cc @@ -39,7 +39,11 @@ TEST(TracerProvider, GetTracer) ASSERT_NE(t3, t6); // Should be an sdk::trace::Tracer with the processor attached. +#ifdef RTTI_ENABLED auto sdkTracer1 = dynamic_cast(t1.get()); +#else + auto sdkTracer1 = static_cast(t1.get()); +#endif ASSERT_NE(nullptr, sdkTracer1); ASSERT_EQ("AlwaysOnSampler", sdkTracer1->GetSampler().GetDescription()); std::unique_ptr processor2(new SimpleSpanProcessor(nullptr)); @@ -49,7 +53,11 @@ TEST(TracerProvider, GetTracer) std::make_shared(std::move(processors2), Resource::Create({}), std::unique_ptr(new AlwaysOffSampler()), std::unique_ptr(new RandomIdGenerator))); +#ifdef RTTI_ENABLED auto sdkTracer2 = dynamic_cast(tp2.GetTracer("test").get()); +#else + auto sdkTracer2 = static_cast(tp2.GetTracer("test").get()); +#endif ASSERT_EQ("AlwaysOffSampler", sdkTracer2->GetSampler().GetDescription()); auto instrumentation_library1 = sdkTracer1->GetInstrumentationLibrary(); @@ -57,7 +65,11 @@ TEST(TracerProvider, GetTracer) ASSERT_EQ(instrumentation_library1.GetVersion(), ""); // Should be an sdk::trace::Tracer with the processor attached. - auto sdkTracer3 = dynamic_cast(t3.get()); +#ifdef RTTI_ENABLED + auto sdkTracer3 = dynamic_cast(t3.get()); +#else + auto sdkTracer3 = static_cast(t3.get()); +#endif auto instrumentation_library3 = sdkTracer3->GetInstrumentationLibrary(); ASSERT_EQ(instrumentation_library3.GetName(), "different"); ASSERT_EQ(instrumentation_library3.GetVersion(), "1.0.0"); diff --git a/sdk/test/trace/tracer_test.cc b/sdk/test/trace/tracer_test.cc index 94e57a4cd7..9d1029cd02 100644 --- a/sdk/test/trace/tracer_test.cc +++ b/sdk/test/trace/tracer_test.cc @@ -347,13 +347,21 @@ TEST(Tracer, GetSampler) // Create a Tracer with a default AlwaysOnSampler auto tracer_on = initTracer(nullptr); +#ifdef RTTI_ENABLED auto &t1 = std::dynamic_pointer_cast(tracer_on)->GetSampler(); +#else + auto &t1 = std::static_pointer_cast(tracer_on)->GetSampler(); +#endif ASSERT_EQ("AlwaysOnSampler", t1.GetDescription()); // Create a Tracer with a AlwaysOffSampler auto tracer_off = initTracer(nullptr, new AlwaysOffSampler()); +#ifdef RTTI_ENABLED auto &t2 = std::dynamic_pointer_cast(tracer_off)->GetSampler(); +#else + auto &t2 = std::static_pointer_cast(tracer_off)->GetSampler(); +#endif ASSERT_EQ("AlwaysOffSampler", t2.GetDescription()); } @@ -539,8 +547,10 @@ TEST(Tracer, TestParentBasedSampler) // so this sampler will work as an AlwaysOnSampler. std::unique_ptr exporter2(new InMemorySpanExporter()); std::shared_ptr span_data_parent_off = exporter2->GetData(); - auto tracer_parent_off = initTracer(std::move(exporter2), - new ParentBasedSampler(std::make_shared())); + auto tracer_parent_off = + initTracer(std::move(exporter2), + // Add this to avoid different results for old and new version of clang-format + new ParentBasedSampler(std::make_shared())); auto span_parent_off_1 = tracer_parent_off->StartSpan("span 1"); auto span_parent_off_2 = tracer_parent_off->StartSpan("span 2");