parallel-consumer: NPE on instantiation when GroupID missing
Exception appeared in io.confluent.parallelconsumer.ParallelEoSStreamProcessor.checkAutoCommitIsDisabled.
Looks like (ConsumerCoordinator)coordinatorField.get(consumer) returns a null;
The code of a creation of ParallelStreamProcessor:
@Slf4j
@Component
public class ParallelConsumer {
String inputTopic = "test-topic";
public ParallelConsumer() {
consume();
}
public ParallelStreamProcessor getParallelConsumer(){
org.apache.kafka.clients.consumer.Consumer<String, String> kafkaConsumer = getKafkaConsumer(); // (1)
ParallelConsumerOptions<String, String> options = ParallelConsumerOptions.<String, String>builder()
.ordering(KEY) // (2)
.maxConcurrency(1000) // (3)
.consumer(kafkaConsumer)
.build();
ParallelStreamProcessor<String, String> eosStreamProcessor =
ParallelStreamProcessor.createEosStreamProcessor(options);
eosStreamProcessor.subscribe(of(inputTopic)); // (4)
return eosStreamProcessor;
}
org.apache.kafka.clients.consumer.Consumer<String, String> getKafkaConsumer() {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return new KafkaConsumer<>(properties);
}
public void consume () {
getParallelConsumer().poll(record ->
log.info("Concurrently processing a record: {}", record));
}
}
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 15 (10 by maintainers)
Commits related to this issue
- fix: #101 Validate GroupId is configured on managed consumer https://github.com/confluentinc/parallel-consumer/issues/101 NPE on instantiation when GroupID missing #101 — committed to astubbs/parallel-consumer by astubbs 3 years ago
- fix: #101 Validate GroupId is configured on managed consumer https://github.com/confluentinc/parallel-consumer/issues/101 NPE on instantiation when GroupID missing #101 Tests for auto commit and gro... — committed to confluentinc/parallel-consumer by astubbs 3 years ago
- fix: #101 Validate GroupId is configured on managed consumer https://github.com/confluentinc/parallel-consumer/issues/101 NPE on instantiation when GroupID missing #101 Tests for auto commit and gro... — committed to confluentinc/parallel-consumer by astubbs 3 years ago
- fix: #101 Validate GroupId is configured on managed consumer https://github.com/confluentinc/parallel-consumer/issues/101 NPE on instantiation when GroupID missing #101 Tests for auto commit and gro... — committed to astubbs/parallel-consumer by astubbs 3 years ago
- fix: #101 Validate GroupId is configured on managed consumer https://github.com/confluentinc/parallel-consumer/issues/101 NPE on instantiation when GroupID missing #101 Tests for auto commit and gro... — committed to astubbs/parallel-consumer by astubbs 3 years ago
- fix: #101 Validate GroupId is configured on managed consumer https://github.com/confluentinc/parallel-consumer/issues/101 NPE on instantiation when GroupID missing #101 Tests for auto commit and gro... — committed to confluentinc/parallel-consumer by astubbs 3 years ago
Thanks for your reply! @astubbs Could you also provide please proper release version?