Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test_with_sanitizer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ jobs:
env:
CC: clang
CXX: clang++
PAIMON_CI_LINK_JOBS: 1
run: ci/scripts/build_paimon.sh $(pwd) true
- name: Show ccache statistics
if: always()
Expand Down
14 changes: 12 additions & 2 deletions ci/scripts/build_paimon.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ enable_sanitizer=${2:-false}
check_clang_tidy=${3:-false}
build_type=${4:-Debug}
build_dir="${source_dir}/build"
build_jobs=${PAIMON_CI_BUILD_JOBS:-$(nproc)}
test_jobs=${PAIMON_CI_TEST_JOBS:-$(nproc)}
link_jobs=${PAIMON_CI_LINK_JOBS:-}

# Display ccache status if available
if command -v ccache &> /dev/null; then
Expand Down Expand Up @@ -65,9 +68,16 @@ if [[ "${enable_sanitizer}" == "true" ]]; then
)
fi

if [[ -n "${link_jobs}" ]]; then
CMAKE_ARGS+=(
"-DCMAKE_JOB_POOLS=paimon_link_pool=${link_jobs}"
"-DCMAKE_JOB_POOL_LINK=paimon_link_pool"
)
fi

cmake "${CMAKE_ARGS[@]}" "${source_dir}"
cmake --build . -- -j "$(nproc)"
ctest --output-on-failure -j "$(nproc)"
cmake --build . -- -j "${build_jobs}"
ctest --output-on-failure -j "${test_jobs}"

if [[ "${check_clang_tidy}" == "true" ]]; then
cmake --build . --target check-clang-tidy
Expand Down
1 change: 1 addition & 0 deletions docs/source/user_guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ User Guide
user_guide/snapshot
user_guide/manifest
user_guide/manifest_cache
user_guide/manifest_entry_cache
user_guide/parquet_metadata_cache
user_guide/data_types
user_guide/primary_key_table
Expand Down
88 changes: 88 additions & 0 deletions docs/source/user_guide/manifest_entry_cache.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
.. Copyright 2026-present Alibaba Inc.

.. Licensed under the Apache License, Version 2.0 (the "License");
.. you may not use this file except in compliance with the License.
.. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing, software
.. distributed under the License is distributed on an "AS IS" BASIS,
.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
.. See the License for the specific language governing permissions and
.. limitations under the License.

Manifest Entry Cache
====================

Overview
--------

Large tables may contain many manifest entries, while a scan may only need a
small subset after bucket, partition, and statistics pruning. The snapshot live
manifest entry cache reduces repeated manifest decoding cost for successive full
scans that target the same bucket.

The cache stores decoded and merged live manifest entries by table path, branch,
and bucket for ``ScanMode::ALL``. Each cache value can retain several snapshot
results for that bucket. Exact snapshot hits are served from the cache; cache
misses rebuild the target snapshot bucket from the target snapshot's data
manifests and store the rebuilt live entries.

Request-specific filters are not stored in the cache. Partition, level, and
predicate filters are still evaluated for each scan, so cached entries can be
reused safely across different scan predicates for the same bucket.

Configuration
-------------

Manifest entry caching reuses the cache instance provided by
``ScanContextBuilder::WithCache()`` and stores bucket-scoped snapshot entries
under
``CacheKind::SNAPSHOT_LIVE_MANIFEST``:

.. code-block:: cpp

auto cache = std::make_shared<LruCache>(128 * 1024 * 1024);
ScanContextBuilder context_builder(table_path);
PAIMON_ASSIGN_OR_RAISE(
std::unique_ptr<ScanContext> scan_context,
context_builder
.WithCache(cache)
.AddOption(Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS, "3")
.Finish());

Cache entries are scoped by table path, branch, and bucket, so they can be
reused across newly created ``TableScan`` and ``FileStoreScan`` instances as
long as they share the same cache object and scan the same bucket.

``Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS`` controls how many snapshot
results are retained in each table/branch/bucket cache value. Older snapshots in
the same bucket are evicted first. The default value is ``0``, which disables
the cache path. Set it to a positive value to enable the cache when
``ScanContextBuilder::WithCache()`` is also configured. Physical cache eviction
is still controlled by the configured ``Cache`` implementation, for example the
capacity of ``LruCache``.

If no cache is provided through ``ScanContextBuilder::WithCache()``, this
optimization is skipped. The snapshot manifest entry cache shares the same
``Cache`` interface with raw manifest and data-file footer caches, but it uses a
dedicated ``CacheKind`` and a table/branch/bucket key instead of file byte
ranges.

Limitations
-----------

The cache is currently used only for ``ScanMode::ALL`` scans that can determine
a single target bucket. It is skipped for scans without a bucket filter because
reading or deserializing all buckets would be too expensive for selective
queries. It is also skipped for row-range scans because row-range pruning is
applied at manifest-meta level.

Metrics
-------

The scan metrics expose existing counters for the last scan:

- ``lastScannedManifests``: how many manifest files were loaded during this
scan before manifest entry decoding.
4 changes: 4 additions & 0 deletions include/paimon/cache/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ enum class CacheKind {
DEFAULT,
MANIFEST,
DATA_FILE_FOOTER,
SNAPSHOT_LIVE_MANIFEST,
};

class PAIMON_EXPORT CacheKey {
Expand All @@ -41,6 +42,9 @@ class PAIMON_EXPORT CacheKey {
int32_t length, bool is_index);
static std::shared_ptr<CacheKey> ForKind(const std::string& file_path, int64_t position,
int32_t length, CacheKind kind);
static std::shared_ptr<CacheKey> ForSnapshotLiveManifestEntries(const std::string& table_path,
const std::string& branch,
int32_t bucket);

public:
virtual ~CacheKey() = default;
Expand Down
5 changes: 5 additions & 0 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ struct PAIMON_EXPORT Options {
/// "latest-full", "latest", "from-snapshot", "from-snapshot-full". Default value is "default".
static const char SCAN_MODE[];

/// "scan.manifest-entry-cache.max-snapshots" - Maximum number of snapshot live manifest entry
/// results retained per table, branch, and bucket. Setting it to 0 disables manifest entry
/// cache. Default value is 0.
static const char SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS[];

/// "read.batch-size" - Read batch size for any file format if it supports.
/// The default value is 1024.
static const char READ_BATCH_SIZE[];
Expand Down
1 change: 1 addition & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ set(PAIMON_CORE_SRCS
core/operation/raw_file_split_read.cpp
core/operation/read_context.cpp
core/operation/scan_context.cpp
core/manifest/snapshot_live_manifest_entries.cpp
core/operation/write_context.cpp
core/operation/write_restore.cpp
core/postpone/postpone_bucket_writer.cpp
Expand Down
2 changes: 2 additions & 0 deletions src/paimon/common/defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const char Options::SOURCE_SPLIT_TARGET_SIZE[] = "source.split.target-size";
const char Options::SOURCE_SPLIT_OPEN_FILE_COST[] = "source.split.open-file-cost";
const char Options::SCAN_SNAPSHOT_ID[] = "scan.snapshot-id";
const char Options::SCAN_MODE[] = "scan.mode";
const char Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS[] =
"scan.manifest-entry-cache.max-snapshots";
const char Options::READ_BATCH_SIZE[] = "read.batch-size";
const char Options::WRITE_BATCH_SIZE[] = "write.batch-size";
const char Options::WRITE_BUFFER_SIZE[] = "write-buffer-size";
Expand Down
49 changes: 49 additions & 0 deletions src/paimon/common/io/cache/cache_key.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,49 @@
#include "paimon/common/io/cache/cache_key.h"

namespace paimon {
namespace {

class SnapshotLiveManifestEntriesCacheKey : public CacheKey {
public:
SnapshotLiveManifestEntriesCacheKey(const std::string& table_path, const std::string& branch,
int32_t bucket)
: CacheKey(CacheKind::SNAPSHOT_LIVE_MANIFEST),
table_path_(table_path),
branch_(branch),
bucket_(bucket) {}

bool IsIndex() const override {
return false;
}

bool Equals(const CacheKey& other) const override {
const auto* rhs = dynamic_cast<const SnapshotLiveManifestEntriesCacheKey*>(&other);
if (!rhs) {
return false;
}
return table_path_ == rhs->table_path_ && branch_ == rhs->branch_ &&
bucket_ == rhs->bucket_ && GetKind() == rhs->GetKind();
}

size_t HashCode() const override {
size_t seed = 0;
seed ^= std::hash<std::string>{}(table_path_) + HASH_CONSTANT + (seed << 6) + (seed >> 2);
seed ^= std::hash<std::string>{}(branch_) + HASH_CONSTANT + (seed << 6) + (seed >> 2);
seed ^= std::hash<int32_t>{}(bucket_) + HASH_CONSTANT + (seed << 6) + (seed >> 2);
seed ^= std::hash<int32_t>{}(static_cast<int32_t>(GetKind())) + HASH_CONSTANT +
(seed << 6) + (seed >> 2);
return seed;
}

private:
static constexpr uint64_t HASH_CONSTANT = 0x9e3779b97f4a7c15ULL;

const std::string table_path_;
const std::string branch_;
const int32_t bucket_;
};

} // namespace

std::shared_ptr<CacheKey> CacheKey::ForPosition(const std::string& file_path, int64_t position,
int32_t length, bool is_index) {
Expand All @@ -31,6 +74,12 @@ std::shared_ptr<CacheKey> CacheKey::ForKind(const std::string& file_path, int64_
return key;
}

std::shared_ptr<CacheKey> CacheKey::ForSnapshotLiveManifestEntries(const std::string& table_path,
const std::string& branch,
int32_t bucket) {
return std::make_shared<SnapshotLiveManifestEntriesCacheKey>(table_path, branch, bucket);
}

bool PositionCacheKey::IsIndex() const {
return is_index_;
}
Expand Down
17 changes: 17 additions & 0 deletions src/paimon/common/io/cache/lru_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,23 @@ TEST_F(LruCacheTest, TestForKindSetsKeyKind) {
ASSERT_EQ(CacheKind::MANIFEST, put_key->GetKind());
}

TEST_F(LruCacheTest, TestForSnapshotLiveManifestEntries) {
auto main_key = CacheKey::ForSnapshotLiveManifestEntries("table_path", "main", 0);
auto same_key = CacheKey::ForSnapshotLiveManifestEntries("table_path", "main", 0);
auto branch_key = CacheKey::ForSnapshotLiveManifestEntries("table_path", "dev", 0);
auto table_key = CacheKey::ForSnapshotLiveManifestEntries("other_table_path", "main", 0);
auto bucket_key = CacheKey::ForSnapshotLiveManifestEntries("table_path", "main", 1);
auto hash_in_path_key = CacheKey::ForSnapshotLiveManifestEntries("table#path", "main", 0);
auto hash_in_branch_key = CacheKey::ForSnapshotLiveManifestEntries("table", "path#main", 0);

ASSERT_EQ(CacheKind::SNAPSHOT_LIVE_MANIFEST, main_key->GetKind());
ASSERT_TRUE(CacheKeyEqual()(main_key, same_key));
ASSERT_FALSE(CacheKeyEqual()(main_key, branch_key));
ASSERT_FALSE(CacheKeyEqual()(main_key, table_key));
ASSERT_FALSE(CacheKeyEqual()(main_key, bucket_key));
ASSERT_FALSE(CacheKeyEqual()(hash_in_path_key, hash_in_branch_key));
}

/// Verifies that multiple evictions happen when a single large entry is inserted.
TEST_F(LruCacheTest, TestMultipleEvictions) {
LruCache cache(300);
Expand Down
7 changes: 6 additions & 1 deletion src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ PrefetchFileBatchReaderImpl::~PrefetchFileBatchReaderImpl() {
Status PrefetchFileBatchReaderImpl::SetReadSchema(
::ArrowSchema* read_schema, const std::shared_ptr<Predicate>& predicate,
const std::optional<RoaringBitmap32>& selection_bitmap) {
PAIMON_RETURN_NOT_OK(CleanUp());
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Schema> schema,
arrow::ImportSchema(read_schema));
for (const auto& reader : readers_) {
Expand All @@ -162,11 +163,15 @@ Status PrefetchFileBatchReaderImpl::SetReadSchema(
}
selection_bitmap_ = selection_bitmap;
predicate_ = predicate;
return RefreshReadRanges();
return RefreshReadRangesInternal();
}

Status PrefetchFileBatchReaderImpl::RefreshReadRanges() {
PAIMON_RETURN_NOT_OK(CleanUp());
return RefreshReadRangesInternal();
}

Status PrefetchFileBatchReaderImpl::RefreshReadRangesInternal() {
bool need_prefetch;
PAIMON_ASSIGN_OR_RAISE(auto read_ranges, readers_[0]->GenReadRanges(&need_prefetch));

Expand Down
1 change: 1 addition & 0 deletions src/paimon/common/reader/prefetch_file_batch_reader_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
PrefetchCacheMode cache_mode);

Status CleanUp();
Status RefreshReadRangesInternal();
void Workloop();
void SetReadStatus(const Status& status);
Status GetReadStatus() const;
Expand Down
13 changes: 13 additions & 0 deletions src/paimon/core/core_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ struct CoreOptions::Impl {
int32_t bucket = -1;

int32_t manifest_merge_min_count = 30;
int32_t scan_manifest_entry_cache_max_snapshots = 0;
int32_t read_batch_size = 1024;
int32_t write_batch_size = 1024;
int32_t local_sort_max_num_file_handles = 128;
Expand Down Expand Up @@ -717,6 +718,13 @@ struct CoreOptions::Impl {
}
// Parse scan.mode - scanning behavior of the source, default "default"
PAIMON_RETURN_NOT_OK(parser.ParseStartupMode(&startup_mode));
// Parse scan.manifest-entry-cache.max-snapshots - cached snapshots per bucket.
PAIMON_RETURN_NOT_OK(parser.Parse(Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS,
&scan_manifest_entry_cache_max_snapshots));
if (scan_manifest_entry_cache_max_snapshots < 0) {
return Status::Invalid(fmt::format("{} must be non-negative",
Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS));
}
// Parse scan.fallback-branch - fallback branch when partition not found
PAIMON_RETURN_NOT_OK(parser.Parse(Options::SCAN_FALLBACK_BRANCH, &scan_fallback_branch));
// Parse branch - branch name, default "main"
Expand Down Expand Up @@ -968,6 +976,11 @@ std::optional<int64_t> CoreOptions::GetScanSnapshotId() const {
std::optional<int64_t> CoreOptions::GetScanTimestampMillis() const {
return impl_->scan_timestamp_millis;
}

int32_t CoreOptions::GetScanManifestEntryCacheMaxSnapshots() const {
return impl_->scan_manifest_entry_cache_max_snapshots;
}

int64_t CoreOptions::GetManifestTargetFileSize() const {
return impl_->manifest_target_file_size;
}
Expand Down
1 change: 1 addition & 0 deletions src/paimon/core/core_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class PAIMON_EXPORT CoreOptions {
int64_t GetSourceSplitOpenFileCost() const;
std::optional<int64_t> GetScanSnapshotId() const;
std::optional<int64_t> GetScanTimestampMillis() const;
int32_t GetScanManifestEntryCacheMaxSnapshots() const;

int64_t GetManifestTargetFileSize() const;
std::shared_ptr<Cache> GetCache() const;
Expand Down
6 changes: 6 additions & 0 deletions src/paimon/core/core_options_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ TEST(CoreOptionsTest, TestDefaultValue) {
ASSERT_EQ(8 * 1024 * 1024L, core_options.GetManifestTargetFileSize());
ASSERT_EQ(16 * 1024 * 1024L, core_options.GetManifestFullCompactionThresholdSize());
ASSERT_EQ(30, core_options.GetManifestMergeMinCount());
ASSERT_EQ(0, core_options.GetScanManifestEntryCacheMaxSnapshots());
ASSERT_EQ(nullptr, core_options.GetCache());
ASSERT_EQ(128 * 1024 * 1024L, core_options.GetSourceSplitTargetSize());
ASSERT_EQ(4 * 1024 * 1024L, core_options.GetSourceSplitOpenFileCost());
Expand Down Expand Up @@ -186,6 +187,7 @@ TEST(CoreOptionsTest, TestFromMap) {
{Options::COMMIT_MAX_RETRIES, "20"},
{Options::SCAN_SNAPSHOT_ID, "5"},
{Options::SCAN_MODE, "from-snapshot-full"},
{Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS, "7"},
{Options::SNAPSHOT_NUM_RETAINED_MIN, "15"},
{Options::SNAPSHOT_NUM_RETAINED_MAX, "30"},
{Options::SNAPSHOT_EXPIRE_LIMIT, "20"},
Expand Down Expand Up @@ -308,6 +310,7 @@ TEST(CoreOptionsTest, TestFromMap) {
ASSERT_EQ(120 * 1000, core_options.GetCommitTimeout());
ASSERT_EQ(20, core_options.GetCommitMaxRetries());
ASSERT_EQ(5, core_options.GetScanSnapshotId().value_or(-1));
ASSERT_EQ(7, core_options.GetScanManifestEntryCacheMaxSnapshots());
ExpireConfig expire_config = core_options.GetExpireConfig();
ASSERT_EQ(15, expire_config.GetSnapshotRetainMin());
ASSERT_EQ(30, expire_config.GetSnapshotRetainMax());
Expand Down Expand Up @@ -437,6 +440,9 @@ TEST(CoreOptionsTest, TestInvalidCase) {
"invalid lookup mode: invalid");
ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::LOOKUP_COMPACT_MAX_INTERVAL, "invalid"}}),
"Invalid Config [lookup-compact.max-interval: invalid]");
ASSERT_NOK_WITH_MSG(
CoreOptions::FromMap({{Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS, "-1"}}),
"scan.manifest-entry-cache.max-snapshots must be non-negative");
ASSERT_NOK_WITH_MSG(
CoreOptions::FromMap({{Options::LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO, "1.1"}}),
"The high priority pool ratio should in the range [0, 1), while input is 1.1");
Expand Down
Loading
Loading