kafka-connect-mongodb: JsonParseException with a string key

Hi,

I am trying to sink to mongodb from a kafka topic that has a string key (simple text) and raw JSON as the message, and I get an exception when I post my connector config to kafka-connect. (I have data in the topic already so the connector processing starts as soon as I post to kafka-connect.) Any advice would be appreciated. Note that I’m not running with confluent platform, just straight kafka. In this example, the keys for all my kafka messages are “SZ0”.

The library version tested is v1.0.0.

Exception:

[2018-05-04 13:58:39,898] ERROR WorkerSinkTask{id=bare-mongodbsink.json-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:515)
org.bson.json.JsonParseException: JSON reader was expecting a value but found 'SZ0'.
        at org.bson.json.JsonReader.readBsonType(JsonReader.java:251)
        at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:682)
        at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:724)
        at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:452)
        at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
        at org.bson.BsonDocument.parse(BsonDocument.java:62)
        at at.grahsl.kafka.connect.mongodb.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:34)
        at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.convert(SinkConverter.java:44)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$buildWriteModel$1(MongoDbSinkTask.java:148)
        at java.util.ArrayList.forEach(ArrayList.java:1257)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModel(MongoDbSinkTask.java:147)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:107)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2018-05-04 13:58:39,901] ERROR WorkerSinkTask{id=bare-mongodbsink.json-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:517)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.bson.json.JsonParseException: JSON reader was expecting a value but found 'SZ0'.
        at org.bson.json.JsonReader.readBsonType(JsonReader.java:251)
        at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:682)
        at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:724)
        at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:452)
        at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
        at org.bson.BsonDocument.parse(BsonDocument.java:62)
        at at.grahsl.kafka.connect.mongodb.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:34)
        at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.convert(SinkConverter.java:44)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$buildWriteModel$1(MongoDbSinkTask.java:148)
        at java.util.ArrayList.forEach(ArrayList.java:1257)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModel(MongoDbSinkTask.java:147)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:107)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
        ... 10 more
[2018-05-04 13:58:39,901] ERROR WorkerSinkTask{id=bare-mongodbsink.json-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
[2018-05-04 13:58:39,902] INFO stopping MongoDB sink task (at.grahsl.kafka.connect.mongodb.MongoDbSinkTask:182)                       

Here is the config I used:

{
    "name": "bare-mongodbsink.json",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
          
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable": false,
          
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
          
        "topics": "sensed-metrics",

        "mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy",
        
        "mongodb.connection.uri": "mongodb://localhost:27017/testdb?w=1&journal=true",
        "mongodb.collection": "sensedMetricsTest"
    }
}

And here is the kafka connect properties file:

bootstrap.servers=localhost:9092

group.id=connect-cluster

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

config.storage.topic=connect-configs
config.storage.replication.factor=1

status.storage.topic=connect-status
status.storage.replication.factor=1

offset.flush.interval.ms=10000



plugin.path=/opt/kafka-connectors

Thanks

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 30 (10 by maintainers)

Most upvoted comments

I got it working using SMT:

transforms=createKey
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=EVENT,SOURCEID

mongodb.document.id.strategy=at.grahsl.kafka.connect.mongodb.processor.id.strategy.PartialValueStrategy
mongodb.key.projection.type=whitelist
mongodb.key.projection.list=EVENT,SOURCEID
mongodb.replace.one.strategy=at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.ReplaceOneBusinessKeyFilterStrategy

Yep I forgot something. You also have to set

mongodb.document.id.strategy=at.grahsl.kafka.connect.mongodb.processor.id.strategy.ProvidedInKeyStrategy

So it actually uses the _id from the created key document

I, we also use this connector and ran into the same problem. We use Kafka connect transformations to wrap the key and do it like this:

# The list of transformations to apply (comma separated, applied in order)
transforms=WrapKey
# The configuration of you transformations
transforms.WrapKey.type=org.apache.kafka.connect.transforms.HoistField$Key
transforms.WrapKey.field=_id

Confluent documentation on transformation is light but you can find more details in Apache Kafka documentation : https://kafka.apache.org/documentation/#connect

@jzuijlek glad to hear you got it working for your use case. just wondering if it would also work with the ReplaceOneDefaultFilterStrategy because it also support compound keys. The custom ReplaceOneBusinessKeyFilterStrategy is for very specific usage, namely if you absolutely need a MongoDB ObjectId to be created and at the same time need to have upsert semantics based on your business key.