benthos: Kafka Input only gets the topic messages, does not get keys

Currently the input of type Kafka will only get the message from a given topic, is there any plan to enable also getting the keys?

My use case is very simply that I need to back up the _schemas topic, but then in order to restore I need to be able to produce those messages with the same key they were originally produced with.

Would you accept a PR for this?

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 16 (7 by maintainers)

Most upvoted comments

Hey @luck02, yeah you can delete individual metadata fields with meta foo = deleted(), and also remove all prefixes with something like meta = meta().filter(!this.key.has_prefix("kafka_")), which would remove all metadata with the kafka_ prefix.

These tips are missing from the docs so I’ll make sure they get added to https://www.benthos.dev/docs/configuration/metadata

Here’s a version that copies kafka_key to source_key and deletes all s3_ and kafka_ headers. That way you’re only adding one extra header from your backup.

input:
  type: s3
  s3:
    bucket: "super-secret"
    delete_objects: false
    download_manager:
      enabled: false
    force_path_style_urls: false
    max_batch_count: 1
    prefix: "benthos/_schemas"
    region: eu-west-1
    retries: 3
    timeout: 5s
pipeline:
  processors:
  - metadata:
      operator: set
      key: source_key
      value: ${!metadata:kafka_key}
  - metadata:
      operator: delete_prefix
      key: s3_
  - metadata:
      operator: delete_prefix
      key: kafka_
output:
  type: kafka
  kafka:
    addresses:
    - b0:9092
    client_id: benthos-schemas-restore-1
    compression: none
    key: "${!metadata:source_key}"
    topic: _schemas
    target_version: 2.1.0

@Jeffail Really appreciate your help this morning.

It turned out that the Topic Operator I am using was falsely reporting having set the max.message.bytes on the topic I was restoring to, so in fact it was exactly what you said - simply the messages were too big!