confluent-kafka-dotnet: High Performance Producer and General Error Handling
Description
I have a question regarding high performance producing. We currently implement the Producer as a single shared instance, within a Web Service (ASP.net WebApi). We initially just had a simple call to ProduceAsync which would have a continuation when something had faulted and log an error. However we noticed that we would constanty get Local Message Timed out messages being thrown, with no other information, repeatedly and only some messages would get through to the Kafka Topics they were being sent too.
We have changed to implement our own timeout logic, where we effectively keep hold of the task then wait for a period of time and if the task has not completed during that time we assume that the kafka brokers are offline.
Looks similar to this:
public Task<Message<Null, string>> SendOne(string data)
{
// send data if we are online or should reconnect
_mainLog.Info($"IsOnline: {IsOnline} ShouldRetryConnection: {ShouldRetryConnection}");
if (IsOnline || ShouldRetryConnection)
{
// start async task
var task = _producer.ProduceAsync(_topicName, null, data, -1);
// wait for it to complete or timeout - this doesn't stop the task
IsOnline = task.Wait(SendTimeout);
// log any exceptions/errors
// were there any exception thrown
if (task.Exception != null)
{
// there are so switch offline
IsOnline = false;
foreach (Exception e in task.Exception.Flatten().InnerExceptions)
_errLog.Error($"[{_name}] failed to execute kafka producer ProduceAsync", e);
}
// we have exceeded our timeout here to wait for the task
// we are now offline from now
if (!IsOnline)
{
// mark when we went offline
OfflineFrom = _clock.UtcNow;
//log the data that we have failed to deliver - in order to retry later
LogMessage(data);
}
//still return the task as there might be some continuation being done
//further up the chain to log out an error if needed
return task;
}
//log the data that we have failed to deliver - in order to retry later
LogMessage(data);
// we are offline and should not retry connection yet
throw new InvalidOperationException($"[{_name}] failed to send messages to kafka. Online: {IsOnline}. Retrying: {ShouldRetryConnection}");
}
The above is effectively a backoff mechanism if kafka cannot handle our spikes in message producing.
So I am really looking for guidance to the following questions, or even if you can shed some light to help me better understand the issues that are happening.
- Is this the correct way to handle the producing, especially towards loss of messages?
- Is there a better way? or should the producer actually handle these errors for us gracefully?
- Do you have a recommended way using the client and how a producer should be setup? Obviously all applications are different but a standard set of rules which should be applied when using the client?
- Do you have a recommended way to produce messages efficiently?
- Should we be handling errors/exceptions in a different way? Is there a recommended way? note We are producing fast, I just think that we aren’t producing in the correct way with regards to this client.
Checklist
Please provide the following information:
- Confluent.Kafka nuget version: 0.11.3
- Apache Kafka version: 0.10.2.1
- Client configuration:
var config = new Dictionary<string, object>()
{
{ "bootstrap.servers", _endpoints},
{ "compression.codec", "lz4" },
{ "default.topic.config", new Dictionary<string, object>()
{
{"request.required.acks", "1" },
{"message.timeout.ms", "10000" }
}
}
};
- Operating system: Windows (Producer)/Linux (Consumer)
About this issue
- Original URL
- State: closed
- Created 6 years ago
- Comments: 23 (9 by maintainers)
I implemented it using ContinueWith. I need guaranteed message delivery, so I need to process the delivery report and mark messages acked or resend. It is pretty simple. I don’t know if this is the best implementation, I’m considering changing to use the DeliveryHandler instead, but it seems to work. Also, now that I know about the random partitioning option, I should just use the Key instead of passing the field to my method.
`var produceTask = producer.ProduceAsync(message.KafkaTopicName, message.TransmissionQueueId, message.Message);
SetMessageSent(message.TransmissionQueueId);
produceTask.ContinueWith(task => ProcessKafkaMessageResult(task, message.TransmissionQueueId));`