kafka-connect-mongodb: Fail to write to DB 'The $v update field is only recognized internally'
Hi,
I’m currently testing the Sink connector and trying to Sync 2 mongoDB clusters where the source cluster is using Debezium CDC and the destination MongoDB is using your MongoDB Sink connector.
my config is:
{
"name": "mongodb-dst-sink",
"config": {
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
"topics": "dbserver1.inventory.customers",
"mongodb.connection.uri": "mongodb://debezium:dbz@mongodb-dst:27017/inventory",
"mongodb.change.data.capture.handler": "at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb.MongoDbHandler",
"mongodb.delete.on.null.values": "false",
"mongodb.collections": "customers",
"mongodb.collection.dbserver1.inventory.customers": "customers"
}
}
Initially I was able to sync the clusters, however after I was trying to update a document in the source DB, I started getting the below exception.
The exception I get:
2019-07-09 15:04:36,175 ERROR || error on mongodb operation [at.grahsl.kafka.connect.mongodb.MongoDbSinkTask]
com.mongodb.MongoBulkWriteException: Bulk write operation error on server mongodb-dst:27017. Write errors: [BulkWriteError{index=0, code=9, message='The $v update field is only recognized internally', details={}}].
at com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:177)
at com.mongodb.connection.BulkWriteBatchCombiner.throwOnError(BulkWriteBatchCombiner.java:206)
at com.mongodb.connection.BulkWriteBatchCombiner.getResult(BulkWriteBatchCombiner.java:147)
at com.mongodb.operation.BulkWriteBatch.getResult(BulkWriteBatch.java:227)
at com.mongodb.operation.MixedBulkWriteOperation.executeBulkWriteBatch(MixedBulkWriteOperation.java:280)
at com.mongodb.operation.MixedBulkWriteOperation.access$700(MixedBulkWriteOperation.java:70)
at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:203)
at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:194)
at com.mongodb.operation.OperationHelper.withReleasableConnection(OperationHelper.java:424)
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:194)
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:69)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:193)
at com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:468)
at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:448)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:148)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$null$0(MongoDbSinkTask.java:118)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$put$1(MongoDbSinkTask.java:117)
at java.util.HashMap.forEach(HashMap.java:1289)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:112)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-07-09 15:04:36,177 ERROR || writing 2 document(s) into collection [inventory.customers] failed -> remaining retries (2) [at.grahsl.kafka.connect.mongodb.MongoDbSinkTask]
2019-07-09 15:04:36,177 ERROR || WorkerSinkTask{id=mongodb-dst-sink-0} RetriableException from SinkTask: [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.RetriableException: Bulk write operation error on server mongodb-dst:27017. Write errors: [BulkWriteError{index=0, code=9, message='The $v update field is only recognized internally', details={}}].
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:170)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$null$0(MongoDbSinkTask.java:118)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$put$1(MongoDbSinkTask.java:117)
at java.util.HashMap.forEach(HashMap.java:1289)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:112)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.mongodb.MongoBulkWriteException: Bulk write operation error on server mongodb-dst:27017. Write errors: [BulkWriteError{index=0, code=9, message='The $v update field is only recognized internally', details={}}].
at com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:177)
at com.mongodb.connection.BulkWriteBatchCombiner.throwOnError(BulkWriteBatchCombiner.java:206)
at com.mongodb.connection.BulkWriteBatchCombiner.getResult(BulkWriteBatchCombiner.java:147)
at com.mongodb.operation.BulkWriteBatch.getResult(BulkWriteBatch.java:227)
at com.mongodb.operation.MixedBulkWriteOperation.executeBulkWriteBatch(MixedBulkWriteOperation.java:280)
at com.mongodb.operation.MixedBulkWriteOperation.access$700(MixedBulkWriteOperation.java:70)
at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:203)
at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:194)
at com.mongodb.operation.OperationHelper.withReleasableConnection(OperationHelper.java:424)
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:194)
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:69)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:193)
at com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:468)
at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:448)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:148)
... 16 more
2019-07-09 15:04:36,665 INFO || 172.20.0.1 - - [09/Jul/2019:15:04:36 +0000] "GET /connectors/mongodb-dst-sink/status HTTP/1.1" 200 170 3 [org.apache.kafka.connect.runtime.rest.RestServer]
2019-07-09 15:04:41,181 ERROR || error on mongodb operation [at.grahsl.kafka.connect.mongodb.MongoDbSinkTask]
com.mongodb.MongoBulkWriteException: Bulk write operation error on server mongodb-dst:27017. Write errors: [BulkWriteError{index=0, code=9, message='The $v update field is only recognized internally', details={}}].
at com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:177)
at com.mongodb.connection.BulkWriteBatchCombiner.throwOnError(BulkWriteBatchCombiner.java:206)
at com.mongodb.connection.BulkWriteBatchCombiner.getResult(BulkWriteBatchCombiner.java:147)
at com.mongodb.operation.BulkWriteBatch.getResult(BulkWriteBatch.java:227)
at com.mongodb.operation.MixedBulkWriteOperation.executeBulkWriteBatch(MixedBulkWriteOperation.java:280)
at com.mongodb.operation.MixedBulkWriteOperation.access$700(MixedBulkWriteOperation.java:70)
at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:203)
at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:194)
at com.mongodb.operation.OperationHelper.withReleasableConnection(OperationHelper.java:424)
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:194)
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:69)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:193)
at com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:468)
at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:448)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:148)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$null$0(MongoDbSinkTask.java:118)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$put$1(MongoDbSinkTask.java:117)
at java.util.HashMap.forEach(HashMap.java:1289)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:112)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
What is the problem and how can I overcome it?
About this issue
- Original URL
- State: closed
- Created 5 years ago
- Comments: 21 (10 by maintainers)
Thank you, it works!
Thanks, Hans. Appreciate it.