apicurio-registry: Apicurio schema registry with JsonSchemaKafkaSerializer and JsonSchemaKafkaDeserializer not working
I am trying to implement schema registry using kafka streams with JSON serde(Input data is in JSON format). But unable to do so, It’s not even throwing any error when I am publishing data in the input kafka topic. After debugging little more, I found that only the serializer and deserializer(serde) part is causing this issue as the same code is working with primitive serde like String, Integer.
Here is the logic -
Properties set are -
properties.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM,schemaRegUrl);
properties.put(AbstractKafkaSerializer.REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM, SimpleTopicIdStrategy.class.getName());
properties.put(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM, GetOrCreateIdStrategy.class.getName());
properties.put(JsonSchemaSerDeConstants.REGISTRY_JSON_SCHEMA_VALIDATION_ENABLED, Boolean.TRUE);
RegistryService service = CompatibleClient.createCompatible(schemaRegUrl);
Serializer<POJO> serializer = new JsonSchemaKafkaSerializer<>(service, true);
Deserializer<POJO> deserializer = new JsonSchemaKafkaDeserializer<>(service, true);
Serde<DataModel> logSerde = Serdes.serdeFrom(serializer, deserializer);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, POJO> input = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, logSerde));
Not sure what’s missing in the above code as I am not receiving any error while running. Kindly help.
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 19 (9 by maintainers)
@EricWittmann
Thank you so very much for the help. It’s working fine now.
@EricWittmann
Can we please reopen the ticket.