quarkus: @ConsumeEvent does not work in native when returning a Uni>

Describe the bug

Quarkus 3.4.1 Redis 7.2.1

I use event bus consumer to run the redis client logic. In JVM mode, the following piece of code works fine but with native executable, it fails with a timeout (30s default value I guess) at event bus sender side. Thanks to the redis MONITOR command, I can see the command processed by redis.

I did some tests with Quarkus 3.2.6.Final. I have the same behavior.

@Inject
private ReactiveRedisDataSource rds;

  return Optional.ofNullable(nfType)
                .map(t -> {
                    final ReactiveSetCommands<String,String> setCommands = rds.set(String.class);
                    return setCommands.smembers(toNfTypesKey(t));
                }).orElseGet(() -> {
                    final ReactiveKeyCommands<String> keyCommands = rds.key(String.class);
                    final ReactiveKeyScanCursor<String> scanCursor = keyCommands.scan(new KeyScanArgs().match(NF_PROFILES_NAMESPACE + '*').count(Long.MAX_VALUE));
                    final boolean hasNext = scanCursor.hasNext();
                    return hasNext ? scanCursor.next() : Uni.createFrom().item(() -> Collections.emptySet());
                }).onItem().transformToMulti(s -> Multi.createFrom().items(s::stream))
                .select().first(limit)
                .onItem().transform(s -> nfType != null ? s : s.substring(NF_PROFILES_NAMESPACE.length()))
                .collect().asSet()
                ;

Expected behavior

The code should work in native executable as for JVM mode.

Actual behavior

The code works in JVM mode but fails with native executable.

How to Reproduce?

No response

Output of uname -a or ver

No response

Output of java -version

java -version
openjdk version "17.0.5" 2022-10-18
OpenJDK Runtime Environment GraalVM CE 22.3.0 (build 17.0.5+8-jvmci-22.3-b08)
OpenJDK 64-Bit Server VM GraalVM CE 22.3.0 (build 17.0.5+8-jvmci-22.3-b08, mixed mode, sharing)

GraalVM version (if different from Java)

No response

Quarkus version or git rev

3.4.1

Build tool (ie. output of mvnw --version or gradlew --version)

./mvnw --version
Apache Maven 3.9.1 (2e178502fcdbffc201671fb2537d0cb4b4cc58f8)
Maven home: C:\Users\mvsz7559\.m2\wrapper\dists\apache-maven-3.9.1-bin\320285b4\apache-maven-3.9.1
Java version: 17.0.5, vendor: GraalVM Community, runtime: D:\Liberkey\MyApps\graalvm\graalvm-ce-java17-22.3.0
Default locale: en_GB, platform encoding: Cp1252
OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"

Additional information

No response

About this issue

  • Original URL
  • State: closed
  • Created 9 months ago
  • Comments: 21 (20 by maintainers)

Commits related to this issue

Most upvoted comments

@mkouba Thank you, It works.

It seems that we only register the LocalEventBusCodec for the return type of a consumer method, i.e. for java.util.Set in this particular case. However, when Vert.x attempts to lookup the codec for the message it’s using the actual type of the message body; i.e. java.util.HashSet in this reproducer. As a result, in the JVM mode the SerializableCodec is used as the fallback (because HashSet implements java.io.Serializable) but this obviously does not work in native mode.

@tnmtechnologies In theory, you don’t need to create the MySet class but merely change the return type of the consumer method to Uni<HashSet<String>>.

@cescoffier I’m not quite sure if we could do something to improve the UX in similar cases…

The issue is the usage of Set<String> as reply. The local codec does not seem to work in this case in native. We need to have a look (probably with the help of @mkouba ).

If you use something like:

public static class MySet extends HashSet<String> {
        public MySet() {
            super();
        }

        public MySet(Set<String> s) {
            super(s);
        }

    }

    /**
     *
     */
    @ConsumeEvent("getIds")
    public Uni<MySet> getIds(final JsonObject message) {
        Log.infov("getIds(message={0})", message);

        final Long limit = Optional.ofNullable(message.getLong("limit")).orElse(Long.MAX_VALUE);
        final String type = message.getString("type");

        return Optional.ofNullable(type)
                    .map(t -> {
                        final ReactiveSetCommands<String,String> setCommands = rds.set(String.class);
                        return setCommands.smembers(TYPE_NAMESPACE + t);
                    }).orElseGet(() -> {
                        final ReactiveKeyCommands<String> keyCommands = rds.key(String.class);                        
                        final ReactiveKeyScanCursor<String> scanCursor = keyCommands.scan(new KeyScanArgs().match(ID_NAMESPACE + '*').count(Long.MAX_VALUE));
                        final boolean hasNext = scanCursor.hasNext();
                        System.out.println("Has next...");
                        return hasNext ? scanCursor.next() : Uni.createFrom().item(() -> Collections.emptySet());
                    }).onItem().transformToMulti(s -> Multi.createFrom().items(s::stream))
                    .select().first(limit)
                    .onItem().transform(s -> type != null ? s : s.substring(ID_NAMESPACE.length()))
                    .collect().asSet()
                    .map(s -> new MySet(s))
                    .log("user code")
                    ;
    }

It works

Yes everything works properly in JVM mode.