Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,8 @@ class UnsafeRowDataEncoder(
decodeToUnsafeRow(bytes, reusedKeyRow)
case PrefixKeyScanStateEncoderSpec(_, numColsPrefixKey) =>
decodeToUnsafeRow(bytes, numFields = numColsPrefixKey)
case _: TimestampAsPrefixKeyStateEncoderSpec | _: TimestampAsPostfixKeyStateEncoderSpec =>
decodeToUnsafeRow(bytes, numFields = keySchema.length - 1)
case _ => throw unsupportedOperationForKeyStateEncoder("decodeKey")
}
}
Expand Down Expand Up @@ -748,8 +750,15 @@ class AvroStateEncoder(
)

// Avro schema used by the avro encoders
private lazy val keyAvroType: Schema = SchemaConverters.toAvroTypeWithDefaults(keySchema)
private lazy val keyProj = UnsafeProjection.create(keySchema)
// For timestamp specs, the key part excludes the timestamp column (always the last field).
private lazy val effectiveKeySchema: StructType = keyStateEncoderSpec match {
case TimestampAsPrefixKeyStateEncoderSpec(s) => StructType(s.dropRight(1))
case TimestampAsPostfixKeyStateEncoderSpec(s) => StructType(s.dropRight(1))
case _ => keySchema
}
private lazy val keyAvroType: Schema =
SchemaConverters.toAvroTypeWithDefaults(effectiveKeySchema)
private lazy val keyProj = UnsafeProjection.create(effectiveKeySchema)

private lazy val valueAvroType: Schema = SchemaConverters.toAvroTypeWithDefaults(valueSchema)
private lazy val valueProj = UnsafeProjection.create(valueSchema)
Expand Down Expand Up @@ -847,8 +856,10 @@ class AvroStateEncoder(
}
}
StructType(remainingSchema)
case _ =>
throw unsupportedOperationForKeyStateEncoder("createAvroEnc")
case TimestampAsPrefixKeyStateEncoderSpec(schema) =>
StructType(schema.dropRight(1))
case TimestampAsPostfixKeyStateEncoderSpec(schema) =>
StructType(schema.dropRight(1))
}

// Handle suffix key schema for prefix scan case
Expand Down Expand Up @@ -1005,6 +1016,11 @@ class AvroStateEncoder(
StateSchemaIdRow(currentKeySchemaId, avroRow))
case PrefixKeyScanStateEncoderSpec(_, _) =>
encodeUnsafeRowToAvro(row, avroEncoder.keySerializer, prefixKeyAvroType, out)
case _: TimestampAsPrefixKeyStateEncoderSpec | _: TimestampAsPostfixKeyStateEncoderSpec =>
val avroRow =
encodeUnsafeRowToAvro(row, avroEncoder.keySerializer, keyAvroType, out)
encodeWithStateSchemaId(
StateSchemaIdRow(currentKeySchemaId, avroRow))
case _ => throw unsupportedOperationForKeyStateEncoder("encodeKey")
}
prependVersionByte(keyBytes)
Expand Down Expand Up @@ -1179,6 +1195,10 @@ class AvroStateEncoder(
case PrefixKeyScanStateEncoderSpec(_, _) =>
decodeFromAvroToUnsafeRow(
bytes, avroEncoder.keyDeserializer, prefixKeyAvroType, prefixKeyProj)
case _: TimestampAsPrefixKeyStateEncoderSpec | _: TimestampAsPostfixKeyStateEncoderSpec =>
val schemaIdRow = decodeStateSchemaIdRow(bytes)
decodeFromAvroToUnsafeRow(
schemaIdRow.bytes, avroEncoder.keyDeserializer, keyAvroType, keyProj)
case _ => throw unsupportedOperationForKeyStateEncoder("decodeKey")
}
}
Expand Down Expand Up @@ -1782,9 +1802,7 @@ abstract class TimestampKeyStateEncoder(
rowBytes, Platform.BYTE_ARRAY_OFFSET,
rowBytesLength
)
// The encoded row does not include the timestamp (it's stored separately),
// so decode with keySchema.length - 1 fields.
dataEncoder.decodeToUnsafeRow(rowBytes, keySchema.length - 1)
dataEncoder.decodeKey(rowBytes)
}

// NOTE: We reuse the ByteBuffer to avoid allocating a new one for every encoding/decoding,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,10 @@ case class RangeKeyScanStateEncoderSpec(
}
}

/** The encoder specification for [[TimestampAsPrefixKeyStateEncoder]]. */
/**
* The encoder specification for [[TimestampAsPrefixKeyStateEncoder]].
* The encoder expects the provided key schema to have [original key fields..., timestamp field].
*/
case class TimestampAsPrefixKeyStateEncoderSpec(keySchema: StructType)
extends KeyStateEncoderSpec {

Expand All @@ -688,7 +691,10 @@ case class TimestampAsPrefixKeyStateEncoderSpec(keySchema: StructType)
}
}

/** The encoder specification for [[TimestampAsPostfixKeyStateEncoder]]. */
/**
* The encoder specification for [[TimestampAsPostfixKeyStateEncoder]].
* The encoder expects the provided key schema to have [original key fields..., timestamp field].
*/
case class TimestampAsPostfixKeyStateEncoderSpec(keySchema: StructType)
extends KeyStateEncoderSpec {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession

private def newDir(): String = Utils.createTempDir().getCanonicalPath

// TODO: [SPARK-55145] Address the new state format with Avro and enable the test with Avro
// encoding
Seq("unsaferow").foreach { encoding =>
Seq("unsaferow", "avro").foreach { encoding =>
Seq("prefix", "postfix").foreach { encoderType =>
test(s"Event time as $encoderType: basic put and get operations (encoding = $encoding)") {
tryWithProviderResource(
Expand Down Expand Up @@ -223,9 +221,7 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
}
}

// TODO: [SPARK-55145] Address the new state format with Avro and enable the test with Avro
// encoding
Seq("unsaferow").foreach { encoding =>
Seq("unsaferow", "avro").foreach { encoding =>
test(s"Event time as prefix: iterator operations (encoding = $encoding)") {
tryWithProviderResource(
newStoreProviderWithTimestampEncoder(
Expand Down Expand Up @@ -558,9 +554,7 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
}
}

// TODO: [SPARK-55145] Address the new state format with Avro and enable the test with Avro
// encoding
Seq("unsaferow").foreach { encoding =>
Seq("unsaferow", "avro").foreach { encoding =>
Seq("prefix", "postfix").foreach { encoderType =>
Seq(false, true).foreach { useMultipleValuesPerKey =>
val multiValueSuffix = if (useMultipleValuesPerKey) " and multiple values" else ""
Expand Down