kafka-node: HighLevelProducer does not work embedding a timestamp.
Bug Report
HighLevelProducer does not work embedding a timestamp.
Environment
- Node version: v6.1.0
- Kafka-node version: v2.2.2
- Kafka version: 0.10.0.2
For specific cases also provide
- Number of Brokers: 1
- Number partitions for topic: 10
Include Sample Code to produce behavior
producer.send([
{
topic: topic,
messages: messages,
timestamp: Date.now()
}
], cb))
Include Sample Code to consumer behavior
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "borker1:9092");
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(stringSerializer);
WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(stringDeserializer);
Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer,windowedDeserializer);
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream("newSignals");
KStream<Windowed<String>, Long> wordCounts = textLines
.mapValues(textLine -> {
System.out.println(textLine);
return textLine.toLowerCase();
})
.groupBy((key, word) -> {
System.out.println(word.split("\\W+")[2]);
return word.split("\\W+")[2];
})
.count(TimeWindows.of(60000), "Counts")
.toStream();
wordCounts.to(windowedSerde, Serdes.Long(), "test");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Include output with Debug turned on
Include output Java concumer log
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Input record ConsumerRecord(topic = newSignals, partition = 0, offset = 0, CreateTime = -1, checksum = 2486091252, serialized key size = -1, serialized value size = 126, key = null, value = {"device_id":"12345","rssi":-21,"ts":1504859056569692,"square":"1234","shop":"1234"}) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.
at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:62)
at org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:60)
at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:46)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:86)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:158)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:619)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Thanks for your contribution!
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Reactions: 3
- Comments: 20 (4 by maintainers)
Setting the message.timestamp.type on topic level worked in my scenario.
By default message.timestamp.type=CreateTime, this means that a timestamp must be added by the producer. However if a timestamp is not added then the exception occurs in Streams Application:
As we are using a third party producer so we need to set message.timestamp.type=LogAppendTime. By setting it to LogAppendTime, a timestamp on each message is added when it is received by the broker.
When a leader broker receives a message
message.timestamp.type and max.message.time.difference.ms will be a per topic configuration.
Any fix for this issue.
I’ve seen the same problem with both the HighLevelProducer and Producer using node 6.10.0 and Kafka 0.10.1.1
I’m able to inspect timestamps on kafka using this command line (obviously changing the name of the topic):
./kafka-simple-consumer-shell.sh --broker-list localhost:9092 --partition 0 --property print.timestamp=true --offset -2 --topic CommandAndControlBut the timestamps are always -1 even if I explicitly set the timestamp field to a specific value.