kafka-node: HighLevelProducer does not work embedding a timestamp.

Bug Report

HighLevelProducer does not work embedding a timestamp.

Environment

  • Node version: v6.1.0
  • Kafka-node version: v2.2.2
  • Kafka version: 0.10.0.2

For specific cases also provide

  • Number of Brokers: 1
  • Number partitions for topic: 10

Include Sample Code to produce behavior

producer.send([
  {
    topic: topic,
    messages: messages,
    timestamp: Date.now()
  }
], cb))

Include Sample Code to consumer behavior

Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "borker1:9092");
    config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    StringSerializer stringSerializer = new StringSerializer();
    StringDeserializer stringDeserializer = new StringDeserializer();

    WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(stringSerializer);
    WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(stringDeserializer);
    Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer,windowedDeserializer);

    KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> textLines = builder.stream("newSignals");

    KStream<Windowed<String>, Long> wordCounts = textLines
        .mapValues(textLine -> {
          System.out.println(textLine);
          return textLine.toLowerCase();
        })
        .groupBy((key, word) -> {
          System.out.println(word.split("\\W+")[2]);
          return word.split("\\W+")[2];
        })
        .count(TimeWindows.of(60000), "Counts")
        .toStream();
    wordCounts.to(windowedSerde, Serdes.Long(), "test");

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();

Include output with Debug turned on


Include output Java concumer log

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Input record ConsumerRecord(topic = newSignals, partition = 0, offset = 0, CreateTime = -1, checksum = 2486091252, serialized key size = -1, serialized value size = 126, key = null, value = {"device_id":"12345","rssi":-21,"ts":1504859056569692,"square":"1234","shop":"1234"}) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.
	at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:62)
	at org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:60)
	at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:46)
	at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:86)
	at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
	at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:158)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:619)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)

Thanks for your contribution!

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Reactions: 3
  • Comments: 20 (4 by maintainers)

Most upvoted comments

Setting the message.timestamp.type on topic level worked in my scenario.

By default message.timestamp.type=CreateTime, this means that a timestamp must be added by the producer. However if a timestamp is not added then the exception occurs in Streams Application:

As we are using a third party producer so we need to set message.timestamp.type=LogAppendTime. By setting it to LogAppendTime, a timestamp on each message is added when it is received by the broker.

When a leader broker receives a message

  1. If message.timestamp.type=LogAppendTime, the server will override the timestamp with its current local time and append the message to the log.
  • If the message is a compressed message. the timestamp in the wrapper message will be updated to current server time. Broker will set the timestamp type bit in wrapper messages to 1. Broker will ignore the inner message timestamp. We do this instead of writing current server time to each message is to avoid recompression penalty when people are using LogAppendTime.
  • If the message is a non-compressed message, the timestamp in the message will be overwritten to current server time.
  1. If message.timestamp.type=CreateTime
  • If the time difference is within a configurable threshold max.message.time.difference.ms, the server will accept it and append it to the log. For compressed message, server will update the timestamp in compressed message to the largest timestamp of the inner messages.
  • If the time difference is beyond the configured threshold max.message.time.difference.ms, the server will reject the entire batch with TimestampExceededThresholdException.

message.timestamp.type and max.message.time.difference.ms will be a per topic configuration.

Any fix for this issue.

I’ve seen the same problem with both the HighLevelProducer and Producer using node 6.10.0 and Kafka 0.10.1.1

I’m able to inspect timestamps on kafka using this command line (obviously changing the name of the topic):

./kafka-simple-consumer-shell.sh --broker-list localhost:9092 --partition 0 --property print.timestamp=true --offset -2 --topic CommandAndControl

But the timestamps are always -1 even if I explicitly set the timestamp field to a specific value.