iceberg: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Apache Iceberg version

1.2.0 (latest release)

Query engine

Spark

Please describe the bug 🐞

According to the documentation, when using Iceberg, one should set spark.sql.extensions to org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions, but setting this property seems to cause an exception to be thrown when trying to write to an Iceberg table using Spark structured streaming.

The exception that is thrown is:

Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: days(ts) is not currently supported
=== Streaming Query ===
Identifier: [id = cfb83943-cd87-4c84-bf25-a290e8891e19, runId = ddf71690-7e5d-41f6-8a8e-84c425683a26]
Current Committed Offsets: {}
Current Available Offsets: {MemoryStream[ts#3,a#4,b#5]: 0}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource spark_catalog.default.test_iceberg_table, cfb83943-cd87-4c84-bf25-a290e8891e19, [path=test_iceberg_table, fanout-enabled=true, checkpointLocation=/tmp/spark-checkpoint-16659193840247202419], Append
+- StreamingDataSourceV2Relation [ts#3, a#4, b#5], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@4533d1cf, MemoryStream[ts#3,a#4,b#5]

Code to reproduce:

package com.example

import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{Encoders, SparkSession}

import java.nio.file.Files
import java.sql.Timestamp

case class Bla(ts: Timestamp, a: String, b: Double)

object MinEx {
  def main(args: Array[String]): Unit = {
    val warehouseDir = Files.createTempDirectory("spark-warehouse-iceberg-").toString
    val checkpointDir = Files.createTempDirectory("spark-checkpoint-").toString
    val spark = SparkSession.builder()
      .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
      .config("spark.sql.catalog.spark_catalog.type", "hadoop")
      .config("spark.sql.catalog.spark_catalog.warehouse", warehouseDir)
      .config("spark.sql.warehouse.dir", warehouseDir)
      .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      .appName("BugRepro")
      .master("local[*]")
      .enableHiveSupport()
      .getOrCreate()

    spark.sql("create table test_iceberg_table(ts timestamp, a string, b double) using iceberg partitioned by (days(ts))")

    implicit val sqlContext = spark.sqlContext
    implicit val encoder = Encoders.product[Bla]
    val memStream = MemoryStream[Bla]
    val now = System.currentTimeMillis()
    val day = 86400000
    memStream.addData(List(
      Bla(new Timestamp(now), "test", 12.34),
      Bla(new Timestamp(now - 1 * day), "test 1d", 33.34),
      Bla(new Timestamp(now - 3 * day), "test 3d", 44.34),
      Bla(new Timestamp(now - 2 * day), "test 2d", 55.34),
    ))

    memStream.toDF()
      .writeStream
      .format("iceberg")
      .outputMode("append")
      .option("path", "test_iceberg_table")
      .option("fanout-enabled", true)
      .option("checkpointLocation", checkpointDir)
      .trigger(Trigger.Once())
      .start()
      .awaitTermination()
  }
}

The code works as expected when the statement that configures spark.sql.extensions is commented out.

About this issue

  • Original URL
  • State: open
  • Created a year ago
  • Reactions: 9
  • Comments: 19 (10 by maintainers)

Commits related to this issue

Most upvoted comments

Hello, my friends.

I’m using Structured streaming writes to partitioned table, and I’m using Iceberg’s transformations to partition.

Trying to record with .start(), I get the same error, but testing to use the approach of passing toTable(‘iceberg_table’) instead of start(), it worked.

I am using Iceberg 1.3.0 and Spark 3.4.0.

This way it didn’t work

df.writeStream.format("iceberg").outputMode("append").trigger(
    once=True
).option("path", iceberg_table).option("fanout-enabled", "true").option(
    "checkpointLocation",
    checkpoint_location,
).start().awaitTermination()

This way it worked:

df.writeStream.format("iceberg").outputMode("append").trigger(
    once=True
).option("path", iceberg_table).option("fanout-enabled", "true").option(
    "checkpointLocation",
    checkpoint_location,
).toTable(
    iceberg_table
).awaitTermination()

@adigerber I know that it is a while ago, but which version of Spark are you using?

Either 3.3.1 or 3.3.2. IIRC it still happens in Spark 3.3.2 + Iceberg 1.2.1

This is a Spark issue, not an Iceberg issue (at least in 3.4). We may consider fixing Spark 3.3 and older but I am not so sure about 3.4. In Spark 3.4, we are relying on the function catalog API to resolve transforms.

@Marcus-Rosti, could you confirm toTable works in 3.4? I believe you tried 3.3 before.

@Fokko is right that start(), unlike toTable(), does not populate the catalog, hence we can’t resolve the transforms. I believe the right solution would be to fix Spark to use SupportsCatalogOptions when loading Table from TableProvider.

same issue for me

Closing in. One workaround is to use toTable('test_iceberg_table ') instead of start().

The stack trace:

23/06/19 17:45:53 ERROR MicroBatchExecution: Query [id = 2507e68d-0fb2-4aa2-98e4-427bfa1326c5, runId = 308060c0-6572-4328-9588-ece2b00c054a] terminated with error
org.apache.spark.sql.AnalysisException: days(ts) is not currently supported
	at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:64)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:64)
	at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$1(DistributionAndOrderingUtils.scala:36)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
	at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:36)
	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:93)
	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
	at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
	at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:82)
	at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:656)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:646)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
	at org.apache.spark.sql.execution.streaming.SingleBatchExecutor.execute(TriggerExecutor.scala:39)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)

Spark 3.2 uses a V2 data source.