kafka-connect-mongodb: Fail to write to DB 'The $v update field is only recognized internally'

Hi,

I’m currently testing the Sink connector and trying to Sync 2 mongoDB clusters where the source cluster is using Debezium CDC and the destination MongoDB is using your MongoDB Sink connector.

my config is:

{
  "name": "mongodb-dst-sink",
  "config": {
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
    "topics": "dbserver1.inventory.customers",
    "mongodb.connection.uri": "mongodb://debezium:dbz@mongodb-dst:27017/inventory",
    "mongodb.change.data.capture.handler": "at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb.MongoDbHandler",
    "mongodb.delete.on.null.values": "false",
    "mongodb.collections": "customers",
    "mongodb.collection.dbserver1.inventory.customers": "customers"
  }
}

Initially I was able to sync the clusters, however after I was trying to update a document in the source DB, I started getting the below exception.

The exception I get:

2019-07-09 15:04:36,175 ERROR  ||  error on mongodb operation   [at.grahsl.kafka.connect.mongodb.MongoDbSinkTask]
com.mongodb.MongoBulkWriteException: Bulk write operation error on server mongodb-dst:27017. Write errors: [BulkWriteError{index=0, code=9, message='The $v update field is only recognized internally', details={}}]. 
	at com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:177)
	at com.mongodb.connection.BulkWriteBatchCombiner.throwOnError(BulkWriteBatchCombiner.java:206)
	at com.mongodb.connection.BulkWriteBatchCombiner.getResult(BulkWriteBatchCombiner.java:147)
	at com.mongodb.operation.BulkWriteBatch.getResult(BulkWriteBatch.java:227)
	at com.mongodb.operation.MixedBulkWriteOperation.executeBulkWriteBatch(MixedBulkWriteOperation.java:280)
	at com.mongodb.operation.MixedBulkWriteOperation.access$700(MixedBulkWriteOperation.java:70)
	at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:203)
	at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:194)
	at com.mongodb.operation.OperationHelper.withReleasableConnection(OperationHelper.java:424)
	at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:194)
	at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:69)
	at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:193)
	at com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:468)
	at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:448)
	at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:148)
	at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$null$0(MongoDbSinkTask.java:118)
	at java.util.ArrayList.forEach(ArrayList.java:1257)
	at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$put$1(MongoDbSinkTask.java:117)
	at java.util.HashMap.forEach(HashMap.java:1289)
	at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:112)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
2019-07-09 15:04:36,177 ERROR  ||  writing 2 document(s) into collection [inventory.customers] failed -> remaining retries (2)   [at.grahsl.kafka.connect.mongodb.MongoDbSinkTask]
2019-07-09 15:04:36,177 ERROR  ||  WorkerSinkTask{id=mongodb-dst-sink-0} RetriableException from SinkTask:   [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.RetriableException: Bulk write operation error on server mongodb-dst:27017. Write errors: [BulkWriteError{index=0, code=9, message='The $v update field is only recognized internally', details={}}]. 
	at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:170)
	at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$null$0(MongoDbSinkTask.java:118)
	at java.util.ArrayList.forEach(ArrayList.java:1257)
	at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$put$1(MongoDbSinkTask.java:117)
	at java.util.HashMap.forEach(HashMap.java:1289)
	at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:112)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.mongodb.MongoBulkWriteException: Bulk write operation error on server mongodb-dst:27017. Write errors: [BulkWriteError{index=0, code=9, message='The $v update field is only recognized internally', details={}}]. 
	at com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:177)
	at com.mongodb.connection.BulkWriteBatchCombiner.throwOnError(BulkWriteBatchCombiner.java:206)
	at com.mongodb.connection.BulkWriteBatchCombiner.getResult(BulkWriteBatchCombiner.java:147)
	at com.mongodb.operation.BulkWriteBatch.getResult(BulkWriteBatch.java:227)
	at com.mongodb.operation.MixedBulkWriteOperation.executeBulkWriteBatch(MixedBulkWriteOperation.java:280)
	at com.mongodb.operation.MixedBulkWriteOperation.access$700(MixedBulkWriteOperation.java:70)
	at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:203)
	at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:194)
	at com.mongodb.operation.OperationHelper.withReleasableConnection(OperationHelper.java:424)
	at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:194)
	at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:69)
	at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:193)
	at com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:468)
	at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:448)
	at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:148)
	... 16 more
2019-07-09 15:04:36,665 INFO   ||  172.20.0.1 - - [09/Jul/2019:15:04:36 +0000] "GET /connectors/mongodb-dst-sink/status HTTP/1.1" 200 170  3   [org.apache.kafka.connect.runtime.rest.RestServer]
2019-07-09 15:04:41,181 ERROR  ||  error on mongodb operation   [at.grahsl.kafka.connect.mongodb.MongoDbSinkTask]
com.mongodb.MongoBulkWriteException: Bulk write operation error on server mongodb-dst:27017. Write errors: [BulkWriteError{index=0, code=9, message='The $v update field is only recognized internally', details={}}]. 
	at com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:177)
	at com.mongodb.connection.BulkWriteBatchCombiner.throwOnError(BulkWriteBatchCombiner.java:206)
	at com.mongodb.connection.BulkWriteBatchCombiner.getResult(BulkWriteBatchCombiner.java:147)
	at com.mongodb.operation.BulkWriteBatch.getResult(BulkWriteBatch.java:227)
	at com.mongodb.operation.MixedBulkWriteOperation.executeBulkWriteBatch(MixedBulkWriteOperation.java:280)
	at com.mongodb.operation.MixedBulkWriteOperation.access$700(MixedBulkWriteOperation.java:70)
	at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:203)
	at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:194)
	at com.mongodb.operation.OperationHelper.withReleasableConnection(OperationHelper.java:424)
	at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:194)
	at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:69)
	at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:193)
	at com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:468)
	at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:448)
	at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:148)
	at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$null$0(MongoDbSinkTask.java:118)
	at java.util.ArrayList.forEach(ArrayList.java:1257)
	at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$put$1(MongoDbSinkTask.java:117)
	at java.util.HashMap.forEach(HashMap.java:1289)
	at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:112)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

What is the problem and how can I overcome it?

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 21 (10 by maintainers)

Most upvoted comments

Thank you, it works!

Thanks, Hans. Appreciate it.