iceberg: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Apache Iceberg version
1.2.0 (latest release)
Query engine
Spark
Please describe the bug 🐞
According to the documentation, when using Iceberg, one should set spark.sql.extensions to org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions, but setting this property seems to cause an exception to be thrown when trying to write to an Iceberg table using Spark structured streaming.
The exception that is thrown is:
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: days(ts) is not currently supported
=== Streaming Query ===
Identifier: [id = cfb83943-cd87-4c84-bf25-a290e8891e19, runId = ddf71690-7e5d-41f6-8a8e-84c425683a26]
Current Committed Offsets: {}
Current Available Offsets: {MemoryStream[ts#3,a#4,b#5]: 0}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
WriteToMicroBatchDataSource spark_catalog.default.test_iceberg_table, cfb83943-cd87-4c84-bf25-a290e8891e19, [path=test_iceberg_table, fanout-enabled=true, checkpointLocation=/tmp/spark-checkpoint-16659193840247202419], Append
+- StreamingDataSourceV2Relation [ts#3, a#4, b#5], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@4533d1cf, MemoryStream[ts#3,a#4,b#5]
Code to reproduce:
package com.example
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{Encoders, SparkSession}
import java.nio.file.Files
import java.sql.Timestamp
case class Bla(ts: Timestamp, a: String, b: Double)
object MinEx {
def main(args: Array[String]): Unit = {
val warehouseDir = Files.createTempDirectory("spark-warehouse-iceberg-").toString
val checkpointDir = Files.createTempDirectory("spark-checkpoint-").toString
val spark = SparkSession.builder()
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", warehouseDir)
.config("spark.sql.warehouse.dir", warehouseDir)
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.appName("BugRepro")
.master("local[*]")
.enableHiveSupport()
.getOrCreate()
spark.sql("create table test_iceberg_table(ts timestamp, a string, b double) using iceberg partitioned by (days(ts))")
implicit val sqlContext = spark.sqlContext
implicit val encoder = Encoders.product[Bla]
val memStream = MemoryStream[Bla]
val now = System.currentTimeMillis()
val day = 86400000
memStream.addData(List(
Bla(new Timestamp(now), "test", 12.34),
Bla(new Timestamp(now - 1 * day), "test 1d", 33.34),
Bla(new Timestamp(now - 3 * day), "test 3d", 44.34),
Bla(new Timestamp(now - 2 * day), "test 2d", 55.34),
))
memStream.toDF()
.writeStream
.format("iceberg")
.outputMode("append")
.option("path", "test_iceberg_table")
.option("fanout-enabled", true)
.option("checkpointLocation", checkpointDir)
.trigger(Trigger.Once())
.start()
.awaitTermination()
}
}
The code works as expected when the statement that configures spark.sql.extensions is commented out.
About this issue
- Original URL
- State: open
- Created a year ago
- Reactions: 9
- Comments: 19 (10 by maintainers)
Commits related to this issue
- Spark: Inject `DataSourceV2Relation` when missing When you start a structured streaming query using `.start()`, there will be no `DataSourceV2Relation` reference. When this is missing, we'll just cre... — committed to Fokko/incubator-iceberg by Fokko a year ago
- Spark: Inject `DataSourceV2Relation` when missing When you start a structured streaming query using `.start()`, there will be no `DataSourceV2Relation` reference. When this is missing, we'll just cre... — committed to Fokko/incubator-iceberg by Fokko a year ago
- Spark: Inject `DataSourceV2Relation` when missing When you start a structured streaming query using `.start()`, there will be no `DataSourceV2Relation` reference. When this is missing, we'll just cre... — committed to Fokko/incubator-iceberg by Fokko a year ago
- Spark: Inject `DataSourceV2Relation` when missing When you start a structured streaming query using `.start()`, there will be no `DataSourceV2Relation` reference. When this is missing, we'll just cre... — committed to Fokko/incubator-iceberg by Fokko a year ago
- Spark: Inject `DataSourceV2Relation` when missing When you start a structured streaming query using `.start()`, there will be no `DataSourceV2Relation` reference. When this is missing, we'll just cre... — committed to Fokko/incubator-iceberg by Fokko a year ago
- Spark: Inject `DataSourceV2Relation` when missing When you start a structured streaming query using `.start()`, there will be no `DataSourceV2Relation` reference. When this is missing, we'll just cre... — committed to Fokko/incubator-iceberg by Fokko a year ago
- Spark: Inject `DataSourceV2Relation` when missing When you start a structured streaming query using `.start()`, there will be no `DataSourceV2Relation` reference. When this is missing, we'll just cre... — committed to Fokko/incubator-iceberg by Fokko a year ago
Hello, my friends.
I’m using Structured streaming writes to partitioned table, and I’m using Iceberg’s transformations to partition.
Trying to record with .start(), I get the same error, but testing to use the approach of passing toTable(‘iceberg_table’) instead of start(), it worked.
I am using Iceberg 1.3.0 and Spark 3.4.0.
This way it didn’t work
This way it worked:
Either 3.3.1 or 3.3.2. IIRC it still happens in Spark 3.3.2 + Iceberg 1.2.1
This is a Spark issue, not an Iceberg issue (at least in 3.4). We may consider fixing Spark 3.3 and older but I am not so sure about 3.4. In Spark 3.4, we are relying on the function catalog API to resolve transforms.
@Marcus-Rosti, could you confirm
toTableworks in 3.4? I believe you tried 3.3 before.@Fokko is right that
start(), unliketoTable(), does not populate the catalog, hence we can’t resolve the transforms. I believe the right solution would be to fix Spark to useSupportsCatalogOptionswhen loadingTablefromTableProvider.same issue for me
Closing in. One workaround is to use
toTable('test_iceberg_table ')instead ofstart().The stack trace:
Spark 3.2 uses a V2 data source.