kafka-connect-mongodb: JsonParseException with a string key
Hi,
I am trying to sink to mongodb from a kafka topic that has a string key (simple text) and raw JSON as the message, and I get an exception when I post my connector config to kafka-connect. (I have data in the topic already so the connector processing starts as soon as I post to kafka-connect.) Any advice would be appreciated. Note that I’m not running with confluent platform, just straight kafka. In this example, the keys for all my kafka messages are “SZ0”.
The library version tested is v1.0.0.
Exception:
[2018-05-04 13:58:39,898] ERROR WorkerSinkTask{id=bare-mongodbsink.json-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:515)
org.bson.json.JsonParseException: JSON reader was expecting a value but found 'SZ0'.
at org.bson.json.JsonReader.readBsonType(JsonReader.java:251)
at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:682)
at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:724)
at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:452)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
at org.bson.BsonDocument.parse(BsonDocument.java:62)
at at.grahsl.kafka.connect.mongodb.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:34)
at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.convert(SinkConverter.java:44)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$buildWriteModel$1(MongoDbSinkTask.java:148)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModel(MongoDbSinkTask.java:147)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:107)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2018-05-04 13:58:39,901] ERROR WorkerSinkTask{id=bare-mongodbsink.json-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:517)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.bson.json.JsonParseException: JSON reader was expecting a value but found 'SZ0'.
at org.bson.json.JsonReader.readBsonType(JsonReader.java:251)
at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:682)
at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:724)
at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:452)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
at org.bson.BsonDocument.parse(BsonDocument.java:62)
at at.grahsl.kafka.connect.mongodb.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:34)
at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.convert(SinkConverter.java:44)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$buildWriteModel$1(MongoDbSinkTask.java:148)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModel(MongoDbSinkTask.java:147)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:107)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
... 10 more
[2018-05-04 13:58:39,901] ERROR WorkerSinkTask{id=bare-mongodbsink.json-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
[2018-05-04 13:58:39,902] INFO stopping MongoDB sink task (at.grahsl.kafka.connect.mongodb.MongoDbSinkTask:182)
Here is the config I used:
{
"name": "bare-mongodbsink.json",
"config": {
"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"topics": "sensed-metrics",
"mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy",
"mongodb.connection.uri": "mongodb://localhost:27017/testdb?w=1&journal=true",
"mongodb.collection": "sensedMetricsTest"
}
}
And here is the kafka connect properties file:
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/opt/kafka-connectors
Thanks
About this issue
- Original URL
- State: closed
- Created 6 years ago
- Comments: 30 (10 by maintainers)
I got it working using SMT:
Yep I forgot something. You also have to set
So it actually uses the _id from the created key document
I, we also use this connector and ran into the same problem. We use Kafka connect transformations to wrap the key and do it like this:
Confluent documentation on transformation is light but you can find more details in Apache Kafka documentation : https://kafka.apache.org/documentation/#connect
@jzuijlek glad to hear you got it working for your use case. just wondering if it would also work with the
ReplaceOneDefaultFilterStrategybecause it also support compound keys. The customReplaceOneBusinessKeyFilterStrategyis for very specific usage, namely if you absolutely need a MongoDB ObjectId to be created and at the same time need to have upsert semantics based on your business key.