BUG-5280: expose queue messages during reconnect 80/48980/11
authorRobert Varga <rovarga@cisco.com>
Thu, 1 Dec 2016 15:28:53 +0000 (16:28 +0100)
committerRobert Varga <rovarga@cisco.com>
Mon, 12 Dec 2016 16:44:35 +0000 (17:44 +0100)
This patch reworks the internals of AbstractClientConnection
to isolate the TransmitQueue from the rest of the logic,
so we have proper split between implementation and interface
exposed to the users.

Furthermore the public interface is slightly reworked so the
individual Proxies have access to the (locked) queue contents,
which is needed to correctly replay transaction state within
transaction chains.

Change-Id: I1c08fa06eec4dd581e07002059c5142e6b0c1ed4
Signed-off-by: Robert Varga <rovarga@cisco.com>
16 files changed:
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectForwarder.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/HistoryReconnectCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java

index bade34c..0366e7a 100644 (file)
@@ -10,20 +10,16 @@ package org.opendaylight.controller.cluster.access.client;
 import akka.actor.ActorRef;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.Queue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.Request;
-import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
@@ -48,33 +44,30 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     @VisibleForTesting
     static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
 
-    private final Queue<TransmittedConnectionEntry> inflight;
-    private final Queue<ConnectionEntry> pending;
-
+    private final Lock lock = new ReentrantLock();
     private final ClientActorContext context;
+    @GuardedBy("lock")
+    private final TransmitQueue queue;
     private final Long cookie;
 
-    private volatile ReconnectForwarder successor;
     private volatile RequestException poisoned;
     private long lastProgress;
 
-    private AbstractClientConnection(final ClientActorContext context, final Long cookie,
-            final Queue<TransmittedConnectionEntry> inflight, final Queue<ConnectionEntry> pending) {
+    // Do not allow subclassing outside of this package
+    AbstractClientConnection(final ClientActorContext context, final Long cookie,
+            final TransmitQueue queue) {
         this.context = Preconditions.checkNotNull(context);
         this.cookie = Preconditions.checkNotNull(cookie);
-        this.inflight = Preconditions.checkNotNull(inflight);
-        this.pending = Preconditions.checkNotNull(pending);
+        this.queue = Preconditions.checkNotNull(queue);
         this.lastProgress = readTime();
     }
 
-    // Do not allow subclassing outside of this package
-    AbstractClientConnection(final ClientActorContext context, final Long cookie) {
-        this(context, cookie, new ArrayDeque<>(), new ArrayDeque<>(1));
-    }
-
     // Do not allow subclassing outside of this package
     AbstractClientConnection(final AbstractClientConnection<T> oldConnection) {
-        this(oldConnection.context, oldConnection.cookie, oldConnection.inflight, oldConnection.pending);
+        this.context = oldConnection.context;
+        this.cookie = oldConnection.cookie;
+        this.lastProgress = oldConnection.lastProgress;
+        this.queue = new TransmitQueue.Halted();
     }
 
     public final ClientActorContext context() {
@@ -97,83 +90,52 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
      * @param callback Callback to invoke
      */
     public final void sendRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback) {
-        Preconditions.checkState(poisoned == null, "Connection %s has been poisoned", this);
-
-        final ReconnectForwarder beforeQueue = successor;
-        final ConnectionEntry entry = new ConnectionEntry(request, callback, readTime());
-        if (beforeQueue != null) {
-            LOG.trace("Forwarding entry {} from {} to {}", entry, this, beforeQueue);
-            beforeQueue.forwardEntry(entry);
-            return;
+        final RequestException maybePoison = poisoned;
+        if (maybePoison != null) {
+            throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
         }
 
-        enqueueEntry(entry);
+        final ConnectionEntry entry = new ConnectionEntry(request, callback, readTime());
 
-        final ReconnectForwarder afterQueue = successor;
-        if (afterQueue != null) {
-            synchronized (this) {
-                spliceToSuccessor(afterQueue);
-            }
+        lock.lock();
+        try {
+            queue.enqueue(entry, entry.getEnqueuedTicks());
+        } finally {
+            lock.unlock();
         }
     }
 
-    public final synchronized void setForwarder(final ReconnectForwarder forwarder) {
-        Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
-        successor = Preconditions.checkNotNull(forwarder);
-        LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
-        spliceToSuccessor(forwarder);
-    }
-
     public abstract Optional<T> getBackendInfo();
 
-    abstract ClientActorBehavior<T> reconnectConnection(ClientActorBehavior<T> current);
-
-    abstract int remoteMaxMessages();
-
-    abstract Entry<ActorRef, RequestEnvelope> prepareForTransmit(Request<?, ?> req);
-
-    @GuardedBy("this")
-    final void spliceToSuccessor(final ReconnectForwarder successor) {
-        ConnectionEntry entry = inflight.poll();
-        while (entry != null) {
-            successor.forwardEntry(entry);
-            entry = inflight.poll();
-        }
-
-        entry = pending.poll();
-        while (entry != null) {
-            successor.forwardEntry(entry);
-            entry = pending.poll();
-        }
+    final Iterable<ConnectionEntry> startReplay() {
+        lock.lock();
+        return queue.asIterable();
     }
 
-    private long readTime() {
-        return context.ticker().read();
+    @GuardedBy("lock")
+    final void finishReplay(final ReconnectForwarder forwarder) {
+        queue.setForwarder(forwarder, readTime());
+        lock.unlock();
     }
 
-    private void transmit(final ConnectionEntry entry) {
-        final Entry<ActorRef, RequestEnvelope> tuple = prepareForTransmit(entry.getRequest());
-        final RequestEnvelope req = tuple.getValue();
+    @GuardedBy("lock")
+    final void setForwarder(final ReconnectForwarder forwarder) {
+        queue.setForwarder(forwarder, readTime());
+    }
 
-        // We need to enqueue the request before we send it to the actor, as we may be executing on a different thread
-        // than the client actor thread, in which case the round-trip could be made faster than we can enqueue --
-        // in which case the receive routine would not find the entry.
-        final TransmittedConnectionEntry txEntry = new TransmittedConnectionEntry(entry, req.getSessionId(),
-            req.getTxSequence(), readTime());
-        inflight.add(txEntry);
+    @GuardedBy("lock")
+    abstract ClientActorBehavior<T> reconnectConnection(ClientActorBehavior<T> current);
 
-        final ActorRef actor = tuple.getKey();
-        LOG.trace("Transmitting request {} as {} to {}", entry.getRequest(), req, actor);
-        actor.tell(req, ActorRef.noSender());
+    private long readTime() {
+        return context.ticker().read();
     }
 
-    final void enqueueEntry(final ConnectionEntry entry) {
-        if (inflight.size() < remoteMaxMessages()) {
-            transmit(entry);
-            LOG.debug("Enqueued request {} to queue {}", entry.getRequest(), this);
-        } else {
-            LOG.debug("Queue is at capacity, delayed sending of request {}", entry.getRequest());
-            pending.add(entry);
+    final void enqueueEntry(final ConnectionEntry entry, final long now) {
+        lock.lock();
+        try {
+            queue.enqueue(entry, now);
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -196,27 +158,33 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
      */
     @VisibleForTesting
     final ClientActorBehavior<T> runTimer(final ClientActorBehavior<T> current) {
-        final long now = readTime();
-
-        if (!inflight.isEmpty() || !pending.isEmpty()) {
-            final long ticksSinceProgress = now - lastProgress;
-            if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
-                LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
-                    TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
-
-                poison(new NoProgressException(ticksSinceProgress));
-                current.removeConnection(this);
-                return current;
+        final Optional<FiniteDuration> delay;
+
+        lock.lock();
+        try {
+            final long now = readTime();
+            if (!queue.isEmpty()) {
+                final long ticksSinceProgress = now - lastProgress;
+                if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
+                    LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
+                        TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
+
+                    lockedPoison(new NoProgressException(ticksSinceProgress));
+                    current.removeConnection(this);
+                    return current;
+                }
             }
-        }
 
-        // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward.
-        // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual tri-state
-        // return convention.
-        final Optional<FiniteDuration> delay = checkTimeout(now);
-        if (delay == null) {
-            // We have timed out. There is no point in scheduling a timer
-            return reconnectConnection(current);
+            // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward.
+            // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual tri-state
+            // return convention.
+            delay = lockedCheckTimeout(now);
+            if (delay == null) {
+                // We have timed out. There is no point in scheduling a timer
+                return reconnectConnection(current);
+            }
+        } finally {
+            lock.unlock();
         }
 
         if (delay.isPresent()) {
@@ -227,6 +195,16 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         return current;
     }
 
+    @VisibleForTesting
+    final Optional<FiniteDuration> checkTimeout(final long now) {
+        lock.lock();
+        try {
+            return lockedCheckTimeout(now);
+        } finally {
+            lock.unlock();
+        }
+    }
+
     /*
      * We are using tri-state return here to indicate one of three conditions:
      * - if there is no timeout to schedule, return Optional.empty()
@@ -235,7 +213,9 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
      */
     @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
             justification = "Returning null Optional is documented in the API contract.")
-    private Optional<FiniteDuration> checkTimeout(final ConnectionEntry head, final long now) {
+    @GuardedBy("lock")
+    private Optional<FiniteDuration> lockedCheckTimeout(final long now) {
+        final ConnectionEntry head = queue.peek();
         if (head == null) {
             return Optional.empty();
         }
@@ -249,39 +229,19 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         return Optional.of(FiniteDuration.apply(delay, TimeUnit.NANOSECONDS));
     }
 
-    /*
-     * We are using tri-state return here to indicate one of three conditions:
-     * - if there is no timeout to schedule, return Optional.empty()
-     * - if there is a timeout to schedule, return a non-empty optional
-     * - if this connections has timed out, return null
-     */
-    @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
-            justification = "Returning null Optional is documented in the API contract.")
-    @VisibleForTesting
-    final Optional<FiniteDuration> checkTimeout(final long now) {
-        final Optional<FiniteDuration> xmit = checkTimeout(inflight.peek(), now);
-        if (xmit == null) {
-            return null;
-        }
-        final Optional<FiniteDuration> pend = checkTimeout(pending.peek(), now);
-        if (pend == null) {
-            return null;
-        }
-        if (!xmit.isPresent()) {
-            return pend;
-        }
-        if (!pend.isPresent()) {
-            return xmit;
+    final void poison(final RequestException cause) {
+        lock.lock();
+        try {
+            lockedPoison(cause);
+        } finally {
+            lock.unlock();
         }
-
-        return Optional.of(xmit.get().min(pend.get()));
     }
 
-    final void poison(final RequestException cause) {
+    @GuardedBy("lock")
+    private void lockedPoison(final RequestException cause) {
         poisoned = cause;
-
-        poisonQueue(inflight, cause);
-        poisonQueue(pending, cause);
+        queue.poison(cause);
     }
 
     @VisibleForTesting
@@ -290,97 +250,15 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     }
 
     final void receiveResponse(final ResponseEnvelope<?> envelope) {
-        Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
-        if (maybeEntry == null) {
-            LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
-            maybeEntry = findMatchingEntry(pending, envelope);
-        }
-
-        if (maybeEntry == null || !maybeEntry.isPresent()) {
-            LOG.warn("No request matching {} found, ignoring response", envelope);
-            return;
-        }
-
-        final TransmittedConnectionEntry entry = maybeEntry.get();
-        LOG.debug("Completing {} with {}", entry, envelope);
-        entry.complete(envelope.getMessage());
-
-        // We have freed up a slot, try to transmit something
-        int toSend = remoteMaxMessages() - inflight.size();
-        while (toSend > 0) {
-            final ConnectionEntry e = pending.poll();
-            if (e == null) {
-                break;
-            }
+        final long now = readTime();
 
-            LOG.debug("Transmitting entry {}", e);
-            transmit(e);
-            toSend--;
+        lock.lock();
+        try {
+            queue.complete(envelope, now);
+        } finally {
+            lock.unlock();
         }
 
         lastProgress = readTime();
     }
-
-    private static void poisonQueue(final Queue<? extends ConnectionEntry> queue, final RequestException cause) {
-        for (ConnectionEntry e : queue) {
-            final Request<?, ?> request = e.getRequest();
-            LOG.trace("Poisoning request {}", request, cause);
-            e.complete(request.toRequestFailure(cause));
-        }
-        queue.clear();
-    }
-
-    /*
-     * We are using tri-state return here to indicate one of three conditions:
-     * - if a matching entry is found, return an Optional containing it
-     * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
-     * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
-     */
-    @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
-            justification = "Returning null Optional is documented in the API contract.")
-    private static Optional<TransmittedConnectionEntry> findMatchingEntry(final Queue<? extends ConnectionEntry> queue,
-            final ResponseEnvelope<?> envelope) {
-        // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
-        // to use an iterator
-        final Iterator<? extends ConnectionEntry> it = queue.iterator();
-        while (it.hasNext()) {
-            final ConnectionEntry e = it.next();
-            final Request<?, ?> request = e.getRequest();
-            final Response<?, ?> response = envelope.getMessage();
-
-            // First check for matching target, or move to next entry
-            if (!request.getTarget().equals(response.getTarget())) {
-                continue;
-            }
-
-            // Sanity-check logical sequence, ignore any out-of-order messages
-            if (request.getSequence() != response.getSequence()) {
-                LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
-                return Optional.empty();
-            }
-
-            // Check if the entry has (ever) been transmitted
-            if (!(e instanceof TransmittedConnectionEntry)) {
-                return Optional.empty();
-            }
-
-            final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e;
-
-            // Now check session match
-            if (envelope.getSessionId() != te.getSessionId()) {
-                LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope);
-                return Optional.empty();
-            }
-            if (envelope.getTxSequence() != te.getTxSequence()) {
-                LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope);
-                return Optional.empty();
-            }
-
-            LOG.debug("Completing request {} with {}", request, envelope);
-            it.remove();
-            return Optional.of(te);
-        }
-
-        return null;
-    }
 }
index 8646bfc..15da294 100644 (file)
@@ -20,17 +20,15 @@ import java.util.Optional;
  */
 abstract class AbstractReceivingClientConnection<T extends BackendInfo> extends AbstractClientConnection<T> {
     private final T backend;
-    private long nextTxSequence;
 
     AbstractReceivingClientConnection(final ClientActorContext context, final Long cookie, final T backend) {
-        super(context, cookie);
+        super(context, cookie, new TransmitQueue.Transmitting(backend));
         this.backend = Preconditions.checkNotNull(backend);
     }
 
     AbstractReceivingClientConnection(final AbstractReceivingClientConnection<T> oldConnection) {
         super(oldConnection);
         this.backend = oldConnection.backend;
-        this.nextTxSequence = oldConnection.nextTxSequence;
     }
 
     @Override
@@ -41,8 +39,4 @@ abstract class AbstractReceivingClientConnection<T extends BackendInfo> extends
     final T backend() {
         return backend;
     }
-
-    final long nextTxSequence() {
-        return nextTxSequence++;
-    }
 }
index ddb7bcd..45580e9 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.access.client;
 
 import com.google.common.annotations.Beta;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nonnull;
@@ -21,6 +22,7 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
@@ -36,6 +38,20 @@ import org.slf4j.LoggerFactory;
 @Beta
 public abstract class ClientActorBehavior<T extends BackendInfo> extends
         RecoveredClientActorBehavior<ClientActorContext> implements Identifiable<ClientIdentifier> {
+    /**
+     * Connection reconnect cohort, driven by this class.
+     */
+    @FunctionalInterface
+    protected interface ConnectionConnectCohort {
+        /**
+         * Finish the connection by replaying previous messages onto the new connection.
+         *
+         * @param enqueuedEntries Previously-enqueued entries
+         * @return A {@link ReconnectForwarder} to handle any straggler messages which arrive after this method returns.
+         */
+        @Nonnull ReconnectForwarder finishReconnect(@Nonnull Iterable<ConnectionEntry> enqueuedEntries);
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class);
 
     /**
@@ -185,28 +201,42 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
     }
 
     /**
-     * Callback invoked when a new connection has been established.
+     * Callback invoked when a new connection has been established. Implementations are expected perform preparatory
+     * tasks before the previous connection is frozen.
      *
-     * @param conn Old connection
-     * @param backend New backend
-     * @return Newly-connected connection.
+     * @param newConn New connection
+     * @return ConnectionConnectCohort which will be used to complete the process of bringing the connection up.
      */
     @GuardedBy("connectionsLock")
-    protected abstract @Nonnull ConnectedClientConnection<T> connectionUp(
-            final @Nonnull AbstractClientConnection<T> conn, final @Nonnull T backend);
+    @Nonnull protected abstract ConnectionConnectCohort connectionUp(@Nonnull ConnectedClientConnection<T> newConn);
 
     private void backendConnectFinished(final Long shard, final AbstractClientConnection<T> conn,
             final T backend, final Throwable failure) {
         if (failure != null) {
             LOG.error("{}: failed to resolve shard {}", persistenceId(), shard, failure);
+            conn.poison(new RuntimeRequestException("Failed to resolve shard " + shard, failure));
             return;
         }
 
         LOG.debug("{}: resolved shard {} to {}", persistenceId(), shard, backend);
         final long stamp = connectionsLock.writeLock();
         try {
-            // Bring the connection up
-            final ConnectedClientConnection<T> newConn = connectionUp(conn, backend);
+            // Create a new connected connection
+            final ConnectedClientConnection<T> newConn = new ConnectedClientConnection<>(conn.context(),
+                    conn.cookie(), backend);
+            LOG.debug("{}: resolving connection {} to {}", persistenceId(), conn, newConn);
+
+            // Start reconnecting without the old connection lock held
+            final ConnectionConnectCohort cohort = Verify.verifyNotNull(connectionUp(newConn));
+
+            // Lock the old connection and get a reference to its entries
+            final Iterable<ConnectionEntry> replayIterable = conn.startReplay();
+
+            // Finish the connection attempt
+            final ReconnectForwarder forwarder = Verify.verifyNotNull(cohort.finishReconnect(replayIterable));
+
+            // Install the forwarder, unlocking the old connection
+            conn.finishReplay(forwarder);
 
             // Make sure new lookups pick up the new connection
             connections.replace(shard, conn, newConn);
index 9dad825..9198d8f 100644 (file)
@@ -7,18 +7,13 @@
  */
 package org.opendaylight.controller.cluster.access.client;
 
-import akka.actor.ActorRef;
 import com.google.common.annotations.Beta;
-import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.Map.Entry;
 import javax.annotation.concurrent.NotThreadSafe;
-import org.opendaylight.controller.cluster.access.concepts.Request;
-import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 
 @Beta
 @NotThreadSafe
 public final class ConnectedClientConnection<T extends BackendInfo> extends AbstractReceivingClientConnection<T> {
-    public ConnectedClientConnection(final ClientActorContext context, final Long cookie, final T backend) {
+    ConnectedClientConnection(final ClientActorContext context, final Long cookie, final T backend) {
         super(context, cookie, backend);
     }
 
@@ -29,15 +24,4 @@ public final class ConnectedClientConnection<T extends BackendInfo> extends Abst
         current.reconnectConnection(this, next);
         return current;
     }
-
-    @Override
-    int remoteMaxMessages() {
-        return backend().getMaxMessages();
-    }
-
-    @Override
-    Entry<ActorRef, RequestEnvelope> prepareForTransmit(final Request<?, ?> req) {
-        return new SimpleImmutableEntry<>(backend().getActor(), new RequestEnvelope(
-            req.toVersion(backend().getVersion()), backend().getSessionId(), nextTxSequence()));
-    }
 }
index e28f9b3..64867e1 100644 (file)
@@ -7,18 +7,14 @@
  */
 package org.opendaylight.controller.cluster.access.client;
 
-import akka.actor.ActorRef;
 import com.google.common.annotations.Beta;
-import java.util.Map.Entry;
 import java.util.Optional;
-import org.opendaylight.controller.cluster.access.concepts.Request;
-import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 
 @Beta
 public final class ConnectingClientConnection<T extends BackendInfo> extends AbstractClientConnection<T> {
     // Initial state, never instantiated externally
     ConnectingClientConnection(final ClientActorContext context, final Long cookie) {
-        super(context, cookie);
+        super(context, cookie, new TransmitQueue.Halted());
     }
 
     @Override
@@ -30,15 +26,4 @@ public final class ConnectingClientConnection<T extends BackendInfo> extends Abs
     ClientActorBehavior<T> reconnectConnection(final ClientActorBehavior<T> current) {
         throw new UnsupportedOperationException("Attempted to reconnect a connecting connection");
     }
-
-    @Override
-    Entry<ActorRef, RequestEnvelope> prepareForTransmit(final Request<?, ?> req) {
-        // This is guarded by remoteMaxMessages() == 0
-        throw new UnsupportedOperationException("Attempted to transmit on a connecting connection");
-    }
-
-    @Override
-    int remoteMaxMessages() {
-        return 0;
-    }
 }
index 37dc2f1..25e5d6e 100644 (file)
@@ -33,7 +33,7 @@ public abstract class ReconnectForwarder {
         successor.sendRequest(request, callback);
     }
 
-    protected abstract void forwardEntry(ConnectionEntry entry);
+    protected abstract void forwardEntry(ConnectionEntry entry, long now);
 
     final AbstractReceivingClientConnection<?> successor() {
         return successor;
index e15a949..a67b7ed 100644 (file)
@@ -7,10 +7,6 @@
  */
 package org.opendaylight.controller.cluster.access.client;
 
-import akka.actor.ActorRef;
-import java.util.Map.Entry;
-import org.opendaylight.controller.cluster.access.concepts.Request;
-import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,15 +30,4 @@ public final class ReconnectingClientConnection<T extends BackendInfo> extends A
         LOG.debug("Skipping reconnect of already-reconnecting connection {}", this);
         return current;
     }
-
-    @Override
-    Entry<ActorRef, RequestEnvelope> prepareForTransmit(final Request<?, ?> req) {
-        // This is guarded by remoteMaxMessages() == 0
-        throw new UnsupportedOperationException("Attempted to transmit on a reconnecting connection");
-    }
-
-    @Override
-    int remoteMaxMessages() {
-        return 0;
-    }
 }
index a8ab7c4..d97ecd9 100644 (file)
@@ -14,7 +14,7 @@ final class SimpleReconnectForwarder extends ReconnectForwarder {
     }
 
     @Override
-    protected void forwardEntry(final ConnectionEntry entry) {
-        successor().enqueueEntry(entry);
+    protected void forwardEntry(final ConnectionEntry entry, final long now) {
+        successor().enqueueEntry(entry, now);
     }
 }
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java
new file mode 100644 (file)
index 0000000..8690236
--- /dev/null
@@ -0,0 +1,260 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.Iterables;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Queue;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This queue is internally split into two queues for performance reasons, both memory efficiency and copy
+ * operations.
+ *
+ * <p>
+ * Entries are always appended to the end, but then they are transmitted to the remote end and do not necessarily
+ * complete in the order in which they were sent -- hence the head of the queue does not increase linearly,
+ * but can involve spurious removals of non-head entries.
+ *
+ * <p>
+ * For memory efficiency we want to pre-allocate both queues -- which points to ArrayDeque, but that is very
+ * inefficient when entries are removed from the middle. In the typical case we expect the number of in-flight
+ * entries to be an order of magnitude lower than the number of enqueued entries, hence the split.
+ *
+ * <p>
+ * Note that in transient case of reconnect, when the backend gives us a lower number of maximum in-flight entries
+ * than the previous incarnation, we may end up still moving the pending queue -- but that is a very exceptional
+ * scenario, hence we consciously ignore it to keep the design relatively simple.
+ *
+ * <p>
+ * This class is not thread-safe, as it is expected to be guarded by {@link AbstractClientConnection}.
+ *
+ * @author Robert Varga
+ */
+@NotThreadSafe
+abstract class TransmitQueue {
+    static final class Halted extends TransmitQueue {
+        @Override
+        int canTransmitCount(final int inflightSize) {
+            return 0;
+        }
+
+        @Override
+        TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
+            throw new UnsupportedOperationException("Attempted to transmit on a halted queue");
+        }
+    }
+
+    static final class Transmitting extends TransmitQueue {
+        private final BackendInfo backend;
+        private long nextTxSequence;
+
+        Transmitting(final BackendInfo backend) {
+            this.backend = Preconditions.checkNotNull(backend);
+        }
+
+        @Override
+        int canTransmitCount(final int inflightSize) {
+            return backend.getMaxMessages() - inflightSize;
+        }
+
+        @Override
+        TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
+            final RequestEnvelope env = new RequestEnvelope(entry.getRequest().toVersion(backend.getVersion()),
+                backend.getSessionId(), nextTxSequence++);
+
+            final TransmittedConnectionEntry ret = new TransmittedConnectionEntry(entry, env.getSessionId(),
+                env.getTxSequence(), now);
+            backend.getActor().tell(env, ActorRef.noSender());
+            return ret;
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class);
+
+    private final ArrayDeque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
+    private final ArrayDeque<ConnectionEntry> pending = new ArrayDeque<>();
+
+    private ReconnectForwarder successor;
+
+    final Iterable<ConnectionEntry> asIterable() {
+        return Iterables.concat(inflight, pending);
+    }
+
+    private void recordCompletion(final long now, final long enqueuedTicks, final long transmitTicks,
+            final long execNanos) {
+        // TODO: record
+    }
+
+    final void complete(final ResponseEnvelope<?> envelope, final long now) {
+        Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
+        if (maybeEntry == null) {
+            LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
+            maybeEntry = findMatchingEntry(pending, envelope);
+        }
+
+        if (maybeEntry == null || !maybeEntry.isPresent()) {
+            LOG.warn("No request matching {} found, ignoring response", envelope);
+            return;
+        }
+
+        final TransmittedConnectionEntry entry = maybeEntry.get();
+        LOG.debug("Completing {} with {}", entry, envelope);
+        entry.complete(envelope.getMessage());
+
+        recordCompletion(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
+
+        // We have freed up a slot, try to transmit something
+        int toSend = canTransmitCount(inflight.size());
+        while (toSend > 0) {
+            final ConnectionEntry e = pending.poll();
+            if (e == null) {
+                break;
+            }
+
+            LOG.debug("Transmitting entry {}", e);
+            transmit(e, now);
+            toSend--;
+        }
+    }
+
+    final void enqueue(final ConnectionEntry entry, final long now) {
+        if (successor != null) {
+            successor.forwardEntry(entry, now);
+            return;
+        }
+
+        if (canTransmitCount(inflight.size()) <= 0) {
+            LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
+            pending.add(entry);
+            return;
+        }
+
+        // We are not thread-safe and are supposed to be externally-guarded, hence send-before-record should be fine.
+        // This needs to be revisited if the external guards are lowered.
+        inflight.offer(transmit(entry, now));
+        LOG.debug("Sent request {} on queue {}", entry.getRequest(), this);
+    }
+
+    abstract int canTransmitCount(int inflightSize);
+
+    abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now);
+
+    final boolean isEmpty() {
+        return inflight.isEmpty() && pending.isEmpty();
+    }
+
+    final ConnectionEntry peek() {
+        final ConnectionEntry ret = inflight.peek();
+        if (ret != null) {
+            return ret;
+        }
+
+        return pending.peek();
+    }
+
+    final void poison(final RequestException cause) {
+        poisonQueue(inflight, cause);
+        poisonQueue(pending, cause);
+    }
+
+    final void setForwarder(final ReconnectForwarder forwarder, final long now) {
+        Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
+        successor = Preconditions.checkNotNull(forwarder);
+        LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
+
+        ConnectionEntry entry = inflight.poll();
+        while (entry != null) {
+            successor.forwardEntry(entry, now);
+            entry = inflight.poll();
+        }
+
+        entry = pending.poll();
+        while (entry != null) {
+            successor.forwardEntry(entry, now);
+            entry = pending.poll();
+        }
+    }
+
+    /*
+     * We are using tri-state return here to indicate one of three conditions:
+     * - if a matching entry is found, return an Optional containing it
+     * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
+     * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
+     */
+    @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
+            justification = "Returning null Optional is documented in the API contract.")
+    private static Optional<TransmittedConnectionEntry> findMatchingEntry(final Queue<? extends ConnectionEntry> queue,
+            final ResponseEnvelope<?> envelope) {
+        // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
+        // to use an iterator
+        final Iterator<? extends ConnectionEntry> it = queue.iterator();
+        while (it.hasNext()) {
+            final ConnectionEntry e = it.next();
+            final Request<?, ?> request = e.getRequest();
+            final Response<?, ?> response = envelope.getMessage();
+
+            // First check for matching target, or move to next entry
+            if (!request.getTarget().equals(response.getTarget())) {
+                continue;
+            }
+
+            // Sanity-check logical sequence, ignore any out-of-order messages
+            if (request.getSequence() != response.getSequence()) {
+                LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
+                return Optional.empty();
+            }
+
+            // Check if the entry has (ever) been transmitted
+            if (!(e instanceof TransmittedConnectionEntry)) {
+                return Optional.empty();
+            }
+
+            final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e;
+
+            // Now check session match
+            if (envelope.getSessionId() != te.getSessionId()) {
+                LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope);
+                return Optional.empty();
+            }
+            if (envelope.getTxSequence() != te.getTxSequence()) {
+                LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope);
+                return Optional.empty();
+            }
+
+            LOG.debug("Completing request {} with {}", request, envelope);
+            it.remove();
+            return Optional.of(te);
+        }
+
+        return null;
+    }
+
+    private static void poisonQueue(final Queue<? extends ConnectionEntry> queue, final RequestException cause) {
+        for (ConnectionEntry e : queue) {
+            final Request<?, ?> request = e.getRequest();
+            LOG.trace("Poisoning request {}", request, cause);
+            e.complete(request.toRequestFailure(cause));
+        }
+        queue.clear();
+    }
+
+}
index 8ab58e4..519763a 100644 (file)
@@ -17,6 +17,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
+import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 import org.opendaylight.controller.cluster.access.client.InversibleLockException;
 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
@@ -229,8 +230,8 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
             }
 
             @Override
-            void replaySuccessfulRequests() {
-                proxy.replaySuccessfulRequests();
+            void replaySuccessfulRequests(final Iterable<ConnectionEntry> previousEntries) {
+                proxy.replaySuccessfulRequests(previousEntries);
             }
 
             @Override
index 5a34b3b..3dc4dbf 100644 (file)
@@ -16,8 +16,6 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
 import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
@@ -122,44 +120,35 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<Shard
      * involved, as the messages need to be replayed to the individual proxies.
      */
     @Override
-    @GuardedBy("connectionsLock")
-    protected final ConnectedClientConnection<ShardBackendInfo> connectionUp(
-            final AbstractClientConnection<ShardBackendInfo> conn, final ShardBackendInfo backend) {
-
-        // Step 0: create a new connected connection
-        final ConnectedClientConnection<ShardBackendInfo> newConn = new ConnectedClientConnection<>(conn.context(),
-                conn.cookie(), backend);
-
-        LOG.debug("{}: resolving connection {} to {}", persistenceId(), conn, newConn);
-
+    protected final ConnectionConnectCohort connectionUp(final ConnectedClientConnection<ShardBackendInfo> newConn) {
+        // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no
+        //         further TransactionProxies can be created and we can safely traverse maps without risking
+        //         missing an entry
         final Collection<HistoryReconnectCohort> cohorts = new ArrayList<>();
-        try {
-            // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no
-            //         further TransactionProxies can be created and we can safely traverse maps without risking
-            //         missing an entry
-            startReconnect(singleHistory, newConn, cohorts);
-            for (ClientLocalHistory h : histories.values()) {
-                startReconnect(h, newConn, cohorts);
-            }
-
-            // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
-            //         the non-throttling interface to the connection, hence we use a wrapper consumer
-            for (HistoryReconnectCohort c : cohorts) {
-                c.replaySuccessfulRequests();
-            }
-
-            // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
-            //         requests will be immediately sent to it and requests being sent concurrently will get forwarded
-            //         once they hit the new connection.
-            conn.setForwarder(BouncingReconnectForwarder.forCohorts(newConn, cohorts));
-        } finally {
-            // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
-            for (HistoryReconnectCohort c : cohorts) {
-                c.close();
-            }
+        startReconnect(singleHistory, newConn, cohorts);
+        for (ClientLocalHistory h : histories.values()) {
+            startReconnect(h, newConn, cohorts);
         }
 
-        return newConn;
+        return previousEntries -> {
+            try {
+                // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
+                //         the non-throttling interface to the connection, hence we use a wrapper consumer
+                for (HistoryReconnectCohort c : cohorts) {
+                    c.replaySuccessfulRequests(previousEntries);
+                }
+
+                // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
+                //         requests will be immediately sent to it and requests being sent concurrently will get
+                //         forwarded once they hit the new connection.
+                return BouncingReconnectForwarder.forCohorts(newConn, cohorts);
+            } finally {
+                // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
+                for (HistoryReconnectCohort c : cohorts) {
+                    c.close();
+                }
+            }
+        };
     }
 
     private static void startReconnect(final AbstractClientHistory history,
index 7f5bec1..36f9a4b 100644 (file)
@@ -17,12 +17,16 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayDeque;
 import java.util.Deque;
+import java.util.Iterator;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.Consumer;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
@@ -31,7 +35,7 @@ import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRe
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
-import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
@@ -73,26 +77,81 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         }
     }
 
-    private enum SealState {
-        /**
-         * The user has not sealed the transaction yet.
-         */
-        OPEN,
-        /**
-         * The user has sealed the transaction, but has not issued a canCommit().
-         */
-        SEALED,
-        /**
-         * The user has sealed the transaction and has issued a canCommit().
-         */
-        FLUSHED,
+    // Generic state base class. Direct instances are used for fast paths, sub-class is used for successor transitions
+    private static class State {
+        private final String string;
+
+        State(final String string) {
+            this.string = Preconditions.checkNotNull(string);
+        }
+
+        @Override
+        public final String toString() {
+            return string;
+        }
     }
 
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
+    // State class used when a successor has interfered. Contains coordinator latch, the successor and previous state
+    private static final class SuccessorState extends State {
+        private final CountDownLatch latch = new CountDownLatch(1);
+        private AbstractProxyTransaction successor;
+        private State prevState;
+
+        SuccessorState() {
+            super("successor");
+        }
+
+        // Synchronize with succession process and return the successor
+        AbstractProxyTransaction await() {
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted while waiting for latch of {}", successor);
+                throw Throwables.propagate(e);
+            }
+            return successor;
+        }
+
+        void finish() {
+            latch.countDown();
+        }
+
+        State getPrevState() {
+            return prevState;
+        }
+
+        void setPrevState(final State prevState) {
+            Verify.verify(this.prevState == null);
+            this.prevState = Preconditions.checkNotNull(prevState);
+        }
+
+        // To be called from safe contexts, where successor is known to be completed
+        AbstractProxyTransaction getSuccessor() {
+            return Verify.verifyNotNull(successor);
+        }
 
+        void setSuccessor(final AbstractProxyTransaction successor) {
+            Verify.verify(this.successor == null);
+            this.successor = Preconditions.checkNotNull(successor);
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
+    private static final AtomicIntegerFieldUpdater<AbstractProxyTransaction> SEALED_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(AbstractProxyTransaction.class, "sealed");
+    private static final AtomicReferenceFieldUpdater<AbstractProxyTransaction, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(AbstractProxyTransaction.class, State.class, "state");
+    private static final State OPEN = new State("open");
+    private static final State SEALED = new State("sealed");
+    private static final State FLUSHED = new State("flushed");
+
+    // Touched from client actor thread only
     private final Deque<Object> successfulRequests = new ArrayDeque<>();
     private final ProxyHistory parent;
 
+    // Accessed from user thread only, which may not access this object concurrently
+    private long sequence;
+
     /*
      * Atomic state-keeping is required to synchronize the process of propagating completed transaction state towards
      * the backend -- which may include a successor.
@@ -105,17 +164,20 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * or timeout, when a successor is injected.
      *
      * This leaves the problem of needing to completely transferring state just after all queued messages are replayed
-     * after a successor was injected, so that it can be properly sealed if we are racing.
+     * after a successor was injected, so that it can be properly sealed if we are racing. Further complication comes
+     * from lock ordering, where the successor injection works with a locked queue and locks proxy objects -- leading
+     * to a potential AB-BA deadlock in case of a naive implementation.
+     *
+     * For tracking user-visible state we use a single volatile int, which is flipped atomically from 0 to 1 exactly
+     * once in {@link AbstractProxyTransaction#seal()}. That keeps common operations fast, as they need to perform
+     * only a single volatile read to assert state correctness.
+     *
+     * For synchronizing client actor (successor-injecting) and user (commit-driving) thread, we keep a separate state
+     * variable. It uses pre-allocated objects for fast paths (i.e. no successor present) and a per-transition object
+     * for slow paths (when successor is injected/present).
      */
-    private volatile SealState sealed = SealState.OPEN;
-    @GuardedBy("this")
-    private AbstractProxyTransaction successor;
-    @GuardedBy("this")
-    private CountDownLatch successorLatch;
-
-    // Accessed from user thread only, which may not access this object concurrently
-    private long sequence;
-
+    private volatile int sealed = 0;
+    private volatile State state = OPEN;
 
     AbstractProxyTransaction(final ProxyHistory parent) {
         this.parent = Preconditions.checkNotNull(parent);
@@ -170,50 +232,37 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * Seal this transaction before it is either committed or aborted.
      */
     final void seal() {
-        final CountDownLatch localLatch;
-
-        synchronized (this) {
-            checkNotSealed();
-            doSeal();
-
-            // Fast path: no successor
-            if (successor == null) {
-                sealed = SealState.SEALED;
-                parent.onTransactionSealed(this);
-                return;
-            }
-
-            localLatch = successorLatch;
-        }
-
-        // Slow path: wait for the latch
-        LOG.debug("{} waiting on successor latch", getIdentifier());
-        try {
-            localLatch.await();
-        } catch (InterruptedException e) {
-            LOG.warn("{} interrupted while waiting for latch", getIdentifier());
-            throw Throwables.propagate(e);
-        }
-
-        synchronized (this) {
-            LOG.debug("{} reacquired lock", getIdentifier());
-
+        // Transition user-visible state first
+        final boolean success = SEALED_UPDATER.compareAndSet(this, 0, 1);
+        Preconditions.checkState(success, "Proxy %s was already sealed", getIdentifier());
+        doSeal();
+        parent.onTransactionSealed(this);
+
+        // Now deal with state transfer, which can occur via successor or a follow-up canCommit() or directCommit().
+        if (!STATE_UPDATER.compareAndSet(this, OPEN, SEALED)) {
+            // Slow path: wait for the successor to complete
+            final AbstractProxyTransaction successor = awaitSuccessor();
+
+            // At this point the successor has completed transition and is possibly visible by the user thread, which is
+            // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
+            // Propagate state and seal the successor.
             flushState(successor);
             successor.seal();
-
-            sealed = SealState.FLUSHED;
-            parent.onTransactionSealed(this);
         }
     }
 
     private void checkNotSealed() {
-        Preconditions.checkState(sealed == SealState.OPEN, "Transaction %s has already been sealed", getIdentifier());
+        Preconditions.checkState(sealed == 0, "Transaction %s has already been sealed", getIdentifier());
     }
 
-    private SealState checkSealed() {
-        final SealState local = sealed;
-        Preconditions.checkState(local != SealState.OPEN, "Transaction %s has not been sealed yet", getIdentifier());
-        return local;
+    private void checkSealed() {
+        Preconditions.checkState(sealed != 0, "Transaction %s has not been sealed yet", getIdentifier());
+    }
+
+    private SuccessorState getSuccessorState() {
+        final State local = state;
+        Verify.verify(local instanceof SuccessorState, "State %s has unexpected class", local);
+        return (SuccessorState) local;
     }
 
     final void recordSuccessfulRequest(final @Nonnull TransactionRequest<?> req) {
@@ -268,15 +317,11 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * @return Future completion
      */
     final ListenableFuture<Boolean> directCommit() {
-        final CountDownLatch localLatch;
+        checkSealed();
 
+        // Precludes startReconnect() from interfering with the fast path
         synchronized (this) {
-            final SealState local = checkSealed();
-
-            // Fast path: no successor asserted
-            if (successor == null) {
-                Verify.verify(local == SealState.SEALED);
-
+            if (STATE_UPDATER.compareAndSet(this, SEALED, FLUSHED)) {
                 final SettableFuture<Boolean> ret = SettableFuture.create();
                 sendRequest(Verify.verifyNotNull(commitRequest(false)), t -> {
                     if (t instanceof TransactionCommitSuccess) {
@@ -292,44 +337,22 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
                     parent.completeTransaction(this);
                 });
 
-                sealed = SealState.FLUSHED;
                 return ret;
             }
-
-            // We have a successor, take its latch
-            localLatch = successorLatch;
-        }
-
-        // Slow path: we need to wait for the successor to completely propagate
-        LOG.debug("{} waiting on successor latch", getIdentifier());
-        try {
-            localLatch.await();
-        } catch (InterruptedException e) {
-            LOG.warn("{} interrupted while waiting for latch", getIdentifier());
-            throw Throwables.propagate(e);
         }
 
-        synchronized (this) {
-            LOG.debug("{} reacquired lock", getIdentifier());
-
-            final SealState local = sealed;
-            Verify.verify(local == SealState.FLUSHED);
-
-            return successor.directCommit();
-        }
+        // We have had some interference with successor injection, wait for it to complete and defer to the successor.
+        return awaitSuccessor().directCommit();
     }
 
     final void canCommit(final VotingFuture<?> ret) {
-        final CountDownLatch localLatch;
+        checkSealed();
 
+        // Precludes startReconnect() from interfering with the fast path
         synchronized (this) {
-            final SealState local = checkSealed();
-
-            // Fast path: no successor asserted
-            if (successor == null) {
-                Verify.verify(local == SealState.SEALED);
-
+            if (STATE_UPDATER.compareAndSet(this, SEALED, FLUSHED)) {
                 final TransactionRequest<?> req = Verify.verifyNotNull(commitRequest(true));
+
                 sendRequest(req, t -> {
                     if (t instanceof TransactionCanCommitSuccess) {
                         ret.voteYes();
@@ -343,31 +366,16 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
                     LOG.debug("Transaction {} canCommit completed", this);
                 });
 
-                sealed = SealState.FLUSHED;
                 return;
             }
-
-            // We have a successor, take its latch
-            localLatch = successorLatch;
-        }
-
-        // Slow path: we need to wait for the successor to completely propagate
-        LOG.debug("{} waiting on successor latch", getIdentifier());
-        try {
-            localLatch.await();
-        } catch (InterruptedException e) {
-            LOG.warn("{} interrupted while waiting for latch", getIdentifier());
-            throw Throwables.propagate(e);
         }
 
-        synchronized (this) {
-            LOG.debug("{} reacquired lock", getIdentifier());
-
-            final SealState local = sealed;
-            Verify.verify(local == SealState.FLUSHED);
+        // We have had some interference with successor injection, wait for it to complete and defer to the successor.
+        awaitSuccessor().canCommit(ret);
+    }
 
-            successor.canCommit(ret);
-        }
+    private AbstractProxyTransaction awaitSuccessor() {
+        return getSuccessorState().await();
     }
 
     final void preCommit(final VotingFuture<?> ret) {
@@ -389,7 +397,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         });
     }
 
-    void doCommit(final VotingFuture<?> ret) {
+    final void doCommit(final VotingFuture<?> ret) {
         checkSealed();
 
         sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> {
@@ -406,13 +414,33 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         });
     }
 
-    final synchronized void startReconnect(final AbstractProxyTransaction successor) {
-        Preconditions.checkState(this.successor == null);
-        this.successor = Preconditions.checkNotNull(successor);
+    // Called with the connection unlocked
+    final synchronized void startReconnect() {
+        // At this point canCommit/directCommit are blocked, we assert a new successor state, retrieving the previous
+        // state. This method is called with the queue still unlocked.
+        final SuccessorState nextState = new SuccessorState();
+        final State prevState = STATE_UPDATER.getAndSet(this, nextState);
+
+        LOG.debug("Start reconnect of proxy {} previous state {}", this, prevState);
+        Verify.verify(!(prevState instanceof SuccessorState), "Proxy %s duplicate reconnect attempt after %s", this,
+            prevState);
+
+        // We have asserted a slow-path state, seal(), canCommit(), directCommit() are forced to slow paths, which will
+        // wait until we unblock nextState's latch before accessing state. Now we record prevState for later use and we
+        // are done.
+        nextState.setPrevState(prevState);
+    }
+
+    // Called with the connection locked
+    final void replayMessages(final AbstractProxyTransaction successor,
+            final Iterable<ConnectionEntry> enqueuedEntries) {
+        final SuccessorState local = getSuccessorState();
+        local.setSuccessor(successor);
 
+        // Replay successful requests first
         for (Object obj : successfulRequests) {
             if (obj instanceof TransactionRequest) {
-                LOG.debug("Forwarding request {} to successor {}", obj, successor);
+                LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
                 successor.handleForwardedRemoteRequest((TransactionRequest<?>) obj, null);
             } else {
                 Verify.verify(obj instanceof IncrementSequence);
@@ -422,28 +450,40 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
         successfulRequests.clear();
 
+        // Now replay whatever is in the connection
+        final Iterator<ConnectionEntry> it = enqueuedEntries.iterator();
+        while (it.hasNext()) {
+            final ConnectionEntry e = it.next();
+            final Request<?, ?> req = e.getRequest();
+
+            if (getIdentifier().equals(req.getTarget())) {
+                Verify.verify(req instanceof TransactionRequest, "Unhandled request %s", req);
+                LOG.debug("Forwarding queued request{} to successor {}", req, successor);
+                successor.handleForwardedRemoteRequest((TransactionRequest<?>) req, e.getCallback());
+                it.remove();
+            }
+        }
+
         /*
-         * Before releasing the lock we need to make sure that a call to seal() blocks until we have completed
-         * finishConnect().
+         * Check the state at which we have started the reconnect attempt. State transitions triggered while we were
+         * reconnecting have been forced to slow paths, which will be unlocked once we unblock the state latch
+         * at the end of this method.
          */
-        successorLatch = new CountDownLatch(1);
-    }
-
-    final synchronized void finishReconnect() {
-        Preconditions.checkState(successorLatch != null);
-
-        if (sealed == SealState.SEALED) {
-            /*
-             * If this proxy is in the 'sealed, have not sent canCommit' state. If so, we need to forward current
-             * leftover state to the successor now.
-             */
+        final State prevState = local.getPrevState();
+        if (SEALED.equals(prevState)) {
+            LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
             flushState(successor);
             successor.seal();
-            sealed = SealState.FLUSHED;
         }
+    }
+
+    // Called with the connection locked
+    final void finishReconnect() {
+        final SuccessorState local = getSuccessorState();
+        LOG.debug("Finishing reconnect of proxy {}", this);
 
-        // All done, release the latch, unblocking seal() and canCommit()
-        successorLatch.countDown();
+        // All done, release the latch, unblocking seal() and canCommit() slow paths
+        local.finish();
     }
 
     /**
@@ -452,11 +492,9 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      *
      * @param request Request to be forwarded
      * @param callback Original callback
-     * @throws RequestException when the request is unhandled by the successor
      */
-    final synchronized void replayRequest(final TransactionRequest<?> request,
-            final Consumer<Response<?, ?>> callback) {
-        Preconditions.checkState(successor != null, "%s does not have a successor set", this);
+    final void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+        final AbstractProxyTransaction successor = getSuccessorState().getSuccessor();
 
         if (successor instanceof LocalProxyTransaction) {
             forwardToLocal((LocalProxyTransaction)successor, request, callback);
index 3136023..3fe6a09 100644 (file)
@@ -57,7 +57,7 @@ final class BouncingReconnectForwarder extends ReconnectForwarder {
 
 
     @Override
-    protected void forwardEntry(final ConnectionEntry entry) {
+    protected void forwardEntry(final ConnectionEntry entry, final long now) {
         final Request<? , ?> request = entry.getRequest();
 
         final LocalHistoryIdentifier historyId;
index ab961f1..7e6ff67 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
+import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
+
 /**
  * Interface exposed by {@link AbstractClientHistory} to {@link DistributedDataStoreClientBehavior} for the sole
  * purpose of performing a connection switchover.
@@ -16,7 +18,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 abstract class HistoryReconnectCohort implements AutoCloseable {
     abstract ProxyReconnectCohort getProxy();
 
-    abstract void replaySuccessfulRequests();
+    abstract void replaySuccessfulRequests(Iterable<ConnectionEntry> previousEntries);
 
     @Override
     public abstract void close();
index 07fcbeb..b3b604b 100644 (file)
@@ -22,6 +22,8 @@ import java.util.function.Consumer;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
+import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
+import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
@@ -195,12 +197,33 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @GuardedBy("lock")
         @Override
-        void replaySuccessfulRequests() {
+        void replaySuccessfulRequests(final Iterable<ConnectionEntry> previousEntries) {
+            // First look for our Create message
+            for (ConnectionEntry e : previousEntries) {
+                final Request<?, ?> req = e.getRequest();
+                if (identifier.equals(req.getTarget())) {
+                    Verify.verify(req instanceof LocalHistoryRequest);
+                    if (req instanceof CreateLocalHistoryRequest) {
+                        successor.connection.sendRequest(req, e.getCallback());
+                        break;
+                    }
+                }
+            }
+
             for (AbstractProxyTransaction t : proxies.values()) {
                 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
                 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier());
                 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
-                t.startReconnect(newProxy);
+                t.replayMessages(newProxy, previousEntries);
+            }
+
+            // Now look for any finalizing messages
+            for (ConnectionEntry e : previousEntries) {
+                final Request<?, ?> req = e.getRequest();
+                if (identifier.equals(req.getTarget())) {
+                    Verify.verify(req instanceof LocalHistoryRequest);
+                    successor.connection.sendRequest(req, e.getCallback());
+                }
             }
         }
 
@@ -347,6 +370,11 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         successor = createSuccessor(newConnection);
         LOG.debug("History {} instantiated successor {}", this, successor);
+
+        for (AbstractProxyTransaction t : proxies.values()) {
+            t.startReconnect();
+        }
+
         return new ReconnectCohort();
     }
 
index 7c37d9d..2f97f90 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
@@ -17,7 +18,7 @@ import org.opendaylight.yangtools.concepts.Identifiable;
 
 abstract class ProxyReconnectCohort implements Identifiable<LocalHistoryIdentifier> {
 
-    abstract void replaySuccessfulRequests();
+    abstract void replaySuccessfulRequests(Iterable<ConnectionEntry> previousEntries);
 
     abstract ProxyHistory finishReconnect();