kafka-rust: UnknownTopicOrPartition upon "on the fly topic generation"
I’m playing with kafka for the first time, and I’m running into some issues? I’ve stared with both kafka 0.9.0.1 and 0.10.1 and I’m having the same problem.
I have auto creation of topics enabled on the cluster, and I’m doing:
match client.load_metadata(&vec!["events-raw"]) {
// ...
}
to try to ensure creation of the topic. That part is working fine, and the topic IS created in kafka. After that, I’m trying to send in records with:
let mut records = vec![];
records.push(Record {
topic: "events-raw",
partition: 0,
key: (),
value: txt,
});
match producer.send_all(&records) {
Ok(_) => {}
Err(e) => println!("Error writing event: {:?}", e),
};
and this is priting out the error mentioned in the title.
About this issue
- Original URL
- State: open
- Created 8 years ago
- Comments: 17
@ChrisMacNaughton i had a chance to reproduce your problem.
the following program works for me (i.e. successfully sends the message) against a local kafka broker configured with
auto.create.topics.enable = true:usage:
./target/debug/examples/example-produce localhost:9092 my-topic-2p localhost:9092 topic-1the trick is at the place marked with “HERE” in the above code - it checks whether there is any partition for the topic; if there isn’t the loop retries fetching the metadata. originally, i mistakenly proposed to you to try
client.topics().contains(topic). however, now i realized that this resulted for me in “…TopicMetadata { error: 5, …” which seems to be exactly what you were experiencing. i was wrong about how the “auto.create.topic” feature works in kafka, sorry for that.observing the log output of this program …
… i see, that upon the initial “get metadata request” which triggers the topic creation, kafka responds with a
LeaderNotAvailableerror code. refreshing the metadata after a pause - in the second loop iteration - the newly created topic finally gets a leader assigned andkafka-rustobtains metadata for a fully available topic with partitions and things start to work as expected.if we consider the above program trying to do merely a
client.load_metadata(&["my-topic"]).unwrap(); ... producer.send(..);where “my-topic” gets newly created (your initially reported scenario) …kafka-rustmust report theLeaderNotAvailableerror instead ofNoSuchTopicOrPartition(this is definitely a bug.)KafkaClient::topics()) that a topic is in a particular error state, without having to resort to the ugly hack demonstrated in the above code at the placed marked “HERE”.I’m re-opening this ticket so we get these two points implemented. Help wanted!
PS: ideally,
kafka-rustwould detect the error state on its own and reload the metadata at a later point in time automatically. the “leader-not-available” is an recoverable error. however, this will require some more thought/analysis and design changes tokafka-rustwhich is the scope of #81.Any fixes for this?
I am facing the issue even when I have created the topic via shell script shipped with Kafka. I printed the topics using
client.topics(), everything looks normal, however as soon as I produce a message, the code panicsError while producing: Error(Kafka(UnknownTopicOrPartition), State { next_error: None, backtrace: None })yes, this is a (known) problem of
kafka-rust; thanks for the ticket!from the top of my head: the problem is that when you send the
load_metadatarequest, kafka will respond with metadata which doesn’t contain the requested topic (since it doesn’t exist yet) and at the same time it will try to create it (if the corresponding option is enabled at the broker side.)kafka-rust, however, initialized itself from the previously returned metadata missing the newly created topic information; hence, when you then send data,kafka-rustwill tell you, there’s no such topic. 😦you can work around it at the moment by reacting yourself on the response code “no topic or partition” by trying to reload the metadata. and try sending the data again.
the proper solution would be for
kafka-rustto realize it was asked for metadata about a specific topic, which it got no response for, and then reload the metadata when asked for requests to that particular topic (while ensuring a certain time has passed between the first metadata request and its reload).