DataflowTemplates: PubSub to ElasticSearch Error

I’m trying to ingest data to ES with the use of GCP Dataflow and the template PubSub to Elasticsearch. So far I’ve tried a couple of different deployments which are all on GCP using the trial option over at elastic.co.

The data I’m trying to ingest are metrics from our devices in a simple JSON format. The dataflow is configured by using the Cloud ID for my deployment and a custom UDF to format the data from PubSub. The type of data I leave default, so I’m not using the audit option as some guides sugest.

It all seems to go well, some metrics are ingested but then there is an error. view below:

Error message from worker: java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:231)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1483)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1449)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:231)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1483)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1449)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:231)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1483)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1449)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:231)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1483)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1449)

This Dataflow is using default configurations so I’d really expect this to work out of the box.

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 24

Most upvoted comments

@GustafKisi For me its working now with Elasticsearch 8.x !

They are working on Elasticsearch 8.x support here: https://github.com/GoogleCloudPlatform/DataflowTemplates/pull/369 Should be available soon within the UI.

Hey @GustafKisi, I’m not an expert on Dataflow cost optimization, but certainly the streaming engine can help. You can also look into limiting the amount of worker nodes (max workers), using a E2 instance type for the machine type, reducing disk size (diskSizeGb). Of course, limiting the amount of logs by log sink configuration or exclusion filters will help as well.

Yes, absolutely, but do note it’s not officially Google version (but the changes are pretty transparent).

Yeah, it’ll take 20-30 minutes or something.

@GustafKisi I suggest Cloud Shell, as it seems to have most stuff installed. Otherwise you’ll have to have the Java JDK and Maven installed.

I did some investigation on this. I believe the reason is because Elasticsearch has deprecated mapping types in ES 7.0+, so this causes a warning like this emitted for every bulk flush: request [POST https://xxx.europe-west4.gcp.elastic-cloud.com:443/logs-gcp.pubsub-default/_doc/_bulk] returned 1 warnings: [299 Elasticsearch-7.16.3-4e6e4eab2297e949ec994e688dad46290d018022 "[types removal] Specifying types in bulk requests is deprecated."].

I suppose the correct way is to either a) make it configurable, b) add ES version detection, c) just support ES 7+, or last but not least, ignore and supress the warning in checkForErrors() (the code in question is here).