kafka-rust: UnknownTopicOrPartition upon "on the fly topic generation"

I’m playing with kafka for the first time, and I’m running into some issues? I’ve stared with both kafka 0.9.0.1 and 0.10.1 and I’m having the same problem.

I have auto creation of topics enabled on the cluster, and I’m doing:

match client.load_metadata(&vec!["events-raw"]) {
// ...
}

to try to ensure creation of the topic. That part is working fine, and the topic IS created in kafka. After that, I’m trying to send in records with:

let mut records = vec![];
records.push(Record {
    topic: "events-raw",
    partition: 0,
    key: (),
    value: txt,
});
match producer.send_all(&records) {
    Ok(_) => {}
     Err(e) => println!("Error writing event: {:?}", e),
};

and this is priting out the error mentioned in the title.

About this issue

Most upvoted comments

@ChrisMacNaughton i had a chance to reproduce your problem.

the following program works for me (i.e. successfully sends the message) against a local kafka broker configured with auto.create.topics.enable = true:

extern crate kafka;
extern crate env_logger;

use std::env;
use std::time::Duration;
use std::thread;

use kafka::client::KafkaClient;
use kafka::producer::{Producer, Record, RequiredAcks};
use kafka::error::{Error as KafkaError, KafkaCode};

fn main() {
    env_logger::init().unwrap();

    let broker = env::args().nth(1).unwrap();
    let topic = env::args().nth(2).unwrap();

    let data = "hello, kafka".as_bytes();
    if let Err(e) = produce_message(data, &topic, vec![broker]) {
        println!("Failed producing messages: {}", e);
    }
}

fn produce_message<'a, 'b>(data: &'a [u8], topic: &'b str, brokers: Vec<String>)
                   -> Result<(), KafkaError>
{
    let mut client = KafkaClient::new(brokers);

    let mut attempt = 0;
    loop {
        attempt += 1;
        let _ = try!(client.load_metadata(&[topic]));
        if client.topics().partitions(topic).map(|p| p.len()).unwrap_or(0) > 0 { // <-- HERE
            break;
        } else if attempt > 2 { // try up to 3 times
            // return some error
            return Err(KafkaError::Kafka(KafkaCode::UnknownTopicOrPartition));
        }
        thread::sleep(Duration::from_secs(1));
    }

    let mut producer =
        try!(Producer::from_client(client)
             .with_ack_timeout(Duration::from_secs(1))
             .with_required_acks(RequiredAcks::One)
             .create());

    try!(producer.send(&Record{
        topic: topic,
        partition: -1,
        key: (),
        value: data,
    }));

    println!("Message sent! Yeah");

    Ok(())
}

usage: ./target/debug/examples/example-produce localhost:9092 my-topic-2p localhost:9092 topic-1

the trick is at the place marked with “HERE” in the above code - it checks whether there is any partition for the topic; if there isn’t the loop retries fetching the metadata. originally, i mistakenly proposed to you to try client.topics().contains(topic). however, now i realized that this resulted for me in “…TopicMetadata { error: 5, …” which seems to be exactly what you were experiencing. i was wrong about how the “auto.create.topic” feature works in kafka, sorry for that.

observing the log output of this program …

DEBUG:kafka::client: fetch_metadata: requesting metadata from localhost:9092
DEBUG:kafka::client::network: Established: KafkaConnection { id: 0, secured: false, host: "localhost:9092" }
TRACE:kafka::client::network: Sent 41 bytes to: KafkaConnection { id: 0, secured: false, host: "localhost:9092" } => Ok(41)
TRACE:kafka::client::network: Read 4 bytes from: KafkaConnection { id: 0, secured: false, host: "localhost:9092" } => Ok(())
TRACE:kafka::client::network: Read 67 bytes from: KafkaConnection { id: 0, secured: false, host: "localhost:9092" } => Ok(())
DEBUG:kafka::client::state: updating metadata from: MetadataResponse { header: HeaderResponse { correlation: 1 }, brokers: [BrokerMetadata { node_id: 0, host: "XYZ", port: 9092 }], topics: [TopicMetadata { error: 5, topic: "my-topic-2p", partitions: [] }] }
DEBUG:kafka::client: fetch_metadata: requesting metadata from localhost:9092
TRACE:kafka::client::network: Sent 41 bytes to: KafkaConnection { id: 0, secured: false, host: "localhost:9092" } => Ok(41)
TRACE:kafka::client::network: Read 4 bytes from: KafkaConnection { id: 0, secured: false, host: "localhost:9092" } => Ok(())
TRACE:kafka::client::network: Read 93 bytes from: KafkaConnection { id: 0, secured: false, host: "localhost:9092" } => Ok(())
DEBUG:kafka::client::state: updating metadata from: MetadataResponse { header: HeaderResponse { correlation: 2 }, brokers: [BrokerMetadata { node_id: 0, host: "XYZ", port: 9092 }], topics: [TopicMetadata { error: 0, topic: "my-topic-2p", partitions: [PartitionMetadata { error: 0, id: 0, leader: 0, replicas: [0], isr: [0] }] }] }
DEBUG:kafka::client::network: Established: KafkaConnection { id: 1, secured: false, host: "petrnnb.kancelar.seznam.cz:9092" }
TRACE:kafka::client::network: Sent 97 bytes to: KafkaConnection { id: 1, secured: false, host: "petrnnb.kancelar.seznam.cz:9092" } => Ok(97)
TRACE:kafka::client::network: Read 4 bytes from: KafkaConnection { id: 1, secured: false, host: "petrnnb.kancelar.seznam.cz:9092" } => Ok(())
TRACE:kafka::client::network: Read 39 bytes from: KafkaConnection { id: 1, secured: false, host: "petrnnb.kancelar.seznam.cz:9092" } => Ok(())
Message sent! Yeah

… i see, that upon the initial “get metadata request” which triggers the topic creation, kafka responds with a LeaderNotAvailable error code. refreshing the metadata after a pause - in the second loop iteration - the newly created topic finally gets a leader assigned and kafka-rust obtains metadata for a fully available topic with partitions and things start to work as expected.

if we consider the above program trying to do merely a client.load_metadata(&["my-topic"]).unwrap(); ... producer.send(..); where “my-topic” gets newly created (your initially reported scenario) …

  1. kafka-rust must report the LeaderNotAvailable error instead of NoSuchTopicOrPartition (this is definitely a bug.)
  2. … it shall be possible to detect through the loaded metadata (i.e. KafkaClient::topics()) that a topic is in a particular error state, without having to resort to the ugly hack demonstrated in the above code at the placed marked “HERE”.

I’m re-opening this ticket so we get these two points implemented. Help wanted!

PS: ideally, kafka-rust would detect the error state on its own and reload the metadata at a later point in time automatically. the “leader-not-available” is an recoverable error. however, this will require some more thought/analysis and design changes to kafka-rust which is the scope of #81.

Any fixes for this?

I am facing the issue even when I have created the topic via shell script shipped with Kafka. I printed the topics using client.topics(), everything looks normal, however as soon as I produce a message, the code panics Error while producing: Error(Kafka(UnknownTopicOrPartition), State { next_error: None, backtrace: None })

yes, this is a (known) problem of kafka-rust; thanks for the ticket!

from the top of my head: the problem is that when you send the load_metadata request, kafka will respond with metadata which doesn’t contain the requested topic (since it doesn’t exist yet) and at the same time it will try to create it (if the corresponding option is enabled at the broker side.) kafka-rust, however, initialized itself from the previously returned metadata missing the newly created topic information; hence, when you then send data, kafka-rust will tell you, there’s no such topic. 😦

you can work around it at the moment by reacting yourself on the response code “no topic or partition” by trying to reload the metadata. and try sending the data again.

the proper solution would be for kafka-rust to realize it was asked for metadata about a specific topic, which it got no response for, and then reload the metadata when asked for requests to that particular topic (while ensuring a certain time has passed between the first metadata request and its reload).