Skip to content

feat: clustered segments pt.1#19460

Open
clintropolis wants to merge 4 commits into
apache:masterfrom
clintropolis:clustered-segments
Open

feat: clustered segments pt.1#19460
clintropolis wants to merge 4 commits into
apache:masterfrom
clintropolis:clustered-segments

Conversation

@clintropolis
Copy link
Copy Markdown
Member

Description

This PR builds on the foundations laid by projections (#17214) and the v10 segment format (#18880) to introduce 'clustered' segments, to give operators the option to push a CLUSTERED BY clause inside of a segment, as a companion to partitioning data is distributed between segments in this manner. Internally, the 'base' table is decmposed into separate cluster groups, which are combined together to form the 'complete' view of all rows stored in the segment via concatenation. This optimizes for use cases where the typical queries are filtering down to a small subset of the cluster groups (ideally a single grouping), which like the effect from using aggregate projections, can greatly reduce the number of rows to be scanned. The expected use cases are things like multi-tenant-with-shared datasource clusters, metrics use cases which typically filter to a single type of service, etc

This PR contains only the read side, to get feedback on the internal segment metadata shapes and query engine integration. The write side (ingestion support) will come in a follow-up PR, so this PR uses some test fixtures to exercise the read paths until segment building is actually in place.

Since this is an experimental/mostly new feature, the most important part for reviewers is the new internal segment metadata, ClusteredValueGroupsBaseTableSchema and its internal new stuff like TableClusterGroupSpec and ClusteringDictionaries, so that we can ensure the metadata we will be storing in the segment is "good" since changing it after segments have been written is very hard.

todo: elaborate on design

@Override
public List<String> getColumnNames()
{
List<String> columns = new ArrayList<>(columnNames.size() + aggregators.length);
};
}

private CursorHolder makeClusteredCursorHolder(CursorBuildSpec spec, ClusteredValueGroupsBaseTableSchema clusterSummary)
}
final Object raw = parent.currentValue(idx);
final String stringValue = raw == null ? null : String.valueOf(raw);
cachedSelector = DimensionSelector.constant(stringValue, spec.getExtractionFn());
final Object raw = parent.currentValue(idx);
final String stringValue = raw == null ? null : String.valueOf(raw);
final String afterExtraction =
spec.getExtractionFn() == null ? stringValue : spec.getExtractionFn().apply(stringValue);
final Object raw = parent.currentValue(idx);
final String stringValue = raw == null ? null : String.valueOf(raw);
final String afterExtraction =
spec.getExtractionFn() == null ? stringValue : spec.getExtractionFn().apply(stringValue);
Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 2
P2 0
P3 0
Total 2

Reviewed 25 of 25 changed files.


This is an automated review by Codex GPT-5.5

);
}

return new QueryableIndexCursorHolder(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Cluster-column filters are reapplied against sub-indexes that do not contain them

After pruneClusterGroups keeps a matching group, the original CursorBuildSpec is passed unchanged into a QueryableIndexCursorHolder for the group sub-index. That sub-index intentionally does not contain the clustering columns, so EqualityFilter.getBitmapColumnIndex sees a missing column and builds an exact all-false/all-unknown bitmap before the ClusteringColumnSelectorFactory wrapper can provide the constant value to a matcher. Queries such as tenant = 'acme' therefore prune to the acme group and then scan zero rows. The multi-group supplier path repeats the same pattern, so clustering predicates need to be stripped/residualized after pruning or the group holder needs a cluster-aware ColumnIndexSelector/filter-bundle path.

if (clusteringValues[idx] == null) {
return DruidPredicateMatch.FALSE;
}
return DruidPredicateMatch.of(Objects.equals(clusteringValues[idx], eq.getMatchValue()));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Pruning ignores EqualityFilter type coercion semantics

The pruner compares the stored clustering value directly with eq.getMatchValue(), but EqualityFilter normal matcher casts the literal to the input column type. For example, a LONG clustering value 5 with an EqualityFilter typed as STRING value "5" would match when evaluated by the real matcher, but Objects.equals(5L, "5") returns false here and the only live cluster group is removed before scanning. TypedInFilter has the same risk below with direct Objects.equals over the raw sorted values. The pruning check should use the same predicate/cast semantics as the filter matcher, or stay UNKNOWN when the literal type does not exactly match the clustering column type.

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 2
P2 1
P3 0
Total 3

Reviewed 27 of 27 changed files.


This is an automated review by Codex GPT-5.5


for (String column : summary.getGroupColumnNames()) {
final String smooshName = Projections.getClusterGroupSegmentInternalFileName(clusteringValueIds, column);
final ByteBuffer colBuffer = segmentFileMapper.mapFile(smooshName);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Check descriptor before mapping sparse group column

The new clustered-group reader maps the smoosh file before checking whether the column descriptor exists. For sparse clustered groups, a logical column can be present in the summary but absent from an individual group, and the existing null-descriptor check suggests that case is expected. As written, mapFile(smooshName) fails before the code can skip the missing column, so loading such clustered segments throws instead of exposing the column as absent/null for that group.

return new Cursor()
{
@Override
public ColumnSelectorFactory getColumnSelectorFactory()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Empty pruned cursor must expose selectors

When all cluster groups are pruned, this holder returns a done cursor whose getColumnSelectorFactory() always throws. Query engines commonly obtain selector factories immediately after creating a non-null cursor, even when the cursor is already done, so filters such as tenant = 'missing' can fail the query instead of returning an empty result. The empty cursor should either return null from asCursor() or provide a harmless selector factory.


// canVectorize() is determined by the per-group holders. Probe the first one (lazily, `Suppliers.memoize`
// means this opens it once and is reused by ConcatenatingVectorCursor).
final boolean canVectorize = holderSuppliers.get(0).get().canVectorize();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Vectorization decision only checks first group

The multi-group clustered holder reports canVectorize() based only on the first per-group holder, but each group gets a different rewritten filter. A first group whose filter folds to TrueFilter can vectorize while a later group's residual filter cannot; the engine will choose the vector path and ConcatenatingVectorCursor will later call asVectorCursor() on the non-vectorizable holder, which throws. This should require all surviving group holders to vectorize, or otherwise disable vectorization for the combined holder.

}

/**
* Open the next group whose cursor has at least one row. Sets {@code currentCursor = null} when all groups are
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: advance to the next group sounds slightly better and more consistent than open the next group.
should this reset delegate as well if all groups are exhausted?

Comment on lines +109 to +112
initializeIfNeeded();
if (currentCursor == null) {
return;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can reuse isDone()

Comment on lines +122 to +125
initializeIfNeeded();
if (currentCursor == null) {
return;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, can reuse isDone()

* concatenating cursor on each group transition. Selectors previously returned by this factory will, on their next
* invocation, observe the updated state; see the per-call indirection in the inner selector classes.
*/
public void setDelegate(ColumnSelectorFactory delegate, Object[] clusteringValues)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find how setDelegate and generation usage hard to reason. It felt like we should have all CursorHolder in this class and let it manage the advance of cursor, generation, ColumnSelectorFactory, along with the ValueMatcher all together in this class. and ClusteringColumnSelectorFactory can implement Iterable<Cursor>, WDYT?

// Cache the rewrite for every group (including pruned ones, where it's FalseFilter) so rewriteFor doesn't
// re-walk for either the cursor factory or callers that want to inspect a pruned group's outcome directly.
final List<TableClusterGroupSpec> kept = new ArrayList<>(groups.size());
final IdentityHashMap<TableClusterGroupSpec, Filter> rewriteCache = new IdentityHashMap<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it felt like ClusterGroupQueryPlan doesnt have much more than this hash map, is there any other reason that we want to have a class wrap it?

Comment on lines +284 to +288
final ClusteringColumnSelectorFactory wrapperFactory = new ClusteringColumnSelectorFactory(
UNINITIALIZED_DELEGATE,
clusteringColumns,
clusteringValuesByGroup.get(0)
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i find it confusing when the delegate is not initialized but clusteringValues are.... maybe we could set ClusteringColumnSelectorFactory as nullable in ConcatenatingCursor

}

final List<Map<String, Supplier<BaseColumnHolder>>> clusterGroupColumnsList;
if (isClusteredSummary) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe we could just check null for clusteredBaseSummary, and it would not require this additional boolean variable


@JsonIgnore
@Override
public List<String> getColumnNames()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the Override doesn't seem necessary? also this function name is very similar to getColumns() except this one has the metrics, maybe we should make them a bit more distinguishable?

private final Granularity effectiveGranularity;

@JsonCreator
public ClusteredValueGroupsBaseTableSchema(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while reading this file, i find myself mumbling words like clustering/ group too much, some personal preference for naming just for inspiration:

clusteringColumns -> groupKeySchema
clusteringDictionaries → groupKeyDictionaries
clusterGroups -> groupSpecs
getGroupColumnNames -> getGroupInternalColumnNames
getGroupDimensionNames -> getGroupInternalDimensionNames
getGroupOrdering -> getGroupInternalOrdering

final Map<String, Supplier<BaseColumnHolder>> baseColumns;
if (isClusteredSummary) {
clusteredBaseSummary = (ClusteredValueGroupsBaseTableSchema) baseSchema;
if (clusteredBaseSummary.getSharedColumns().isEmpty()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont see how shared columns could be used in this PR, maybe we dont include it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants