apicurio-registry: Apicurio schema registry with JsonSchemaKafkaSerializer and JsonSchemaKafkaDeserializer not working

I am trying to implement schema registry using kafka streams with JSON serde(Input data is in JSON format). But unable to do so, It’s not even throwing any error when I am publishing data in the input kafka topic. After debugging little more, I found that only the serializer and deserializer(serde) part is causing this issue as the same code is working with primitive serde like String, Integer.

Here is the logic -

Properties set are -

properties.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM,schemaRegUrl);
properties.put(AbstractKafkaSerializer.REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM, SimpleTopicIdStrategy.class.getName());
 properties.put(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM, GetOrCreateIdStrategy.class.getName());
properties.put(JsonSchemaSerDeConstants.REGISTRY_JSON_SCHEMA_VALIDATION_ENABLED, Boolean.TRUE);

RegistryService service = CompatibleClient.createCompatible(schemaRegUrl);
Serializer<POJO> serializer = new JsonSchemaKafkaSerializer<>(service, true);
Deserializer<POJO> deserializer = new JsonSchemaKafkaDeserializer<>(service, true);
Serde<DataModel> logSerde = Serdes.serdeFrom(serializer, deserializer);

StreamsBuilder builder = new StreamsBuilder();
KStream<String, POJO> input = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, logSerde));

Not sure what’s missing in the above code as I am not receiving any error while running. Kindly help.

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 19 (9 by maintainers)

Most upvoted comments

@EricWittmann

Thank you so very much for the help. It’s working fine now.

@EricWittmann

Can we please reopen the ticket.