[core] Support arbitrary time granularity for chain table delta computation#8185
[core] Support arbitrary time granularity for chain table delta computation#8185juntaozhang wants to merge 11 commits into
Conversation
| String fingerprint = DateTimeFormatter.ofPattern(formatter).format(FINGERPRINT); | ||
|
|
||
| checkArgument( | ||
| fingerprint.length() == formatter.length(), |
There was a problem hiding this comment.
This rejects valid partition.timestamp-formatter values that use DateTimeFormatter quoting, for example yyyyMMdd'T'HHmm. PartitionTimeExtractor and calPartValues can handle a pattern such as $dtT$hm with that formatter, but this check compares the formatted output length (20260609T1150) with the raw formatter pattern length including quote characters, so chain reads fail before delta partitions are enumerated. Please parse formatter fields without relying on raw pattern positions, or validate/reject these formatter patterns consistently before they can be used.
There was a problem hiding this comment.
Thanks for the review. Fixed and refactored by following steps:
- Parse pattern and formatter into tokens
- Match pattern tokens to format tokens recursively
- Compute each variable's start/end position in the formatted output
- Extract partition values by slicing the formatted datetime string
There was a problem hiding this comment.
ChainTableUtils.calPartValues relied on regex extraction which couldn't handle patterns like
$dtT$hm with quoted literals in the formatter (e.g., yyyyMMdd'T'HHmm).
The new ChainPartitionPatternResolver.calPartValues establishes mapping between pattern variables and formatter tokens, so $dt maps to yyyyMMdd and $hm maps to HHmm directly.
|
|
||
| private static final LocalDateTime FINGERPRINT = LocalDateTime.of(2026, 6, 9, 11, 50, 58); | ||
| private static final String TIME_UNIT_CHARS = "yMdHhmsS"; | ||
| private static final Pattern VARIABLE = Pattern.compile("\\$[a-zA-Z_]+"); |
There was a problem hiding this comment.
This variable regex stops before digits, so a valid partition column like dt1 is parsed as variable $dt plus a constant 1. For partition.timestamp-pattern = "$dt1" and formatter yyyyMMdd, the extractor then treats the day digit as a fixed constant and returns a monthly step, causing getDeltaPartitions to generate only month boundaries and miss daily delta partitions. Please include the same identifier characters that partition fields can use, e.g. digits after the first character, when splitting timestamp-pattern variables.
There was a problem hiding this comment.
Thanks for the review. This is already fixed — parsePattern() matches the full partition column name from partitionColumns, so $dt1 is correctly parsed as a single variable.
| public class ChainPartitionStepExtractor { | ||
|
|
||
| private static final LocalDateTime FINGERPRINT = LocalDateTime.of(2026, 6, 9, 11, 50, 58); | ||
| private static final String TIME_UNIT_CHARS = "yMdHhmsS"; |
There was a problem hiding this comment.
S is advertised as a time unit here, but resolveField cannot map fractional seconds: the fingerprint has zero nanos, so a formatter containing SSS produces 000, and Long.parseLong("000") does not match any year/month/day/hour/minute/second value. A valid Java formatter such as yyyyMMddHHmmssSSS will therefore throw Unknown time unit value: 000 during chain delta planning. Please either implement a fractional-second step or remove/reject S consistently instead of accepting it in TIME_UNIT_CHARS.
There was a problem hiding this comment.
Fixed. S has been removed from supported time unit chars.
| .collectAsList().stream() | ||
| .map(Row::toString) | ||
| .collect(Collectors.toList())) | ||
| .containsExactlyInAnyOrder( |
There was a problem hiding this comment.
This new minute-level scenario currently fails locally with mvn -pl paimon-spark/paimon-spark-ut -am -Pfast-build -DfailIfNoTests=false -Dtest=SparkChainTableITCase#testChainTableWithMinuteLevelPartitions test: the query for dt=20250810 and hr_min=03:40 also returns [7,1,7,20250810,03:40] from the 03:45 delta partition. That means the chain planner is still reading delta partitions after the requested chain partition, so this feature can expose future delta rows when a partition filter is present. Please tighten the delta partition selection to (anchor, requested] for the actual requested partition.
There was a problem hiding this comment.
Thanks for the detailed explanation. After refactored, I rebased to master and the UT passed on my local:
[INFO] --- maven-compiler-plugin:3.8.0:testCompile (default-testCompile) @ paimon-spark-ut_2.12 ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- maven-jar-plugin:3.4.2:test-jar (prepare-test-jar) @ paimon-spark-ut_2.12 ---
[INFO] Building jar: /Users/juntao/src/github.com/apache/paimon/paimon-spark/paimon-spark-ut/target/paimon-spark-ut_2.12-1.5-SNAPSHOT-tests.jar
[INFO]
[INFO] --- maven-surefire-plugin:3.0.0-M5:test (default-test) @ paimon-spark-ut_2.12 ---
[INFO]
[INFO] -------------------------------------------------------
[INFO] T E S T S
[INFO] -------------------------------------------------------
[INFO] Running org.apache.paimon.spark.SparkChainTableITCase
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 22.931 s - in org.apache.paimon.spark.SparkChainTableITCase
b11e9c2 to
156d471
Compare
|
Hi @JingsongLi, Could you please help review another change. I've refactored |
| // pattern token | ||
| int maxLen = formatTokens.size() - formatIdx - (patternTokens.size() - patternIdx - 1); | ||
|
|
||
| for (int len = 1; len <= maxLen; len++) { |
There was a problem hiding this comment.
This accepts adjacent variables but the matching is shortest-first, so undelimited patterns are split incorrectly when resolving values back from a formatted timestamp. For example, with partition columns [dt, hour], pattern $dt$hour, and formatter yyyyMMddHH, the first variable is mapped only to yyyy and the second to MMddHH. resolvePartitionValues(2025-08-10T03:00) would produce dt=2025, hour=081003 instead of dt=20250810, hour=03, so getDeltaPartitions generates partitions that do not exist and chain reads can miss delta data. Please either make adjacent-variable mapping unambiguous/correct or reject such patterns up front.
There was a problem hiding this comment.
Thanks for the valuable feedback. Although for (int len = maxLen; len > 0; len--) can easily handle cases like
yyyyMMddHH, I don't think it's wise to let users configure an ambiguous pattern.
5623189 to
1d2a7cd
Compare
1d2a7cd to
2a7f021
Compare
| partCursor < partitionColumns.size(), | ||
| "Extra variable in pattern, exceed partitionColumns count"); | ||
| String part = curr + partitionColumns.get(partCursor); | ||
| checkArgument(pattern.substring(cursor).startsWith(part)); |
There was a problem hiding this comment.
This makes the resolver require variables in partition.timestamp-pattern to appear in exactly the same order as partitionColumns. That is stricter than the existing PartitionTimeExtractor, which replaces variables by name, and chain-table schema validation does not reject a reordered but valid pattern. For example, partition keys (dt, hour) with pattern $hour $dt and formatter HH yyyyMMdd used to parse to 2025-08-10T05:00, but new PartitionTimeResolver(Arrays.asList("dt", "hour"), "$hour $dt", "HH yyyyMMdd") now fails here before chain delta planning starts. Please parse the variable name from the pattern instead of consuming partitionColumns positionally, or reject this order explicitly during table validation.
There was a problem hiding this comment.
Thanks for the guidance.
-
This is already supported. The new
parsePattern()matches variables by name rather than by position,
andparsePartitionValues()resolves values from a column-name map, so a pattern like$hour $dtworks regardless of the order in partitionColumns. -
I still use
partitionColumnshere because regex cannot resolve the ambiguity in cases like$dt01, where it is impossible to tell whetherdt01is a single column name ordtfollowed by literal01. -
Other cases from
PartitionTimeExtractor, such as default pattern/formatter handling, will be fully migrated in the next PR.
| tokens.add(new LiteralToken(str.replace("''", "'"), start, pos + 1)); | ||
| } | ||
| } else { | ||
| tokens.add(new LiteralToken(String.valueOf(c), pos, pos + 1)); |
There was a problem hiding this comment.
This still silently accepts unsupported DateTimeFormatter pattern letters as literals. For example, after removing S from FIELD_MAP, a formatter such as yyyyMMddHHmmssSSS is tokenized with three literal S tokens here, but DateTimeFormatter will format them as fractional-second digits. If $dt consumes the whole formatter, extractMinStep() only sees seconds and chain delta planning advances by one second while generating values like ...000, so millisecond partitions are skipped instead of being rejected. Please reject unquoted formatter pattern letters that are not in FIELD_MAP (or implement their step semantics), rather than treating them as literal text.
| .collectAsList().stream() | ||
| .map(Row::toString) | ||
| .collect(Collectors.toList())) | ||
| .containsExactlyInAnyOrder( |
There was a problem hiding this comment.
Blocking: this new test is currently failing on the PR head. Running mvn -pl paimon-spark/paimon-spark-ut -am -Pfast-build -DfailIfNoTests=false -Dtest=SparkChainTableITCase#testChainTableWithMinuteLevelPartitions test still returns an extra [7,1,7,20250810,03:40] row from the 03:45 delta partition for the 03:40 query. That means chain planning can still read future delta partitions after the requested chain partition, so please tighten the selected delta partitions to the actual (anchor, requested] range before merging.
| Map<FormatToken, Integer> startPositions = new LinkedHashMap<>(); | ||
| for (FormatToken token : formatTokens) { | ||
| startPositions.put(token, pos); | ||
| pos += token.getLength(); |
There was a problem hiding this comment.
Blocking: this offset calculation assumes every formatter token emits exactly token.getLength() characters, but Java DateTimeFormatter allows variable-width fields such as yyyy-M-d and H:m:s. On this PR head, new PartitionTimeResolver(Arrays.asList("dt"), "$dt", "yyyy-M-d").resolvePartitionValues(LocalDateTime.of(2026, 12, 31, 0, 0)) returns {dt=2026-12-}, truncating the day. Please either compute spans from the actual formatted output or reject variable-width formatter patterns during table validation.
be21acf to
867f26e
Compare
Purpose
#8184
Tests