X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FClientActorBehavior.java;h=79d7eceb148d8d255ab1c9785e7349ebb5c72522;hb=dafc95d149bc62f101de37e94b9b5e3526d4e87b;hp=ccfbba6cd3d94f92ef9757999352893e1b0f7270;hpb=417f3a8c2401e6c8ce3c91ea969da24929c24c1c;p=controller.git diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java index ccfbba6cd3..79d7eceb14 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java @@ -32,6 +32,9 @@ import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationExce import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherType; +import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; +import org.opendaylight.controller.cluster.messaging.MessageAssembler; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.concepts.WritableIdentifier; import org.slf4j.Logger; @@ -61,7 +64,7 @@ public abstract class ClientActorBehavior extends } private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class); - private static final FiniteDuration RESOLVE_RETRY_DURATION = FiniteDuration.apply(5, TimeUnit.SECONDS); + private static final FiniteDuration RESOLVE_RETRY_DURATION = FiniteDuration.apply(1, TimeUnit.SECONDS); /** * Map of connections to the backend. This map is concurrent to allow lookups, but given complex operations @@ -79,11 +82,18 @@ public abstract class ClientActorBehavior extends private final Map> connections = new ConcurrentHashMap<>(); private final InversibleLock connectionsLock = new InversibleLock(); private final BackendInfoResolver resolver; + private final MessageAssembler responseMessageAssembler; protected ClientActorBehavior(@Nonnull final ClientActorContext context, @Nonnull final BackendInfoResolver resolver) { super(context); this.resolver = Preconditions.checkNotNull(resolver); + + final ClientActorConfig config = context.config(); + responseMessageAssembler = MessageAssembler.builder().logContext(persistenceId()) + .fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(), + config.getTempFileDirectory())) + .assembledMessageCallback((message, sender) -> context.self().tell(message, sender)).build(); } @Override @@ -92,6 +102,11 @@ public abstract class ClientActorBehavior extends return context().getIdentifier(); } + @Override + public void close() { + responseMessageAssembler.close(); + } + /** * Get a connection to a shard. * @@ -121,13 +136,21 @@ public abstract class ClientActorBehavior extends if (command instanceof InternalCommand) { return ((InternalCommand) command).execute(this); } + if (command instanceof SuccessEnvelope) { return onRequestSuccess((SuccessEnvelope) command); } + if (command instanceof FailureEnvelope) { return internalOnRequestFailure((FailureEnvelope) command); } + if (MessageAssembler.isHandledMessage(command)) { + context().dispatchers().getDispatcher(DispatcherType.Serialization).execute( + () -> responseMessageAssembler.handleMessage(command, context().self())); + return this; + } + return onCommand(command); } @@ -202,8 +225,8 @@ public abstract class ClientActorBehavior extends // Already reconnecting, do not churn the logs return this; } else if (conn != null) { - LOG.info("{}: connection {} indicated no sequencing mismatch on {} sequence {}, reconnecting it", - persistenceId(), conn, failure.getTarget(), failure.getSequence(), cause); + LOG.info("{}: connection {} indicated sequencing mismatch on {} sequence {} ({}), reconnecting it", + persistenceId(), conn, failure.getTarget(), failure.getSequence(), command.getTxSequence(), cause); return conn.reconnect(this, cause); } } @@ -261,22 +284,22 @@ public abstract class ClientActorBehavior extends @GuardedBy("connectionsLock") @Nonnull protected abstract ConnectionConnectCohort connectionUp(@Nonnull ConnectedClientConnection newConn); - private void backendConnectFinished(final Long shard, final AbstractClientConnection conn, + private void backendConnectFinished(final Long shard, final AbstractClientConnection oldConn, final T backend, final Throwable failure) { if (failure != null) { if (failure instanceof TimeoutException) { - if (!conn.equals(connections.get(shard))) { + if (!oldConn.equals(connections.get(shard))) { // AbstractClientConnection will remove itself when it decides there is no point in continuing, // at which point we want to stop retrying - LOG.info("{}: stopping resolution of shard {} on stale connection {}", persistenceId(), shard, conn, - failure); + LOG.info("{}: stopping resolution of shard {} on stale connection {}", persistenceId(), shard, + oldConn, failure); return; } LOG.debug("{}: timed out resolving shard {}, scheduling retry in {}", persistenceId(), shard, RESOLVE_RETRY_DURATION, failure); context().executeInActor(b -> { - resolveConnection(shard, conn); + resolveConnection(shard, oldConn); return b; }, RESOLVE_RETRY_DURATION); return; @@ -290,7 +313,7 @@ public abstract class ClientActorBehavior extends cause = new RuntimeRequestException("Failed to resolve shard " + shard, failure); } - conn.poison(cause); + oldConn.poison(cause); return; } @@ -300,29 +323,31 @@ public abstract class ClientActorBehavior extends final Stopwatch sw = Stopwatch.createStarted(); // Create a new connected connection - final ConnectedClientConnection newConn = new ConnectedClientConnection<>(conn.context(), - conn.cookie(), backend); - LOG.info("{}: resolving connection {} to {}", persistenceId(), conn, newConn); + final ConnectedClientConnection newConn = new ConnectedClientConnection<>(oldConn, backend); + LOG.info("{}: resolving connection {} to {}", persistenceId(), oldConn, newConn); // Start reconnecting without the old connection lock held final ConnectionConnectCohort cohort = Verify.verifyNotNull(connectionUp(newConn)); // Lock the old connection and get a reference to its entries - final Collection replayIterable = conn.startReplay(); + final Collection replayIterable = oldConn.startReplay(); // Finish the connection attempt final ReconnectForwarder forwarder = Verify.verifyNotNull(cohort.finishReconnect(replayIterable)); + // Cancel sleep debt after entries were replayed, before new connection starts receiving. + newConn.cancelDebt(); + // Install the forwarder, unlocking the old connection - conn.finishReplay(forwarder); + oldConn.finishReplay(forwarder); // Make sure new lookups pick up the new connection - if (!connections.replace(shard, conn, newConn)) { - final AbstractClientConnection existing = connections.get(conn.cookie()); + if (!connections.replace(shard, oldConn, newConn)) { + final AbstractClientConnection existing = connections.get(oldConn.cookie()); LOG.warn("{}: old connection {} does not match existing {}, new connection {} in limbo", - persistenceId(), conn, existing, newConn); + persistenceId(), oldConn, existing, newConn); } else { - LOG.info("{}: replaced connection {} with {} in {}", persistenceId(), conn, newConn, sw); + LOG.info("{}: replaced connection {} with {} in {}", persistenceId(), oldConn, newConn, sw); } } finally { connectionsLock.unlockWrite(stamp);