Skip to content

Conversation

@zifeif2
Copy link
Contributor

@zifeif2 zifeif2 commented Dec 12, 2025

What changes were proposed in this pull request?

Integrate the PartitionKeyExtractor introduced in this PR to StatePartitionAllColumnFamiliesReader. Before this change, StatePartitionAllColumnFamiliesReader returns the entire key value in partition_key field, and SchemaUtil will return keySchema as the partitionKey schema. After this change, StatePartitionAllColumnFamiliesReader will return the actual partition key, and SchemaUtil returns the actual partitionKey schema

Why are the changes needed?

When creating a StatePartitionAllColumnFamiliesReader, we need to pass along the stateFormatVersion and operator name.
In SchemaUtil, we will create a getExtractor helper function. It's used when getSourceSchema is called (for default column family), as well as when StatePartitionAllColumnFamiliesReader is initialized, as the reader will use the extractor to get partitionKey for different column families in iterator
We also added checks of partitionKey in reader suite

Does this PR introduce any user-facing change?

No

How was this patch tested?

See updated StatePartitionAllColumnFamiliesSuite. We have hard-coded function that extract partition key for different column families from normalDF, then we'll compare the extracted partition key against the partition key read from bytesDF

Was this patch authored or co-authored using generative AI tooling?

Yes. claude-4.5-opus


val stateVarInfo: Option[TransformWithStateVariableInfo] = if (
sourceOptions.internalOnlyReadAllColumnFamilies) {
stateStoreReaderInfo.allColumnFamiliesReaderInfo.stateVariableInfos.headOption
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use first schema in stateVariableInfos as a default schema to get partitionKeySchema in SchemaUtil

@zifeif2 zifeif2 force-pushed the integrate-key-extraction branch from 996e27b to d9a88b3 Compare December 16, 2025 00:20
@zifeif2 zifeif2 marked this pull request as ready for review December 16, 2025 00:28
reduce duplicate code

all tests pass
@zifeif2 zifeif2 force-pushed the integrate-key-extraction branch from d9a88b3 to d234a97 Compare December 17, 2025 04:56
val stateVarInfo = stateVarInfoList.head
transformWithStateVariableInfoOpt = Some(stateVarInfo)
}
var stateVarInfoList = operatorProperties.stateVariables
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as previous version exception for indentation. We can now assign a transformWithStateVariableInfoOpt because stateVarName will always be a "valid" value after line 323 change

(resultSchema.keySchema, resultSchema.valueSchema)
}

val stateVarInfo = stateStoreReaderInfo.transformWithStateVariableInfoOpt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why change to val? it is only used once below

storeMetadata: Array[StateMetadataTableEntry]): Option[Int] = {
if (storeMetadata.nonEmpty &&
storeMetadata.head.operatorName == StatefulOperatorsUtils.SYMMETRIC_HASH_JOIN_EXEC_OP_NAME) {
Some(session.conf.get(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should read this from the current batch offset seq conf instead. buildStateStoreConf does similar.

The session here doesn't include the confs written in checkpoint, so can return wrong value

s"for state variable $stateVarName in operator ${sourceOptions.operatorId}")
val stateVarInfo = stateVarInfoList.head
transformWithStateVariableInfoOpt = Some(stateVarInfo)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new line

}
var stateVarInfoList = operatorProperties.stateVariables
.filter(stateVar => stateVar.stateName == stateVarName)
if (stateVarInfoList.isEmpty &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this anymore right. Since it won't be empty

stateSchemaProvider,
joinColFamilyOpt,
AllColumnFamiliesReaderInfo(stateStoreColFamilySchemas, stateVariableInfos)
AllColumnFamiliesReaderInfo(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we always populating this? even when allCF reader is off


// Convert normal data to bytes
val normalAsBytes = normalDf.toSeq.map { row =>
// Convert normal data to (partitionKeyStr, keyBytes, valueBytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean partitionKeyStruct instead?


// Extract raw bytes from bytes read data (no deserialization/reserialization)
val bytesAsBytes = filteredBytesData.map { row =>
// Extract (partitionKeyStr, keyBytes, valueBytes) from bytes read data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, partitionKeyStruct?

s"Size mismatch: normal has ${normalSorted.length}, bytes has ${bytesSorted.length}")

// Compare each pair
// Compare each tuple (partitionKeyStr, keyBytes, valueBytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

"partition_id",
"STRUCT(key AS groupingKey, expiration_timestamp_ms AS key)",
"NULL AS value")
// Partition key should be just the grouping key, not the composite (key, timestamp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move the comment to be close to partitionKeyExtractor = below. Same for others below. It is best for comments to be close to what they are describing. Also helps when updating the code later to easily see that the comment needs update.

("$count_listState", groupByKeySchema, countValueSchema)
val ttlColFamilyPartitionKeyExtractor: Option[Row => Row] =
Some(compositeKey => compositeKey.getStruct(1))
val simpleColumnFamilies = Seq(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ttlColumnFamilies?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants