Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[v24.2.x] compression: correct endianness in snappy_java_compressor (Manual backport) #25112

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,21 @@

namespace compression::internal {
struct snappy_java_compressor {
struct snappy_magic {
static const constexpr std::array<uint8_t, 8> java_magic = {
0x82, 'S', 'N', 'A', 'P', 'P', 'Y', 0};
// Previously, these version fields were erroneously written with
// little-endian encoding. They are now corrected to be written and
// decoded using big-endian, but we must retain backwards compatibility
// with the existing, improperly encoded batches.
static const constexpr int32_t default_version = 1;
static const constexpr int32_t min_compatible_version = 1;
static const constexpr size_t header_len = java_magic.size()
+ sizeof(default_version)
+ sizeof(
min_compatible_version);
};

static iobuf compress(const iobuf&);
static iobuf uncompress(const iobuf&);
};
Expand Down
33 changes: 11 additions & 22 deletions src/v/compression/internal/snappy_java_compressor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,6 @@
#include <snappy.h>

namespace compression::internal {
struct snappy_magic {
static const constexpr std::array<uint8_t, 8> java_magic = {
0x82, 'S', 'N', 'A', 'P', 'P', 'Y', 0};
static const constexpr int32_t default_version = 1;
static const constexpr int32_t min_compatible_version = 1;
static const constexpr size_t header_len = java_magic.size()
+ sizeof(default_version)
+ sizeof(min_compatible_version);
};

size_t find_max_size_in_frags(const iobuf& x) {
size_t ret = 0;
Expand Down Expand Up @@ -58,8 +49,10 @@ iobuf snappy_java_compressor::compress(const iobuf& x) {
iobuf ret;
ret.append(
snappy_magic::java_magic.data(), snappy_magic::java_magic.size());
append_le(ret, snappy_magic::default_version);
append_le(ret, snappy_magic::min_compatible_version);
// versions in header are big-endian. See:
// https://github.com/xerial/snappy-java/blob/65e1ec3de1a0d447b137c6dd6393629aa3d75b8b/src/main/java/org/xerial/snappy/SnappyCodec.java#L78-L81
append_be(ret, snappy_magic::default_version);
append_be(ret, snappy_magic::min_compatible_version);
// staging buffer
ss::temporary_buffer<char> obuf(find_max_size_in_frags(x));
for (const auto& f : x) {
Expand All @@ -82,17 +75,13 @@ iobuf snappy_java_compressor::uncompress(const iobuf& x) {
if (unlikely(snappy_magic::java_magic != magic_compare)) {
return snappy_standard_compressor::uncompress(x);
}
// NOTE: version and min_version are LITTLE_ENDIAN!
const auto version = iter.consume_type<int32_t>();
const auto min_version = iter.consume_type<int32_t>();
if (unlikely(min_version < snappy_magic::min_compatible_version)) {
throw std::runtime_error(fmt_with_ctx(
fmt::format,
"version missmatch. iobuf: {} - version:{}, min_version:{}",
x,
version,
min_version));
}
// Previously, these version fields were erroneously written with
// little-endian encoding. They are now corrected to be written and decoded
// using big-endian. Additionally, there was previously a version check
// here. It has been removed due to incorrect implementation, and because
// most other snappy clients do not perform checks around these fields.
[[maybe_unused]] const auto version = iter.consume_be_type<int32_t>();
[[maybe_unused]] const auto min_version = iter.consume_be_type<int32_t>();
// stream decoder next
iobuf ret;
const size_t input_bytes = x.size_bytes();
Expand Down
26 changes: 26 additions & 0 deletions src/v/compression/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,32 @@ redpanda_cc_gtest(
],
)

redpanda_cc_gtest(
name = "snappy_test",
timeout = "short",
srcs = [
"snappy_tests.cc",
],
cpu = 1,
data = [
"//src/v/compression/tests/snappy_payload:little_endian_compressed_data.snappy",
"//src/v/compression/tests/snappy_payload:uncompressed_data",
],
env = {"SNAPPY_PAYLOAD_PATH": "src/v/compression/tests/snappy_payload"},
deps = [
"//src/v/base",
"//src/v/bytes:iobuf",
"//src/v/bytes:iostream",
"//src/v/compression",
"//src/v/random:generators",
"//src/v/test_utils:gtest",
"//src/v/utils:file_io",
"@googletest//:gtest",
"@seastar",
"@snappy",
],
)

redpanda_cc_btest(
name = "zstd_test",
timeout = "short",
Expand Down
16 changes: 16 additions & 0 deletions src/v/compression/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,19 @@ rp_test(
LABELS compression
ARGS "-- -c 1"
)

set(SNAPPY_PAYLOAD_PATH "${CMAKE_CURRENT_SOURCE_DIR}/snappy_payload")

rp_test(
UNIT_TEST
GTEST
BINARY_NAME snappy_tests
SOURCES snappy_tests.cc
LIBRARIES v::compression v::random v::gtest_main v::utils
LABELS compression
ARGS "-- -c 1"
INPUT_FILES
"${SNAPPY_PAYLOAD_PATH}/uncompressed_data"
"${SNAPPY_PAYLOAD_PATH}/little_endian_compressed_data.snappy"
ENV "SNAPPY_PAYLOAD_PATH="
)
4 changes: 4 additions & 0 deletions src/v/compression/tests/snappy_payload/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
exports_files([
"uncompressed_data",
"little_endian_compressed_data.snappy",
])
Binary file not shown.
1 change: 1 addition & 0 deletions src/v/compression/tests/snappy_payload/uncompressed_data
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
iXXf8ieqH0vIe65zxnKMN4578QHDxqFJgo38wPtJzp0AD1MvsLHUIggX2MGazSEznDhpUnUMVgVVs6ObsUKUWpwj4QNcPQUYLqYUWbDjvSv5SBPMeH2HtYNcvr4urkkfDh6qm1CA0lcOyUziWiIpMh5qLylBYDbPA44e1TlpKF3qy6psrtqgsEDS2gsGcZVluPUnfpKATW6aZ3BvUwYWeybCSfQrZRH0vVzoPuAgdmeWqXDNrkfZoLxxpAbgVcejU5AgG8n2wxrOjrMfi20AmnuaWIT1LECnMWFzbKkyGYI4v7Xhv4URTBv033EYKnVdqthPWZHLwQgVVyTxNjCT0drSS5W6KkcyYykyJxsH2CexbsDj7ZmVOYQ5FW4ShXBluHKD8eqhBzeYHTSFoNUqxfOl3IE48PfJyqb5t2fqIkZvcw4OSrhupxFFEEoAt8aaypTleU7Dg50sjMRVt1n7Wewiu1RlCignU74yKcyMSPwDYSeWwOHSfYxZaij8Pk7D
171 changes: 171 additions & 0 deletions src/v/compression/tests/snappy_tests.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "bytes/iobuf.h"
#include "bytes/iostream.h"
#include "compression/internal/snappy_java_compressor.h"
#include "compression/snappy_standard_compressor.h"
#include "random/generators.h"
#include "utils/file_io.h"

#include <seastar/core/byteorder.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/temporary_buffer.hh>
#include <seastar/util/short_streams.hh>

#include <gtest/gtest.h>

#include <cstdlib>
#include <snappy-sinksource.h>
#include <snappy.h>

TEST(SnappyTest, CompressAndDecompressSnappyStandardTest) {
const auto data = random_generators::gen_alphanum_string(512);

iobuf buf;
buf.append(data.data(), data.size());
auto compressed_buf = compression::snappy_standard_compressor::compress(
buf);
auto decompressed_buf = compression::snappy_standard_compressor::uncompress(
compressed_buf);
EXPECT_EQ(buf, decompressed_buf);
}

TEST(SnappyTest, CompressAndDecompressSnappyJavaTest) {
const auto data = random_generators::gen_alphanum_string(512);

iobuf buf;
buf.append(data.data(), data.size());
auto compressed_buf
= compression::internal::snappy_java_compressor::compress(buf);
auto decompressed_buf
= compression::internal::snappy_java_compressor::uncompress(
compressed_buf);
EXPECT_EQ(buf, decompressed_buf);
}

TEST(SnappyTest, SnappyStandardIsValidCompressedTest) {
const auto data = random_generators::gen_alphanum_string(512);

iobuf buf;
buf.append(data.data(), data.size());
auto compressed_buf = compression::snappy_standard_compressor::compress(
buf);

EXPECT_EQ(
snappy::IsValidCompressedBuffer(
compressed_buf.begin()->get(), compressed_buf.size_bytes()),
true);
}

TEST(SnappyTest, CompressedVersionHeadersSnappyJavaTest) {
// snappy-java uses big-endian format to encode headers. See:
// https://github.com/xerial/snappy-java/blob/65e1ec3de1a0d447b137c6dd6393629aa3d75b8b/src/main/java/org/xerial/snappy/SnappyOutputStream.java#L343-L349
// https://github.com/xerial/snappy-java/blob/65e1ec3de1a0d447b137c6dd6393629aa3d75b8b/src/main/java/org/xerial/snappy/SnappyCodec.java#L78-L81
const auto data = random_generators::gen_alphanum_string(512);

iobuf buf;
buf.append(data.data(), data.size());
auto compressed_buf
= compression::internal::snappy_java_compressor::compress(buf);
auto magic_buf = ss::temporary_buffer<char>(
compression::internal::snappy_java_compressor::snappy_magic::java_magic
.size());
auto compressed_frag = compressed_buf.begin();

// Check the magic header
EXPECT_EQ(
std::memcmp(
compressed_frag->get(),
compression::internal::snappy_java_compressor::snappy_magic::java_magic
.begin(),
compression::internal::snappy_java_compressor::snappy_magic::java_magic
.size()),
0);
compressed_frag->trim_front(
compression::internal::snappy_java_compressor::snappy_magic::java_magic
.size());

// Check the default version
auto be_default_version = ss::cpu_to_be(
compression::internal::snappy_java_compressor::snappy_magic::
default_version);
EXPECT_EQ(
std::memcmp(
compressed_frag->get(),
reinterpret_cast<const char*>(&be_default_version),
sizeof(compression::internal::snappy_java_compressor::snappy_magic::
default_version)),
0);

compressed_frag->trim_front(
sizeof(compression::internal::snappy_java_compressor::snappy_magic::
default_version));

// Check the compat version
auto be_compat_version = ss::cpu_to_be(
compression::internal::snappy_java_compressor::snappy_magic::
min_compatible_version);
EXPECT_EQ(
std::memcmp(
compressed_frag->get(),
reinterpret_cast<const char*>(&be_compat_version),
sizeof(compression::internal::snappy_java_compressor::snappy_magic::
min_compatible_version)),
0);

compressed_frag->trim_front(
sizeof(compression::internal::snappy_java_compressor::snappy_magic::
min_compatible_version));

// Check the size of the compressed payload
int32_t be_compressed_size{};
std::memcpy(
&be_compressed_size, compressed_frag->get(), sizeof(be_compressed_size));
int32_t compressed_size = ss::be_to_cpu(be_compressed_size);
compressed_frag->trim_front(sizeof(compressed_size));
EXPECT_EQ(compressed_size, compressed_frag->size());

// Get the size of the decompressed payload
snappy::ByteArraySource compressed_source(
compressed_frag->get(), compressed_size);
uint32_t decompressed_size;
snappy::GetUncompressedLength(&compressed_source, &decompressed_size);
EXPECT_EQ(decompressed_size, data.size());
compressed_frag->trim_front(sizeof(decompressed_size));
}

TEST(SnappyTest, LittleEndianHeadersBackwardsCompatibilitySnappyJavaTest) {
// Previously, version fields were erroneously written with
// little-endian encoding. They are now corrected to be written and decoded
// using big-endian, but we must retain backwards compatibility here with
// the existing, improperly encoded batches (as version, min_version fields
// with value 1 will decode to the value 16777216).
// See: https://github.com/redpanda-data/redpanda/issues/25091
auto snappy_payload_path = std::getenv("SNAPPY_PAYLOAD_PATH");
vassert(snappy_payload_path, "expected value for payload path");
auto root = std::filesystem::path(snappy_payload_path);

// The original, uncompressed data.
auto expected_decompressed_file = root / "uncompressed_data";
EXPECT_TRUE(ss::file_exists(expected_decompressed_file.c_str()).get());
auto expected_decompressed_buffer
= read_fully(expected_decompressed_file.native()).get();

// A payload that was previously compressed by redpanda, with version
// headers in little-endian encoding.
auto le_compressed_file = root / "little_endian_compressed_data.snappy";
EXPECT_TRUE(ss::file_exists(le_compressed_file.c_str()).get());
auto le_compressed_buffer = read_fully(le_compressed_file.native()).get();

auto decompressed_buffer
= compression::internal::snappy_java_compressor::uncompress(
le_compressed_buffer);
EXPECT_EQ(decompressed_buffer, expected_decompressed_buffer);
}
Loading