spark: [BUG]: Applying UDFs to TimestampType causes occasional exception

Describe the bug When applying multiple UDFs to a dataframe an error is thrown when at least one of the target columns of the UDF is a TimestampType column.

To Reproduce

Below is a minimal reproduction you can use. It includes a comment on how to create the necessary parquet file. It must have 2 timestamp columns and a sufficient number of rows (~2k) for the issue to occur. It is launched via the below command:

spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.2 --master local .\microsoft-spark-3-0_2.12-1.0.0.jar dotnet .\spark_cache_bug.dll
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;

using static Microsoft.Spark.Sql.Functions;

namespace spark_cache_bug
{
    public static class SparkGenerator
    {
        public static Func<Column, Column> Generate(StructType returnType, string column) =>
            Udf<Row>(row => new GenericRow(new string [] {"1970-01-02 00:00:00"}), returnType);
    }

    public static class Program
    {
        private static void Generate(Dictionary<string, Column> expressionMap, string column, DataFrame df)
        {
            var typeMapper = new Dictionary<string, DataType>();
            foreach (var f in df.Schema().Fields)
            {
                typeMapper.Add(f.Name, f.DataType);
            }

            var returnType = new StructType(new[] {new StructField(column, new StringType())});

            var udf = SparkGenerator.Generate(returnType,column);

            var newCol = udf(Struct(expressionMap[column]));
            expressionMap[column] = newCol.GetField(column).Cast(typeMapper[column].SimpleString).Alias(column);

        }

        /*
         * To create the parquet file for testing.
         * spark-shell --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.2   (assuming you are writing file to S3)
         * import java.sql.Timestamp
         * val df = sc.parallelize( Seq.fill(10000){(new Timestamp(0), new Timestamp(0))}).toDF("timestamp1","timestamp2")
         * df.write.parquet("...")
         *
         * this creates a parquet file with 2 columns called timestamp1 and timestamp2.  They are both of type TimestampType
         */

        private static void Main(string[] args)
        {

            var spark = SparkSession.Builder().GetOrCreate();
            spark.Conf().Set("fs.s3a.access.key", "<AWS ACCESS ID>");
            spark.Conf().Set("fs.s3a.secret.key", "<AWS ACCESS SECRET KEY");
            var sourcePath = "s3a://<PATH TO FILE>";
            var outputPath = "s3a://<DESIRED OUTPUT LOCATION>";

            var df = spark.Read().Parquet(sourcePath);

            var expressionMap = df.Schema().Fields.ToDictionary(f => f.Name, f => df.Col(f.Name));

            Generate(expressionMap, "timestamp1", df);
            Generate(expressionMap, "timestamp2", df);

            df = df.Select(df.Schema().Fields.Select(f => expressionMap[f.Name]).ToArray());
            df.Write().Mode("overwrite").Parquet(outputPath);
        }
    }
}

The exception thrown is:

Caused by: org.apache.spark.api.python.PythonException: System.InvalidCastException: Unable to cast object of type 'Microsoft.Spark.Sql.Types.Timestamp' to type 'System.Int32'.
   at Microsoft.Spark.Sql.Types.TimestampType.FromInternal(Object obj) in /_/src/csharp/Microsoft.Spark/Sql/Types/SimpleTypes.cs:line 116
   at Microsoft.Spark.Sql.Row.Convert() in /_/src/csharp/Microsoft.Spark/Sql/Row.cs:line 152
   at Microsoft.Spark.Sql.Row..ctor(Object[] values, StructType schema) in /_/src/csharp/Microsoft.Spark/Sql/Row.cs:line 36
   at Microsoft.Spark.Sql.RowConstructor.GetRow() in /_/src/csharp/Microsoft.Spark/Sql/RowConstructor.cs:line 104
   at Microsoft.Spark.Sql.RowConstructor.GetRow() in /_/src/csharp/Microsoft.Spark/Sql/RowConstructor.cs:line 100
   at Microsoft.Spark.Worker.Command.PicklingSqlCommandExecutor.ExecuteCore(Stream inputStream, Stream outputStream, SqlCommand[] commands) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 146
   at Microsoft.Spark.Worker.Command.SqlCommandExecutor.Execute(Version version, Stream inputStream, Stream outputStream, PythonEvalType evalType, SqlCommand[] commands) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 76
   at Microsoft.Spark.Worker.Command.CommandExecutor.Execute(Stream inputStream, Stream outputStream, Int32 splitIndex, CommandPayload commandPayload) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Command\CommandExecutor.cs:line 65
   at Microsoft.Spark.Worker.TaskRunner.ProcessStream(Stream inputStream, Stream outputStream, Version version, Boolean& readComplete) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\TaskRunner.cs:line 154
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
        at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
        at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:272)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
        ... 9 more

Our investigation After investigating a bit, it appears to be because a given row’s TimestampType column is sometimes passed to TimestampType.FromInternal multiple times (once per UDF call). The first time FromInternal is called it properly creates a new Timestamp object. The second time, however, that same Timestamp object is passed again into the function where the function is expecting an int or long. The culprit might be RowConstructor.GetRow which appears to possible re-use _args. When I modify GetRows to first make a copy of _args and pass the copy to the Row constructor the issue goes away.

Desktop (please complete the following information):

  • Windows 10
  • Chrome
  • v1.0.0 running Spark 3

Additional context This error won’t always occur but occurs with high probability if the dataframe has at least 2k rows.

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 26 (3 by maintainers)

Most upvoted comments

Sure. Can you reach out to hello@tonic.ai. I’ll see the e-mail and respond.

@elvaliuliuliu Sorry, just saw you already asked for the AWS setup. I’ll get that to you in a few minutes!