lettuce-core: Responses getting out of sync with requests

Current Behavior

We’ve recently updated lettuce to 6.1.1.RELEASE and after running it in our production environment for a week we ran into a troubling issue. At some point it looks like lettuce worked itself into a state where commands were receiving responses to different requests. This manifested as lots of deserialization errors (e.g. java.lang.UnsupportedOperationException: io.lettuce.core.output.ValueOutput does not support set(long)), but also more seriously as incorrect data being returned to the caller.

Restarting the service brought things back to a working state. Prior to this happening the only other thing we observed were some RedisCommandTimeoutException being thrown, but not at an anomalous level.

For the meantime we’ve added some more aggressive checking for this happening, so if it does reoccur I’d be happy to try and gather more information, but not sure what would be helpful?

Environment

  • Lettuce version(s): 6.1.1.RELEASE
  • Redis version: 3.2.10

About this issue

  • Original URL
  • State: open
  • Created 2 years ago
  • Comments: 16 (4 by maintainers)

Most upvoted comments

We have got the same problem. And I reproduced the problem by using below codes:

@SpringBootTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class RedisTest {

    @Autowired
    private ReactiveRedisTemplate redisTemplate;

    private ExecutorService executorService = Executors.newFixedThreadPool(20);

    @Test
    public void testRedis() throws Exception{
        Random random = new Random();
        AtomicInteger counter = new AtomicInteger(0);
        for (int i = 0; i < 100000; i++) {
            int finalI = i;
            executorService.submit(() -> {
                redisTemplate.opsForValue().set("testkey", "testvalue" + finalI)
                        //.publishOn(Schedulers.boundedElastic())
                        .flatMap(r1 -> {
                            System.out.println(Thread.currentThread().getName() + " : result1 = " + r1);
                            return redisTemplate.opsForHash().get("test_hash", "hash_field") //, "test_value" + finalI)
                                   // .publishOn(Schedulers.boundedElastic())
                                    .defaultIfEmpty("")
                                    .flatMap(r2 -> {
                                        if(random.nextInt() % 2 == 0 && counter.get() < 100){
                                            counter.incrementAndGet();
                                            throw new OutOfMemoryError("test out of memory error");
                                        }
                                        System.out.println(Thread.currentThread().getName() + " : result2 = " + r2);
                                        return redisTemplate.opsForHash().put("test_hash", "hash_field", "hash_value" + finalI)
                                                //.publishOn(Schedulers.boundedElastic())
                                                .flatMap(r3 -> {
                                                            System.out.println(Thread.currentThread().getName() + " : result3 = " + r3);
                                                            return redisTemplate.opsForValue()
                                                                    .get("testkey")
                                                                    //.publishOn(Schedulers.boundedElastic())
                                                                    .map(r4 -> {
                                                                        System.out.println(Thread.currentThread().getName() + " : result4 = " + r4);
                                                                        return r4;
                                                                    });
                                                        });

                                    });
                        })
//                        .subscribeOn(Schedulers.boundedElastic())
                        .subscribe();
            });
            TimeUnit.MILLISECONDS.sleep(10);

        }
        System.in.read();
    }

}

My environment is:

  • Spring Boot 2.6.8
  • ReactiveRedisTemplate
  • lettuce 6.1.8.RELEASE

I found if my business codes throw errors, it will be catched by the lettuce pool thread.While this will finally lead to java.lang.UnsupportedOperationException: io.lettuce.core.output.ValueOutput does not support set(long) exception. And If I indicate a thread pool to my business codes by publishOn, the error won’t be caught by lettuce thread.This can void the exception happening.

@qinchunabng Thanks for sharing. I have one question. You example code has two comments. one is ‘publishOn’ and other is ‘subscribeOn’ To avoid exception happening, needs uncomment both?

Just uncomment ‘publishOn’. ‘subscribeOn’ is testing code.