X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FClientActorBehavior.java;h=e70fdc9662c6232dcc81bda50d95795cca02dbde;hp=e4b73b14f0c71f85bde3279522887a5b466700f8;hb=3488960e844918404db473ee684661f98dab3563;hpb=320a4e5cd2d9d80468a3f82798744f2035488218 diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java index e4b73b14f0..e70fdc9662 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java @@ -9,23 +9,30 @@ package org.opendaylight.controller.cluster.access.client; import com.google.common.annotations.Beta; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.access.commands.NotLeaderException; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException; +import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.concepts.WritableIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; /** * A behavior, which handles messages sent to a {@link AbstractClientActor}. @@ -35,7 +42,22 @@ import org.slf4j.LoggerFactory; @Beta public abstract class ClientActorBehavior extends RecoveredClientActorBehavior implements Identifiable { + /** + * Connection reconnect cohort, driven by this class. + */ + @FunctionalInterface + protected interface ConnectionConnectCohort { + /** + * Finish the connection by replaying previous messages onto the new connection. + * + * @param enqueuedEntries Previously-enqueued entries + * @return A {@link ReconnectForwarder} to handle any straggler messages which arrive after this method returns. + */ + @Nonnull ReconnectForwarder finishReconnect(@Nonnull Iterable enqueuedEntries); + } + private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class); + private static final FiniteDuration RESOLVE_RETRY_DURATION = FiniteDuration.apply(5, TimeUnit.SECONDS); /** * Map of connections to the backend. This map is concurrent to allow lookups, but given complex operations @@ -84,6 +106,11 @@ public abstract class ClientActorBehavior extends } } + private AbstractClientConnection getConnection(final ResponseEnvelope response) { + // Always called from actor context: no locking required + return connections.get(extractCookie(response.getMessage().getTarget())); + } + @SuppressWarnings("unchecked") @Override final ClientActorBehavior onReceiveCommand(final Object command) { @@ -100,14 +127,18 @@ public abstract class ClientActorBehavior extends return onCommand(command); } - private void onResponse(final ResponseEnvelope response) { - final WritableIdentifier id = response.getMessage().getTarget(); - - // FIXME: this will need to be updated for other Request/Response types to extract cookie - Preconditions.checkArgument(id instanceof TransactionIdentifier); - final TransactionIdentifier txId = (TransactionIdentifier) id; + private static long extractCookie(final WritableIdentifier id) { + if (id instanceof TransactionIdentifier) { + return ((TransactionIdentifier) id).getHistoryId().getCookie(); + } else if (id instanceof LocalHistoryIdentifier) { + return ((LocalHistoryIdentifier) id).getCookie(); + } else { + throw new IllegalArgumentException("Unhandled identifier " + id); + } + } - final AbstractClientConnection connection = connections.get(txId.getHistoryId().getCookie()); + private void onResponse(final ResponseEnvelope response) { + final AbstractClientConnection connection = getConnection(response); if (connection != null) { connection.receiveResponse(response); } else { @@ -134,6 +165,16 @@ public abstract class ClientActorBehavior extends poison(cause); return null; } + if (cause instanceof NotLeaderException) { + final AbstractClientConnection conn = getConnection(command); + if (conn instanceof ReconnectingClientConnection) { + // Already reconnecting, do not churn the logs + return this; + } else if (conn != null) { + LOG.info("{}: connection {} indicated no leadership, reconnecting it", persistenceId(), conn, cause); + return conn.reconnect(this); + } + } return onRequestFailure(command); } @@ -179,31 +220,71 @@ public abstract class ClientActorBehavior extends } /** - * Callback invoked when a new connection has been established. + * Callback invoked when a new connection has been established. Implementations are expected perform preparatory + * tasks before the previous connection is frozen. * - * @param conn Old connection - * @param backend New backend - * @return Newly-connected connection. + * @param newConn New connection + * @return ConnectionConnectCohort which will be used to complete the process of bringing the connection up. */ @GuardedBy("connectionsLock") - protected abstract @Nonnull ConnectedClientConnection connectionUp( - final @Nonnull AbstractClientConnection conn, final @Nonnull T backend); + @Nonnull protected abstract ConnectionConnectCohort connectionUp(@Nonnull ConnectedClientConnection newConn); private void backendConnectFinished(final Long shard, final AbstractClientConnection conn, final T backend, final Throwable failure) { if (failure != null) { + if (failure instanceof TimeoutException) { + if (!conn.equals(connections.get(shard))) { + // AbstractClientConnection will remove itself when it decides there is no point in continuing, + // at which point we want to stop retrying + LOG.info("{}: stopping resolution of shard {} on stale connection {}", persistenceId(), shard, conn, + failure); + return; + } + + LOG.debug("{}: timed out resolving shard {}, scheduling retry in {}", persistenceId(), shard, + RESOLVE_RETRY_DURATION, failure); + context().executeInActor(b -> { + resolveConnection(shard, conn); + return b; + }, RESOLVE_RETRY_DURATION); + return; + } + LOG.error("{}: failed to resolve shard {}", persistenceId(), shard, failure); + final RequestException cause; + if (failure instanceof RequestException) { + cause = (RequestException) failure; + } else { + cause = new RuntimeRequestException("Failed to resolve shard " + shard, failure); + } + + conn.poison(cause); return; } + LOG.info("{}: resolved shard {} to {}", persistenceId(), shard, backend); final long stamp = connectionsLock.writeLock(); try { - // Bring the connection up - final ConnectedClientConnection newConn = connectionUp(conn, backend); + // Create a new connected connection + final ConnectedClientConnection newConn = new ConnectedClientConnection<>(conn.context(), + conn.cookie(), backend); + LOG.info("{}: resolving connection {} to {}", persistenceId(), conn, newConn); + + // Start reconnecting without the old connection lock held + final ConnectionConnectCohort cohort = Verify.verifyNotNull(connectionUp(newConn)); + + // Lock the old connection and get a reference to its entries + final Iterable replayIterable = conn.startReplay(); + + // Finish the connection attempt + final ReconnectForwarder forwarder = Verify.verifyNotNull(cohort.finishReconnect(replayIterable)); + + // Install the forwarder, unlocking the old connection + conn.finishReplay(forwarder); // Make sure new lookups pick up the new connection connections.replace(shard, conn, newConn); - LOG.debug("{}: replaced connection {} with {}", persistenceId(), conn, newConn); + LOG.info("{}: replaced connection {} with {}", persistenceId(), conn, newConn); } finally { connectionsLock.unlockWrite(stamp); } @@ -218,10 +299,17 @@ public abstract class ClientActorBehavior extends void reconnectConnection(final ConnectedClientConnection oldConn, final ReconnectingClientConnection newConn) { final ReconnectingClientConnection conn = (ReconnectingClientConnection)newConn; - connections.replace(oldConn.cookie(), (AbstractClientConnection)oldConn, conn); - LOG.debug("{}: connection {} reconnecting as {}", persistenceId(), oldConn, newConn); + LOG.info("{}: connection {} reconnecting as {}", persistenceId(), oldConn, newConn); + + final boolean replaced = connections.replace(oldConn.cookie(), (AbstractClientConnection)oldConn, conn); + if (!replaced) { + final AbstractClientConnection existing = connections.get(oldConn.cookie()); + LOG.warn("{}: old connection {} does not match existing {}, new connection {} in limbo", persistenceId(), + oldConn, existing, newConn); + } final Long shard = oldConn.cookie(); + LOG.info("{}: refreshing backend for shard {}", persistenceId(), shard); resolver().refreshBackendInfo(shard, conn.getBackendInfo().get()).whenComplete( (backend, failure) -> context().executeInActor(behavior -> { backendConnectFinished(shard, conn, backend, failure); @@ -231,12 +319,15 @@ public abstract class ClientActorBehavior extends private ConnectingClientConnection createConnection(final Long shard) { final ConnectingClientConnection conn = new ConnectingClientConnection<>(context(), shard); + resolveConnection(shard, conn); + return conn; + } + private void resolveConnection(final Long shard, final AbstractClientConnection conn) { + LOG.debug("{}: resolving shard {} connection {}", persistenceId(), shard, conn); resolver().getBackendInfo(shard).whenComplete((backend, failure) -> context().executeInActor(behavior -> { backendConnectFinished(shard, conn, backend, failure); return behavior; })); - - return conn; } }