There is a race window when we are establishing connection to the
backend:
When we received the pointer to shard leader, we send a connect
request, but during that time window the leader may move, resulting
in a NotLeaderException response to ConnectClientRequest. Since
we are in reconnection mode, this will result in hard abort of
connection.
Fix this by wrapping NotLeaderException and akka failures in a
TimeoutException -- hence we will retry connecting.
Change-Id: Ia5d1915d59e80a70c54302c1790121d0767ff08a
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
+import org.opendaylight.controller.cluster.access.commands.NotLeaderException;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, CONNECT_TIMEOUT))
.whenComplete((response, failure) -> {
if (failure != null) {
FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, CONNECT_TIMEOUT))
.whenComplete((response, failure) -> {
if (failure != null) {
- LOG.debug("Connect attempt to {} failed", shardName, failure);
- future.completeExceptionally(failure);
+ LOG.debug("Connect attempt to {} failed, will retry", shardName, failure);
+ future.completeExceptionally(wrap("Connection attempt failed", failure));
return;
}
if (response instanceof RequestFailure) {
final Throwable cause = ((RequestFailure<?, ?>) response).getCause().unwrap();
LOG.debug("Connect attempt to {} failed to process", shardName, cause);
return;
}
if (response instanceof RequestFailure) {
final Throwable cause = ((RequestFailure<?, ?>) response).getCause().unwrap();
LOG.debug("Connect attempt to {} failed to process", shardName, cause);
- future.completeExceptionally(cause);
+ final Throwable result = cause instanceof NotLeaderException
+ ? wrap("Leader moved during establishment", cause) : cause;
+ future.completeExceptionally(result);