elasticsearch-hadoop: Unable to index JSON from HDFS using SchemaRDD.saveToEs()

Elasticsearch-hadoop: elasticsearch-hadoop-2.1.0.BUILD-20150224.023403-309.zip Spark: 1.2.0

I’m attempting to take a very basic JSON document on HDFS and index it in ES using SchemaRDD.saveToEs(). According to the documentation under “writing existing JSON to elasticsearch” it should be as easy as creating a SchemaRDD via SQLContext.jsonFile() and then index using .saveToEs(), but I’m getting an error.

Replicating the problem:

  1. Create JSON file on hdfs with the content:
{"key":"value"}
  1. Execute code in spark-shell
import org.apache.spark.SparkContext._
import org.elasticsearch.spark._

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

val input = sqlContext.jsonFile("hdfs://nameservice1/user/mshirley/test.json")
input.saveToEs("mshirley_spark_test/test")

Error:

 org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [Bad Request(400) - Invalid JSON fragment received[["value"]][MapperParsingException[failed to parse]; nested: ElasticsearchParseException[Failed to derive xcontent from (offset=13, length=9): [123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 34, 118, 97, 108, 117, 101, 34, 93, 10]]; ]];

input object:

res1: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[6] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
PhysicalRDD [key#0], MappedRDD[5] at map at JsonRDD.scala:47

input.printSchema():

root
 |-- key: string (nullable = true)

Expected result: I expect to be able to read a file from HDFS that contains a JSON document per line and submit that data to ES for indexing.

About this issue

  • Original URL
  • State: closed
  • Created 9 years ago
  • Comments: 26 (12 by maintainers)

Commits related to this issue

Most upvoted comments

@costin @yanakad @mshirley

In my case the test was even simpler. To summarize: Dependencies: elasticsearch-spark_2.10 Version: 2.1.0.Beta3 spark-streaming_2.10 Version: 1.2.1

Running on: Yarn HDP cluster

Test #1: Save RDD[Map[String,Int]]. Result: Pass. This works Test #2: Save RDD[String] where String is a JSON string. This fails. Test #3: Save RDD[Case Class] According to documentation this should work. It fails as well. Here’s the documentation I’m referring to http://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html

Using the exact same example does not work. (The documentation also has a typo naming the variable +RDD+). Here’s the example. Again, this did not work and threw an exception.

// define a case class
case class Trip(departure: String, arrival: String)               

val upcomingTrip = Trip("OTP", "SFO")
val lastWeekTrip = Trip("MUC", "OTP")

val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))           
EsSpark.saveToEs(rdd, "spark/docs")

And the exception was:

org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [Bad Request(400) - Invalid JSON fragment received[["MUC","OTP"]][MapperParsingException[failed to parse]; nested: ElasticsearchParseException[Failed to derive xcontent from (offset=0, length=13): [91, 34, 77, 85, 67, 34, 44, 34, 79, 84, 80, 34, 93]]; ]]; Bailing out..
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
    at org.apache.spark.scheduler.Task.run(Task.scala:58)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

The example couldn’t really get any simpler than that. If I manually create a Map out of the values in the case class, it works. This example even removes the SQL context schema out of the equation.

Thoughts?