Skip to content

Commit

Permalink
Introduce the block version ref key
Browse files Browse the repository at this point in the history
  • Loading branch information
poodlewars committed Oct 31, 2024
1 parent 9d073c6 commit 619af2a
Show file tree
Hide file tree
Showing 8 changed files with 351 additions and 100 deletions.
13 changes: 8 additions & 5 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ set(arcticdb_srcs
stream/protobuf_mappings.cpp
toolbox/library_tool.cpp
util/allocator.cpp
util/buffer_holder.cpp
util/buffer_pool.cpp
util/configs_map.cpp
util/decimal.cpp
Expand All @@ -501,17 +502,17 @@ set(arcticdb_srcs
util/timer.cpp
util/trace.cpp
util/type_handler.cpp
version/block_key.hpp
version/block_key.cpp
version/local_versioned_engine.cpp
version/op_log.cpp
version/snapshot.cpp
version/symbol_list.cpp
version/version_core.cpp
version/version_store_api.cpp
version/version_utils.cpp
version/symbol_list.cpp
version/version_map_batch_methods.cpp
storage/s3/ec2_utils.cpp
util/buffer_holder.cpp)
)

add_library(arcticdb_core_object OBJECT ${arcticdb_srcs})

Expand Down Expand Up @@ -946,17 +947,19 @@ if(${TEST})
util/test/test_string_utils.cpp
util/test/test_tracing_allocator.cpp
version/test/test_append.cpp
version/test/test_block_key.cpp
version/test/test_sort_index.cpp
version/test/test_sorting_info_state_machine.cpp
version/test/test_sparse.cpp
version/test/test_stream_version_data.cpp
version/test/test_symbol_list.cpp
version/test/test_version_map.cpp
version/test/test_version_map_batch.cpp
version/test/test_version_store.cpp
version/test/test_sorting_info_state_machine.cpp
version/test/version_map_model.hpp
python/python_handlers.cpp
storage/test/common.hpp
version/test/test_sort_index.cpp)
)

set(EXECUTABLE_PERMS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE) # 755

Expand Down
6 changes: 6 additions & 0 deletions cpp/arcticdb/entity/key.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ KeyData get_key_data(KeyType key_type) {
STRING_REF(KeyType::LOCK, lref, 'x')
STRING_REF(KeyType::SNAPSHOT_TOMBSTONE, ttomb, 'X')
STRING_KEY(KeyType::APPEND_DATA, app, 'b')
STRING_REF(KeyType::BLOCK_VERSION_REF, bvref, 'R')
// Unused
STRING_KEY(KeyType::PARTITION, pref, 'p')
STRING_KEY(KeyType::REPLICATION_FAIL_INFO, rfail, 'F')
Expand Down Expand Up @@ -96,4 +97,9 @@ bool is_ref_key_class(KeyType key_type){
return key_class_from_key_type(key_type) == KeyClass::REF_KEY;
}

bool is_block_ref_key_class(KeyType k) {
// Only block version ref currently implemented
return k == KeyType::BLOCK_VERSION_REF;
}

}
8 changes: 8 additions & 0 deletions cpp/arcticdb/entity/key.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ enum class KeyType : int {
* Used for storing the ids of storages that failed to sync
*/
REPLICATION_FAIL_INFO = 26,

/*
* A reference key storing many versions, used to track state within some of our background jobs.
*/
BLOCK_VERSION_REF = 27,

UNDEFINED
};

Expand Down Expand Up @@ -248,6 +254,8 @@ bool is_string_key_type(KeyType k);

bool is_ref_key_class(KeyType k);

bool is_block_ref_key_class(KeyType k);

inline KeyType get_key_type_for_data_stream(const StreamId &) {
return KeyType::TABLE_DATA;
}
Expand Down
123 changes: 123 additions & 0 deletions cpp/arcticdb/version/block_key.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/* Copyright 2024 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/
#include <arcticdb/version/block_key.hpp>
#include <arcticdb/stream/index.hpp>
#include <arcticdb/stream/index_aggregator.hpp>
#include <arcticdb/stream/stream_utils.hpp>
#include <utility>

namespace arcticdb::version_store {

BlockKey::BlockKey(KeyType key_type, StreamId id, SegmentInMemory &&segment) {
util::check(is_block_ref_key_class(key_type), "Expected block ref key but type was {}", key_type);
expected_key_type_of_contents_ = expected_key_type_of_contents(key_type);
block_key_type_ = key_type;
id_ = std::move(id);
keys_ = map_from_segment(std::move(segment));
}

BlockKey::BlockKey(KeyType key_type, StreamId id)
: BlockKey(key_type, std::move(id), SegmentInMemory()) {
}

BlockKey::BlockKey(KeyType key_type, StreamId id, std::unordered_map<StreamId, AtomKey> keys)
: keys_(std::move(keys)), block_key_type_(key_type), id_(std::move(id)) {
expected_key_type_of_contents_ = expected_key_type_of_contents(key_type);
}

BlockKey BlockKey::block_with_same_keys(StreamId new_id) {
return {key_type(), std::move(new_id), keys_};
}

void BlockKey::upsert(AtomKey&& key) {
util::check(valid_, "Attempt to use BlockKey after release_segment_in_memory");
util::check(key.type() == expected_key_type_of_contents_, "Unexpected key_type, was {} expected {}", key.type(),
expected_key_type_of_contents_);
keys_[key.id()] = std::move(key);
}

bool BlockKey::remove(const StreamId &id) {
util::check(valid_, "Attempt to use BlockKey after release_segment_in_memory");
return keys_.erase(id) == 1;
}

std::optional<AtomKey> BlockKey::read(const StreamId &id) const {
util::check(valid_, "Attempt to use BlockKey after release_segment_in_memory");
auto it = keys_.find(id);
if (it == keys_.end()) {
return std::nullopt;
} else {
return it->second;
}
}

SegmentInMemory BlockKey::release_segment_in_memory() {
util::check(valid_, "Attempt to release_segment_in_memory on a BlockKey twice");
valid_ = false;
SegmentInMemory result;
stream::IndexAggregator<stream::RowCountIndex> agg(id_, [&result](SegmentInMemory&& segment) {
result = std::move(segment);
});

for (auto&& [_, k] : keys_) {
agg.add_key(k);
}

agg.finalize();
keys_.clear();
return result;
}

KeyType BlockKey::key_type() const {
return block_key_type_;
}

StreamId BlockKey::id() const {
return id_;
}

std::unordered_map<StreamId, AtomKey> BlockKey::map_from_segment(SegmentInMemory &&segment) {
std::unordered_map<StreamId, AtomKey> result;
for (size_t idx = 0; idx < segment.row_count(); idx++) {
auto id = stream::stream_id_from_segment<pipelines::index::Fields>(segment, idx);
auto row_key = stream::read_key_row_into_builder<pipelines::index::Fields>(segment, idx)
.build(id, expected_key_type_of_contents_);
result.insert({id, row_key});
}
return result;
}

KeyType BlockKey::expected_key_type_of_contents(const KeyType &key_type) {
switch (key_type) {
case KeyType::BLOCK_VERSION_REF:
return KeyType::VERSION;
default:
util::raise_rte("Unsupported key type {}", key_type);
}
}

void write_block_key(Store *store, BlockKey &&key) {
store->write_sync(
key.key_type(),
key.id(),
key.release_segment_in_memory()
);
}

BlockKey read_block_key(Store *store, const KeyType key_type, const StreamId &id) {
util::check(is_block_ref_key_class(key_type), "Expected block ref key but type was {}", key_type);
auto opts = storage::ReadKeyOpts{};
opts.dont_warn_about_missing_key = true;
try {
SegmentInMemory segment = store->read_sync(RefKey{id, key_type}, opts).second;
return BlockKey{key_type, id, std::move(segment)};
} catch (storage::KeyNotFoundException& ex) {

Check failure on line 118 in cpp/arcticdb/version/block_key.cpp

View workflow job for this annotation

GitHub Actions / Windows C++ Tests / compile (windows, windows-cl, win_amd64, C:/cpp_build, C:/vcpkg_packages, *.pdb, *.lib, *.ilk, *....

the following warning is treated as an error

Check warning on line 118 in cpp/arcticdb/version/block_key.cpp

View workflow job for this annotation

GitHub Actions / Windows C++ Tests / compile (windows, windows-cl, win_amd64, C:/cpp_build, C:/vcpkg_packages, *.pdb, *.lib, *.ilk, *....

'ex': unreferenced local variable
return BlockKey{key_type, id};
}
}

} // namespace arcticdb::version_store
75 changes: 75 additions & 0 deletions cpp/arcticdb/version/block_key.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/* Copyright 2024 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/
#pragma once

#include <utility>
#include <arcticdb/storage/store.hpp>
#include <arcticdb/entity/key.hpp>

namespace arcticdb::version_store {

/**
* A key whose segment stores many atom keys (all of the same type).
*/
class BlockKey {
public:

/**
* Loaded from an existing block key.
*/
BlockKey(KeyType key_type, StreamId id, SegmentInMemory&& segment);

/**
* A new block key.
*/
BlockKey(KeyType key_type, StreamId id);

BlockKey(KeyType key_type, StreamId id, std::unordered_map<StreamId, AtomKey> keys);

BlockKey block_with_same_keys(StreamId new_id);

void upsert(AtomKey&& key);

/**
* Returns true iff the id was removed. False indicates that the id was not present.
*/
bool remove(const StreamId& id);

/**
* nullopt indicates that the id was not present in this block
*/
std::optional<AtomKey> read(const StreamId& id) const;

SegmentInMemory release_segment_in_memory();

KeyType key_type() const;

StreamId id() const;

private:

static KeyType expected_key_type_of_contents(const KeyType &key_type);
std::unordered_map<StreamId, AtomKey> map_from_segment(SegmentInMemory&& segment);

std::unordered_map<StreamId, AtomKey> keys_;
bool valid_{true};
KeyType block_key_type_;
StreamId id_;
KeyType expected_key_type_of_contents_;
};

/**
* Write the key to storage. Invalidates the in-memory key.
*/
void write_block_key(Store* store, BlockKey&& key);

/**
* Read the block key from storage. If the key does not exist in the storage, returns an empty BlockKey.
*/
BlockKey read_block_key(Store* store, KeyType key_type, const StreamId& id);

}
2 changes: 1 addition & 1 deletion cpp/arcticdb/version/snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void iterate_snapshots(const std::shared_ptr<Store>& store, folly::Function<void
std::unordered_set<SnapshotId> seen;

store->iterate_type(KeyType::SNAPSHOT_REF, [&snap_variant_keys, &seen](VariantKey &&vk) {
util::check(std::holds_alternative<RefKey>(vk), "Expected shapshot ref to be reference type, got {}", variant_key_view(vk));
util::check(std::holds_alternative<RefKey>(vk), "Expected snapshot ref to be reference type, got {}", variant_key_view(vk));
auto ref_key = std::get<RefKey>(std::move(vk));
seen.insert(ref_key.id());
snap_variant_keys.emplace_back(ref_key);
Expand Down
Loading

0 comments on commit 619af2a

Please # to comment.