X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FClientActorBehavior.java;h=9ba118ed6b9c95a1d5c914a4b681413af4cce95d;hp=3cdda596ade0739a679add4b55c1ade989d3aec3;hb=bc2b83e97bc73930badd4a3063c65b849f82c664;hpb=b0067e0a4bfa955f15c6259e019f954687264eff diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java index 3cdda596ad..9ba118ed6b 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java @@ -8,18 +8,30 @@ package org.opendaylight.controller.cluster.access.client; import com.google.common.annotations.Beta; -import java.util.Optional; -import java.util.concurrent.CompletionStage; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import org.opendaylight.controller.cluster.access.commands.TransactionRequest; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.access.commands.NotLeaderException; +import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; +import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException; +import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; +import org.opendaylight.yangtools.concepts.WritableIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; @@ -30,138 +42,173 @@ import scala.concurrent.duration.FiniteDuration; * @author Robert Varga */ @Beta -public abstract class ClientActorBehavior extends RecoveredClientActorBehavior - implements Identifiable { +public abstract class ClientActorBehavior extends + RecoveredClientActorBehavior implements Identifiable { + /** + * Connection reconnect cohort, driven by this class. + */ + @FunctionalInterface + protected interface ConnectionConnectCohort { + /** + * Finish the connection by replaying previous messages onto the new connection. + * + * @param enqueuedEntries Previously-enqueued entries + * @return A {@link ReconnectForwarder} to handle any straggler messages which arrive after this method returns. + */ + @Nonnull ReconnectForwarder finishReconnect(@Nonnull Collection enqueuedEntries); + } + private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class); + private static final FiniteDuration RESOLVE_RETRY_DURATION = FiniteDuration.apply(5, TimeUnit.SECONDS); + + /** + * Map of connections to the backend. This map is concurrent to allow lookups, but given complex operations + * involved in connection transitions it is protected by a {@link InversibleLock}. Write-side of the lock is taken + * during connection transitions. Optimistic read-side of the lock is taken when new connections are introduced + * into the map. + * + *

+ * The lock detects potential AB/BA deadlock scenarios and will force the reader side out by throwing + * a {@link InversibleLockException} -- which must be propagated up, releasing locks as it propagates. The initial + * entry point causing the the conflicting lookup must then call {@link InversibleLockException#awaitResolution()} + * before retrying the operation. + */ + // TODO: it should be possible to move these two into ClientActorContext + private final Map> connections = new ConcurrentHashMap<>(); + private final InversibleLock connectionsLock = new InversibleLock(); + private final BackendInfoResolver resolver; - protected ClientActorBehavior(final @Nonnull ClientActorContext context) { + protected ClientActorBehavior(@Nonnull final ClientActorContext context, + @Nonnull final BackendInfoResolver resolver) { super(context); + this.resolver = Preconditions.checkNotNull(resolver); } @Override - public final @Nonnull ClientIdentifier getIdentifier() { + @Nonnull + public final ClientIdentifier getIdentifier() { return context().getIdentifier(); } + /** + * Get a connection to a shard. + * + * @param shard Shard cookie + * @return Connection to a shard + * @throws InversibleLockException if the shard is being reconnected + */ + public final AbstractClientConnection getConnection(final Long shard) { + while (true) { + final long stamp = connectionsLock.optimisticRead(); + final AbstractClientConnection conn = connections.computeIfAbsent(shard, this::createConnection); + if (connectionsLock.validate(stamp)) { + // No write-lock in-between, return success + return conn; + } + } + } + + private AbstractClientConnection getConnection(final ResponseEnvelope response) { + // Always called from actor context: no locking required + return connections.get(extractCookie(response.getMessage().getTarget())); + } + + @SuppressWarnings("unchecked") @Override - final ClientActorBehavior onReceiveCommand(final Object command) { + final ClientActorBehavior onReceiveCommand(final Object command) { if (command instanceof InternalCommand) { - return ((InternalCommand) command).execute(this); + return ((InternalCommand) command).execute(this); } if (command instanceof SuccessEnvelope) { return onRequestSuccess((SuccessEnvelope) command); } if (command instanceof FailureEnvelope) { - return onRequestFailure((FailureEnvelope) command); + return internalOnRequestFailure((FailureEnvelope) command); } return onCommand(command); } - private ClientActorBehavior onRequestSuccess(final SuccessEnvelope command) { - return context().completeRequest(this, command); - } - - private ClientActorBehavior onRequestFailure(final FailureEnvelope command) { - final RequestFailure failure = command.getMessage(); - final RequestException cause = failure.getCause(); - if (cause instanceof RetiredGenerationException) { - LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause); - haltClient(cause); - context().poison(cause); - return null; - } - - if (failure.isHardFailure()) { - return context().completeRequest(this, command); + private static long extractCookie(final WritableIdentifier id) { + if (id instanceof TransactionIdentifier) { + return ((TransactionIdentifier) id).getHistoryId().getCookie(); + } else if (id instanceof LocalHistoryIdentifier) { + return ((LocalHistoryIdentifier) id).getCookie(); + } else { + throw new IllegalArgumentException("Unhandled identifier " + id); } - - // TODO: add instanceof checks on cause to detect more problems - - LOG.warn("{}: Unhandled retriable failure {}, promoting to hard failure", persistenceId(), command); - return context().completeRequest(this, command); } - // This method is executing in the actor context, hence we can safely interact with the queue - private ClientActorBehavior doSendRequest(final long sequence, final TransactionRequest request, - final RequestCallback callback) { - // Get or allocate queue for the request - final SequencedQueue queue = context().queueFor(request.getTarget().getHistoryId().getCookie()); - - // Note this is a tri-state return and can be null - final Optional result = queue.enqueueRequest(sequence, request, callback); - if (result == null) { - // Happy path: we are done here - return this; - } - - if (result.isPresent()) { - // Less happy path: we need to schedule a timer - scheduleQueueTimeout(queue, result.get()); - return this; + private void onResponse(final ResponseEnvelope response) { + final AbstractClientConnection connection = getConnection(response); + if (connection != null) { + connection.receiveResponse(response); + } else { + LOG.info("{}: Ignoring unknown response {}", persistenceId(), response); } + } - startResolve(queue, request.getTarget().getHistoryId().getCookie()); + private ClientActorBehavior onRequestSuccess(final SuccessEnvelope success) { + onResponse(success); return this; } - // This method is executing in the actor context, hence we can safely interact with the queue - private void startResolve(final SequencedQueue queue, final long cookie) { - // Queue does not have backend information. Initiate resolution, which may actually be piggy-backing on to a - // previous request to resolve. - final CompletionStage f = resolver().getBackendInfo(cookie); - - // This is the tricky part: depending on timing, the queue may have a stale request for resolution, which has - // been invalidated or it may already have a reference to this resolution request. Let us give it a chance to - // update and it will indicate if this resolution request is an update. If it is, we'll piggy-back on it and - // run backend information update in the actor thread. If it is not, we do not need to do anything, as we will - // bulk-process all requests. - if (queue.expectProof(f)) { - f.thenAccept(backend -> context().executeInActor(cb -> cb.finishResolve(queue, f, backend))); - } + private ClientActorBehavior onRequestFailure(final FailureEnvelope failure) { + onResponse(failure); + return this; } - // This method is executing in the actor context, hence we can safely interact with the queue - private ClientActorBehavior finishResolve(final SequencedQueue queue, - final CompletionStage futureBackend, final BackendInfo backend) { - - final Optional maybeTimeout = queue.setBackendInfo(futureBackend, backend); - if (maybeTimeout.isPresent()) { - scheduleQueueTimeout(queue, maybeTimeout.get()); + private ClientActorBehavior internalOnRequestFailure(final FailureEnvelope command) { + final RequestFailure failure = command.getMessage(); + final RequestException cause = failure.getCause(); + if (cause instanceof RetiredGenerationException) { + LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause); + haltClient(cause); + poison(cause); + return null; + } + if (cause instanceof NotLeaderException) { + final AbstractClientConnection conn = getConnection(command); + if (conn instanceof ReconnectingClientConnection) { + // Already reconnecting, do not churn the logs + return this; + } else if (conn != null) { + LOG.info("{}: connection {} indicated no leadership, reconnecting it", persistenceId(), conn, cause); + return conn.reconnect(this, cause); + } + } + if (cause instanceof OutOfSequenceEnvelopeException) { + final AbstractClientConnection conn = getConnection(command); + if (conn instanceof ReconnectingClientConnection) { + // Already reconnecting, do not churn the logs + return this; + } else if (conn != null) { + LOG.info("{}: connection {} indicated no sequencing mismatch on {} sequence {}, reconnecting it", + persistenceId(), conn, failure.getTarget(), failure.getSequence(), cause); + return conn.reconnect(this, cause); + } } - return this; - } - // This method is executing in the actor context, hence we can safely interact with the queue - private void scheduleQueueTimeout(final SequencedQueue queue, final FiniteDuration timeout) { - LOG.debug("{}: scheduling timeout in {}", persistenceId(), timeout); - context().executeInActor(cb -> cb.queueTimeout(queue), timeout); + return onRequestFailure(command); } - // This method is executing in the actor context, hence we can safely interact with the queue - private ClientActorBehavior queueTimeout(final SequencedQueue queue) { - final boolean needBackend; - + private void poison(final RequestException cause) { + final long stamp = connectionsLock.writeLock(); try { - needBackend = queue.runTimeout(); - } catch (NoProgressException e) { - // Uh-oh, no progress. The queue has already killed itself, now we need to remove it - context().removeQueue(queue); - return this; - } + for (AbstractClientConnection q : connections.values()) { + q.poison(cause); + } - if (needBackend) { - startResolve(queue, queue.getCookie()); + connections.clear(); + } finally { + connectionsLock.unlockWrite(stamp); } - - return this; } /** - * Halt And Catch Fire. - * - * Halt processing on this client. Implementations need to ensure they initiate state flush procedures. No attempt - * to use this instance should be made after this method returns. Any such use may result in undefined behavior. + * Halt And Catch Fire. Halt processing on this client. Implementations need to ensure they initiate state flush + * procedures. No attempt to use this instance should be made after this method returns. Any such use may result + * in undefined behavior. * * @param cause Failure cause */ @@ -170,26 +217,158 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior onCommand(@Nonnull Object command); /** * Override this method to provide a backend resolver instance. * - * @return + * @return a backend resolver instance */ - protected abstract @Nonnull BackendInfoResolver resolver(); + protected final @Nonnull BackendInfoResolver resolver() { + return resolver; + } /** - * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke - * from any thread. + * Callback invoked when a new connection has been established. Implementations are expected perform preparatory + * tasks before the previous connection is frozen. * - * @param request Request to send - * @param callback Callback to invoke + * @param newConn New connection + * @return ConnectionConnectCohort which will be used to complete the process of bringing the connection up. */ - public final void sendRequest(final long sequence, final TransactionRequest request, final RequestCallback callback) { - context().executeInActor(cb -> cb.doSendRequest(sequence, request, callback)); + @GuardedBy("connectionsLock") + @Nonnull protected abstract ConnectionConnectCohort connectionUp(@Nonnull ConnectedClientConnection newConn); + + private void backendConnectFinished(final Long shard, final AbstractClientConnection conn, + final T backend, final Throwable failure) { + if (failure != null) { + if (failure instanceof TimeoutException) { + if (!conn.equals(connections.get(shard))) { + // AbstractClientConnection will remove itself when it decides there is no point in continuing, + // at which point we want to stop retrying + LOG.info("{}: stopping resolution of shard {} on stale connection {}", persistenceId(), shard, conn, + failure); + return; + } + + LOG.debug("{}: timed out resolving shard {}, scheduling retry in {}", persistenceId(), shard, + RESOLVE_RETRY_DURATION, failure); + context().executeInActor(b -> { + resolveConnection(shard, conn); + return b; + }, RESOLVE_RETRY_DURATION); + return; + } + + LOG.error("{}: failed to resolve shard {}", persistenceId(), shard, failure); + final RequestException cause; + if (failure instanceof RequestException) { + cause = (RequestException) failure; + } else { + cause = new RuntimeRequestException("Failed to resolve shard " + shard, failure); + } + + conn.poison(cause); + return; + } + + LOG.info("{}: resolved shard {} to {}", persistenceId(), shard, backend); + final long stamp = connectionsLock.writeLock(); + try { + // Create a new connected connection + final ConnectedClientConnection newConn = new ConnectedClientConnection<>(conn.context(), + conn.cookie(), backend); + LOG.info("{}: resolving connection {} to {}", persistenceId(), conn, newConn); + + // Start reconnecting without the old connection lock held + final ConnectionConnectCohort cohort = Verify.verifyNotNull(connectionUp(newConn)); + + // Lock the old connection and get a reference to its entries + final Collection replayIterable = conn.startReplay(); + + // Finish the connection attempt + final ReconnectForwarder forwarder = Verify.verifyNotNull(cohort.finishReconnect(replayIterable)); + + // Install the forwarder, unlocking the old connection + conn.finishReplay(forwarder); + + // Make sure new lookups pick up the new connection + if (!connections.replace(shard, conn, newConn)) { + final AbstractClientConnection existing = connections.get(conn.cookie()); + LOG.warn("{}: old connection {} does not match existing {}, new connection {} in limbo", + persistenceId(), conn, existing, newConn); + } else { + LOG.info("{}: replaced connection {} with {}", persistenceId(), conn, newConn); + } + } finally { + connectionsLock.unlockWrite(stamp); + } + } + + void removeConnection(final AbstractClientConnection conn) { + final long stamp = connectionsLock.writeLock(); + try { + if (!connections.remove(conn.cookie(), conn)) { + final AbstractClientConnection existing = connections.get(conn.cookie()); + if (existing != null) { + LOG.warn("{}: failed to remove connection {}, as it was superseded by {}", persistenceId(), conn, + existing); + } else { + LOG.warn("{}: failed to remove connection {}, as it was not tracked", persistenceId(), conn); + } + } else { + LOG.info("{}: removed connection {}", persistenceId(), conn); + } + } finally { + connectionsLock.unlockWrite(stamp); + } + } + + @SuppressWarnings("unchecked") + void reconnectConnection(final ConnectedClientConnection oldConn, + final ReconnectingClientConnection newConn) { + final ReconnectingClientConnection conn = (ReconnectingClientConnection)newConn; + LOG.info("{}: connection {} reconnecting as {}", persistenceId(), oldConn, newConn); + + final long stamp = connectionsLock.writeLock(); + try { + final boolean replaced = connections.replace(oldConn.cookie(), (AbstractClientConnection)oldConn, conn); + if (!replaced) { + final AbstractClientConnection existing = connections.get(oldConn.cookie()); + if (existing != null) { + LOG.warn("{}: failed to replace connection {}, as it was superseded by {}", persistenceId(), conn, + existing); + } else { + LOG.warn("{}: failed to replace connection {}, as it was not tracked", persistenceId(), conn); + } + } + } finally { + connectionsLock.unlockWrite(stamp); + } + + final Long shard = oldConn.cookie(); + LOG.info("{}: refreshing backend for shard {}", persistenceId(), shard); + resolver().refreshBackendInfo(shard, conn.getBackendInfo().get()).whenComplete( + (backend, failure) -> context().executeInActor(behavior -> { + backendConnectFinished(shard, conn, backend, failure); + return behavior; + })); + } + + private ConnectingClientConnection createConnection(final Long shard) { + final ConnectingClientConnection conn = new ConnectingClientConnection<>(context(), shard); + resolveConnection(shard, conn); + return conn; + } + + private void resolveConnection(final Long shard, final AbstractClientConnection conn) { + LOG.debug("{}: resolving shard {} connection {}", persistenceId(), shard, conn); + resolver().getBackendInfo(shard).whenComplete((backend, failure) -> context().executeInActor(behavior -> { + backendConnectFinished(shard, conn, backend, failure); + return behavior; + })); } }