parallel-consumer: NPE on instantiation when GroupID missing

Exception appeared in io.confluent.parallelconsumer.ParallelEoSStreamProcessor.checkAutoCommitIsDisabled. Looks like (ConsumerCoordinator)coordinatorField.get(consumer) returns a null;

The code of a creation of ParallelStreamProcessor:

@Slf4j
@Component
public class ParallelConsumer {
    String inputTopic = "test-topic";

    public ParallelConsumer() {
        consume();
    }

    public ParallelStreamProcessor getParallelConsumer(){
        org.apache.kafka.clients.consumer.Consumer<String, String> kafkaConsumer = getKafkaConsumer(); // (1)

        ParallelConsumerOptions<String, String> options = ParallelConsumerOptions.<String, String>builder()
                .ordering(KEY) // (2)
                .maxConcurrency(1000) // (3)
                .consumer(kafkaConsumer)
                .build();

        ParallelStreamProcessor<String, String> eosStreamProcessor =
                ParallelStreamProcessor.createEosStreamProcessor(options);

        eosStreamProcessor.subscribe(of(inputTopic)); // (4)

        return eosStreamProcessor;
    }

    org.apache.kafka.clients.consumer.Consumer<String, String> getKafkaConsumer() {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        return new KafkaConsumer<>(properties);
    }

    public void consume () {
        getParallelConsumer().poll(record ->
                log.info("Concurrently processing a record: {}", record));
    }

}

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 15 (10 by maintainers)

Commits related to this issue

Most upvoted comments

Thanks for your reply! @astubbs Could you also provide please proper release version?