X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FClientActorBehavior.java;fp=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FClientActorBehavior.java;h=45580e92fd734489616c4840a5feac2f529aafd8;hp=ddb7bcdad112e84623c797f8790a968eac07791a;hb=b4d95acff78952020e9fbde4372d13b461fd7469;hpb=61d4d322740f116d7d8ec91b8ba2e4eed409d7d7 diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java index ddb7bcdad1..45580e92fd 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.access.client; import com.google.common.annotations.Beta; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nonnull; @@ -21,6 +22,7 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException; +import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; @@ -36,6 +38,20 @@ import org.slf4j.LoggerFactory; @Beta public abstract class ClientActorBehavior extends RecoveredClientActorBehavior implements Identifiable { + /** + * Connection reconnect cohort, driven by this class. + */ + @FunctionalInterface + protected interface ConnectionConnectCohort { + /** + * Finish the connection by replaying previous messages onto the new connection. + * + * @param enqueuedEntries Previously-enqueued entries + * @return A {@link ReconnectForwarder} to handle any straggler messages which arrive after this method returns. + */ + @Nonnull ReconnectForwarder finishReconnect(@Nonnull Iterable enqueuedEntries); + } + private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class); /** @@ -185,28 +201,42 @@ public abstract class ClientActorBehavior extends } /** - * Callback invoked when a new connection has been established. + * Callback invoked when a new connection has been established. Implementations are expected perform preparatory + * tasks before the previous connection is frozen. * - * @param conn Old connection - * @param backend New backend - * @return Newly-connected connection. + * @param newConn New connection + * @return ConnectionConnectCohort which will be used to complete the process of bringing the connection up. */ @GuardedBy("connectionsLock") - protected abstract @Nonnull ConnectedClientConnection connectionUp( - final @Nonnull AbstractClientConnection conn, final @Nonnull T backend); + @Nonnull protected abstract ConnectionConnectCohort connectionUp(@Nonnull ConnectedClientConnection newConn); private void backendConnectFinished(final Long shard, final AbstractClientConnection conn, final T backend, final Throwable failure) { if (failure != null) { LOG.error("{}: failed to resolve shard {}", persistenceId(), shard, failure); + conn.poison(new RuntimeRequestException("Failed to resolve shard " + shard, failure)); return; } LOG.debug("{}: resolved shard {} to {}", persistenceId(), shard, backend); final long stamp = connectionsLock.writeLock(); try { - // Bring the connection up - final ConnectedClientConnection newConn = connectionUp(conn, backend); + // Create a new connected connection + final ConnectedClientConnection newConn = new ConnectedClientConnection<>(conn.context(), + conn.cookie(), backend); + LOG.debug("{}: resolving connection {} to {}", persistenceId(), conn, newConn); + + // Start reconnecting without the old connection lock held + final ConnectionConnectCohort cohort = Verify.verifyNotNull(connectionUp(newConn)); + + // Lock the old connection and get a reference to its entries + final Iterable replayIterable = conn.startReplay(); + + // Finish the connection attempt + final ReconnectForwarder forwarder = Verify.verifyNotNull(cohort.finishReconnect(replayIterable)); + + // Install the forwarder, unlocking the old connection + conn.finishReplay(forwarder); // Make sure new lookups pick up the new connection connections.replace(shard, conn, newConn);