feat: clustered segments pt.1#19460
Conversation
| @Override | ||
| public List<String> getColumnNames() | ||
| { | ||
| List<String> columns = new ArrayList<>(columnNames.size() + aggregators.length); |
| }; | ||
| } | ||
|
|
||
| private CursorHolder makeClusteredCursorHolder(CursorBuildSpec spec, ClusteredValueGroupsBaseTableSchema clusterSummary) |
| } | ||
| final Object raw = parent.currentValue(idx); | ||
| final String stringValue = raw == null ? null : String.valueOf(raw); | ||
| cachedSelector = DimensionSelector.constant(stringValue, spec.getExtractionFn()); |
| final Object raw = parent.currentValue(idx); | ||
| final String stringValue = raw == null ? null : String.valueOf(raw); | ||
| final String afterExtraction = | ||
| spec.getExtractionFn() == null ? stringValue : spec.getExtractionFn().apply(stringValue); |
| final Object raw = parent.currentValue(idx); | ||
| final String stringValue = raw == null ? null : String.valueOf(raw); | ||
| final String afterExtraction = | ||
| spec.getExtractionFn() == null ? stringValue : spec.getExtractionFn().apply(stringValue); |
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 2 |
| P2 | 0 |
| P3 | 0 |
| Total | 2 |
Reviewed 25 of 25 changed files.
This is an automated review by Codex GPT-5.5
| ); | ||
| } | ||
|
|
||
| return new QueryableIndexCursorHolder( |
There was a problem hiding this comment.
[P1] Cluster-column filters are reapplied against sub-indexes that do not contain them
After pruneClusterGroups keeps a matching group, the original CursorBuildSpec is passed unchanged into a QueryableIndexCursorHolder for the group sub-index. That sub-index intentionally does not contain the clustering columns, so EqualityFilter.getBitmapColumnIndex sees a missing column and builds an exact all-false/all-unknown bitmap before the ClusteringColumnSelectorFactory wrapper can provide the constant value to a matcher. Queries such as tenant = 'acme' therefore prune to the acme group and then scan zero rows. The multi-group supplier path repeats the same pattern, so clustering predicates need to be stripped/residualized after pruning or the group holder needs a cluster-aware ColumnIndexSelector/filter-bundle path.
| if (clusteringValues[idx] == null) { | ||
| return DruidPredicateMatch.FALSE; | ||
| } | ||
| return DruidPredicateMatch.of(Objects.equals(clusteringValues[idx], eq.getMatchValue())); |
There was a problem hiding this comment.
[P1] Pruning ignores EqualityFilter type coercion semantics
The pruner compares the stored clustering value directly with eq.getMatchValue(), but EqualityFilter normal matcher casts the literal to the input column type. For example, a LONG clustering value 5 with an EqualityFilter typed as STRING value "5" would match when evaluated by the real matcher, but Objects.equals(5L, "5") returns false here and the only live cluster group is removed before scanning. TypedInFilter has the same risk below with direct Objects.equals over the raw sorted values. The pruning check should use the same predicate/cast semantics as the filter matcher, or stay UNKNOWN when the literal type does not exactly match the clustering column type.
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 2 |
| P2 | 1 |
| P3 | 0 |
| Total | 3 |
Reviewed 27 of 27 changed files.
This is an automated review by Codex GPT-5.5
|
|
||
| for (String column : summary.getGroupColumnNames()) { | ||
| final String smooshName = Projections.getClusterGroupSegmentInternalFileName(clusteringValueIds, column); | ||
| final ByteBuffer colBuffer = segmentFileMapper.mapFile(smooshName); |
There was a problem hiding this comment.
[P1] Check descriptor before mapping sparse group column
The new clustered-group reader maps the smoosh file before checking whether the column descriptor exists. For sparse clustered groups, a logical column can be present in the summary but absent from an individual group, and the existing null-descriptor check suggests that case is expected. As written, mapFile(smooshName) fails before the code can skip the missing column, so loading such clustered segments throws instead of exposing the column as absent/null for that group.
| return new Cursor() | ||
| { | ||
| @Override | ||
| public ColumnSelectorFactory getColumnSelectorFactory() |
There was a problem hiding this comment.
[P1] Empty pruned cursor must expose selectors
When all cluster groups are pruned, this holder returns a done cursor whose getColumnSelectorFactory() always throws. Query engines commonly obtain selector factories immediately after creating a non-null cursor, even when the cursor is already done, so filters such as tenant = 'missing' can fail the query instead of returning an empty result. The empty cursor should either return null from asCursor() or provide a harmless selector factory.
|
|
||
| // canVectorize() is determined by the per-group holders. Probe the first one (lazily, `Suppliers.memoize` | ||
| // means this opens it once and is reused by ConcatenatingVectorCursor). | ||
| final boolean canVectorize = holderSuppliers.get(0).get().canVectorize(); |
There was a problem hiding this comment.
[P2] Vectorization decision only checks first group
The multi-group clustered holder reports canVectorize() based only on the first per-group holder, but each group gets a different rewritten filter. A first group whose filter folds to TrueFilter can vectorize while a later group's residual filter cannot; the engine will choose the vector path and ConcatenatingVectorCursor will later call asVectorCursor() on the non-vectorizable holder, which throws. This should require all surviving group holders to vectorize, or otherwise disable vectorization for the combined holder.
| } | ||
|
|
||
| /** | ||
| * Open the next group whose cursor has at least one row. Sets {@code currentCursor = null} when all groups are |
There was a problem hiding this comment.
nit: advance to the next group sounds slightly better and more consistent than open the next group.
should this reset delegate as well if all groups are exhausted?
| initializeIfNeeded(); | ||
| if (currentCursor == null) { | ||
| return; | ||
| } |
| initializeIfNeeded(); | ||
| if (currentCursor == null) { | ||
| return; | ||
| } |
There was a problem hiding this comment.
same here, can reuse isDone()
| * concatenating cursor on each group transition. Selectors previously returned by this factory will, on their next | ||
| * invocation, observe the updated state; see the per-call indirection in the inner selector classes. | ||
| */ | ||
| public void setDelegate(ColumnSelectorFactory delegate, Object[] clusteringValues) |
There was a problem hiding this comment.
I find how setDelegate and generation usage hard to reason. It felt like we should have all CursorHolder in this class and let it manage the advance of cursor, generation, ColumnSelectorFactory, along with the ValueMatcher all together in this class. and ClusteringColumnSelectorFactory can implement Iterable<Cursor>, WDYT?
| // Cache the rewrite for every group (including pruned ones, where it's FalseFilter) so rewriteFor doesn't | ||
| // re-walk for either the cursor factory or callers that want to inspect a pruned group's outcome directly. | ||
| final List<TableClusterGroupSpec> kept = new ArrayList<>(groups.size()); | ||
| final IdentityHashMap<TableClusterGroupSpec, Filter> rewriteCache = new IdentityHashMap<>(); |
There was a problem hiding this comment.
it felt like ClusterGroupQueryPlan doesnt have much more than this hash map, is there any other reason that we want to have a class wrap it?
| final ClusteringColumnSelectorFactory wrapperFactory = new ClusteringColumnSelectorFactory( | ||
| UNINITIALIZED_DELEGATE, | ||
| clusteringColumns, | ||
| clusteringValuesByGroup.get(0) | ||
| ); |
There was a problem hiding this comment.
i find it confusing when the delegate is not initialized but clusteringValues are.... maybe we could set ClusteringColumnSelectorFactory as nullable in ConcatenatingCursor
| } | ||
|
|
||
| final List<Map<String, Supplier<BaseColumnHolder>>> clusterGroupColumnsList; | ||
| if (isClusteredSummary) { |
There was a problem hiding this comment.
nit: maybe we could just check null for clusteredBaseSummary, and it would not require this additional boolean variable
|
|
||
| @JsonIgnore | ||
| @Override | ||
| public List<String> getColumnNames() |
There was a problem hiding this comment.
the Override doesn't seem necessary? also this function name is very similar to getColumns() except this one has the metrics, maybe we should make them a bit more distinguishable?
| private final Granularity effectiveGranularity; | ||
|
|
||
| @JsonCreator | ||
| public ClusteredValueGroupsBaseTableSchema( |
There was a problem hiding this comment.
while reading this file, i find myself mumbling words like clustering/ group too much, some personal preference for naming just for inspiration:
clusteringColumns -> groupKeySchema
clusteringDictionaries → groupKeyDictionaries
clusterGroups -> groupSpecs
getGroupColumnNames -> getGroupInternalColumnNames
getGroupDimensionNames -> getGroupInternalDimensionNames
getGroupOrdering -> getGroupInternalOrdering
| final Map<String, Supplier<BaseColumnHolder>> baseColumns; | ||
| if (isClusteredSummary) { | ||
| clusteredBaseSummary = (ClusteredValueGroupsBaseTableSchema) baseSchema; | ||
| if (clusteredBaseSummary.getSharedColumns().isEmpty()) { |
There was a problem hiding this comment.
i dont see how shared columns could be used in this PR, maybe we dont include it?
Description
This PR builds on the foundations laid by projections (#17214) and the v10 segment format (#18880) to introduce 'clustered' segments, to give operators the option to push a
CLUSTERED BYclause inside of a segment, as a companion to partitioning data is distributed between segments in this manner. Internally, the 'base' table is decmposed into separate cluster groups, which are combined together to form the 'complete' view of all rows stored in the segment via concatenation. This optimizes for use cases where the typical queries are filtering down to a small subset of the cluster groups (ideally a single grouping), which like the effect from using aggregate projections, can greatly reduce the number of rows to be scanned. The expected use cases are things like multi-tenant-with-shared datasource clusters, metrics use cases which typically filter to a single type of service, etcThis PR contains only the read side, to get feedback on the internal segment metadata shapes and query engine integration. The write side (ingestion support) will come in a follow-up PR, so this PR uses some test fixtures to exercise the read paths until segment building is actually in place.
Since this is an experimental/mostly new feature, the most important part for reviewers is the new internal segment metadata,
ClusteredValueGroupsBaseTableSchemaand its internal new stuff likeTableClusterGroupSpecandClusteringDictionaries, so that we can ensure the metadata we will be storing in the segment is "good" since changing it after segments have been written is very hard.todo: elaborate on design