Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types.ops

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
import org.apache.spark.sql.errors.DataTypeErrorsBase
import org.apache.spark.sql.types.{TimestampLTZNanosType, TimestampNTZNanosType}
import org.apache.spark.unsafe.types.TimestampNanosVal

/**
* Client-side (spark-api) operations shared by the nanosecond timestamp types
* (TimestampNTZNanosType and TimestampLTZNanosType).
*
* Internal values are [[org.apache.spark.unsafe.types.TimestampNanosVal]] (epoch micros + nanos
* within the micro). The two concrete subclasses differ only in their DataType and SQL-literal
* prefix; storage and formatting are identical.
*
* SCOPE (SPARK-57101): this issue wires physical representation, literals, row accessors, and
* codegen class selection. String formatting here is an interim implementation until dedicated
* fractional-second formatters land in a follow-up issue; Dataset encoders are out of scope
* (SPARK-57033 and related), so getEncoder reports the type as unsupported, matching the legacy
* RowEncoder behavior.
*
* @since 4.3.0
*/
abstract class TimestampNanosTypeApiOps extends TypeApiOps with DataTypeErrorsBase {

/** SQL literal prefix for this type, e.g. "TIMESTAMP_NTZ" or "TIMESTAMP_LTZ". */
protected def sqlTypeName: String

// ==================== String Formatting (interim) ====================

override def format(v: Any): String = v.asInstanceOf[TimestampNanosVal].toString

override def toSQLValue(v: Any): String = s"$sqlTypeName '${format(v)}'"

// ==================== Row Encoding ====================

// Encoders are handled in a follow-up issue (SPARK-57033). Until then, report the type as
// unsupported with the same error as the legacy RowEncoder fallback to preserve parity.
override def getEncoder: AgnosticEncoder[_] =
throw new AnalysisException(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will replace this after the PR #56158 is merged.

errorClass = "UNSUPPORTED_DATA_TYPE_FOR_ENCODER",
messageParameters = Map("dataType" -> toSQLType(dataType)))
}

/**
* Client-side operations for [[org.apache.spark.sql.types.TimestampNTZNanosType]].
*
* @param t
* The TimestampNTZNanosType with precision information
* @since 4.3.0
*/
class TimestampNTZNanosTypeApiOps(val t: TimestampNTZNanosType) extends TimestampNanosTypeApiOps {
override def dataType: TimestampNTZNanosType = t
override protected def sqlTypeName: String = "TIMESTAMP_NTZ"
}

/**
* Client-side operations for [[org.apache.spark.sql.types.TimestampLTZNanosType]].
*
* @param t
* The TimestampLTZNanosType with precision information
* @since 4.3.0
*/
class TimestampLTZNanosTypeApiOps(val t: TimestampLTZNanosType) extends TimestampNanosTypeApiOps {
override def dataType: TimestampLTZNanosType = t
override protected def sqlTypeName: String = "TIMESTAMP_LTZ"
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType

import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.types.{DataType, TimeType}
import org.apache.spark.sql.types.{DataType, TimestampLTZNanosType, TimestampNTZNanosType, TimeType}
import org.apache.spark.unsafe.types.UTF8String

/**
Expand Down Expand Up @@ -159,6 +159,8 @@ object TypeApiOps {
if (!SqlApiConf.get.typesFrameworkEnabled) return None
dt match {
case tt: TimeType => Some(new TimeTypeApiOps(tt))
case t: TimestampNTZNanosType => Some(new TimestampNTZNanosTypeApiOps(t))
case t: TimestampLTZNanosType => Some(new TimestampLTZNanosTypeApiOps(t))
// Add new types here - single registration point
case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,28 +145,32 @@ object InternalRow {
def getAccessor(dt: DataType, nullable: Boolean = true): (SpecializedGetters, Int) => Any = {
val getValueNullSafe: (SpecializedGetters, Int) => Any = dt match {
case u: UserDefinedType[_] => getAccessor(u.sqlType, nullable)
case _ => PhysicalDataType(dt) match {
case PhysicalBooleanType => (input, ordinal) => input.getBoolean(ordinal)
case PhysicalByteType => (input, ordinal) => input.getByte(ordinal)
case PhysicalShortType => (input, ordinal) => input.getShort(ordinal)
case PhysicalIntegerType => (input, ordinal) => input.getInt(ordinal)
case PhysicalLongType => (input, ordinal) => input.getLong(ordinal)
case PhysicalFloatType => (input, ordinal) => input.getFloat(ordinal)
case PhysicalDoubleType => (input, ordinal) => input.getDouble(ordinal)
case _: PhysicalStringType => (input, ordinal) => input.getUTF8String(ordinal)
case PhysicalBinaryType => (input, ordinal) => input.getBinary(ordinal)
case PhysicalCalendarIntervalType => (input, ordinal) => input.getInterval(ordinal)
case PhysicalTimestampNTZNanosType => (input, ordinal) =>
input.getTimestampNTZNanos(ordinal)
case PhysicalTimestampLTZNanosType => (input, ordinal) =>
input.getTimestampLTZNanos(ordinal)
case t: PhysicalDecimalType => (input, ordinal) =>
input.getDecimal(ordinal, t.precision, t.scale)
case t: PhysicalStructType => (input, ordinal) => input.getStruct(ordinal, t.fields.length)
case _: PhysicalArrayType => (input, ordinal) => input.getArray(ordinal)
case _: PhysicalMapType => (input, ordinal) => input.getMap(ordinal)
case _ => (input, ordinal) => input.get(ordinal, dt)
}
case _ =>
TypeOps(dt).map(_.getScalaAccessor).getOrElse {
PhysicalDataType(dt) match {
case PhysicalBooleanType => (input, ordinal) => input.getBoolean(ordinal)
case PhysicalByteType => (input, ordinal) => input.getByte(ordinal)
case PhysicalShortType => (input, ordinal) => input.getShort(ordinal)
case PhysicalIntegerType => (input, ordinal) => input.getInt(ordinal)
case PhysicalLongType => (input, ordinal) => input.getLong(ordinal)
case PhysicalFloatType => (input, ordinal) => input.getFloat(ordinal)
case PhysicalDoubleType => (input, ordinal) => input.getDouble(ordinal)
case _: PhysicalStringType => (input, ordinal) => input.getUTF8String(ordinal)
case PhysicalBinaryType => (input, ordinal) => input.getBinary(ordinal)
case PhysicalCalendarIntervalType => (input, ordinal) => input.getInterval(ordinal)
case PhysicalTimestampNTZNanosType => (input, ordinal) =>
input.getTimestampNTZNanos(ordinal)
case PhysicalTimestampLTZNanosType => (input, ordinal) =>
input.getTimestampLTZNanos(ordinal)
case t: PhysicalDecimalType => (input, ordinal) =>
input.getDecimal(ordinal, t.precision, t.scale)
case t: PhysicalStructType =>
(input, ordinal) => input.getStruct(ordinal, t.fields.length)
case _: PhysicalArrayType => (input, ordinal) => input.getArray(ordinal)
case _: PhysicalMapType => (input, ordinal) => input.getMap(ordinal)
case _ => (input, ordinal) => input.get(ordinal, dt)
}
}
}

if (nullable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.types.ops.TypeOps
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.TimestampNanosVal

/**
* A parent class for mutable container objects that are reused when the values are changed,
Expand Down Expand Up @@ -186,6 +187,21 @@ final class MutableAny extends MutableValue {
}
}

final class MutableTimestampNanos extends MutableValue {
var value: TimestampNanosVal = _
override def boxed: Any = if (isNull) null else value
override def update(v: Any): Unit = {
isNull = false
value = v.asInstanceOf[TimestampNanosVal]
}
override def copy(): MutableTimestampNanos = {
val newCopy = new MutableTimestampNanos
newCopy.isNull = isNull
newCopy.value = value
newCopy
}
}

/**
* A row type that holds an array specialized container objects, of type [[MutableValue]], chosen
* based on the dataTypes of each column. The intent is to decrease garbage when modifying the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1689,23 +1689,27 @@ object CodeGenerator extends Logging {
val jt = javaType(dataType)
dataType match {
case udt: UserDefinedType[_] => getValue(input, udt.sqlType, ordinal)
case _ if isPrimitiveType(jt) => s"$input.get${primitiveTypeName(jt)}($ordinal)"
case _ => PhysicalDataType(dataType) match {
case _: PhysicalArrayType => s"$input.getArray($ordinal)"
case PhysicalBinaryType => s"$input.getBinary($ordinal)"
case _: PhysicalGeographyType => s"$input.getGeography($ordinal)"
case _: PhysicalGeometryType => s"$input.getGeometry($ordinal)"
case PhysicalCalendarIntervalType => s"$input.getInterval($ordinal)"
case PhysicalTimestampNTZNanosType => s"$input.getTimestampNTZNanos($ordinal)"
case PhysicalTimestampLTZNanosType => s"$input.getTimestampLTZNanos($ordinal)"
case t: PhysicalDecimalType => s"$input.getDecimal($ordinal, ${t.precision}, ${t.scale})"
case _: PhysicalMapType => s"$input.getMap($ordinal)"
case PhysicalNullType => "null"
case _: PhysicalStringType => s"$input.getUTF8String($ordinal)"
case t: PhysicalStructType => s"$input.getStruct($ordinal, ${t.fields.length})"
case PhysicalVariantType => s"$input.getVariant($ordinal)"
case _ => s"($jt)$input.get($ordinal, null)"
}
case _ =>
TypeOps(dataType).map(_.getCodegenGetter(input, ordinal)).getOrElse {
if (isPrimitiveType(jt)) s"$input.get${primitiveTypeName(jt)}($ordinal)"
else PhysicalDataType(dataType) match {
case _: PhysicalArrayType => s"$input.getArray($ordinal)"
case PhysicalBinaryType => s"$input.getBinary($ordinal)"
case _: PhysicalGeographyType => s"$input.getGeography($ordinal)"
case _: PhysicalGeometryType => s"$input.getGeometry($ordinal)"
case PhysicalCalendarIntervalType => s"$input.getInterval($ordinal)"
case PhysicalTimestampNTZNanosType => s"$input.getTimestampNTZNanos($ordinal)"
case PhysicalTimestampLTZNanosType => s"$input.getTimestampLTZNanos($ordinal)"
case t: PhysicalDecimalType =>
s"$input.getDecimal($ordinal, ${t.precision}, ${t.scale})"
case _: PhysicalMapType => s"$input.getMap($ordinal)"
case PhysicalNullType => "null"
case _: PhysicalStringType => s"$input.getUTF8String($ordinal)"
case t: PhysicalStructType => s"$input.getStruct($ordinal, ${t.fields.length})"
case PhysicalVariantType => s"$input.getVariant($ordinal)"
case _ => s"($jt)$input.get($ordinal, null)"
}
}
}
}

Expand Down Expand Up @@ -1769,20 +1773,28 @@ object CodeGenerator extends Logging {
def setColumn(row: String, dataType: DataType, ordinal: Int, value: String): String = {
val jt = javaType(dataType)
dataType match {
case _ if isPrimitiveType(jt) => s"$row.set${primitiveTypeName(jt)}($ordinal, $value)"
case udt: UserDefinedType[_] => setColumn(row, udt.sqlType, ordinal, value)
case CalendarIntervalType => s"$row.setInterval($ordinal, $value)"
case _: TimestampNTZNanosType => s"$row.setTimestampNTZNanos($ordinal, $value)"
case _: TimestampLTZNanosType => s"$row.setTimestampLTZNanos($ordinal, $value)"
case t: DecimalType => s"$row.setDecimal($ordinal, $value, ${t.precision})"
case udt: UserDefinedType[_] => setColumn(row, udt.sqlType, ordinal, value)
// The UTF8String, InternalRow, ArrayData and MapData may came from UnsafeRow, we should copy
// it to avoid keeping a "pointer" to a memory region which may get updated afterwards.
case _: StringType | _: StructType | _: ArrayType | _: MapType =>
s"$row.update($ordinal, $value.copy())"
case _ => s"$row.update($ordinal, $value)"
case _ =>
TypeOps(dataType).map(_.getCodegenSetter(row, ordinal, value)).getOrElse {
if (isPrimitiveType(jt)) s"$row.set${primitiveTypeName(jt)}($ordinal, $value)"
else setColumnDefault(row, dataType, ordinal, value)
}
}
}

private def setColumnDefault(
row: String, dataType: DataType, ordinal: Int, value: String): String = dataType match {
case _: TimestampNTZNanosType => s"$row.setTimestampNTZNanos($ordinal, $value)"
case _: TimestampLTZNanosType => s"$row.setTimestampLTZNanos($ordinal, $value)"
case _ => s"$row.update($ordinal, $value)"
}

/**
* Update a column in MutableRow from ExprCode.
*
Expand Down Expand Up @@ -1982,26 +1994,29 @@ object CodeGenerator extends Logging {
case udt: UserDefinedType[_] => javaType(udt.sqlType)
case ObjectType(cls) if cls.isArray => s"${javaType(ObjectType(cls.getComponentType))}[]"
case ObjectType(cls) => cls.getName
case _ => PhysicalDataType(dt) match {
case _: PhysicalArrayType => "ArrayData"
case PhysicalBinaryType => "byte[]"
case PhysicalBooleanType => JAVA_BOOLEAN
case PhysicalByteType => JAVA_BYTE
case PhysicalCalendarIntervalType => "CalendarInterval"
case PhysicalTimestampNTZNanosType => "TimestampNanosVal"
case PhysicalTimestampLTZNanosType => "TimestampNanosVal"
case PhysicalIntegerType => JAVA_INT
case _: PhysicalDecimalType => "Decimal"
case PhysicalDoubleType => JAVA_DOUBLE
case PhysicalFloatType => JAVA_FLOAT
case PhysicalLongType => JAVA_LONG
case _: PhysicalMapType => "MapData"
case PhysicalShortType => JAVA_SHORT
case _: PhysicalStringType => "UTF8String"
case _: PhysicalStructType => "InternalRow"
case _: PhysicalVariantType => "VariantVal"
case _ => "Object"
}
case _ =>
TypeOps(dt).map(_.getJavaClass.getSimpleName).getOrElse {
PhysicalDataType(dt) match {
case _: PhysicalArrayType => "ArrayData"
case PhysicalBinaryType => "byte[]"
case PhysicalBooleanType => JAVA_BOOLEAN
case PhysicalByteType => JAVA_BYTE
case PhysicalCalendarIntervalType => "CalendarInterval"
case PhysicalTimestampNTZNanosType => "TimestampNanosVal"
case PhysicalTimestampLTZNanosType => "TimestampNanosVal"
case PhysicalIntegerType => JAVA_INT
case _: PhysicalDecimalType => "Decimal"
case PhysicalDoubleType => JAVA_DOUBLE
case PhysicalFloatType => JAVA_FLOAT
case PhysicalLongType => JAVA_LONG
case _: PhysicalMapType => "MapData"
case PhysicalShortType => JAVA_SHORT
case _: PhysicalStringType => "UTF8String"
case _: PhysicalStructType => "InternalRow"
case _: PhysicalVariantType => "VariantVal"
case _ => "Object"
}
}
}

def javaClass(dt: DataType): Class[_] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.types.ops.TypeOps
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -113,9 +114,16 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
// Can't call setNullAt() for DecimalType with precision larger than 18.
s"$rowWriter.write($index, (Decimal) null, ${t.precision}, ${t.scale});"
case CalendarIntervalType => s"$rowWriter.write($index, (CalendarInterval) null);"
case _: TimestampNTZNanosType | _: TimestampLTZNanosType =>
s"$rowWriter.write($index, (TimestampNanosVal) null);"
case _ => s"$rowWriter.setNullAt($index);"
case _ =>
TypeOps(dt)
.flatMap(_.getCodegenNullWrite(rowWriter, index.toString))
.getOrElse {
dt match {
case _: TimestampNTZNanosType | _: TimestampLTZNanosType =>
s"$rowWriter.write($index, (TimestampNanosVal) null);"
case _ => s"$rowWriter.setNullAt($index);"
}
}
}

val writeField = writeElement(ctx, input.value, index.toString, dt, rowWriter)
Expand Down Expand Up @@ -183,9 +191,16 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
case t: DecimalType if t.precision > Decimal.MAX_LONG_DIGITS =>
s"$arrayWriter.write($index, (Decimal) null, ${t.precision}, ${t.scale});"
case CalendarIntervalType => s"$arrayWriter.write($index, (CalendarInterval) null);"
case _: TimestampNTZNanosType | _: TimestampLTZNanosType =>
s"$arrayWriter.write($index, (TimestampNanosVal) null);"
case _ => s"$arrayWriter.setNull${elementOrOffsetSize}Bytes($index);"
case _ =>
TypeOps(et)
.flatMap(_.getCodegenNullWrite(arrayWriter, index))
.getOrElse {
et match {
case _: TimestampNTZNanosType | _: TimestampLTZNanosType =>
s"$arrayWriter.write($index, (TimestampNanosVal) null);"
case _ => s"$arrayWriter.setNull${elementOrOffsetSize}Bytes($index);"
}
}
}

val elementAssignment = if (containsNull) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,11 +540,15 @@ case class Literal (value: Any, dataType: DataType) extends LeafExpression {
}
case ByteType | ShortType =>
ExprCode.forNonNullValue(JavaCode.expression(s"($javaType)$value", dataType))
case TimestampType | TimestampNTZType | LongType | _: DayTimeIntervalType | _: TimeType =>
case TimestampType | TimestampNTZType | LongType | _: DayTimeIntervalType =>
toExprCode(s"${value}L")
case _ =>
val constRef = ctx.addReferenceObj("literal", value, javaType)
ExprCode.forNonNullValue(JavaCode.global(constRef, dataType))
TypeOps(dataType)
.map(ops => toExprCode(ops.getJavaLiteral(value)))
.getOrElse {
val constRef = ctx.addReferenceObj("literal", value, javaType)
ExprCode.forNonNullValue(JavaCode.global(constRef, dataType))
}
}
}
}
Expand Down
Loading