X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FClientActorBehavior.java;h=358704524a07451da1e72c26bec9192f2fd71e06;hp=554ffe97c77ae7c3011daca2a28c22f660a20e78;hb=b99dc64f4c2373e28c3c94c11cedad0e5f7abe1d;hpb=4de61ef3bd95898d262564a1bb48ab29080f4211 diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java index 554ffe97c7..358704524a 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java @@ -13,6 +13,7 @@ import com.google.common.base.Stopwatch; import com.google.common.base.Verify; import java.util.Collection; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -31,6 +32,9 @@ import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationExce import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherType; +import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; +import org.opendaylight.controller.cluster.messaging.MessageAssembler; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.concepts.WritableIdentifier; import org.slf4j.Logger; @@ -78,11 +82,18 @@ public abstract class ClientActorBehavior extends private final Map> connections = new ConcurrentHashMap<>(); private final InversibleLock connectionsLock = new InversibleLock(); private final BackendInfoResolver resolver; + private final MessageAssembler responseMessageAssembler; protected ClientActorBehavior(@Nonnull final ClientActorContext context, @Nonnull final BackendInfoResolver resolver) { super(context); this.resolver = Preconditions.checkNotNull(resolver); + + final ClientActorConfig config = context.config(); + responseMessageAssembler = MessageAssembler.builder().logContext(persistenceId()) + .fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(), + config.getTempFileDirectory())) + .assembledMessageCallback((message, sender) -> context.self().tell(message, sender)).build(); } @Override @@ -91,6 +102,11 @@ public abstract class ClientActorBehavior extends return context().getIdentifier(); } + @Override + public void close() { + responseMessageAssembler.close(); + } + /** * Get a connection to a shard. * @@ -120,13 +136,21 @@ public abstract class ClientActorBehavior extends if (command instanceof InternalCommand) { return ((InternalCommand) command).execute(this); } + if (command instanceof SuccessEnvelope) { return onRequestSuccess((SuccessEnvelope) command); } + if (command instanceof FailureEnvelope) { return internalOnRequestFailure((FailureEnvelope) command); } + if (MessageAssembler.isHandledMessage(command)) { + context().dispatchers().getDispatcher(DispatcherType.Serialization).execute( + () -> responseMessageAssembler.handleMessage(command, context().self())); + return this; + } + return onCommand(command); } @@ -160,6 +184,25 @@ public abstract class ClientActorBehavior extends } private ClientActorBehavior internalOnRequestFailure(final FailureEnvelope command) { + final AbstractClientConnection conn = getConnection(command); + if (conn != null) { + /* + * We are talking to multiple actors, which may be lagging behind our state significantly. This has + * the effect that we may be receiving responses from a previous connection after we have created a new + * one to a different actor. + * + * Since we are already replaying requests to the new actor, we want to ignore errors reported on the old + * connection -- for example NotLeaderException, which must not cause a new reconnect. Check the envelope's + * sessionId and if it does not match our current connection just ignore it. + */ + final Optional optBackend = conn.getBackendInfo(); + if (optBackend.isPresent() && optBackend.get().getSessionId() != command.getSessionId()) { + LOG.debug("{}: Mismatched current connection {} and envelope {}, ignoring response", persistenceId(), + conn, command); + return this; + } + } + final RequestFailure failure = command.getMessage(); final RequestException cause = failure.getCause(); if (cause instanceof RetiredGenerationException) { @@ -169,7 +212,6 @@ public abstract class ClientActorBehavior extends return null; } if (cause instanceof NotLeaderException) { - final AbstractClientConnection conn = getConnection(command); if (conn instanceof ReconnectingClientConnection) { // Already reconnecting, do not churn the logs return this; @@ -179,13 +221,12 @@ public abstract class ClientActorBehavior extends } } if (cause instanceof OutOfSequenceEnvelopeException) { - final AbstractClientConnection conn = getConnection(command); if (conn instanceof ReconnectingClientConnection) { // Already reconnecting, do not churn the logs return this; } else if (conn != null) { - LOG.info("{}: connection {} indicated no sequencing mismatch on {} sequence {}, reconnecting it", - persistenceId(), conn, failure.getTarget(), failure.getSequence(), cause); + LOG.info("{}: connection {} indicated sequencing mismatch on {} sequence {} ({}), reconnecting it", + persistenceId(), conn, failure.getTarget(), failure.getSequence(), command.getTxSequence(), cause); return conn.reconnect(this, cause); } }