esjc: CatchUpSubscription reconnect failure

Consider the following test program

import java.util.concurrent.TimeoutException;

import com.github.msemys.esjc.CatchUpSubscription;
import com.github.msemys.esjc.CatchUpSubscriptionListener;
import com.github.msemys.esjc.EventStore;
import com.github.msemys.esjc.EventStoreBuilder;
import com.github.msemys.esjc.ResolvedEvent;
import com.github.msemys.esjc.SubscriptionDropReason;

public class EsTest {
	static public void main(String... strings) throws InterruptedException, TimeoutException {
		final EventStore eventstore = EventStoreBuilder.newBuilder()
			    .singleNodeAddress("127.0.0.1", 1113)
			    .userCredentials("admin", "changeit")
			    .build();
		
		final CatchUpSubscriptionListener catchUpSubscriptionListener = new CatchUpSubscriptionListener() {
		    @Override
		    public void onLiveProcessingStarted(CatchUpSubscription subscription) {
		        System.out.println("Live processing started!");
		    }

		    @Override
		    public void onEvent(CatchUpSubscription subscription, ResolvedEvent event) {
		        System.out.println(event.originalEvent().eventType);
		    }

		    @Override
		    public void onClose(CatchUpSubscription subscription, SubscriptionDropReason reason, Exception exception) {
		        System.out.println("Subscription closed: " + reason);
		        eventstore.subscribeToStreamFrom("foo", 0L, this);
		        
		    }
		};
		CatchUpSubscription catchupSubscription = eventstore.subscribeToStreamFrom("foo", 0L, catchUpSubscriptionListener);
		Thread.sleep(100000);
		catchupSubscription.close();
	}
}

If during its execution EventStore v 4.0.2 is stopped and restarted three times, it produces the following output:

Live processing started!
Subscription closed: ConnectionClosed
Live processing started!
Live processing started!
Subscription closed: ConnectionClosed
Subscription closed: ConnectionClosed
Live processing started!
Live processing started!
Live processing started!
Live processing started!
Subscription closed: ConnectionClosed
Subscription closed: ConnectionClosed
Subscription closed: ConnectionClosed
Subscription closed: ConnectionClosed

It happens because the subsription listener is reconnected automatically using CatchUpSubscription.reconnectionHook after the connection is established. From the output you see that number of subscriptions doubles each time.

Unfortunately if line eventstore.subscribeToStreamFrom("foo", 0L, this); is removed from onClose , the client does not reconnect at all. After the EventStore is stopped the first time and the client outputs text Subscription closed: ConnectionClosed at the first time no futher text is output to the console when the server is restarted.

This behaviour does not change even if I add

                            .maxOperationRetries(-1)
			    .maxReconnections(-1)

to client configuration.

I have not found any way to let the client restore the connection correctly (it means just one time) after the EventStore server is restarted. Could you please help me to fix it?

Used EventStore client version : 2.0.0 User EventStore server version: 4.0.2 The problem can be reproduced always on both Linux and Windows OS.

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 15 (5 by maintainers)

Commits related to this issue

Most upvoted comments

Thought I should share our final solution, in case somebody stumbles upon this issue. Basically, we went with EventStoreListener implementation very similar to what @dpolivaev suggested. That takes care of re-establishing the connection. In addition, we have CatchUpSubscriptionListener for all our CatchUpSubscription with onClose similar to this:

@Override
public void onClose(CatchUpSubscription subscription, SubscriptionDropReason reason, Exception exception) {
	if (reason == SubscriptionDropReason.UserInitiated &&
            !(exception instanceof SubscriptionBufferOverflowException)) {
		logger.trace("Subscription to '{}' dropped (user initiated).", subscription.streamId, exception);
	}
	else if (reason == SubscriptionDropReason.ConnectionClosed) {
		logger.info("Subscription to '{}' dropped (connection closed). Automatic resubscribe upon reconnect is expected.",
					subscription.streamId,
					exception);
	}
	else {
		logger.error("Subscription to '{}' dropped unexpectedly! Reason: {}. Manual recovery required.",
					 subscription.streamId,
					 reason,
					 exception);
               ... shut-down related part of the system, clean-up etc ...
	}
}

So, basically, we reconnect upon each client disconnect (except on application shut-down). We then rely on that reconnect (and built-in resubscribe in CatchUpSubscription) for recovering subscriptions in case of closed connection. We do not attempt to recover subscriptions from various overflows etc. (never happened thus far, and probably useless without dynamic reconfiguration).

The only downside that I’m aware of is that all errors during catch-up phase (until live processing starts) cause subscription close with reason SubscriptionDropReason.CatchUpError, thus masking the real drop reason. In those cases, we abandon the resubscribe attempt and turn-off related part of the system (requiring manual intervantion). Large catch-up processing batches are super-rare and always supervised / controlled, so this works for us.

This approach solves #28 as well.

HTH

I see… it’s something wrong either with client or server v4. I’ve just tested (es single-node):

  • esjc 1.8.1 + es v3 => OK (works as expected)
  • esjc 1.8.1 + es v4 => something goes wrong with connection close and reconnect
  • esjc 2.0.0 + es v4 => something goes wrong with connection close and reconnect

I will look into this issue further.