neo4j-streams: Unrecognized Kafka Sink settings

Guidelines

I tried to install the neo4j-streams 4.1.0 to publish data from a Kafka topic to Neo4j database. I copied the .jar file to the /plugins repository and I modified the neo4j conf file to add the appropriate parameters. When I restart the neo4j database I have the following WARN :

2021-11-08 16:34:20.967+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.topic.cypher.TEST-TOPIC
2021-11-08 16:34:20.968+0000 WARN  Unrecognized setting. No declared setting with name: kafka.ssl.truststore.location
2021-11-08 16:34:20.968+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.enabled
2021-11-08 16:34:20.968+0000 WARN  Unrecognized setting. No declared setting with name: kafka.bootstrap.servers
2021-11-08 16:34:20.968+0000 WARN  Unrecognized setting. No declared setting with name: neo4j.database
2021-11-08 16:34:20.968+0000 WARN  Unrecognized setting. No declared setting with name: kafka.ssl.keystore.location
2021-11-08 16:34:20.968+0000 WARN  Unrecognized setting. No declared setting with name: kafka.group.id
2021-11-08 16:34:20.968+0000 WARN  Unrecognized setting. No declared setting with name: kafka.security.protocol
2021-11-08 16:34:20.968+0000 WARN  Unrecognized setting. No declared setting with name: kafka.ssl.keystore.password

It seems that the paramters related to neo4j streams connector are not recognized by the neo4j database.

I tried to push the following file to the noe4j database :

[{
 "id": 42,
 "properties": {
   "title": "Answer to anyting",
   "description": "It depends."
 },
{
 "id": 43,
 "properties": {
   "title": "Answer to anyting",
   "description": "It depends."
 }
}],

I used this cypher tempalte :

streams.sink.topic.cypher.TEST-TOPIC=MERGE (n:Label {id: event.id}) ON CREATE SET n += event.properties

I use the SSL security protocol to connect to Kafka.

Versions

  • OS: RedHat 8.2 (Ootpa)
  • Neo4j: 4.2.3
  • Neo4j-Streams: 4.1.0

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 32 (16 by maintainers)

Most upvoted comments

@mroiter-larus

It’s working fine on my laptop, thanks a lor for your help. I still have an SSL exception on the other server, but I’ll look for it.

2021-11-19 13:26:56.437+0000 WARN  [s.StreamsRouterConfigurationListener] [neo4j/2d8c2716] Cannot retrieve valid topics because the following exception,
next attempt is in 300000 ms:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) ~[neo4j-streams-4.1.0.jar:?]
        at streams.kafka.KafkaAdminService$start$1.invokeSuspend(KafkaAdminService.kt:29) [neo4j-streams-4.1.0.jar:?]
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) [neo4j-streams-4.1.0.jar:?]
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) [neo4j-streams-4.1.0.jar:?]
        at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571) [neo4j-streams-4.1.0.jar:?]
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750) [neo4j-streams-4.1.0.jar:?]
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678) [neo4j-streams-4.1.0.jar:?]
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665) [neo4j-streams-4.1.0.jar:?]
Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
        at sun.security.ssl.Alert.createSSLException(Alert.java:131) ~[?:?]
        at sun.security.ssl.TransportContext.fatal(TransportContext.java:352) ~[?:?]
        at sun.security.ssl.TransportContext.fatal(TransportContext.java:295) ~[?:?]
        at sun.security.ssl.TransportContext.fatal(TransportContext.java:290) ~[?:?]
        at sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:654) ~[?:?]
        at sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(CertificateMessage.java:473) ~[?:?]
        at sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(CertificateMessage.java:369) ~[?:?]
        at sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392) ~[?:?]
        at sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443) ~[?:?]
        at sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1074) ~[?:?]
        at sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061) ~[?:?]
        at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
        at sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1008) ~[?:?]
        at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:402) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:484) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:340) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:265) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:170) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.common.network.Selector.poll(Selector.java:483) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:540) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1196) ~[neo4j-streams-4.1.0.jar:?]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]

@aissaelouafi

I’m sorry, I just assumed that you already had your configuration in the streams.conf file. From Neo4j Streams 4.0.7 all the Neo4j Streams and Kafka settings has to be moved from neo4j.conf to streams.conf (You have to create this file manually). This is reported in the official Neo4j Streams documentation here.

About your questions:

  1. Neo4j Streams support two tpye of deserializers: ByteArrayDeserializer and KafkaAvroDeserializer. (Please see here)
  2. I send events via kafka-console-producer with the following:
kafka-console-producer --bootstrap-server localhost:9092 --topic testtopic

Then i send events one by one in json format as they were string, for example:

{ "id": 42, "properties": { "title": "Answer to anyting", "description": "It depends." }}

But if you want to send events from file you can collect the json string events such as the one above, and placing them one per line. I would suggest not to use --broker-list because it is deprecated (use --bootstrap-server option instead) 3. If you want to write/read data into/from a non-default Neo4j database you have to manually create it first.

Hope this helps.

Cheers,

Mauro