Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,9 @@ class CodegenContext extends Logging {
case dt: DataType if isPrimitiveType(dt) => s"($c1 > $c2 ? 1 : $c1 < $c2 ? -1 : 0)"
case BinaryType => s"org.apache.spark.unsafe.types.ByteArray.compareBinary($c1, $c2)"
case CalendarIntervalType => s"$c1.compareTo($c2)"
// TimestampNanosVal exposes only `compareTo`; the AtomicType fallback below emits
// `$c1.compare($c2)`, which would not resolve as a Java method call.
case _: TimestampNTZNanosType | _: TimestampLTZNanosType => s"$c1.compareTo($c2)"
Comment thread
stevomitric marked this conversation as resolved.
case NullType => "0"
case array: ArrayType =>
val elementType = array.elementType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,11 @@ case object PhysicalCalendarIntervalType extends PhysicalCalendarIntervalType
* Storage layout is identical to [[PhysicalTimestampLTZNanosType]]; both types exist so the
* NTZ/LTZ distinction propagates through the physical-type system to consumers that need it.
*
* Ordering, compare, and hash are not implemented yet and will be added in a follow-up issue.
* Hash is not implemented yet and will be added in a follow-up issue.
*/
class PhysicalTimestampNTZNanosType() extends PhysicalDataType {
override private[sql] def ordering =
throw QueryExecutionErrors.orderedOperationUnsupportedByDataTypeError(
"PhysicalTimestampNTZNanosType")
override private[sql] type InternalType = TimestampNanosVal
override private[sql] val ordering = implicitly[Ordering[InternalType]]
@transient private[sql] lazy val tag = typeTag[InternalType]
}
case object PhysicalTimestampNTZNanosType extends PhysicalTimestampNTZNanosType
Expand All @@ -197,13 +195,11 @@ case object PhysicalTimestampNTZNanosType extends PhysicalTimestampNTZNanosType
* Storage layout is identical to [[PhysicalTimestampNTZNanosType]]; both types exist so the
* NTZ/LTZ distinction propagates through the physical-type system to consumers that need it.
*
* Ordering, compare, and hash are not implemented yet and will be added in a follow-up issue.
* Hash is not implemented yet and will be added in a follow-up issue.
*/
class PhysicalTimestampLTZNanosType() extends PhysicalDataType {
override private[sql] def ordering =
throw QueryExecutionErrors.orderedOperationUnsupportedByDataTypeError(
"PhysicalTimestampLTZNanosType")
override private[sql] type InternalType = TimestampNanosVal
override private[sql] val ordering = implicitly[Ordering[InternalType]]
@transient private[sql] lazy val tag = typeTag[InternalType]
}
case object PhysicalTimestampLTZNanosType extends PhysicalTimestampLTZNanosType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, GenerateOrdering, LazilyGeneratedOrdering}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.TimestampNanosVal
import org.apache.spark.util.ArrayImplicits._

class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper {
Expand Down Expand Up @@ -141,6 +142,68 @@ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper {
GenerateOrdering.generate(Array.fill(5000)(sortOrder).toImmutableArraySeq)
}

// SPARK-57103: ordering for nanosecond timestamp types. Not driven by the generic
// `atomicTypes` loop above because `RandomDataGenerator` does not yet support the new
// types (tracked separately in SPARK-57034); we hand-roll edge cases here instead.
private def compareNanos(
dataType: AtomicType,
a: TimestampNanosVal,
b: TimestampNanosVal,
expected: Int): Unit = {
test(s"compare two $dataType values: a = $a, b = $b") {
val rowA = InternalRow(a)
val rowB = InternalRow(b)
Seq(Ascending, Descending).foreach { direction =>
val sortOrder = direction match {
case Ascending => BoundReference(0, dataType, nullable = true).asc
case Descending => BoundReference(0, dataType, nullable = true).desc
}
val expectedCompareResult = direction match {
case Ascending => signum(expected)
case Descending => -1 * signum(expected)
}
val intOrdering = new InterpretedOrdering(sortOrder :: Nil)
val genOrdering = new LazilyGeneratedOrdering(sortOrder :: Nil)
Seq(intOrdering, genOrdering).foreach { ordering =>
assert(ordering.compare(rowA, rowA) === 0)
assert(ordering.compare(rowB, rowB) === 0)
assert(signum(ordering.compare(rowA, rowB)) === expectedCompareResult)
assert(signum(ordering.compare(rowB, rowA)) === -1 * expectedCompareResult)
}
}
}
}

Seq(TimestampNTZNanosType(9), TimestampLTZNanosType(9)).foreach { dt =>
Comment thread
stevomitric marked this conversation as resolved.
// equal values
compareNanos(dt,
TimestampNanosVal.fromParts(1000L, 100.toShort),
TimestampNanosVal.fromParts(1000L, 100.toShort), 0)
// primary key (epochMicros) decides
compareNanos(dt,
TimestampNanosVal.fromParts(1000L, 999.toShort),
TimestampNanosVal.fromParts(1001L, 0.toShort), -1)
// tie-breaker (nanosWithinMicro) within the same micro
compareNanos(dt,
TimestampNanosVal.fromParts(1000L, 100.toShort),
TimestampNanosVal.fromParts(1000L, 101.toShort), -1)
// Long boundary: plain subtraction would overflow; Ordering must use Long.compare.
compareNanos(dt,
TimestampNanosVal.fromParts(Long.MinValue, 0.toShort),
TimestampNanosVal.fromParts(Long.MaxValue, 0.toShort), -1)
// pre-epoch sorts before epoch regardless of nanos
compareNanos(dt,
TimestampNanosVal.fromParts(-1L, 999.toShort),
TimestampNanosVal.fromParts(0L, 0.toShort), -1)
// null sorts before any value under default NullsFirst semantics
compareNanos(dt, null, TimestampNanosVal.fromParts(0L, 0.toShort), -1)
}

// Ordering is precision-independent. One case at p = 7 documents that intent.
compareNanos(TimestampNTZNanosType(7),
TimestampNanosVal.fromParts(0L, 0.toShort),
TimestampNanosVal.fromParts(0L, 1.toShort), -1)

test("SPARK-21344: BinaryType comparison does signed byte array comparison") {
val data = Seq(
(Array[Byte](1), Array[Byte](-1)),
Expand Down