-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-54443][SS] Integrate PartitionKeyExtractor in Re-partition reader #53459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
4330018 to
996e27b
Compare
|
|
||
| val stateVarInfo: Option[TransformWithStateVariableInfo] = if ( | ||
| sourceOptions.internalOnlyReadAllColumnFamilies) { | ||
| stateStoreReaderInfo.allColumnFamiliesReaderInfo.stateVariableInfos.headOption |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use first schema in stateVariableInfos as a default schema to get partitionKeySchema in SchemaUtil
996e27b to
d9a88b3
Compare
reduce duplicate code all tests pass
d9a88b3 to
d234a97
Compare
| val stateVarInfo = stateVarInfoList.head | ||
| transformWithStateVariableInfoOpt = Some(stateVarInfo) | ||
| } | ||
| var stateVarInfoList = operatorProperties.stateVariables |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same as previous version exception for indentation. We can now assign a transformWithStateVariableInfoOpt because stateVarName will always be a "valid" value after line 323 change
| (resultSchema.keySchema, resultSchema.valueSchema) | ||
| } | ||
|
|
||
| val stateVarInfo = stateStoreReaderInfo.transformWithStateVariableInfoOpt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why change to val? it is only used once below
| storeMetadata: Array[StateMetadataTableEntry]): Option[Int] = { | ||
| if (storeMetadata.nonEmpty && | ||
| storeMetadata.head.operatorName == StatefulOperatorsUtils.SYMMETRIC_HASH_JOIN_EXEC_OP_NAME) { | ||
| Some(session.conf.get(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should read this from the current batch offset seq conf instead. buildStateStoreConf does similar.
The session here doesn't include the confs written in checkpoint, so can return wrong value
| s"for state variable $stateVarName in operator ${sourceOptions.operatorId}") | ||
| val stateVarInfo = stateVarInfoList.head | ||
| transformWithStateVariableInfoOpt = Some(stateVarInfo) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: new line
| } | ||
| var stateVarInfoList = operatorProperties.stateVariables | ||
| .filter(stateVar => stateVar.stateName == stateVarName) | ||
| if (stateVarInfoList.isEmpty && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need this anymore right. Since it won't be empty
| stateSchemaProvider, | ||
| joinColFamilyOpt, | ||
| AllColumnFamiliesReaderInfo(stateStoreColFamilySchemas, stateVariableInfos) | ||
| AllColumnFamiliesReaderInfo( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we always populating this? even when allCF reader is off
|
|
||
| // Convert normal data to bytes | ||
| val normalAsBytes = normalDf.toSeq.map { row => | ||
| // Convert normal data to (partitionKeyStr, keyBytes, valueBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean partitionKeyStruct instead?
|
|
||
| // Extract raw bytes from bytes read data (no deserialization/reserialization) | ||
| val bytesAsBytes = filteredBytesData.map { row => | ||
| // Extract (partitionKeyStr, keyBytes, valueBytes) from bytes read data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, partitionKeyStruct?
| s"Size mismatch: normal has ${normalSorted.length}, bytes has ${bytesSorted.length}") | ||
|
|
||
| // Compare each pair | ||
| // Compare each tuple (partitionKeyStr, keyBytes, valueBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| "partition_id", | ||
| "STRUCT(key AS groupingKey, expiration_timestamp_ms AS key)", | ||
| "NULL AS value") | ||
| // Partition key should be just the grouping key, not the composite (key, timestamp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move the comment to be close to partitionKeyExtractor = below. Same for others below. It is best for comments to be close to what they are describing. Also helps when updating the code later to easily see that the comment needs update.
| ("$count_listState", groupByKeySchema, countValueSchema) | ||
| val ttlColFamilyPartitionKeyExtractor: Option[Row => Row] = | ||
| Some(compositeKey => compositeKey.getStruct(1)) | ||
| val simpleColumnFamilies = Seq( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ttlColumnFamilies?
What changes were proposed in this pull request?
Integrate the PartitionKeyExtractor introduced in this PR to StatePartitionAllColumnFamiliesReader. Before this change, StatePartitionAllColumnFamiliesReader returns the entire key value in partition_key field, and SchemaUtil will return
keySchemaas the partitionKey schema. After this change, StatePartitionAllColumnFamiliesReader will return the actual partition key, and SchemaUtil returns the actual partitionKey schemaWhy are the changes needed?
When creating a StatePartitionAllColumnFamiliesReader, we need to pass along the stateFormatVersion and operator name.
In SchemaUtil, we will create a
getExtractorhelper function. It's used when getSourceSchema is called (for default column family), as well as when StatePartitionAllColumnFamiliesReader is initialized, as the reader will use the extractor to get partitionKey for different column families initeratorWe also added checks of partitionKey in reader suite
Does this PR introduce any user-facing change?
No
How was this patch tested?
See updated StatePartitionAllColumnFamiliesSuite. We have hard-coded function that extract partition key for different column families from normalDF, then we'll compare the extracted partition key against the partition key read from bytesDF
Was this patch authored or co-authored using generative AI tooling?
Yes. claude-4.5-opus