spring-kafka: replication-offset-checkpoint.tmp (No such file or directory)

We are using Embedded Kafka for Integration testing. What we’re getting is FileNotFoundException (sometimes) on Jenkins. Question - is there a possibility to turn off ReplicaManager or so? Maybe there’s another solution.

2016-10-13 07:53:35.161 ERROR 9469 --- [fka-scheduler-1] kafka.server.ReplicaManager              : [Replica Manager on Broker 0]: Error writing to highwatermark file: 

java.io.FileNotFoundException: /tmp/kafka-3266968904825284552/replication-offset-checkpoint.tmp (No such file or directory)
    at java.io.FileOutputStream.open0(Native Method)
    at java.io.FileOutputStream.open(FileOutputStream.java:270)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
    at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37)
    at kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:874)
    at kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:871)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
    at kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:871)
    at kafka.server.ReplicaManager$$anonfun$1.apply$mcV$sp(ReplicaManager.scala:153)
    at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
    at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

About this issue

  • Original URL
  • State: closed
  • Created 8 years ago
  • Comments: 68 (30 by maintainers)

Commits related to this issue

Most upvoted comments

This issue is still happening as of spring-kafka 2.2.5.RELEASE

We did some debugging on it, and it seems to come from a conflicting set of JVM shutdown hooks.

spring-kafka-test is using TestUtils to set up the kafka properties. Under the hood, this is going to org.apache.kafka.test.TestUtils.tempDirectory which registers a shutdown hook that is meant to clean up the temp directory upon JVM shutdown. If this hooks runs with the broker still up, the LogManager does a System.exit as described.

spring-kafka-test is using the context lifecycle to shutdown the broker — when @DirtiesContext is set, the context is destroyed before the JVM starts shutting down. That way, once the TestUtils shutdown hook runs, the broker is actually down and everything works. This is also why any other solution that uses the spring/junit lifecycle to shutdown the kafka broker while the jvm is up will work also around this (e.g. a TestExecutionListener).

Unfortunately, when DirtiesContext is not set, Spring’s context teardown also relies on a JVM shutdown hook. As the shutdown hook execution order is not guaranteed, this means we have a race condition between Spring shutting down the broker and TestUtils destroying the tempdir.

If the temp directory is cleaned prior to Spring’s context teardown, the Kafka brokers would be left running and the LogManager will eventually be met with an empty directory. Hence the System.exit

We worked around this by using a SecurityManager to forbid System.exit(1) in our tests. This ensures that spring has a chance to shutdown the broker, even if the broker wants to die prematurely. Likely a longer term fix would be to have the EmbeddedKafkaBroker manage the lifecycle of the temp directory itself, instead of delegating its creation to TestUtils

We use JUnit 5 with Karate on our projects and we’re using the @EmbeddedKafka annotation and ran into the same problem as the issue starter. Adding a @DirtiesContext on a test method eventually did the trick and the error disappeared.

Adding an update here in case anyone else stumbles upon this and needs some help. I am using the 1.2.3.RELEASE (due to compatibility issues with the version of Kafka I am running) and ran into this issue. After digging into it, it appears that the issue is the fact that the KafkaEmbedded class calls the shutdown() method on the underlying Apache KafkaServer class when you call after() to shut down the embedded server. Calling shutdown() itself is not the issue – the issue is that the shutdown() method calls the default implementation of shutdown() on the ReplicaManager, which in turn defaults the flag that controls writing the high water mark file to true. You can call shutdown() on the ReplicaManager with an optional false parameter to skip writing the high water mark file (which is really not needed for unit/integration tests). To get around this issue, I used a little reflection:

final KafkaServer server = embeddedKafka.getKafkaServers().stream().findFirst().orElse(null);
if(server != null) {
    server.replicaManager().shutdown(false);
    final Field replicaManagerField = server.getClass().getDeclaredField("replicaManager");
    if(replicaManagerField != null) {
        replicaManagerField.setAccessible(true);
        replicaManagerField.set(server, null);
     }
}
embeddedKafka.after();

The code above explicitly calls shutdown() on the ReplicaManager used by the KafkaServer passing false as the parameter and then replaces the ReplicaManager instance in the KafkaServer with null so it does not get called again with the true parameter value when after() is called on the embeddedKafka instance. This is totally a hack and definitely has more to do with the API exposed by Apache Kafka than the Spring library. Hopefully, this is easier to fix in more recent versions of both this library and Apache Kafka. However, if you are stuck on this version (or a similar one) like me, the solution above should solve the error outlined in this GitHub issue.

Yes, Spring context is being closed in parallel with other shutdown hooks, they are executed in separate threads (see java.lang.ApplicationShutdownHooks#runHooks). Spring shutdown hook is rather slow, so in most cases destroyMethod = "after" will be called after kafka log directories are already gone.

And yes again, you are right, ERRORs in logs do not fail build, they just sitting there making me nervous each time I analyze logs.

I have found this thread via google looking for solution. Did not find it here, but this is the only place which refers to problem I encountered. I did analysis and want to share it with others. Here is what happens:

  1. kafka TestUtils.tempDirectory method is used to create temporary directory for embedded kafka broker. It also registers shutdown hook which deletes this directory when JVM exits.
  2. when unit test finishes execution it calls System.exit, which in turn executes all registered shutdown hooks

If kafka broker runs at the end of unit test it will attempt to write/read data in a dir which is deleted and produces different FileNotFound exceptions.

Solution: shutdown embedded kafka broker at the end of the test before System.exit is called.

If KafkaEmbedded rule is used properly it will call KafkaEmbedded#after method, which destroys broker before System#exit is called.

I use KafkaEmbedded class in Spring integration test and create it as bean. Unfortunately spring context is destroyed in shutdown hook as well and it happens concurrently with other shutdown hooks, so kafka log directory is destroyed before embedded kafka broker is down. I did not find proper solution yet for this usage scenario.

I found KafkaScheduler.class in the stack traces, so I adjust the Kafka server property replica.high.watermark.checkpoint.interval.ms to a very large number (e.g. 100000000).

This work for me.

I definitely think this warrants reopening the issue though, as a few others are also reporting this problem.

@gustavomaia Since it is a static method setting, you can put it anywhere you want. However, this setting must be executed before the entire test is completed.

class SomeTest {
    static {
        Exit.setHaltProcedure((statusCode, message) -> {
            if (statusCode != 1) {
                Runtime.getRuntime().halt(statusCode);
            }
        });
    }

    @Test
    void test1()  {
     }

     @Test
     void test2() {
     }
}

Even using the SecurityManager, the test can fail. Exit / halt can be prevented by throwing Exception in checkExit of SecurityManager. However, the exception thrown in JVM shutdownHook cannot be catch and the test fails.

@Rouche
org.apache.kafka.common.utils.Exit

When JVM shutdown Hook is running, kafka log file is deleted and Exit.halt (1) is called when other shutdown hook accesses kafka log file at the same time.

Since halt is called here and status is 1, i only defend against 1. https://github.com/a0x8o/kafka/blob/master/core/src/main/scala/kafka/log/LogManager.scala#L193

If you encounter a situation where the test fails with a different status value, you can add defense code.

An error log may occur, but the test will not fail because the command is not propagated to Runtime.halt.

I solved this problem.

As in https://github.com/spring-projects/spring-kafka/issues/194#issuecomment-501003852, the tmp folder is removed in JVM shutdown, and when using Kafka in another hook, Embedded Kafka calls Runtime.getRuntime().halt(1).

The following code can prevent halt(1) from ending the test.

Exit.setHaltProcedure((statusCode, message) -> {
    if (statusCode != 1) {
        Runtime.getRuntime().halt(statusCode);
    }
});

This create another error:

KafkaException: Failed to acquire lock on file .lock in C:\tmp\kafka-data\MyService. A Kafka instance in another process or thread is using this directory.

Anyway you dont ned to specify a log dir, kafka already uses a random one with a seed in its name.

I have been looking the ticket for the error in Kafka jiras but I can find it. Does anybody knows whre to locate it in order to track the progress in the resolution of this issue?

I had a similar issue to this, but was using Cucumber, so the failure was actually causing a build failure (despite all the tests passing).

The solution I had was to add a shutdown hook before Spring desroys the context, as below:

@Component
public class EmbeddedKafkaLifecycle {

  @Autowired
  KafkaEmbedded kafkaEmbedded;

  @PreDestroy
  public void onDestroy() {
    kafkaEmbedded.after();
  }

}

@jdpgrailsdev and @hstaos

Guys, you saved my day.

I still don’t understand why do you ask that question to us, Spring community when you see the problem in the Apache Kafka directly? Let’s move this discussion to StackOverflow: there is nothing from the Spring perspective to fix or improve.

If you insist it is ReplicaManager, so please go to the Apache Kafka community and ask there. That is so low level of the integration that we are just not aware of.

Sorry, but it looks like we (at least me) are useless for your on the topic and I don’t understand why do you spend time with us not Apache Kafka community?

Would be glad to see some cross-link from there to widen knowledge in this Kafka topic.

Thanks