From: Robert Varga Date: Thu, 1 Dec 2016 15:28:53 +0000 (+0100) Subject: BUG-5280: expose queue messages during reconnect X-Git-Tag: release/carbon~364 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=b4d95acff78952020e9fbde4372d13b461fd7469 BUG-5280: expose queue messages during reconnect This patch reworks the internals of AbstractClientConnection to isolate the TransmitQueue from the rest of the logic, so we have proper split between implementation and interface exposed to the users. Furthermore the public interface is slightly reworked so the individual Proxies have access to the (locked) queue contents, which is needed to correctly replay transaction state within transaction chains. Change-Id: I1c08fa06eec4dd581e07002059c5142e6b0c1ed4 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java index bade34cb2f..0366e7ace2 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java @@ -10,20 +10,16 @@ package org.opendaylight.controller.cluster.access.client; import akka.actor.ActorRef; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Verify; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.ArrayDeque; -import java.util.Iterator; -import java.util.Map.Entry; import java.util.Optional; -import java.util.Queue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.Request; -import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; @@ -48,33 +44,30 @@ public abstract class AbstractClientConnection { @VisibleForTesting static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30); - private final Queue inflight; - private final Queue pending; - + private final Lock lock = new ReentrantLock(); private final ClientActorContext context; + @GuardedBy("lock") + private final TransmitQueue queue; private final Long cookie; - private volatile ReconnectForwarder successor; private volatile RequestException poisoned; private long lastProgress; - private AbstractClientConnection(final ClientActorContext context, final Long cookie, - final Queue inflight, final Queue pending) { + // Do not allow subclassing outside of this package + AbstractClientConnection(final ClientActorContext context, final Long cookie, + final TransmitQueue queue) { this.context = Preconditions.checkNotNull(context); this.cookie = Preconditions.checkNotNull(cookie); - this.inflight = Preconditions.checkNotNull(inflight); - this.pending = Preconditions.checkNotNull(pending); + this.queue = Preconditions.checkNotNull(queue); this.lastProgress = readTime(); } - // Do not allow subclassing outside of this package - AbstractClientConnection(final ClientActorContext context, final Long cookie) { - this(context, cookie, new ArrayDeque<>(), new ArrayDeque<>(1)); - } - // Do not allow subclassing outside of this package AbstractClientConnection(final AbstractClientConnection oldConnection) { - this(oldConnection.context, oldConnection.cookie, oldConnection.inflight, oldConnection.pending); + this.context = oldConnection.context; + this.cookie = oldConnection.cookie; + this.lastProgress = oldConnection.lastProgress; + this.queue = new TransmitQueue.Halted(); } public final ClientActorContext context() { @@ -97,83 +90,52 @@ public abstract class AbstractClientConnection { * @param callback Callback to invoke */ public final void sendRequest(final Request request, final Consumer> callback) { - Preconditions.checkState(poisoned == null, "Connection %s has been poisoned", this); - - final ReconnectForwarder beforeQueue = successor; - final ConnectionEntry entry = new ConnectionEntry(request, callback, readTime()); - if (beforeQueue != null) { - LOG.trace("Forwarding entry {} from {} to {}", entry, this, beforeQueue); - beforeQueue.forwardEntry(entry); - return; + final RequestException maybePoison = poisoned; + if (maybePoison != null) { + throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison); } - enqueueEntry(entry); + final ConnectionEntry entry = new ConnectionEntry(request, callback, readTime()); - final ReconnectForwarder afterQueue = successor; - if (afterQueue != null) { - synchronized (this) { - spliceToSuccessor(afterQueue); - } + lock.lock(); + try { + queue.enqueue(entry, entry.getEnqueuedTicks()); + } finally { + lock.unlock(); } } - public final synchronized void setForwarder(final ReconnectForwarder forwarder) { - Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this); - successor = Preconditions.checkNotNull(forwarder); - LOG.debug("Connection {} superseded by {}, splicing queue", this, successor); - spliceToSuccessor(forwarder); - } - public abstract Optional getBackendInfo(); - abstract ClientActorBehavior reconnectConnection(ClientActorBehavior current); - - abstract int remoteMaxMessages(); - - abstract Entry prepareForTransmit(Request req); - - @GuardedBy("this") - final void spliceToSuccessor(final ReconnectForwarder successor) { - ConnectionEntry entry = inflight.poll(); - while (entry != null) { - successor.forwardEntry(entry); - entry = inflight.poll(); - } - - entry = pending.poll(); - while (entry != null) { - successor.forwardEntry(entry); - entry = pending.poll(); - } + final Iterable startReplay() { + lock.lock(); + return queue.asIterable(); } - private long readTime() { - return context.ticker().read(); + @GuardedBy("lock") + final void finishReplay(final ReconnectForwarder forwarder) { + queue.setForwarder(forwarder, readTime()); + lock.unlock(); } - private void transmit(final ConnectionEntry entry) { - final Entry tuple = prepareForTransmit(entry.getRequest()); - final RequestEnvelope req = tuple.getValue(); + @GuardedBy("lock") + final void setForwarder(final ReconnectForwarder forwarder) { + queue.setForwarder(forwarder, readTime()); + } - // We need to enqueue the request before we send it to the actor, as we may be executing on a different thread - // than the client actor thread, in which case the round-trip could be made faster than we can enqueue -- - // in which case the receive routine would not find the entry. - final TransmittedConnectionEntry txEntry = new TransmittedConnectionEntry(entry, req.getSessionId(), - req.getTxSequence(), readTime()); - inflight.add(txEntry); + @GuardedBy("lock") + abstract ClientActorBehavior reconnectConnection(ClientActorBehavior current); - final ActorRef actor = tuple.getKey(); - LOG.trace("Transmitting request {} as {} to {}", entry.getRequest(), req, actor); - actor.tell(req, ActorRef.noSender()); + private long readTime() { + return context.ticker().read(); } - final void enqueueEntry(final ConnectionEntry entry) { - if (inflight.size() < remoteMaxMessages()) { - transmit(entry); - LOG.debug("Enqueued request {} to queue {}", entry.getRequest(), this); - } else { - LOG.debug("Queue is at capacity, delayed sending of request {}", entry.getRequest()); - pending.add(entry); + final void enqueueEntry(final ConnectionEntry entry, final long now) { + lock.lock(); + try { + queue.enqueue(entry, now); + } finally { + lock.unlock(); } } @@ -196,27 +158,33 @@ public abstract class AbstractClientConnection { */ @VisibleForTesting final ClientActorBehavior runTimer(final ClientActorBehavior current) { - final long now = readTime(); - - if (!inflight.isEmpty() || !pending.isEmpty()) { - final long ticksSinceProgress = now - lastProgress; - if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) { - LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this, - TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress)); - - poison(new NoProgressException(ticksSinceProgress)); - current.removeConnection(this); - return current; + final Optional delay; + + lock.lock(); + try { + final long now = readTime(); + if (!queue.isEmpty()) { + final long ticksSinceProgress = now - lastProgress; + if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) { + LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this, + TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress)); + + lockedPoison(new NoProgressException(ticksSinceProgress)); + current.removeConnection(this); + return current; + } } - } - // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward. - // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual tri-state - // return convention. - final Optional delay = checkTimeout(now); - if (delay == null) { - // We have timed out. There is no point in scheduling a timer - return reconnectConnection(current); + // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward. + // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual tri-state + // return convention. + delay = lockedCheckTimeout(now); + if (delay == null) { + // We have timed out. There is no point in scheduling a timer + return reconnectConnection(current); + } + } finally { + lock.unlock(); } if (delay.isPresent()) { @@ -227,6 +195,16 @@ public abstract class AbstractClientConnection { return current; } + @VisibleForTesting + final Optional checkTimeout(final long now) { + lock.lock(); + try { + return lockedCheckTimeout(now); + } finally { + lock.unlock(); + } + } + /* * We are using tri-state return here to indicate one of three conditions: * - if there is no timeout to schedule, return Optional.empty() @@ -235,7 +213,9 @@ public abstract class AbstractClientConnection { */ @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL", justification = "Returning null Optional is documented in the API contract.") - private Optional checkTimeout(final ConnectionEntry head, final long now) { + @GuardedBy("lock") + private Optional lockedCheckTimeout(final long now) { + final ConnectionEntry head = queue.peek(); if (head == null) { return Optional.empty(); } @@ -249,39 +229,19 @@ public abstract class AbstractClientConnection { return Optional.of(FiniteDuration.apply(delay, TimeUnit.NANOSECONDS)); } - /* - * We are using tri-state return here to indicate one of three conditions: - * - if there is no timeout to schedule, return Optional.empty() - * - if there is a timeout to schedule, return a non-empty optional - * - if this connections has timed out, return null - */ - @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL", - justification = "Returning null Optional is documented in the API contract.") - @VisibleForTesting - final Optional checkTimeout(final long now) { - final Optional xmit = checkTimeout(inflight.peek(), now); - if (xmit == null) { - return null; - } - final Optional pend = checkTimeout(pending.peek(), now); - if (pend == null) { - return null; - } - if (!xmit.isPresent()) { - return pend; - } - if (!pend.isPresent()) { - return xmit; + final void poison(final RequestException cause) { + lock.lock(); + try { + lockedPoison(cause); + } finally { + lock.unlock(); } - - return Optional.of(xmit.get().min(pend.get())); } - final void poison(final RequestException cause) { + @GuardedBy("lock") + private void lockedPoison(final RequestException cause) { poisoned = cause; - - poisonQueue(inflight, cause); - poisonQueue(pending, cause); + queue.poison(cause); } @VisibleForTesting @@ -290,97 +250,15 @@ public abstract class AbstractClientConnection { } final void receiveResponse(final ResponseEnvelope envelope) { - Optional maybeEntry = findMatchingEntry(inflight, envelope); - if (maybeEntry == null) { - LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope); - maybeEntry = findMatchingEntry(pending, envelope); - } - - if (maybeEntry == null || !maybeEntry.isPresent()) { - LOG.warn("No request matching {} found, ignoring response", envelope); - return; - } - - final TransmittedConnectionEntry entry = maybeEntry.get(); - LOG.debug("Completing {} with {}", entry, envelope); - entry.complete(envelope.getMessage()); - - // We have freed up a slot, try to transmit something - int toSend = remoteMaxMessages() - inflight.size(); - while (toSend > 0) { - final ConnectionEntry e = pending.poll(); - if (e == null) { - break; - } + final long now = readTime(); - LOG.debug("Transmitting entry {}", e); - transmit(e); - toSend--; + lock.lock(); + try { + queue.complete(envelope, now); + } finally { + lock.unlock(); } lastProgress = readTime(); } - - private static void poisonQueue(final Queue queue, final RequestException cause) { - for (ConnectionEntry e : queue) { - final Request request = e.getRequest(); - LOG.trace("Poisoning request {}", request, cause); - e.complete(request.toRequestFailure(cause)); - } - queue.clear(); - } - - /* - * We are using tri-state return here to indicate one of three conditions: - * - if a matching entry is found, return an Optional containing it - * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null - * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional - */ - @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL", - justification = "Returning null Optional is documented in the API contract.") - private static Optional findMatchingEntry(final Queue queue, - final ResponseEnvelope envelope) { - // Try to find the request in a queue. Responses may legally come back in a different order, hence we need - // to use an iterator - final Iterator it = queue.iterator(); - while (it.hasNext()) { - final ConnectionEntry e = it.next(); - final Request request = e.getRequest(); - final Response response = envelope.getMessage(); - - // First check for matching target, or move to next entry - if (!request.getTarget().equals(response.getTarget())) { - continue; - } - - // Sanity-check logical sequence, ignore any out-of-order messages - if (request.getSequence() != response.getSequence()) { - LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope); - return Optional.empty(); - } - - // Check if the entry has (ever) been transmitted - if (!(e instanceof TransmittedConnectionEntry)) { - return Optional.empty(); - } - - final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e; - - // Now check session match - if (envelope.getSessionId() != te.getSessionId()) { - LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope); - return Optional.empty(); - } - if (envelope.getTxSequence() != te.getTxSequence()) { - LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope); - return Optional.empty(); - } - - LOG.debug("Completing request {} with {}", request, envelope); - it.remove(); - return Optional.of(te); - } - - return null; - } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java index 8646bfcba5..15da294ec9 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java @@ -20,17 +20,15 @@ import java.util.Optional; */ abstract class AbstractReceivingClientConnection extends AbstractClientConnection { private final T backend; - private long nextTxSequence; AbstractReceivingClientConnection(final ClientActorContext context, final Long cookie, final T backend) { - super(context, cookie); + super(context, cookie, new TransmitQueue.Transmitting(backend)); this.backend = Preconditions.checkNotNull(backend); } AbstractReceivingClientConnection(final AbstractReceivingClientConnection oldConnection) { super(oldConnection); this.backend = oldConnection.backend; - this.nextTxSequence = oldConnection.nextTxSequence; } @Override @@ -41,8 +39,4 @@ abstract class AbstractReceivingClientConnection extends final T backend() { return backend; } - - final long nextTxSequence() { - return nextTxSequence++; - } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java index ddb7bcdad1..45580e92fd 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.access.client; import com.google.common.annotations.Beta; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nonnull; @@ -21,6 +22,7 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException; +import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; @@ -36,6 +38,20 @@ import org.slf4j.LoggerFactory; @Beta public abstract class ClientActorBehavior extends RecoveredClientActorBehavior implements Identifiable { + /** + * Connection reconnect cohort, driven by this class. + */ + @FunctionalInterface + protected interface ConnectionConnectCohort { + /** + * Finish the connection by replaying previous messages onto the new connection. + * + * @param enqueuedEntries Previously-enqueued entries + * @return A {@link ReconnectForwarder} to handle any straggler messages which arrive after this method returns. + */ + @Nonnull ReconnectForwarder finishReconnect(@Nonnull Iterable enqueuedEntries); + } + private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class); /** @@ -185,28 +201,42 @@ public abstract class ClientActorBehavior extends } /** - * Callback invoked when a new connection has been established. + * Callback invoked when a new connection has been established. Implementations are expected perform preparatory + * tasks before the previous connection is frozen. * - * @param conn Old connection - * @param backend New backend - * @return Newly-connected connection. + * @param newConn New connection + * @return ConnectionConnectCohort which will be used to complete the process of bringing the connection up. */ @GuardedBy("connectionsLock") - protected abstract @Nonnull ConnectedClientConnection connectionUp( - final @Nonnull AbstractClientConnection conn, final @Nonnull T backend); + @Nonnull protected abstract ConnectionConnectCohort connectionUp(@Nonnull ConnectedClientConnection newConn); private void backendConnectFinished(final Long shard, final AbstractClientConnection conn, final T backend, final Throwable failure) { if (failure != null) { LOG.error("{}: failed to resolve shard {}", persistenceId(), shard, failure); + conn.poison(new RuntimeRequestException("Failed to resolve shard " + shard, failure)); return; } LOG.debug("{}: resolved shard {} to {}", persistenceId(), shard, backend); final long stamp = connectionsLock.writeLock(); try { - // Bring the connection up - final ConnectedClientConnection newConn = connectionUp(conn, backend); + // Create a new connected connection + final ConnectedClientConnection newConn = new ConnectedClientConnection<>(conn.context(), + conn.cookie(), backend); + LOG.debug("{}: resolving connection {} to {}", persistenceId(), conn, newConn); + + // Start reconnecting without the old connection lock held + final ConnectionConnectCohort cohort = Verify.verifyNotNull(connectionUp(newConn)); + + // Lock the old connection and get a reference to its entries + final Iterable replayIterable = conn.startReplay(); + + // Finish the connection attempt + final ReconnectForwarder forwarder = Verify.verifyNotNull(cohort.finishReconnect(replayIterable)); + + // Install the forwarder, unlocking the old connection + conn.finishReplay(forwarder); // Make sure new lookups pick up the new connection connections.replace(shard, conn, newConn); diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java index 9dad825e7e..9198d8fe12 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java @@ -7,18 +7,13 @@ */ package org.opendaylight.controller.cluster.access.client; -import akka.actor.ActorRef; import com.google.common.annotations.Beta; -import java.util.AbstractMap.SimpleImmutableEntry; -import java.util.Map.Entry; import javax.annotation.concurrent.NotThreadSafe; -import org.opendaylight.controller.cluster.access.concepts.Request; -import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; @Beta @NotThreadSafe public final class ConnectedClientConnection extends AbstractReceivingClientConnection { - public ConnectedClientConnection(final ClientActorContext context, final Long cookie, final T backend) { + ConnectedClientConnection(final ClientActorContext context, final Long cookie, final T backend) { super(context, cookie, backend); } @@ -29,15 +24,4 @@ public final class ConnectedClientConnection extends Abst current.reconnectConnection(this, next); return current; } - - @Override - int remoteMaxMessages() { - return backend().getMaxMessages(); - } - - @Override - Entry prepareForTransmit(final Request req) { - return new SimpleImmutableEntry<>(backend().getActor(), new RequestEnvelope( - req.toVersion(backend().getVersion()), backend().getSessionId(), nextTxSequence())); - } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java index e28f9b35ed..64867e1c0e 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java @@ -7,18 +7,14 @@ */ package org.opendaylight.controller.cluster.access.client; -import akka.actor.ActorRef; import com.google.common.annotations.Beta; -import java.util.Map.Entry; import java.util.Optional; -import org.opendaylight.controller.cluster.access.concepts.Request; -import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; @Beta public final class ConnectingClientConnection extends AbstractClientConnection { // Initial state, never instantiated externally ConnectingClientConnection(final ClientActorContext context, final Long cookie) { - super(context, cookie); + super(context, cookie, new TransmitQueue.Halted()); } @Override @@ -30,15 +26,4 @@ public final class ConnectingClientConnection extends Abs ClientActorBehavior reconnectConnection(final ClientActorBehavior current) { throw new UnsupportedOperationException("Attempted to reconnect a connecting connection"); } - - @Override - Entry prepareForTransmit(final Request req) { - // This is guarded by remoteMaxMessages() == 0 - throw new UnsupportedOperationException("Attempted to transmit on a connecting connection"); - } - - @Override - int remoteMaxMessages() { - return 0; - } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectForwarder.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectForwarder.java index 37dc2f1c4d..25e5d6edfe 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectForwarder.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectForwarder.java @@ -33,7 +33,7 @@ public abstract class ReconnectForwarder { successor.sendRequest(request, callback); } - protected abstract void forwardEntry(ConnectionEntry entry); + protected abstract void forwardEntry(ConnectionEntry entry, long now); final AbstractReceivingClientConnection successor() { return successor; diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnection.java index e15a949600..a67b7ed3be 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnection.java @@ -7,10 +7,6 @@ */ package org.opendaylight.controller.cluster.access.client; -import akka.actor.ActorRef; -import java.util.Map.Entry; -import org.opendaylight.controller.cluster.access.concepts.Request; -import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,15 +30,4 @@ public final class ReconnectingClientConnection extends A LOG.debug("Skipping reconnect of already-reconnecting connection {}", this); return current; } - - @Override - Entry prepareForTransmit(final Request req) { - // This is guarded by remoteMaxMessages() == 0 - throw new UnsupportedOperationException("Attempted to transmit on a reconnecting connection"); - } - - @Override - int remoteMaxMessages() { - return 0; - } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java index a8ab7c4e23..d97ecd93f3 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java @@ -14,7 +14,7 @@ final class SimpleReconnectForwarder extends ReconnectForwarder { } @Override - protected void forwardEntry(final ConnectionEntry entry) { - successor().enqueueEntry(entry); + protected void forwardEntry(final ConnectionEntry entry, final long now) { + successor().enqueueEntry(entry, now); } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java new file mode 100644 index 0000000000..8690236aa6 --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java @@ -0,0 +1,260 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.client; + +import akka.actor.ActorRef; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import com.google.common.collect.Iterables; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayDeque; +import java.util.Iterator; +import java.util.Optional; +import java.util.Queue; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This queue is internally split into two queues for performance reasons, both memory efficiency and copy + * operations. + * + *

+ * Entries are always appended to the end, but then they are transmitted to the remote end and do not necessarily + * complete in the order in which they were sent -- hence the head of the queue does not increase linearly, + * but can involve spurious removals of non-head entries. + * + *

+ * For memory efficiency we want to pre-allocate both queues -- which points to ArrayDeque, but that is very + * inefficient when entries are removed from the middle. In the typical case we expect the number of in-flight + * entries to be an order of magnitude lower than the number of enqueued entries, hence the split. + * + *

+ * Note that in transient case of reconnect, when the backend gives us a lower number of maximum in-flight entries + * than the previous incarnation, we may end up still moving the pending queue -- but that is a very exceptional + * scenario, hence we consciously ignore it to keep the design relatively simple. + * + *

+ * This class is not thread-safe, as it is expected to be guarded by {@link AbstractClientConnection}. + * + * @author Robert Varga + */ +@NotThreadSafe +abstract class TransmitQueue { + static final class Halted extends TransmitQueue { + @Override + int canTransmitCount(final int inflightSize) { + return 0; + } + + @Override + TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) { + throw new UnsupportedOperationException("Attempted to transmit on a halted queue"); + } + } + + static final class Transmitting extends TransmitQueue { + private final BackendInfo backend; + private long nextTxSequence; + + Transmitting(final BackendInfo backend) { + this.backend = Preconditions.checkNotNull(backend); + } + + @Override + int canTransmitCount(final int inflightSize) { + return backend.getMaxMessages() - inflightSize; + } + + @Override + TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) { + final RequestEnvelope env = new RequestEnvelope(entry.getRequest().toVersion(backend.getVersion()), + backend.getSessionId(), nextTxSequence++); + + final TransmittedConnectionEntry ret = new TransmittedConnectionEntry(entry, env.getSessionId(), + env.getTxSequence(), now); + backend.getActor().tell(env, ActorRef.noSender()); + return ret; + } + } + + private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class); + + private final ArrayDeque inflight = new ArrayDeque<>(); + private final ArrayDeque pending = new ArrayDeque<>(); + + private ReconnectForwarder successor; + + final Iterable asIterable() { + return Iterables.concat(inflight, pending); + } + + private void recordCompletion(final long now, final long enqueuedTicks, final long transmitTicks, + final long execNanos) { + // TODO: record + } + + final void complete(final ResponseEnvelope envelope, final long now) { + Optional maybeEntry = findMatchingEntry(inflight, envelope); + if (maybeEntry == null) { + LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope); + maybeEntry = findMatchingEntry(pending, envelope); + } + + if (maybeEntry == null || !maybeEntry.isPresent()) { + LOG.warn("No request matching {} found, ignoring response", envelope); + return; + } + + final TransmittedConnectionEntry entry = maybeEntry.get(); + LOG.debug("Completing {} with {}", entry, envelope); + entry.complete(envelope.getMessage()); + + recordCompletion(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos()); + + // We have freed up a slot, try to transmit something + int toSend = canTransmitCount(inflight.size()); + while (toSend > 0) { + final ConnectionEntry e = pending.poll(); + if (e == null) { + break; + } + + LOG.debug("Transmitting entry {}", e); + transmit(e, now); + toSend--; + } + } + + final void enqueue(final ConnectionEntry entry, final long now) { + if (successor != null) { + successor.forwardEntry(entry, now); + return; + } + + if (canTransmitCount(inflight.size()) <= 0) { + LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest()); + pending.add(entry); + return; + } + + // We are not thread-safe and are supposed to be externally-guarded, hence send-before-record should be fine. + // This needs to be revisited if the external guards are lowered. + inflight.offer(transmit(entry, now)); + LOG.debug("Sent request {} on queue {}", entry.getRequest(), this); + } + + abstract int canTransmitCount(int inflightSize); + + abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now); + + final boolean isEmpty() { + return inflight.isEmpty() && pending.isEmpty(); + } + + final ConnectionEntry peek() { + final ConnectionEntry ret = inflight.peek(); + if (ret != null) { + return ret; + } + + return pending.peek(); + } + + final void poison(final RequestException cause) { + poisonQueue(inflight, cause); + poisonQueue(pending, cause); + } + + final void setForwarder(final ReconnectForwarder forwarder, final long now) { + Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this); + successor = Preconditions.checkNotNull(forwarder); + LOG.debug("Connection {} superseded by {}, splicing queue", this, successor); + + ConnectionEntry entry = inflight.poll(); + while (entry != null) { + successor.forwardEntry(entry, now); + entry = inflight.poll(); + } + + entry = pending.poll(); + while (entry != null) { + successor.forwardEntry(entry, now); + entry = pending.poll(); + } + } + + /* + * We are using tri-state return here to indicate one of three conditions: + * - if a matching entry is found, return an Optional containing it + * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null + * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional + */ + @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL", + justification = "Returning null Optional is documented in the API contract.") + private static Optional findMatchingEntry(final Queue queue, + final ResponseEnvelope envelope) { + // Try to find the request in a queue. Responses may legally come back in a different order, hence we need + // to use an iterator + final Iterator it = queue.iterator(); + while (it.hasNext()) { + final ConnectionEntry e = it.next(); + final Request request = e.getRequest(); + final Response response = envelope.getMessage(); + + // First check for matching target, or move to next entry + if (!request.getTarget().equals(response.getTarget())) { + continue; + } + + // Sanity-check logical sequence, ignore any out-of-order messages + if (request.getSequence() != response.getSequence()) { + LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope); + return Optional.empty(); + } + + // Check if the entry has (ever) been transmitted + if (!(e instanceof TransmittedConnectionEntry)) { + return Optional.empty(); + } + + final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e; + + // Now check session match + if (envelope.getSessionId() != te.getSessionId()) { + LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope); + return Optional.empty(); + } + if (envelope.getTxSequence() != te.getTxSequence()) { + LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope); + return Optional.empty(); + } + + LOG.debug("Completing request {} with {}", request, envelope); + it.remove(); + return Optional.of(te); + } + + return null; + } + + private static void poisonQueue(final Queue queue, final RequestException cause) { + for (ConnectionEntry e : queue) { + final Request request = e.getRequest(); + LOG.trace("Poisoning request {}", request, cause); + e.complete(request.toRequestFailure(cause)); + } + queue.clear(); + } + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java index 8ab58e410a..519763ac02 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java @@ -17,6 +17,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; +import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.client.InversibleLockException; import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; @@ -229,8 +230,8 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia } @Override - void replaySuccessfulRequests() { - proxy.replaySuccessfulRequests(); + void replaySuccessfulRequests(final Iterable previousEntries) { + proxy.replaySuccessfulRequests(previousEntries); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java index 5a34b3b77e..3dc4dbf146 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java @@ -16,8 +16,6 @@ import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; import org.opendaylight.controller.cluster.access.client.ClientActorBehavior; import org.opendaylight.controller.cluster.access.client.ClientActorContext; @@ -122,44 +120,35 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior connectionUp( - final AbstractClientConnection conn, final ShardBackendInfo backend) { - - // Step 0: create a new connected connection - final ConnectedClientConnection newConn = new ConnectedClientConnection<>(conn.context(), - conn.cookie(), backend); - - LOG.debug("{}: resolving connection {} to {}", persistenceId(), conn, newConn); - + protected final ConnectionConnectCohort connectionUp(final ConnectedClientConnection newConn) { + // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no + // further TransactionProxies can be created and we can safely traverse maps without risking + // missing an entry final Collection cohorts = new ArrayList<>(); - try { - // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no - // further TransactionProxies can be created and we can safely traverse maps without risking - // missing an entry - startReconnect(singleHistory, newConn, cohorts); - for (ClientLocalHistory h : histories.values()) { - startReconnect(h, newConn, cohorts); - } - - // Step 2: Collect previous successful requests from the cohorts. We do not want to expose - // the non-throttling interface to the connection, hence we use a wrapper consumer - for (HistoryReconnectCohort c : cohorts) { - c.replaySuccessfulRequests(); - } - - // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding - // requests will be immediately sent to it and requests being sent concurrently will get forwarded - // once they hit the new connection. - conn.setForwarder(BouncingReconnectForwarder.forCohorts(newConn, cohorts)); - } finally { - // Step 4: Complete switchover of the connection. The cohorts can resume normal operations. - for (HistoryReconnectCohort c : cohorts) { - c.close(); - } + startReconnect(singleHistory, newConn, cohorts); + for (ClientLocalHistory h : histories.values()) { + startReconnect(h, newConn, cohorts); } - return newConn; + return previousEntries -> { + try { + // Step 2: Collect previous successful requests from the cohorts. We do not want to expose + // the non-throttling interface to the connection, hence we use a wrapper consumer + for (HistoryReconnectCohort c : cohorts) { + c.replaySuccessfulRequests(previousEntries); + } + + // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding + // requests will be immediately sent to it and requests being sent concurrently will get + // forwarded once they hit the new connection. + return BouncingReconnectForwarder.forCohorts(newConn, cohorts); + } finally { + // Step 4: Complete switchover of the connection. The cohorts can resume normal operations. + for (HistoryReconnectCohort c : cohorts) { + c.close(); + } + } + }; } private static void startReconnect(final AbstractClientHistory history, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index 7f5bec1ff6..36f9a4bccb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -17,12 +17,16 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayDeque; import java.util.Deque; +import java.util.Iterator; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; @@ -31,7 +35,7 @@ import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRe import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; -import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; @@ -73,26 +77,81 @@ abstract class AbstractProxyTransaction implements Identifiable SEALED_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(AbstractProxyTransaction.class, "sealed"); + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(AbstractProxyTransaction.class, State.class, "state"); + private static final State OPEN = new State("open"); + private static final State SEALED = new State("sealed"); + private static final State FLUSHED = new State("flushed"); + + // Touched from client actor thread only private final Deque successfulRequests = new ArrayDeque<>(); private final ProxyHistory parent; + // Accessed from user thread only, which may not access this object concurrently + private long sequence; + /* * Atomic state-keeping is required to synchronize the process of propagating completed transaction state towards * the backend -- which may include a successor. @@ -105,17 +164,20 @@ abstract class AbstractProxyTransaction implements Identifiable req) { @@ -268,15 +317,11 @@ abstract class AbstractProxyTransaction implements Identifiable directCommit() { - final CountDownLatch localLatch; + checkSealed(); + // Precludes startReconnect() from interfering with the fast path synchronized (this) { - final SealState local = checkSealed(); - - // Fast path: no successor asserted - if (successor == null) { - Verify.verify(local == SealState.SEALED); - + if (STATE_UPDATER.compareAndSet(this, SEALED, FLUSHED)) { final SettableFuture ret = SettableFuture.create(); sendRequest(Verify.verifyNotNull(commitRequest(false)), t -> { if (t instanceof TransactionCommitSuccess) { @@ -292,44 +337,22 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { - final CountDownLatch localLatch; + checkSealed(); + // Precludes startReconnect() from interfering with the fast path synchronized (this) { - final SealState local = checkSealed(); - - // Fast path: no successor asserted - if (successor == null) { - Verify.verify(local == SealState.SEALED); - + if (STATE_UPDATER.compareAndSet(this, SEALED, FLUSHED)) { final TransactionRequest req = Verify.verifyNotNull(commitRequest(true)); + sendRequest(req, t -> { if (t instanceof TransactionCanCommitSuccess) { ret.voteYes(); @@ -343,31 +366,16 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { @@ -389,7 +397,7 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { + final void doCommit(final VotingFuture ret) { checkSealed(); sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> { @@ -406,13 +414,33 @@ abstract class AbstractProxyTransaction implements Identifiable enqueuedEntries) { + final SuccessorState local = getSuccessorState(); + local.setSuccessor(successor); + // Replay successful requests first for (Object obj : successfulRequests) { if (obj instanceof TransactionRequest) { - LOG.debug("Forwarding request {} to successor {}", obj, successor); + LOG.debug("Forwarding successful request {} to successor {}", obj, successor); successor.handleForwardedRemoteRequest((TransactionRequest) obj, null); } else { Verify.verify(obj instanceof IncrementSequence); @@ -422,28 +450,40 @@ abstract class AbstractProxyTransaction implements Identifiable it = enqueuedEntries.iterator(); + while (it.hasNext()) { + final ConnectionEntry e = it.next(); + final Request req = e.getRequest(); + + if (getIdentifier().equals(req.getTarget())) { + Verify.verify(req instanceof TransactionRequest, "Unhandled request %s", req); + LOG.debug("Forwarding queued request{} to successor {}", req, successor); + successor.handleForwardedRemoteRequest((TransactionRequest) req, e.getCallback()); + it.remove(); + } + } + /* - * Before releasing the lock we need to make sure that a call to seal() blocks until we have completed - * finishConnect(). + * Check the state at which we have started the reconnect attempt. State transitions triggered while we were + * reconnecting have been forced to slow paths, which will be unlocked once we unblock the state latch + * at the end of this method. */ - successorLatch = new CountDownLatch(1); - } - - final synchronized void finishReconnect() { - Preconditions.checkState(successorLatch != null); - - if (sealed == SealState.SEALED) { - /* - * If this proxy is in the 'sealed, have not sent canCommit' state. If so, we need to forward current - * leftover state to the successor now. - */ + final State prevState = local.getPrevState(); + if (SEALED.equals(prevState)) { + LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor); flushState(successor); successor.seal(); - sealed = SealState.FLUSHED; } + } + + // Called with the connection locked + final void finishReconnect() { + final SuccessorState local = getSuccessorState(); + LOG.debug("Finishing reconnect of proxy {}", this); - // All done, release the latch, unblocking seal() and canCommit() - successorLatch.countDown(); + // All done, release the latch, unblocking seal() and canCommit() slow paths + local.finish(); } /** @@ -452,11 +492,9 @@ abstract class AbstractProxyTransaction implements Identifiable request, - final Consumer> callback) { - Preconditions.checkState(successor != null, "%s does not have a successor set", this); + final void replayRequest(final TransactionRequest request, final Consumer> callback) { + final AbstractProxyTransaction successor = getSuccessorState().getSuccessor(); if (successor instanceof LocalProxyTransaction) { forwardToLocal((LocalProxyTransaction)successor, request, callback); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java index 3136023204..3fe6a09bf6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java @@ -57,7 +57,7 @@ final class BouncingReconnectForwarder extends ReconnectForwarder { @Override - protected void forwardEntry(final ConnectionEntry entry) { + protected void forwardEntry(final ConnectionEntry entry, final long now) { final Request request = entry.getRequest(); final LocalHistoryIdentifier historyId; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/HistoryReconnectCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/HistoryReconnectCohort.java index ab961f1045..7e6ff671a4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/HistoryReconnectCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/HistoryReconnectCohort.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import org.opendaylight.controller.cluster.access.client.ConnectionEntry; + /** * Interface exposed by {@link AbstractClientHistory} to {@link DistributedDataStoreClientBehavior} for the sole * purpose of performing a connection switchover. @@ -16,7 +18,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; abstract class HistoryReconnectCohort implements AutoCloseable { abstract ProxyReconnectCohort getProxy(); - abstract void replaySuccessfulRequests(); + abstract void replaySuccessfulRequests(Iterable previousEntries); @Override public abstract void close(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index 07fcbebad0..b3b604b7f0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -22,6 +22,8 @@ import java.util.function.Consumer; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; +import org.opendaylight.controller.cluster.access.client.ConnectionEntry; +import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; @@ -195,12 +197,33 @@ abstract class ProxyHistory implements Identifiable { @GuardedBy("lock") @Override - void replaySuccessfulRequests() { + void replaySuccessfulRequests(final Iterable previousEntries) { + // First look for our Create message + for (ConnectionEntry e : previousEntries) { + final Request req = e.getRequest(); + if (identifier.equals(req.getTarget())) { + Verify.verify(req instanceof LocalHistoryRequest); + if (req instanceof CreateLocalHistoryRequest) { + successor.connection.sendRequest(req, e.getCallback()); + break; + } + } + } + for (AbstractProxyTransaction t : proxies.values()) { LOG.debug("{} creating successor transaction proxy for {}", identifier, t); final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier()); LOG.debug("{} created successor transaction proxy {}", identifier, newProxy); - t.startReconnect(newProxy); + t.replayMessages(newProxy, previousEntries); + } + + // Now look for any finalizing messages + for (ConnectionEntry e : previousEntries) { + final Request req = e.getRequest(); + if (identifier.equals(req.getTarget())) { + Verify.verify(req instanceof LocalHistoryRequest); + successor.connection.sendRequest(req, e.getCallback()); + } } } @@ -347,6 +370,11 @@ abstract class ProxyHistory implements Identifiable { successor = createSuccessor(newConnection); LOG.debug("History {} instantiated successor {}", this, successor); + + for (AbstractProxyTransaction t : proxies.values()) { + t.startReconnect(); + } + return new ReconnectCohort(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java index 7c37d9d4a0..2f97f901ff 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import java.util.function.BiConsumer; import java.util.function.Consumer; +import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestException; @@ -17,7 +18,7 @@ import org.opendaylight.yangtools.concepts.Identifiable; abstract class ProxyReconnectCohort implements Identifiable { - abstract void replaySuccessfulRequests(); + abstract void replaySuccessfulRequests(Iterable previousEntries); abstract ProxyHistory finishReconnect();