BUG-5280: add AbstractClientConnection 10/44910/72
authorRobert Varga <rovarga@cisco.com>
Tue, 30 Aug 2016 15:57:41 +0000 (17:57 +0200)
committerRobert Varga <rovarga@cisco.com>
Mon, 21 Nov 2016 16:38:53 +0000 (17:38 +0100)
Introduce a connection concept. This is a replacement for
the request queue, as it turns out we do need the concept
of a full connection (e.g. generational logic).

This comes from the need to sensibly switch behaviors
as the locality of the backend leader changes.

This patch implements two sets of strategies for dealing
with reconnect:

The first one assumes long-lived state and is used for
proxies dealing with histories. Here we make sure to
reinstantiate and replace them in a map, as we want
new transactions to follow the new semantic and we do not
want to tear histories down or follow inefficient paths.

The second one assumes short-lived state and is used for
proxies dealing with individual transactions. Transactions
are assumed to come and go rapidly and therefore we
do not replace the proxies in maps (as they will be
short-lived), but rather forwards operations to successors.

The first strategy has a higher access cost, but its state
is always fully uptodate when reconnect finishes, while
the second strategy favor access time, but operations end
up "trailing" and will be forwarded (and hence inefficient)
until the transaction completes.

Change-Id: I7fd9e21c749f55b91229bf0b671c8dcf2e4d5982
Signed-off-by: Robert Varga <rovarga@cisco.com>
52 files changed:
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientActor.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientActorContext.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfo.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfoResolver.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorContext.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectionEntry.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/EmptyQueue.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InitialClientActorContext.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InternalCommand.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InversibleLock.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InversibleLockException.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/NoProgressException.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectForwarder.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnection.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/RequestCallback.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java [deleted file]
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntry.java [deleted file]
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmittedConnectionEntry.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java [moved from opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/SequencedQueueTest.java with 57% similarity]
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectionEntryTest.java [moved from opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntryTest.java with 55% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractShardedTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractLocalProxyHistory.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyHistory.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractTransactionCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalProxyHistory.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/FailedDataTreeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/HistoryReconnectCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyHistory.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleLocalProxyHistory.java [deleted file]

index 36f06ef..fdba64f 100644 (file)
@@ -67,5 +67,5 @@ public abstract class AbstractClientActor extends UntypedPersistentActor {
         switchBehavior(currentBehavior.onReceiveRecover(recover));
     }
 
-    protected abstract ClientActorBehavior initialBehavior(ClientActorContext context);
+    protected abstract ClientActorBehavior<?> initialBehavior(ClientActorContext context);
 }
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
new file mode 100644 (file)
index 0000000..170b150
--- /dev/null
@@ -0,0 +1,259 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayDeque;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Base class for a connection to the backend. Responsible to queueing and dispatch of requests toward the backend.
+ * Can be in three conceptual states: Connecting, Connected and Reconnecting, which are represented by public final
+ * classes exposed from this package.
+ *
+ * @author Robert Varga
+ */
+@NotThreadSafe
+public abstract class AbstractClientConnection<T extends BackendInfo> {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractClientConnection.class);
+
+    // Keep these constants in nanoseconds, as that prevents unnecessary conversions in the fast path
+    @VisibleForTesting
+    static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
+    @VisibleForTesting
+    static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
+
+    private final Queue<ConnectionEntry> pending;
+    private final ClientActorContext context;
+    private final Long cookie;
+
+    private volatile ReconnectForwarder successor;
+    private volatile RequestException poisoned;
+    private long lastProgress;
+
+    private AbstractClientConnection(final ClientActorContext context, final Long cookie,
+            final Queue<ConnectionEntry> pending) {
+        this.context = Preconditions.checkNotNull(context);
+        this.cookie = Preconditions.checkNotNull(cookie);
+        this.pending = Preconditions.checkNotNull(pending);
+        this.lastProgress = readTime();
+    }
+
+    // Do not allow subclassing outside of this package
+    AbstractClientConnection(final ClientActorContext context, final Long cookie) {
+        this(context, cookie, new ArrayDeque<>(1));
+    }
+
+    // Do not allow subclassing outside of this package
+    AbstractClientConnection(final AbstractClientConnection<T> oldConnection) {
+        this(oldConnection.context, oldConnection.cookie, oldConnection.pending);
+    }
+
+    public final ClientActorContext context() {
+        return context;
+    }
+
+    public final @Nonnull Long cookie() {
+        return cookie;
+    }
+
+    public final ActorRef localActor() {
+        return context.self();
+    }
+
+    final long readTime() {
+        return context.ticker().read();
+    }
+
+    final Queue<ConnectionEntry> pending() {
+        return pending;
+    }
+
+    /**
+     * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
+     * from any thread.
+     *
+     * @param request Request to send
+     * @param callback Callback to invoke
+     */
+    public final void sendRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback) {
+        Preconditions.checkState(poisoned == null, "Connection %s has been poisoned", this);
+
+        final ReconnectForwarder beforeQueue = successor;
+        final ConnectionEntry entry = new ConnectionEntry(request, callback, readTime());
+        if (beforeQueue != null) {
+            LOG.trace("Forwarding entry {} from {} to {}", entry, this, beforeQueue);
+            beforeQueue.forwardEntry(entry);
+            return;
+        }
+
+        enqueueEntry(entry);
+
+        final ReconnectForwarder afterQueue = successor;
+        if (afterQueue != null) {
+            synchronized (this) {
+                spliceToSuccessor(afterQueue);
+            }
+        }
+    }
+
+    public final synchronized void setForwarder(final ReconnectForwarder forwarder) {
+        Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
+        successor = Preconditions.checkNotNull(forwarder);
+        LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
+        spliceToSuccessor(forwarder);
+    }
+
+    public abstract Optional<T> getBackendInfo();
+
+    @GuardedBy("this")
+    void spliceToSuccessor(final ReconnectForwarder successor) {
+        ConnectionEntry entry = pending.poll();
+        while (entry != null) {
+            successor.forwardEntry(entry);
+            entry = pending.poll();
+        }
+    }
+
+    final ConnectionEntry dequeEntry() {
+        lastProgress = readTime();
+        return pending.poll();
+    }
+
+    void enqueueEntry(final ConnectionEntry entry) {
+        pending.add(entry);
+    }
+
+    /**
+     * Schedule a timer to fire on the actor thread after a delay.
+     *
+     * @param delay Delay, in nanoseconds
+     */
+    private void scheduleTimer(final FiniteDuration delay) {
+        LOG.debug("{}: scheduling timeout in {}", context.persistenceId(), delay);
+        context.executeInActor(this::runTimer, delay);
+    }
+
+    /**
+     * Check queue timeouts and return true if a timeout has occurred.
+     *
+     * @return True if a timeout occurred
+     * @throws NoProgressException if the queue failed to make progress for an extended
+     *                             time.
+     */
+    @VisibleForTesting
+    final ClientActorBehavior<T> runTimer(final ClientActorBehavior<T> current) {
+        final long now = readTime();
+
+        if (!isEmpty()) {
+            final long ticksSinceProgress = now - lastProgress;
+            if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
+                LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
+                    TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
+
+                poison(new NoProgressException(ticksSinceProgress));
+                current.removeConnection(this);
+                return current;
+            }
+        }
+
+        // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward.
+        // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual tri-state
+        // return convention.
+        final Optional<FiniteDuration> delay = checkTimeout(now);
+        if (delay == null) {
+            // We have timed out. There is no point in scheduling a timer
+            return reconnectConnection(current);
+        }
+
+        if (delay.isPresent()) {
+            // If there is new delay, schedule a timer
+            scheduleTimer(delay.get());
+        }
+
+        return current;
+    }
+
+    boolean isEmpty() {
+        return pending.isEmpty();
+    }
+
+    /*
+     * We are using tri-state return here to indicate one of three conditions:
+     * - if there is no timeout to schedule, return Optional.empty()
+     * - if there is a timeout to schedule, return a non-empty optional
+     * - if this connections has timed out, return null
+     */
+    @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
+            justification = "Returning null Optional is documented in the API contract.")
+    final Optional<FiniteDuration> checkTimeout(final ConnectionEntry head, final long now) {
+        if (head == null) {
+            return Optional.empty();
+        }
+
+        final long delay = head.getEnqueuedTicks() - now + REQUEST_TIMEOUT_NANOS;
+        if (delay <= 0) {
+            LOG.debug("Connection {} timed out", this);
+            return null;
+        }
+
+        return Optional.of(FiniteDuration.apply(delay, TimeUnit.NANOSECONDS));
+    }
+
+    /*
+     * We are using tri-state return here to indicate one of three conditions:
+     * - if there is no timeout to schedule, return Optional.empty()
+     * - if there is a timeout to schedule, return a non-empty optional
+     * - if this connections has timed out, return null
+     */
+    @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
+            justification = "Returning null Optional is documented in the API contract.")
+    Optional<FiniteDuration> checkTimeout(final long now) {
+        return checkTimeout(pending.peek(), now);
+    }
+
+    static void poisonQueue(final Queue<? extends ConnectionEntry> queue, final RequestException cause) {
+        for (ConnectionEntry e : queue) {
+            final Request<?, ?> request = e.getRequest();
+            LOG.trace("Poisoning request {}", request, cause);
+            e.complete(request.toRequestFailure(cause));
+        }
+        queue.clear();
+    }
+
+    void poison(final RequestException cause) {
+        poisoned = cause;
+        poisonQueue(pending, cause);
+    }
+
+    @VisibleForTesting
+    final RequestException poisoned() {
+        return poisoned;
+    }
+
+    abstract ClientActorBehavior<T> reconnectConnection(ClientActorBehavior<T> current);
+
+    abstract void receiveResponse(final ResponseEnvelope<?> envelope);
+}
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java
new file mode 100644 (file)
index 0000000..180ac94
--- /dev/null
@@ -0,0 +1,213 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Queue;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Implementation-internal intermediate subclass between {@link AbstractClientConnection} and two-out of three of its
+ * sublcasses. It allows us to share some code.
+ *
+ * @author Robert Varga
+ *
+ * @param <T> Concrete {@link BackendInfo} type
+ */
+abstract class AbstractReceivingClientConnection<T extends BackendInfo> extends AbstractClientConnection<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractReceivingClientConnection.class);
+
+    private final Queue<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
+    private final T backend;
+
+    private long lastProgress;
+
+    AbstractReceivingClientConnection(final ClientActorContext context, final Long cookie, final T backend) {
+        super(context, cookie);
+        this.backend = Preconditions.checkNotNull(backend);
+        this.lastProgress = readTime();
+    }
+
+    AbstractReceivingClientConnection(final AbstractReceivingClientConnection<T> oldConnection) {
+        super(oldConnection);
+        this.backend = oldConnection.backend;
+        this.lastProgress = oldConnection.lastProgress;
+    }
+
+    @Override
+    public final Optional<T> getBackendInfo() {
+        return Optional.of(backend);
+    }
+
+    final ActorRef remoteActor() {
+        return backend.getActor();
+    }
+
+    final int remoteMaxMessages() {
+        return backend.getMaxMessages();
+    }
+
+    final ABIVersion remoteVersion() {
+        return backend.getVersion();
+    }
+
+    final long sessionId() {
+        return backend.getSessionId();
+    }
+
+    final int inflightSize() {
+        return inflight.size();
+    }
+
+    final void appendToInflight(final TransmittedConnectionEntry entry) {
+        // This should never fail
+        inflight.add(entry);
+    }
+
+    @GuardedBy("this")
+    @Override
+    void spliceToSuccessor(final ReconnectForwarder successor) {
+        ConnectionEntry entry = inflight.poll();
+        while (entry != null) {
+            successor.forwardEntry(entry);
+            entry = inflight.poll();
+        }
+
+        super.spliceToSuccessor(successor);
+    }
+
+    @Override
+    void receiveResponse(final ResponseEnvelope<?> envelope) {
+        Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
+        if (maybeEntry == null) {
+            LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
+            maybeEntry = findMatchingEntry(pending(), envelope);
+        }
+
+        if (maybeEntry == null || !maybeEntry.isPresent()) {
+            LOG.warn("No request matching {} found, ignoring response", envelope);
+            return;
+        }
+
+        lastProgress = readTime();
+        maybeEntry.get().complete(envelope.getMessage());
+
+        // We have freed up a slot, try to transmit something
+        final int toSend = remoteMaxMessages() - inflight.size();
+        if (toSend > 0) {
+            sendMessages(toSend);
+        }
+    }
+
+    @Override
+    boolean isEmpty() {
+        return inflight.isEmpty() && super.isEmpty();
+    }
+
+    @Override
+    void poison(final RequestException cause) {
+        super.poison(cause);
+        poisonQueue(inflight, cause);
+    }
+
+    /**
+     * Transmit a given number of messages.
+     *
+     * @param count Number of messages to transmit, guaranteed to be positive.
+     */
+    abstract void sendMessages(int count);
+
+    /*
+     * We are using tri-state return here to indicate one of three conditions:
+     * - if a matching entry is found, return an Optional containing it
+     * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
+     * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
+     */
+    @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
+            justification = "Returning null Optional is documented in the API contract.")
+    private static Optional<TransmittedConnectionEntry> findMatchingEntry(final Queue<? extends ConnectionEntry> queue,
+            final ResponseEnvelope<?> envelope) {
+        // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
+        // to use an iterator
+        final Iterator<? extends ConnectionEntry> it = queue.iterator();
+        while (it.hasNext()) {
+            final ConnectionEntry e = it.next();
+            final Request<?, ?> request = e.getRequest();
+            final Response<?, ?> response = envelope.getMessage();
+
+            // First check for matching target, or move to next entry
+            if (!request.getTarget().equals(response.getTarget())) {
+                continue;
+            }
+
+            // Sanity-check logical sequence, ignore any out-of-order messages
+            if (request.getSequence() != response.getSequence()) {
+                LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
+                return Optional.empty();
+            }
+
+            // Check if the entry has (ever) been transmitted
+            if (!(e instanceof TransmittedConnectionEntry)) {
+                return Optional.empty();
+            }
+
+            final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e;
+
+            // Now check session match
+            if (envelope.getSessionId() != te.getSessionId()) {
+                LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope);
+                return Optional.empty();
+            }
+            if (envelope.getTxSequence() != te.getTxSequence()) {
+                LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope);
+                return Optional.empty();
+            }
+
+            LOG.debug("Completing request {} with {}", request, envelope);
+            it.remove();
+            return Optional.of(te);
+        }
+
+        return null;
+    }
+
+    @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
+            justification = "Returning null Optional is documented in the API contract.")
+    @Override
+    final Optional<FiniteDuration> checkTimeout(final long now) {
+        final Optional<FiniteDuration> xmit = checkTimeout(inflight.peek(), now);
+        if (xmit == null) {
+            return null;
+        }
+        final Optional<FiniteDuration> pend = super.checkTimeout(now);
+        if (pend == null) {
+            return null;
+        }
+        if (!xmit.isPresent()) {
+            return pend;
+        }
+        if (!pend.isPresent()) {
+            return xmit;
+        }
+
+        return Optional.of(xmit.get().min(pend.get()));
+    }
+}
index a5a579d..85d9f14 100644 (file)
@@ -20,7 +20,7 @@ import org.opendaylight.controller.cluster.access.ABIVersion;
  * <p>
  * This class is not final so concrete actor behavior implementations may subclass it and track more information about
  * the backend. The {@link #hashCode()} and {@link #equals(Object)} methods are made final to ensure subclasses compare
- * on identity.
+ * on object identity.
  *
  * @author Robert Varga
  */
index 7180573..e4aa2b1 100644 (file)
@@ -8,18 +8,8 @@
 package org.opendaylight.controller.cluster.access.client;
 
 import akka.actor.ActorRef;
-import com.google.common.base.Preconditions;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import javax.annotation.Nonnull;
-import javax.annotation.concurrent.ThreadSafe;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Caching resolver which resolves a cookie to a leader {@link ActorRef}. This class needs to be specialized by the
@@ -29,70 +19,24 @@ import org.slf4j.LoggerFactory;
  *
  * @author Robert Varga
  */
-@ThreadSafe
 public abstract class BackendInfoResolver<T extends BackendInfo> {
-    private static final Logger LOG = LoggerFactory.getLogger(BackendInfoResolver.class);
-    private final ConcurrentMap<Long, CompletableFuture<T>> backends = new ConcurrentHashMap<>();
-
-    /**
-     * Return the currently-resolved backend information, if available. This method is guaranteed not to block, but will
-     * initiate resolution of the information if there is none.
-     *
-     * @param cookie Backend cookie
-     * @return Backend information, if available
-     */
-    public final Optional<T> getFutureBackendInfo(final Long cookie) {
-        final Future<T> f = lookupBackend(cookie);
-        if (f.isDone()) {
-            try {
-                return Optional.of(f.get());
-            } catch (InterruptedException | ExecutionException e) {
-                LOG.debug("Resolution of {} failed", f, e);
-            }
-        }
-
-        return Optional.empty();
-    }
-
     /**
-     * Invalidate a particular instance of {@link BackendInfo}, typically as a response to a request timing out. If
-     * the provided information is not the one currently cached this method does nothing.
+     * Request resolution of a particular backend identified by a cookie. This request can be satisfied from the cache.
      *
      * @param cookie Backend cookie
-     * @param info Previous information to be invalidated
+     * @return A {@link CompletionStage} resulting in information about the backend
      */
-    public final void invalidateBackend(final long cookie, @Nonnull final CompletionStage<? extends BackendInfo> info) {
-        if (backends.remove(cookie, Preconditions.checkNotNull(info))) {
-            LOG.trace("Invalidated cache %s -> %s", Long.toUnsignedString(cookie), info);
-            invalidateBackendInfo(info);
-        }
-    }
+    @Nonnull
+    public abstract CompletionStage<? extends T> getBackendInfo(@Nonnull Long cookie);
 
     /**
-     * Request new resolution of a particular backend identified by a cookie. This method is invoked when a client
-     * requests information which is not currently cached.
+     * Request re-resolution of a particular backend identified by a cookie, indicating a particular information as
+     * being stale. If the implementation's cache holds the stale information, it should be purged.
      *
      * @param cookie Backend cookie
-     * @return A {@link CompletableFuture} resulting in information about the backend
+     * @param staleInfo Stale backend information
+     * @return A {@link CompletionStage} resulting in information about the backend
      */
     @Nonnull
-    protected abstract CompletableFuture<T> resolveBackendInfo(@Nonnull final Long cookie);
-
-    /**
-     * Invalidate previously-resolved shard information. This method is invoked when a timeout is detected
-     * and the information may need to be refreshed.
-     *
-     * @param info Previous promise of backend information
-     */
-    protected abstract void invalidateBackendInfo(@Nonnull CompletionStage<? extends BackendInfo> info);
-
-    // This is what the client needs to start processing. For as long as we do not have this, we should not complete
-    // this stage until we have this information
-    final CompletionStage<? extends T> getBackendInfo(final Long cookie) {
-        return lookupBackend(cookie);
-    }
-
-    private CompletableFuture<T> lookupBackend(final Long cookie) {
-        return backends.computeIfAbsent(Preconditions.checkNotNull(cookie), this::resolveBackendInfo);
-    }
+    public abstract CompletionStage<? extends T> refreshBackendInfo(@Nonnull Long cookie, @Nonnull T staleInfo);
 }
index 43b621c..e4b73b1 100644 (file)
@@ -8,21 +8,24 @@
 package org.opendaylight.controller.cluster.access.client;
 
 import com.google.common.annotations.Beta;
-import java.util.Optional;
-import java.util.concurrent.CompletionStage;
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException;
 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.concepts.WritableIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.FiniteDuration;
 
 /**
  * A behavior, which handles messages sent to a {@link AbstractClientActor}.
@@ -30,12 +33,31 @@ import scala.concurrent.duration.FiniteDuration;
  * @author Robert Varga
  */
 @Beta
-public abstract class ClientActorBehavior extends RecoveredClientActorBehavior<ClientActorContext>
-        implements Identifiable<ClientIdentifier> {
+public abstract class ClientActorBehavior<T extends BackendInfo> extends
+        RecoveredClientActorBehavior<ClientActorContext> implements Identifiable<ClientIdentifier> {
     private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class);
 
-    protected ClientActorBehavior(@Nonnull final ClientActorContext context) {
+    /**
+     * Map of connections to the backend. This map is concurrent to allow lookups, but given complex operations
+     * involved in connection transitions it is protected by a {@link InversibleLock}. Write-side of the lock is taken
+     * during connection transitions. Optimistic read-side of the lock is taken when new connections are introduced
+     * into the map.
+     *
+     * <p>
+     * The lock detects potential AB/BA deadlock scenarios and will force the reader side out by throwing
+     * a {@link InversibleLockException} -- which must be propagated up, releasing locks as it propagates. The initial
+     * entry point causing the the conflicting lookup must then call {@link InversibleLockException#awaitResolution()}
+     * before retrying the operation.
+     */
+    // TODO: it should be possible to move these two into ClientActorContext
+    private final Map<Long, AbstractClientConnection<T>> connections = new ConcurrentHashMap<>();
+    private final InversibleLock connectionsLock = new InversibleLock();
+    private final BackendInfoResolver<T> resolver;
+
+    protected ClientActorBehavior(@Nonnull final ClientActorContext context,
+            @Nonnull final BackendInfoResolver<T> resolver) {
         super(context);
+        this.resolver = Preconditions.checkNotNull(resolver);
     }
 
     @Override
@@ -44,118 +66,89 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior<C
         return context().getIdentifier();
     }
 
+    /**
+     * Get a connection to a shard.
+     *
+     * @param shard Shard cookie
+     * @return Connection to a shard
+     * @throws InversibleLockException if the shard is being reconnected
+     */
+    public final AbstractClientConnection<T> getConnection(final Long shard) {
+        while (true) {
+            final long stamp = connectionsLock.optimisticRead();
+            final AbstractClientConnection<T> conn = connections.computeIfAbsent(shard, this::createConnection);
+            if (connectionsLock.validate(stamp)) {
+                // No write-lock in-between, return success
+                return conn;
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
     @Override
-    final ClientActorBehavior onReceiveCommand(final Object command) {
+    final ClientActorBehavior<T> onReceiveCommand(final Object command) {
         if (command instanceof InternalCommand) {
-            return ((InternalCommand) command).execute(this);
+            return ((InternalCommand<T>) command).execute(this);
         }
         if (command instanceof SuccessEnvelope) {
             return onRequestSuccess((SuccessEnvelope) command);
         }
         if (command instanceof FailureEnvelope) {
-            return onRequestFailure((FailureEnvelope) command);
+            return internalOnRequestFailure((FailureEnvelope) command);
         }
 
         return onCommand(command);
     }
 
-    private ClientActorBehavior onRequestSuccess(final SuccessEnvelope command) {
-        return context().completeRequest(this, command);
-    }
+    private void onResponse(final ResponseEnvelope<?> response) {
+        final WritableIdentifier id = response.getMessage().getTarget();
 
-    private ClientActorBehavior onRequestFailure(final FailureEnvelope command) {
-        final RequestFailure<?, ?> failure = command.getMessage();
-        final RequestException cause = failure.getCause();
-        if (cause instanceof RetiredGenerationException) {
-            LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause);
-            haltClient(cause);
-            context().poison(cause);
-            return null;
-        }
+        // FIXME: this will need to be updated for other Request/Response types to extract cookie
+        Preconditions.checkArgument(id instanceof TransactionIdentifier);
+        final TransactionIdentifier txId = (TransactionIdentifier) id;
 
-        if (failure.isHardFailure()) {
-            return context().completeRequest(this, command);
+        final AbstractClientConnection<T> connection = connections.get(txId.getHistoryId().getCookie());
+        if (connection != null) {
+            connection.receiveResponse(response);
+        } else {
+            LOG.info("{}: Ignoring unknown response {}", persistenceId(), response);
         }
-
-        // TODO: add instanceof checks on cause to detect more problems
-
-        LOG.warn("{}: Unhandled retriable failure {}, promoting to hard failure", persistenceId(), command);
-        return context().completeRequest(this, command);
     }
 
-    // This method is executing in the actor context, hence we can safely interact with the queue
-    private ClientActorBehavior doSendRequest(final TransactionRequest<?> request, final RequestCallback callback) {
-        // Get or allocate queue for the request
-        final SequencedQueue queue = context().queueFor(request.getTarget().getHistoryId().getCookie());
-
-        // Note this is a tri-state return and can be null
-        final Optional<FiniteDuration> result = queue.enqueueRequest(request, callback);
-        if (result == null) {
-            // Happy path: we are done here
-            return this;
-        }
-
-        if (result.isPresent()) {
-            // Less happy path: we need to schedule a timer
-            scheduleQueueTimeout(queue, result.get());
-            return this;
-        }
-
-        startResolve(queue, request.getTarget().getHistoryId().getCookie());
+    private ClientActorBehavior<T> onRequestSuccess(final SuccessEnvelope success) {
+        onResponse(success);
         return this;
     }
 
-    // This method is executing in the actor context, hence we can safely interact with the queue
-    private void startResolve(final SequencedQueue queue, final long cookie) {
-        // Queue does not have backend information. Initiate resolution, which may actually be piggy-backing on to a
-        // previous request to resolve.
-        final CompletionStage<? extends BackendInfo> f = resolver().getBackendInfo(cookie);
-
-        // This is the tricky part: depending on timing, the queue may have a stale request for resolution, which has
-        // been invalidated or it may already have a reference to this resolution request. Let us give it a chance to
-        // update and it will indicate if this resolution request is an update. If it is, we'll piggy-back on it and
-        // run backend information update in the actor thread. If it is not, we do not need to do anything, as we will
-        // bulk-process all requests.
-        if (queue.expectProof(f)) {
-            f.thenAccept(backend -> context().executeInActor(cb -> cb.finishResolve(queue, f, backend)));
-        }
+    private ClientActorBehavior<T> onRequestFailure(final FailureEnvelope failure) {
+        onResponse(failure);
+        return this;
     }
 
-    // This method is executing in the actor context, hence we can safely interact with the queue
-    private ClientActorBehavior finishResolve(final SequencedQueue queue,
-            final CompletionStage<? extends BackendInfo> futureBackend, final BackendInfo backend) {
-
-        final Optional<FiniteDuration> maybeTimeout = queue.setBackendInfo(futureBackend, backend);
-        if (maybeTimeout.isPresent()) {
-            scheduleQueueTimeout(queue, maybeTimeout.get());
+    private ClientActorBehavior<T> internalOnRequestFailure(final FailureEnvelope command) {
+        final RequestFailure<?, ?> failure = command.getMessage();
+        final RequestException cause = failure.getCause();
+        if (cause instanceof RetiredGenerationException) {
+            LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause);
+            haltClient(cause);
+            poison(cause);
+            return null;
         }
-        return this;
-    }
 
-    // This method is executing in the actor context, hence we can safely interact with the queue
-    private void scheduleQueueTimeout(final SequencedQueue queue, final FiniteDuration timeout) {
-        LOG.debug("{}: scheduling timeout in {}", persistenceId(), timeout);
-        context().executeInActor(cb -> cb.queueTimeout(queue), timeout);
+        return onRequestFailure(command);
     }
 
-    // This method is executing in the actor context, hence we can safely interact with the queue
-    private ClientActorBehavior queueTimeout(final SequencedQueue queue) {
-        final boolean needBackend;
-
+    private void poison(final RequestException cause) {
+        final long stamp = connectionsLock.writeLock();
         try {
-            needBackend = queue.runTimeout();
-        } catch (NoProgressException e) {
-            // Uh-oh, no progress. The queue has already killed itself, now we need to remove it
-            LOG.debug("{}: No progress made - removing queue", persistenceId(), e);
-            context().removeQueue(queue);
-            return this;
-        }
+            for (AbstractClientConnection<T> q : connections.values()) {
+                q.poison(cause);
+            }
 
-        if (needBackend) {
-            startResolve(queue, queue.getCookie());
+            connections.clear();
+        } finally {
+            connectionsLock.unlockWrite(stamp);
         }
-
-        return this;
     }
 
     /**
@@ -174,24 +167,76 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior<C
      * @return Next behavior to use, null if this actor should shut down.
      */
     @Nullable
-    protected abstract ClientActorBehavior onCommand(@Nonnull Object command);
+    protected abstract ClientActorBehavior<T> onCommand(@Nonnull Object command);
 
     /**
      * Override this method to provide a backend resolver instance.
      *
      * @return a backend resolver instance
      */
-    @Nonnull
-    protected abstract BackendInfoResolver<?> resolver();
+    protected final @Nonnull BackendInfoResolver<T> resolver() {
+        return resolver;
+    }
 
     /**
-     * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
-     * from any thread.
+     * Callback invoked when a new connection has been established.
      *
-     * @param request Request to send
-     * @param callback Callback to invoke
+     * @param conn Old connection
+     * @param backend New backend
+     * @return Newly-connected connection.
      */
-    public final void sendRequest(final TransactionRequest<?> request, final RequestCallback callback) {
-        context().executeInActor(cb -> cb.doSendRequest(request, callback));
+    @GuardedBy("connectionsLock")
+    protected abstract @Nonnull ConnectedClientConnection<T> connectionUp(
+            final @Nonnull AbstractClientConnection<T> conn, final @Nonnull T backend);
+
+    private void backendConnectFinished(final Long shard, final AbstractClientConnection<T> conn,
+            final T backend, final Throwable failure) {
+        if (failure != null) {
+            LOG.error("{}: failed to resolve shard {}", persistenceId(), shard, failure);
+            return;
+        }
+
+        final long stamp = connectionsLock.writeLock();
+        try {
+            // Bring the connection up
+            final ConnectedClientConnection<T> newConn = connectionUp(conn, backend);
+
+            // Make sure new lookups pick up the new connection
+            connections.replace(shard, conn, newConn);
+            LOG.debug("{}: replaced connection {} with {}", persistenceId(), conn, newConn);
+        } finally {
+            connectionsLock.unlockWrite(stamp);
+        }
+    }
+
+    void removeConnection(final AbstractClientConnection<?> conn) {
+        connections.remove(conn.cookie(), conn);
+        LOG.debug("{}: removed connection {}", persistenceId(), conn);
+    }
+
+    @SuppressWarnings("unchecked")
+    void reconnectConnection(final ConnectedClientConnection<?> oldConn,
+            final ReconnectingClientConnection<?> newConn) {
+        final ReconnectingClientConnection<T> conn = (ReconnectingClientConnection<T>)newConn;
+        connections.replace(oldConn.cookie(), (AbstractClientConnection<T>)oldConn, conn);
+        LOG.debug("{}: connection {} reconnecting as {}", persistenceId(), oldConn, newConn);
+
+        final Long shard = oldConn.cookie();
+        resolver().refreshBackendInfo(shard, conn.getBackendInfo().get()).whenComplete(
+            (backend, failure) -> context().executeInActor(behavior -> {
+                backendConnectFinished(shard, conn, backend, failure);
+                return behavior;
+            }));
+    }
+
+    private ConnectingClientConnection<T> createConnection(final Long shard) {
+        final ConnectingClientConnection<T> conn = new ConnectingClientConnection<>(context(), shard);
+
+        resolver().getBackendInfo(shard).whenComplete((backend, failure) -> context().executeInActor(behavior -> {
+            backendConnectFinished(shard, conn, backend, failure);
+            return behavior;
+        }));
+
+        return conn;
     }
 }
index 26e6835..cb36223 100644 (file)
@@ -13,18 +13,10 @@ import akka.actor.Scheduler;
 import com.google.common.annotations.Beta;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Ticker;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.ThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.RequestException;
-import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.concepts.WritableIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -41,11 +33,8 @@ import scala.concurrent.duration.FiniteDuration;
 @Beta
 @ThreadSafe
 public class ClientActorContext extends AbstractClientActorContext implements Identifiable<ClientIdentifier> {
-    private static final Logger LOG = LoggerFactory.getLogger(ClientActorContext.class);
-
-    private final Map<Long, SequencedQueue> queues = new ConcurrentHashMap<>();
-    private final ClientIdentifier identifier;
     private final ExecutionContext executionContext;
+    private final ClientIdentifier identifier;
     private final Scheduler scheduler;
 
     // Hidden to avoid subclassing
@@ -79,45 +68,15 @@ public class ClientActorContext extends AbstractClientActorContext implements Id
      * Execute a command in the context of the client actor.
      *
      * @param command Block of code which needs to be execute
+     * @param <T> BackendInfo type
      */
-    public void executeInActor(@Nonnull final InternalCommand command) {
+    public <T extends BackendInfo> void executeInActor(@Nonnull final InternalCommand<T> command) {
         self().tell(Preconditions.checkNotNull(command), ActorRef.noSender());
     }
 
-    public Cancellable executeInActor(@Nonnull final InternalCommand command, final FiniteDuration delay) {
+    public <T extends BackendInfo> Cancellable executeInActor(@Nonnull final InternalCommand<T> command,
+            final FiniteDuration delay) {
         return scheduler.scheduleOnce(Preconditions.checkNotNull(delay), self(), Preconditions.checkNotNull(command),
             executionContext, ActorRef.noSender());
     }
-
-    SequencedQueue queueFor(final Long cookie) {
-        return queues.computeIfAbsent(cookie, t -> new SequencedQueue(t, ticker()));
-    }
-
-    void removeQueue(final SequencedQueue queue) {
-        queues.remove(queue.getCookie(), queue);
-    }
-
-    ClientActorBehavior completeRequest(final ClientActorBehavior current, final ResponseEnvelope<?> response) {
-        final WritableIdentifier id = response.getMessage().getTarget();
-
-        // FIXME: this will need to be updated for other Request/Response types to extract cookie
-        Preconditions.checkArgument(id instanceof TransactionIdentifier);
-        final TransactionIdentifier txId = (TransactionIdentifier) id;
-
-        final SequencedQueue queue = queues.get(txId.getHistoryId().getCookie());
-        if (queue == null) {
-            LOG.info("{}: Ignoring unknown response {}", persistenceId(), response);
-            return current;
-        } else {
-            return queue.complete(current, response);
-        }
-    }
-
-    void poison(final RequestException cause) {
-        for (SequencedQueue q : queues.values()) {
-            q.poison(cause);
-        }
-
-        queues.clear();
-    }
 }
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java
new file mode 100644 (file)
index 0000000..6c1507c
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Beta
+@NotThreadSafe
+public final class ConnectedClientConnection<T extends BackendInfo> extends AbstractReceivingClientConnection<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(ConnectedClientConnection.class);
+
+    private long nextTxSequence;
+
+    public ConnectedClientConnection(final ClientActorContext context, final Long cookie, final T backend) {
+        super(context, cookie, backend);
+    }
+
+    private TransmittedConnectionEntry transmit(final ConnectionEntry entry) {
+        final long txSequence = nextTxSequence++;
+
+        final RequestEnvelope toSend = new RequestEnvelope(entry.getRequest().toVersion(remoteVersion()), sessionId(),
+            txSequence);
+
+        final ActorRef actor = remoteActor();
+        LOG.trace("Transmitting request {} as {} to {}", entry.getRequest(), toSend, actor);
+        actor.tell(toSend, ActorRef.noSender());
+
+        return new TransmittedConnectionEntry(entry, sessionId(), txSequence, readTime());
+    }
+
+    @Override
+    void enqueueEntry(final ConnectionEntry entry) {
+        if (inflightSize() < remoteMaxMessages()) {
+            appendToInflight(transmit(entry));
+            LOG.debug("Enqueued request {} to queue {}", entry.getRequest(), this);
+        } else {
+            LOG.debug("Queue is at capacity, delayed sending of request {}", entry.getRequest());
+            super.enqueueEntry(entry);
+        }
+    }
+
+    @Override
+    void sendMessages(final int count) {
+        int toSend = count;
+
+        while (toSend > 0) {
+            final ConnectionEntry e = dequeEntry();
+            if (e == null) {
+                break;
+            }
+
+            LOG.debug("Transmitting entry {}", e);
+            appendToInflight(transmit(e));
+            toSend--;
+        }
+    }
+
+    @Override
+    ClientActorBehavior<T> reconnectConnection(final ClientActorBehavior<T> current) {
+        final ReconnectingClientConnection<T> next = new ReconnectingClientConnection<>(this);
+        setForwarder(new SimpleReconnectForwarder(next));
+        current.reconnectConnection(this, next);
+        return current;
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java
new file mode 100644 (file)
index 0000000..cdadf1d
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import com.google.common.annotations.Beta;
+import java.util.Optional;
+import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Beta
+public final class ConnectingClientConnection<T extends BackendInfo> extends AbstractClientConnection<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(ConnectingClientConnection.class);
+
+    // Initial state, never instantiated externally
+    ConnectingClientConnection(final ClientActorContext context, final Long cookie) {
+        super(context, cookie);
+    }
+
+    @Override
+    public Optional<T> getBackendInfo() {
+        return Optional.empty();
+    }
+
+    @Override
+    void receiveResponse(final ResponseEnvelope<?> envelope) {
+        LOG.warn("Initial connection {} ignoring response {}", this, envelope);
+    }
+
+    @Override
+    ClientActorBehavior<T> reconnectConnection(final ClientActorBehavior<T> current) {
+        throw new UnsupportedOperationException("Attempted to reconnect a connecting connection");
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectionEntry.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectionEntry.java
new file mode 100644 (file)
index 0000000..64586f0
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.yangtools.concepts.Immutable;
+
+/**
+ * Single entry in a {@link AbstractClientConnection}. Tracks the request, the associated callback and time when
+ * the request was first enqueued.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public class ConnectionEntry implements Immutable {
+    private final Consumer<Response<?, ?>> callback;
+    private final Request<?, ?> request;
+    private final long enqueuedTicks;
+
+    ConnectionEntry(final Request<?, ?> request, final Consumer<Response<?, ?>> callback, final long now) {
+        this.request = Preconditions.checkNotNull(request);
+        this.callback = Preconditions.checkNotNull(callback);
+        this.enqueuedTicks = now;
+    }
+
+    ConnectionEntry(final ConnectionEntry entry) {
+        this(entry.request, entry.callback, entry.enqueuedTicks);
+    }
+
+    public final Consumer<Response<?, ?>> getCallback() {
+        return callback;
+    }
+
+    public final Request<?, ?> getRequest() {
+        return request;
+    }
+
+    public void complete(final Response<?, ?> response) {
+        callback.accept(response);
+    }
+
+    final long getEnqueuedTicks() {
+        return enqueuedTicks;
+    }
+}
index c4a09e7..40a296b 100644 (file)
@@ -12,6 +12,7 @@ import java.util.AbstractQueue;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Queue;
+import org.opendaylight.yangtools.concepts.Immutable;
 
 /**
  * A specialized always-empty implementation of {@link java.util.Queue}. This implementation will always refuse new
@@ -23,7 +24,7 @@ import java.util.Queue;
  */
 // TODO: move this class into yangtools.util
 @Beta
-public final class EmptyQueue<E> extends AbstractQueue<E> {
+public final class EmptyQueue<E> extends AbstractQueue<E> implements Immutable {
     private static final EmptyQueue<?> INSTANCE = new EmptyQueue<>();
 
     private EmptyQueue() {
index 50183c1..c0790e7 100644 (file)
@@ -29,11 +29,11 @@ final class InitialClientActorContext extends AbstractClientActorContext {
         actor.saveSnapshot(snapshot);
     }
 
-    void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+    void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
         actor.deleteSnapshots(criteria);
     }
 
-    ClientActorBehavior createBehavior(final ClientIdentifier clientId) {
+    ClientActorBehavior<?> createBehavior(final ClientIdentifier clientId) {
         final ActorSystem system = actor.getContext().system();
         final ClientActorContext context = new ClientActorContext(self(), system.scheduler(), system.dispatcher(),
             persistenceId(), clientId);
index 2b773ca..f34a475 100644 (file)
@@ -17,12 +17,12 @@ import javax.annotation.Nullable;
  * @author Robert Varga
  */
 @FunctionalInterface
-public interface InternalCommand {
+public interface InternalCommand<T extends BackendInfo> {
     /**
      * Run command actions.
      *
      * @param currentBehavior Current Behavior
      * @return Next behavior to use in the client actor
      */
-    @Nullable ClientActorBehavior execute(@Nonnull ClientActorBehavior currentBehavior);
+    @Nullable ClientActorBehavior<T> execute(@Nonnull ClientActorBehavior<T> currentBehavior);
 }
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InversibleLock.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InversibleLock.java
new file mode 100644 (file)
index 0000000..ea4e581
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Verify;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.StampedLock;
+
+/**
+ * A lock implementation which allows users to perform optimistic reads and validate them in a fashion similar
+ * to {@link StampedLock}. In case a read is contented with a write, the read side will throw
+ * an {@link InversibleLockException}, which the caller can catch and use to wait for the write to resolve.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class InversibleLock {
+    private static final AtomicReferenceFieldUpdater<InversibleLock, CountDownLatch> LATCH_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(InversibleLock.class, CountDownLatch.class, "latch");
+
+    private final StampedLock lock = new StampedLock();
+    private volatile CountDownLatch latch;
+
+    /**
+     * Return a stamp for read validation.
+     *
+     * @return A stamp, which can be used with {@link #validate(long)}.
+     * @throws InversibleLockException if this lock is currently write-locked
+     */
+    public long optimisticRead() {
+        while (true) {
+            final long stamp = lock.tryOptimisticRead();
+            if (stamp != 0) {
+                return stamp;
+            }
+
+            // Write-locked. Read the corresponding latch and if present report an exception, which will propagate
+            // and force release of locks.
+            final CountDownLatch local = latch;
+            if (local != null) {
+                throw new InversibleLockException(latch);
+            }
+
+            // No latch present: retry optimistic lock
+        }
+    }
+
+    public boolean validate(final long stamp) {
+        return lock.validate(stamp);
+    }
+
+    public long writeLock() {
+        final CountDownLatch local = new CountDownLatch(1);
+        final boolean taken = LATCH_UPDATER.compareAndSet(this, null, local);
+        Verify.verify(taken);
+
+        return lock.writeLock();
+    }
+
+    public void unlockWrite(final long stamp) {
+        final CountDownLatch local = LATCH_UPDATER.getAndSet(this, null);
+        Verify.verifyNotNull(local);
+        lock.unlockWrite(stamp);
+        local.countDown();
+    }
+
+}
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InversibleLockException.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InversibleLockException.java
new file mode 100644 (file)
index 0000000..7ec18a2
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Exception thrown from {@link InversibleLock#optimisticRead()} and can be used to wait for the racing write
+ * to complete using {@link #awaitResolution()}.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class InversibleLockException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+
+    private final transient CountDownLatch latch;
+
+    InversibleLockException(final CountDownLatch latch) {
+        this.latch = Preconditions.checkNotNull(latch);
+    }
+
+    public void awaitResolution() {
+        // latch can be null after deserialization
+        if (latch != null) {
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                throw new IllegalStateException("Interrupted while waiting for latch " + latch, e);
+            }
+        }
+    }
+}
index f046be4..cc34b30 100644 (file)
@@ -12,7 +12,7 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException;
 
 /**
  * Internal {@link RequestException} used as poison cause when the client fails to make progress for a long time.
- * See {@link SequencedQueue} for details.
+ * See {@link AbstractClientConnection} for details.
  *
  * @author Robert Varga
  */
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectForwarder.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectForwarder.java
new file mode 100644 (file)
index 0000000..37dc2f1
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import com.google.common.base.Preconditions;
+import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Forwarder class responsible for routing requests from the previous connection incarnation back to the originator,
+ * which can then convert them as appropriate.
+ *
+ * @author Robert Varga
+ */
+public abstract class ReconnectForwarder {
+    static final Logger LOG = LoggerFactory.getLogger(ReconnectForwarder.class);
+    // Visible for subclass method handle
+    private final AbstractReceivingClientConnection<?> successor;
+
+    protected ReconnectForwarder(final AbstractReceivingClientConnection<?> successor) {
+        this.successor = Preconditions.checkNotNull(successor);
+    }
+
+    protected final void sendToSuccessor(final Request<?, ?> request, final Consumer<Response<?, ?>> callback) {
+        successor.sendRequest(request, callback);
+    }
+
+    protected abstract void forwardEntry(ConnectionEntry entry);
+
+    final AbstractReceivingClientConnection<?> successor() {
+        return successor;
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnection.java
new file mode 100644 (file)
index 0000000..0209f95
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link AbstractClientConnection} which is being reconnected after having timed out.
+ *
+ * @author Robert Varga
+ *
+ * @param <T> {@link BackendInfo} type
+ */
+public final class ReconnectingClientConnection<T extends BackendInfo> extends AbstractReceivingClientConnection<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(ReconnectingClientConnection.class);
+
+    ReconnectingClientConnection(final ConnectedClientConnection<T> oldConnection) {
+        super(oldConnection);
+    }
+
+    @Override
+    void sendMessages(final int count) {
+        LOG.debug("Connection {} is reconnecting, not transmitting anything", this);
+    }
+
+    @Override
+    ClientActorBehavior<T> reconnectConnection(final ClientActorBehavior<T> current) {
+        // Intentional no-op
+        LOG.debug("Skipping reconnect of already-reconnecting connection {}", this);
+        return current;
+    }
+}
index 4d81ce0..90af8cb 100644 (file)
@@ -12,12 +12,12 @@ import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 
 @FunctionalInterface
-public interface RequestCallback {
+interface RequestCallback {
     /**
      * Invoked when a particular request completes.
      *
      * @param response Response to the request
      * @return Next client actor behavior
      */
-    @Nullable ClientActorBehavior complete(@Nonnull Response<?, ?> response);
+    @Nullable ClientActorBehavior<?> complete(@Nonnull Response<?, ?> response);
 }
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java
deleted file mode 100644 (file)
index 596e353..0000000
+++ /dev/null
@@ -1,403 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.access.client;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Ticker;
-import com.google.common.base.Verify;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-import java.util.Optional;
-import java.util.Queue;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.NotThreadSafe;
-import org.opendaylight.controller.cluster.access.concepts.Request;
-import org.opendaylight.controller.cluster.access.concepts.RequestException;
-import org.opendaylight.controller.cluster.access.concepts.Response;
-import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.FiniteDuration;
-
-/*
- * A queue that processes entries in sequence.
- *
- * TODO: make this class and its users thread-safe. This will require some atomic state-keeping so that timeouts,
- *       retries and enqueues work as expected.
- */
-@NotThreadSafe
-final class SequencedQueue {
-    private static final Logger LOG = LoggerFactory.getLogger(SequencedQueue.class);
-
-    // Keep these constant in nanoseconds, as that prevents unnecessary conversions in the fast path
-    @VisibleForTesting
-    static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
-    @VisibleForTesting
-    static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
-    private static final FiniteDuration INITIAL_REQUEST_TIMEOUT = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS,
-        TimeUnit.NANOSECONDS);
-
-    /**
-     * Default number of permits we start with. This value is used when we start up only, once we resolve a backend
-     * we will use its advertized {@link BackendInfo#getMaxMessages()} forever, refreshing the value on each successful
-     * resolution.
-     */
-    private static final int DEFAULT_TX_LIMIT = 1000;
-
-    private final Ticker ticker;
-    private final Long cookie;
-
-    /*
-     * We need to keep the sequence of operations towards the backend and rate-limit what we send out, possibly dealing
-     * with the limit changing between reconnects (which imply retransmission).
-     *
-     * We keep three queues: one for requests that have been sent to the last known backend (until we have a new one),
-     * one for requests that have been sent to the previous backend (and have not been transmitted to the current one),
-     * and one for requests which have not been transmitted at all.
-     *
-     * When transmitting we first try to drain the second queue and service the third one only when that becomes empty.
-     * When receiving, we look at the first two -- as the response has to match a transmitted request. Since responses
-     * can get re-ordered, we may end up receiving responses to previously-sent requests before we have a chance
-     * to retransmit -- hence the second queue.
-     */
-    private Queue<SequencedQueueEntry> currentInflight = new ArrayDeque<>();
-    private Queue<SequencedQueueEntry> lastInflight = new ArrayDeque<>();
-    private final Queue<SequencedQueueEntry> pending = new ArrayDeque<>();
-
-    /**
-     * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when
-     * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested
-     * result.
-     */
-    private CompletionStage<? extends BackendInfo> backendProof;
-    private BackendInfo backend;
-
-    // This is not final because we need to be able to replace it.
-    private long txSequence;
-
-    private int lastTxLimit = DEFAULT_TX_LIMIT;
-
-    /**
-     * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue.
-     */
-    private Object expectingTimer;
-
-    private long lastProgress;
-
-    // Updated from application thread
-    private volatile boolean notClosed = true;
-
-    SequencedQueue(final Long cookie, final Ticker ticker) {
-        this.cookie = Preconditions.checkNotNull(cookie);
-        this.ticker = Preconditions.checkNotNull(ticker);
-        lastProgress = ticker.read();
-    }
-
-    Long getCookie() {
-        return cookie;
-    }
-
-    private void checkNotClosed() {
-        Preconditions.checkState(notClosed, "Queue %s is closed", this);
-    }
-
-    private long nextTxSequence() {
-        return txSequence++;
-    }
-
-    /**
-     * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller
-     * the following scenarios:
-     * 1) The request has been enqueued and transmitted. No further actions are necessary
-     * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer
-     * 3) The request has been enqueued, but the caller needs to request resolution of backend information and that
-     *    process needs to complete before transmission occurs
-     * <p/>
-     * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode
-     * the scenarios above according to the following rules:
-     * - if is null, the first case applies
-     * - if {@link Optional#isPresent()} returns false, the third case applies and the caller should initiate backend
-     *      resolution and eventually call {@link #setBackendInfo(CompletionStage, BackendInfo)}
-     * - if {@link Optional#isPresent()} returns true, the second case applies and the caller MUST schedule a timer
-     *
-     * @param request Request to be sent
-     * @param callback Callback to be invoked
-     * @return Optional duration with semantics described above.
-     */
-    @Nullable Optional<FiniteDuration> enqueueRequest(final Request<?, ?> request, final RequestCallback callback) {
-        checkNotClosed();
-
-        final long now = ticker.read();
-        final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
-        if (backend == null) {
-            LOG.debug("No backend available, request resolution");
-            pending.add(e);
-            return Optional.empty();
-        }
-        if (!lastInflight.isEmpty()) {
-            LOG.debug("Retransmit not yet complete, delaying request {}", request);
-            pending.add(e);
-            return null;
-        }
-        if (currentInflight.size() >= lastTxLimit) {
-            LOG.debug("Queue is at capacity, delayed sending of request {}", request);
-            pending.add(e);
-            return null;
-        }
-
-        // Ready to transmit
-
-        if (currentInflight.offer(e)) {
-            LOG.debug("Enqueued request {} to queue {}", request, this);
-        } else {
-            // This shouldn't happen since the queue has unlimited capacity but check anyway to avoid FindBugs warning
-            // about checking return value.
-            LOG.warn("Fail to enqueued request {} to queue {}", request, this);
-        }
-
-        e.retransmit(backend, nextTxSequence(), now);
-        if (expectingTimer == null) {
-            expectingTimer = now + REQUEST_TIMEOUT_NANOS;
-            return Optional.of(INITIAL_REQUEST_TIMEOUT);
-        } else {
-            return null;
-        }
-    }
-
-    /*
-     * We are using tri-state return here to indicate one of three conditions:
-     * - if a matching entry is found, return an Optional containing it
-     * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
-     * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
-     */
-    @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
-            justification = "Returning null Optional is documented in the API contract.")
-    private static Optional<SequencedQueueEntry> findMatchingEntry(final Queue<SequencedQueueEntry> queue,
-            final ResponseEnvelope<?> envelope) {
-        // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
-        // to use an iterator
-        final Iterator<SequencedQueueEntry> it = queue.iterator();
-        while (it.hasNext()) {
-            final SequencedQueueEntry e = it.next();
-            final TxDetails txDetails = Verify.verifyNotNull(e.getTxDetails());
-
-            final Request<?, ?> request = e.getRequest();
-            final Response<?, ?> response = envelope.getMessage();
-
-            // First check for matching target, or move to next entry
-            if (!request.getTarget().equals(response.getTarget())) {
-                continue;
-            }
-
-            // Sanity-check logical sequence, ignore any out-of-order messages
-            if (request.getSequence() != response.getSequence()) {
-                LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
-                return Optional.empty();
-            }
-
-            // Now check session match
-            if (envelope.getSessionId() != txDetails.getSessionId()) {
-                LOG.debug("Expecting session {}, ignoring response {}", txDetails.getSessionId(), envelope);
-                return Optional.empty();
-            }
-            if (envelope.getTxSequence() != txDetails.getTxSequence()) {
-                LOG.warn("Expecting txSequence {}, ignoring response {}", txDetails.getTxSequence(), envelope);
-                return Optional.empty();
-            }
-
-            LOG.debug("Completing request {} with {}", request, envelope);
-            it.remove();
-            return Optional.of(e);
-        }
-
-        return null;
-    }
-
-    ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> envelope) {
-        Optional<SequencedQueueEntry> maybeEntry = findMatchingEntry(currentInflight, envelope);
-        if (maybeEntry == null) {
-            maybeEntry = findMatchingEntry(lastInflight, envelope);
-        }
-
-        if (maybeEntry == null || !maybeEntry.isPresent()) {
-            LOG.warn("No request matching {} found, ignoring response", envelope);
-            return current;
-        }
-
-        lastProgress = ticker.read();
-        final ClientActorBehavior ret = maybeEntry.get().complete(envelope.getMessage());
-
-        // We have freed up a slot, try to transmit something
-        if (backend != null) {
-            final int toSend = lastTxLimit - currentInflight.size();
-            if (toSend > 0) {
-                runTransmit(toSend);
-            }
-        }
-
-        return ret;
-    }
-
-    private int transmitEntries(final Queue<SequencedQueueEntry> queue, final int count) {
-        int toSend = count;
-
-        while (toSend > 0) {
-            final SequencedQueueEntry e = queue.poll();
-            if (e == null) {
-                break;
-            }
-
-            LOG.debug("Transmitting entry {}", e);
-            e.retransmit(backend, nextTxSequence(), lastProgress);
-            toSend--;
-        }
-
-        return toSend;
-    }
-
-    private void runTransmit(final int count) {
-        final int toSend;
-
-        // Process lastInflight first, possibly clearing it
-        if (!lastInflight.isEmpty()) {
-            toSend = transmitEntries(lastInflight, count);
-            if (lastInflight.isEmpty()) {
-                // We won't be needing the queue anymore, change it to specialized implementation
-                lastInflight = EmptyQueue.getInstance();
-            }
-        } else {
-            toSend = count;
-        }
-
-        // Process pending next.
-        transmitEntries(pending, toSend);
-    }
-
-    Optional<FiniteDuration> setBackendInfo(final CompletionStage<? extends BackendInfo> proof,
-            final BackendInfo backend) {
-        Preconditions.checkNotNull(backend);
-        if (!proof.equals(backendProof)) {
-            LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
-            return Optional.empty();
-        }
-
-        LOG.debug("Resolved backend {}",  backend);
-
-        // We are un-blocking transmission, but we need to juggle the queues first to get retransmit order right
-        // and also not to exceed new limits
-        final Queue<SequencedQueueEntry> newLast = new ArrayDeque<>(currentInflight.size() + lastInflight.size());
-        newLast.addAll(currentInflight);
-        newLast.addAll(lastInflight);
-        lastInflight = newLast.isEmpty() ? EmptyQueue.getInstance() : newLast;
-
-        // Clear currentInflight, possibly compacting it
-        final int txLimit = backend.getMaxMessages();
-        if (lastTxLimit > txLimit) {
-            currentInflight = new ArrayDeque<>();
-        } else {
-            currentInflight.clear();
-        }
-
-        // We are ready to roll
-        this.backend = backend;
-        backendProof = null;
-        txSequence = 0;
-        lastTxLimit = txLimit;
-        lastProgress = ticker.read();
-
-        // No pending requests, return
-        if (lastInflight.isEmpty() && pending.isEmpty()) {
-            return Optional.empty();
-        }
-
-        LOG.debug("Sending up to {} requests to backend {}", txLimit, backend);
-
-        runTransmit(lastTxLimit);
-
-        // Calculate next timer if necessary
-        if (expectingTimer == null) {
-            // Request transmission may have cost us some time. Recalculate timeout.
-            final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
-            expectingTimer = nextTicks;
-            return Optional.of(FiniteDuration.apply(nextTicks - lastProgress, TimeUnit.NANOSECONDS));
-        } else {
-            return Optional.empty();
-        }
-    }
-
-    boolean expectProof(final CompletionStage<? extends BackendInfo> proof) {
-        if (!proof.equals(backendProof)) {
-            LOG.debug("Setting resolution handle to {}", proof);
-            backendProof = proof;
-            return true;
-        } else {
-            LOG.trace("Already resolving handle {}", proof);
-            return false;
-        }
-    }
-
-    boolean hasCompleted() {
-        return !notClosed && currentInflight.isEmpty() && lastInflight.isEmpty() && pending.isEmpty();
-    }
-
-    /**
-     * Check queue timeouts and return true if a timeout has occured.
-     *
-     * @return True if a timeout occured
-     * @throws NoProgressException if the queue failed to make progress for an extended
-     *                             time.
-     */
-    boolean runTimeout() throws NoProgressException {
-        expectingTimer = null;
-        final long now = ticker.read();
-
-        if (!currentInflight.isEmpty() || !lastInflight.isEmpty() || !pending.isEmpty()) {
-            final long ticksSinceProgress = now - lastProgress;
-            if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
-                LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
-                    TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
-
-                final NoProgressException ex = new NoProgressException(ticksSinceProgress);
-                poison(ex);
-                throw ex;
-            }
-        }
-
-        // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue
-        final SequencedQueueEntry head = currentInflight.peek();
-        if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
-            backend = null;
-            LOG.debug("Queue {} invalidated backend info", this);
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    private static void poisonQueue(final Queue<SequencedQueueEntry> queue, final RequestException cause) {
-        queue.forEach(e -> e.poison(cause));
-        queue.clear();
-    }
-
-    void poison(final RequestException cause) {
-        close();
-
-        poisonQueue(currentInflight, cause);
-        poisonQueue(lastInflight, cause);
-        poisonQueue(pending, cause);
-    }
-
-    // FIXME: add a caller from ClientSingleTransaction
-    void close() {
-        notClosed = false;
-    }
-}
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntry.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntry.java
deleted file mode 100644 (file)
index 8814d50..0000000
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.access.client;
-
-import akka.actor.ActorRef;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-import javax.annotation.Nullable;
-import org.opendaylight.controller.cluster.access.concepts.Request;
-import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
-import org.opendaylight.controller.cluster.access.concepts.RequestException;
-import org.opendaylight.controller.cluster.access.concepts.Response;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Single entry in {@link SequencedQueue}. Tracks the request, the associated callback and accounting information.
- *
- * @author Robert Varga
- */
-final class SequencedQueueEntry {
-    private static final Logger LOG = LoggerFactory.getLogger(SequencedQueueEntry.class);
-
-    private final Request<?, ?> request;
-    private final RequestCallback callback;
-    private final long enqueuedTicks;
-
-    private TxDetails txDetails;
-
-    SequencedQueueEntry(final Request<?, ?> request, final RequestCallback callback,
-        final long now) {
-        this.request = Preconditions.checkNotNull(request);
-        this.callback = Preconditions.checkNotNull(callback);
-        this.enqueuedTicks = now;
-    }
-
-    Request<?, ?> getRequest() {
-        return request;
-    }
-
-    @Nullable TxDetails getTxDetails() {
-        return txDetails;
-    }
-
-    ClientActorBehavior complete(final Response<?, ?> response) {
-        LOG.debug("Completing request {} with {}", request, response);
-        return callback.complete(response);
-    }
-
-    void poison(final RequestException cause) {
-        LOG.trace("Poisoning request {}", request, cause);
-        callback.complete(request.toRequestFailure(cause));
-    }
-
-    boolean isTimedOut(final long now, final long timeoutNanos) {
-        final long elapsed;
-
-        if (txDetails != null) {
-            elapsed = now - txDetails.getTimeTicks();
-        } else {
-            elapsed = now - enqueuedTicks;
-        }
-
-        if (elapsed >= timeoutNanos) {
-            LOG.debug("Request {} timed out after {}ns", request, elapsed);
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    void retransmit(final BackendInfo backend, final long txSequence, final long now) {
-        final RequestEnvelope toSend = new RequestEnvelope(request.toVersion(backend.getVersion()),
-            backend.getSessionId(), txSequence);
-
-        final ActorRef actor = backend.getActor();
-        LOG.trace("Transmitting request {} as {} to {}", request, toSend, actor);
-        actor.tell(toSend, ActorRef.noSender());
-        txDetails = new TxDetails(backend.getSessionId(), txSequence, now);
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(SequencedQueueEntry.class).add("request", request).toString();
-    }
-
-}
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java
new file mode 100644 (file)
index 0000000..a8ab7c4
--- /dev/null
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+// Simple forwarder which just pushes the entry to the successor
+final class SimpleReconnectForwarder extends ReconnectForwarder {
+    SimpleReconnectForwarder(final AbstractReceivingClientConnection<?> successor) {
+        super(successor);
+    }
+
+    @Override
+    protected void forwardEntry(final ConnectionEntry entry) {
+        successor().enqueueEntry(entry);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmittedConnectionEntry.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmittedConnectionEntry.java
new file mode 100644 (file)
index 0000000..34cbd49
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+/**
+ * A {@link ConnectionEntry} which has been transmitted. It holds additional information about the last transmission.
+ *
+ * @author Robert Varga
+ */
+final class TransmittedConnectionEntry extends ConnectionEntry {
+    private final long sessionId;
+    private final long txSequence;
+    private final long txTicks;
+
+    TransmittedConnectionEntry(final ConnectionEntry entry, final long sessionId, final long txSequence,
+        final long now) {
+        super(entry);
+        this.sessionId = sessionId;
+        this.txSequence = txSequence;
+        this.txTicks = now;
+    }
+
+    long getSessionId() {
+        return sessionId;
+    }
+
+    long getTxSequence() {
+        return txSequence;
+    }
+
+    long getTxTicks() {
+        return txTicks;
+    }
+
+}
@@ -14,6 +14,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -22,9 +23,9 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.TestProbe;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -41,16 +42,18 @@ import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.common.actor.TestTicker;
 import org.opendaylight.yangtools.concepts.WritableIdentifier;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
- * Test suite covering logic contained in {@link SequencedQueue}. It assumes {@link SequencedQueueEntryTest} passes.
+ * Test suite covering logic contained in {@link ConnectingClientConnection}. It assumes {@link ConnectionEntryTest}
+ * passes.
  *
  * @author Robert Varga
  */
-public class SequencedQueueTest {
+public class ConnectingClientConnectionTest {
     private static class MockFailure extends RequestFailure<WritableIdentifier, MockFailure> {
         private static final long serialVersionUID = 1L;
 
@@ -100,9 +103,11 @@ public class SequencedQueueTest {
     @Mock
     private RequestException mockCause;
     @Mock
-    private RequestCallback mockCallback;
+    private Consumer<Response<?, ?>> mockCallback;
     @Mock
-    private ClientActorBehavior mockBehavior;
+    private ClientActorBehavior<?> mockBehavior;
+    @Mock
+    private ClientActorContext mockContext;
 
     private TestTicker ticker;
     private BackendInfo mockBackendInfo;
@@ -115,7 +120,7 @@ public class SequencedQueueTest {
     private static ActorSystem actorSystem;
     private TestProbe mockActor;
 
-    private SequencedQueue queue;
+    private AbstractClientConnection<?> queue;
 
     @BeforeClass
     public static void setupClass() {
@@ -131,10 +136,11 @@ public class SequencedQueueTest {
     public void setup() {
         MockitoAnnotations.initMocks(this);
 
-        doReturn(mockBehavior).when(mockCallback).complete(any(MockFailure.class));
+        doNothing().when(mockCallback).accept(any(MockFailure.class));
 
         ticker = new TestTicker();
         ticker.increment(ThreadLocalRandom.current().nextLong());
+        doReturn(ticker).when(mockContext).ticker();
 
         mockActor = TestProbe.apply(actorSystem);
         mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5);
@@ -144,7 +150,7 @@ public class SequencedQueueTest {
         mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0);
         mockCookie = ThreadLocalRandom.current().nextLong();
 
-        queue = new SequencedQueue(mockCookie, ticker);
+        queue = new ConnectingClientConnection<>(mockContext, mockCookie);
     }
 
     @After
@@ -153,38 +159,17 @@ public class SequencedQueueTest {
     }
 
     @Test
-    public void testGetCookie() {
-        assertSame(mockCookie, queue.getCookie());
-    }
-
-    @Test
-    public void testEmptyClose() {
-        assertFalse(queue.hasCompleted());
-        queue.close();
-        assertTrue(queue.hasCompleted());
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testClosedEnqueueRequest() {
-        queue.close();
-
-        // Kaboom
-        queue.enqueueRequest(mockRequest, mockCallback);
-    }
-
-    @Test
-    public void testCloseIdempotent() {
-        queue.close();
-        queue.close();
+    public void testCookie() {
+        assertSame(mockCookie, queue.cookie());
     }
 
     @Test
     public void testPoison() {
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.sendRequest(mockRequest, mockCallback);
         queue.poison(mockCause);
 
         final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
-        verify(mockCallback).complete(captor.capture());
+        verify(mockCallback).accept(captor.capture());
         assertSame(mockCause, captor.getValue().getCause());
     }
 
@@ -194,7 +179,7 @@ public class SequencedQueueTest {
         queue.poison(mockCause);
 
         // Kaboom
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.sendRequest(mockRequest, mockCallback);
     }
 
     @Test
@@ -204,47 +189,11 @@ public class SequencedQueueTest {
     }
 
     @Test
-    public void testEnqueueRequestNeedsBackend() {
-        final Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
-
+    public void testSendRequestNeedsBackend() {
+        queue.sendRequest(mockRequest, mockCallback);
+        final Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
         assertNotNull(ret);
-        assertFalse(ret.isPresent());
-    }
-
-    @Test
-    public void testExpectProof() {
-        final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
-        assertTrue(queue.expectProof(proof));
-        assertFalse(queue.expectProof(proof));
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void testSetBackendNull() {
-        final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
-        assertTrue(queue.expectProof(proof));
-        queue.setBackendInfo(proof, null);
-    }
-
-    @Test
-    public void testSetBackendWithNoResolution() {
-        queue.enqueueRequest(mockRequest, mockCallback);
-
-        final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
-        final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
-        assertNotNull(ret);
-        assertFalse(ret.isPresent());
-    }
-
-    @Test
-    public void testSetBackendWithWrongProof() {
-        queue.enqueueRequest(mockRequest, mockCallback);
-
-        final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
-        assertTrue(queue.expectProof(proof));
-
-        final Optional<FiniteDuration> ret = queue.setBackendInfo(new CompletableFuture<>(), mockBackendInfo);
-        assertNotNull(ret);
-        assertFalse(ret.isPresent());
+        assertTrue(ret.isPresent());
     }
 
     @Test
@@ -254,135 +203,111 @@ public class SequencedQueueTest {
     }
 
     @Test
-    public void testSetBackendWithRequestsNoTimer() {
-        queue.enqueueRequest(mockRequest, mockCallback);
-
-        final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
-        assertTrue(queue.expectProof(proof));
-        assertFalse(mockActor.msgAvailable());
-
-        final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
-        assertNotNull(ret);
-        assertTrue(ret.isPresent());
-
-        assertTransmit(mockRequest, 0);
-    }
-
-    @Test
-    public void testEnqueueRequestNeedsTimer() {
+    public void testSendRequestNeedsTimer() {
         setupBackend();
 
-        final Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
+        queue.sendRequest(mockRequest, mockCallback);
+        final Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
         assertNotNull(ret);
         assertTrue(ret.isPresent());
         assertTransmit(mockRequest, 0);
     }
 
-    @Test
-    public void testEnqueueRequestWithoutTimer() {
-        setupBackend();
-
-        // First request
-        Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
-        assertNotNull(ret);
-        assertTrue(ret.isPresent());
-        assertTransmit(mockRequest, 0);
-
-        // Second request, no timer fired
-        ret = queue.enqueueRequest(mockRequest2, mockCallback);
-        assertNull(ret);
-        assertTransmit(mockRequest2, 1);
-    }
-
     @Test
     public void testRunTimeoutEmpty() throws NoProgressException {
-        final boolean ret = queue.runTimeout();
-        assertFalse(ret);
+        Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+        assertNotNull(ret);
+        assertFalse(ret.isPresent());
     }
 
     @Test
     public void testRunTimeoutWithoutShift() throws NoProgressException {
-        queue.enqueueRequest(mockRequest, mockCallback);
-        final boolean ret = queue.runTimeout();
-        assertFalse(ret);
+        queue.sendRequest(mockRequest, mockCallback);
+        Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+        assertNotNull(ret);
+        assertTrue(ret.isPresent());
     }
 
     @Test
     public void testRunTimeoutWithTimeoutLess() throws NoProgressException {
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.sendRequest(mockRequest, mockCallback);
 
-        ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS - 1);
+        ticker.increment(AbstractClientConnection.REQUEST_TIMEOUT_NANOS - 1);
 
-        final boolean ret = queue.runTimeout();
-        assertFalse(ret);
+        Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+        assertNotNull(ret);
+        assertTrue(ret.isPresent());
     }
 
     @Test
     public void testRunTimeoutWithTimeoutExact() throws NoProgressException {
         setupBackend();
 
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.sendRequest(mockRequest, mockCallback);
 
-        ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS);
+        ticker.increment(AbstractClientConnection.REQUEST_TIMEOUT_NANOS);
 
-        final boolean ret = queue.runTimeout();
-        assertTrue(ret);
+        Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+        assertNull(ret);
     }
 
     @Test
     public void testRunTimeoutWithTimeoutMore() throws NoProgressException {
         setupBackend();
 
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.sendRequest(mockRequest, mockCallback);
 
-        ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS + 1);
+        ticker.increment(AbstractClientConnection.REQUEST_TIMEOUT_NANOS + 1);
 
-        final boolean ret = queue.runTimeout();
-        assertTrue(ret);
+        Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+        assertNull(ret);
     }
 
-    @Test(expected = NoProgressException.class)
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     public void testRunTimeoutWithoutProgressExact() throws NoProgressException {
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.sendRequest(mockRequest, mockCallback);
 
-        ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS);
+        ticker.increment(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS);
 
         // Kaboom
-        queue.runTimeout();
+        queue.runTimer((ClientActorBehavior) mockBehavior);
+        assertNotNull(queue.poisoned());
     }
 
-    @Test(expected = NoProgressException.class)
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     public void testRunTimeoutWithoutProgressMore() throws NoProgressException {
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.sendRequest(mockRequest, mockCallback);
 
-        ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1);
+        ticker.increment(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS + 1);
 
         // Kaboom
-        queue.runTimeout();
+        queue.runTimer((ClientActorBehavior) mockBehavior);
+        assertNotNull(queue.poisoned());
     }
 
     @Test
     public void testRunTimeoutEmptyWithoutProgressExact() throws NoProgressException {
-        ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS);
+        ticker.increment(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS);
 
         // No problem
-        final boolean ret = queue.runTimeout();
-        assertFalse(ret);
+        Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+        assertNotNull(ret);
+        assertFalse(ret.isPresent());
     }
 
     @Test
     public void testRunTimeoutEmptyWithoutProgressMore() throws NoProgressException {
-        ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1);
+        ticker.increment(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS + 1);
 
         // No problem
-        final boolean ret = queue.runTimeout();
-        assertFalse(ret);
+        Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+        assertNotNull(ret);
+        assertFalse(ret.isPresent());
     }
 
     @Test
     public void testCompleteEmpty() {
-        final ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope);
-        assertSame(mockBehavior, ret);
+        queue.receiveResponse(mockResponseEnvelope);
         verifyNoMoreInteractions(mockCallback);
     }
 
@@ -390,14 +315,12 @@ public class SequencedQueueTest {
     public void testCompleteSingle() {
         setupBackend();
 
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.sendRequest(mockRequest, mockCallback);
 
-        ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope);
-        verify(mockCallback).complete(mockResponse);
-        assertSame(mockBehavior, ret);
+        queue.receiveResponse(mockResponseEnvelope);
+        verify(mockCallback).accept(mockResponse);
 
-        ret = queue.complete(mockBehavior, mockResponseEnvelope);
-        assertSame(mockBehavior, ret);
+        queue.receiveResponse(mockResponseEnvelope);
         verifyNoMoreInteractions(mockCallback);
     }
 
@@ -405,36 +328,35 @@ public class SequencedQueueTest {
     public void testCompleteNull() {
         setupBackend();
 
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.sendRequest(mockRequest, mockCallback);
 
-        doReturn(null).when(mockCallback).complete(mockResponse);
+        doNothing().when(mockCallback).accept(mockResponse);
 
-        ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope);
-        verify(mockCallback).complete(mockResponse);
-        assertNull(ret);
+        queue.receiveResponse(mockResponseEnvelope);
+        verify(mockCallback).accept(mockResponse);
     }
 
     @Test
     public void testProgressRecord() throws NoProgressException {
         setupBackend();
 
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.sendRequest(mockRequest, mockCallback);
 
         ticker.increment(10);
-        queue.enqueueRequest(mockRequest2, mockCallback);
-        queue.complete(mockBehavior, mockResponseEnvelope);
+        queue.sendRequest(mockRequest2, mockCallback);
+        queue.receiveResponse(mockResponseEnvelope);
 
-        ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS - 11);
-        assertTrue(queue.runTimeout());
+        ticker.increment(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS - 11);
+
+        Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+        assertNull(ret);
     }
 
     private void setupBackend() {
-        final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
-        assertTrue(queue.expectProof(proof));
-        final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
-        assertNotNull(ret);
-        assertFalse(ret.isPresent());
-        assertFalse(mockActor.msgAvailable());
+        final ConnectedClientConnection<?> newConn = new ConnectedClientConnection<>(mockContext, mockCookie,
+                mockBackendInfo);
+        queue.setForwarder(new SimpleReconnectForwarder(newConn));
+        queue = newConn;
     }
 
     private void assertTransmit(final Request<?, ?> expected, final long sequence) {
@@ -7,46 +7,38 @@
  */
 package org.opendaylight.controller.cluster.access.client;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.verify;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.TestProbe;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
 import org.opendaylight.controller.cluster.access.concepts.Request;
-import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.common.actor.TestTicker;
 import org.opendaylight.yangtools.concepts.WritableIdentifier;
-import scala.concurrent.duration.Duration;
 
 /**
- * Test suite covering logic contained in {@link SequencedQueueEntry}.
+ * Test suite covering logic contained in {@link ConnectionEntry}.
  *
  * @author Robert Varga
  */
-public class SequencedQueueEntryTest {
+public class ConnectionEntryTest {
     private static class MockFailure extends RequestFailure<WritableIdentifier, MockFailure> {
         private static final long serialVersionUID = 1L;
 
@@ -96,19 +88,18 @@ public class SequencedQueueEntryTest {
     @Mock
     private RequestException mockCause;
     @Mock
-    private RequestCallback mockCallback;
+    private Consumer<Response<?, ?>> mockCallback;
     @Mock
-    private ClientActorBehavior mockBehavior;
+    private ClientActorBehavior<?> mockBehavior;
 
     private TestTicker ticker;
-    private BackendInfo mockBackendInfo;
     private Request<WritableIdentifier, ?> mockRequest;
     private Response<WritableIdentifier, ?> mockResponse;
 
     private static ActorSystem actorSystem;
     private TestProbe mockActor;
 
-    private SequencedQueueEntry entry;
+    private ConnectionEntry entry;
 
     @BeforeClass
     public static void setupClass() {
@@ -124,17 +115,16 @@ public class SequencedQueueEntryTest {
     public void setup() {
         MockitoAnnotations.initMocks(this);
 
-        doReturn(mockBehavior).when(mockCallback).complete(any(MockFailure.class));
+        doNothing().when(mockCallback).accept(any(MockFailure.class));
 
         ticker = new TestTicker();
         ticker.increment(ThreadLocalRandom.current().nextLong());
 
         mockActor = TestProbe.apply(actorSystem);
-        mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5);
         mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
         mockResponse = mockRequest.toRequestFailure(mockCause);
 
-        entry = new SequencedQueueEntry(mockRequest, mockCallback, ticker.read());
+        entry = new ConnectionEntry(mockRequest, mockCallback, ticker.read());
     }
 
     @After
@@ -142,65 +132,9 @@ public class SequencedQueueEntryTest {
         actorSystem.stop(mockActor.ref());
     }
 
-    @Test
-    public void testGetTxDetails() {
-        assertNull(entry.getTxDetails());
-        entry.retransmit(mockBackendInfo, 0, ticker.read());
-        assertEquals(0, entry.getTxDetails().getTxSequence());
-        entry.retransmit(mockBackendInfo, 1, ticker.read());
-        assertEquals(1, entry.getTxDetails().getTxSequence());
-        entry.retransmit(mockBackendInfo, 3, ticker.read());
-        assertEquals(3, entry.getTxDetails().getTxSequence());
-    }
-
     @Test
     public void testComplete() {
         entry.complete(mockResponse);
-        verify(mockCallback).complete(mockResponse);
-    }
-
-    @Test
-    public void testPoison() {
-        entry.poison(mockCause);
-
-        final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
-        verify(mockCallback).complete(captor.capture());
-        assertSame(mockCause, captor.getValue().getCause());
-    }
-
-    @Test
-    public void testIsTimedOut() {
-        assertTrue(entry.isTimedOut(ticker.read(), 0));
-        assertFalse(entry.isTimedOut(ticker.read(), 1));
-
-        entry.retransmit(mockBackendInfo, 0, ticker.read());
-        assertTrue(entry.isTimedOut(ticker.read(), 0));
-        ticker.increment(10);
-        assertTrue(entry.isTimedOut(ticker.read(), 10));
-        assertFalse(entry.isTimedOut(ticker.read(), 20));
-
-        entry.retransmit(mockBackendInfo, 1, ticker.read());
-        assertTrue(entry.isTimedOut(ticker.read(), 0));
-        ticker.increment(10);
-        assertTrue(entry.isTimedOut(ticker.read(), 10));
-        assertFalse(entry.isTimedOut(ticker.read(), 11));
-    }
-
-    @Test
-    public void testRetransmit() {
-        assertFalse(mockActor.msgAvailable());
-        entry.retransmit(mockBackendInfo, 0, ticker.read());
-
-        assertTrue(mockActor.msgAvailable());
-        assertRequestEquals(mockRequest, mockActor.receiveOne(Duration.apply(5, TimeUnit.SECONDS)));
-    }
-
-    private static void assertRequestEquals(final Request<?, ?> expected, final Object obj) {
-        assertTrue(obj instanceof RequestEnvelope);
-
-        final RequestEnvelope actual = (RequestEnvelope) obj;
-        assertEquals(0, actual.getSessionId());
-        assertEquals(0, actual.getTxSequence());
-        assertEquals(expected.getTarget(), actual.getMessage().getTarget());
+        verify(mockCallback).accept(mockResponse);
     }
 }
index ae845a7..ea6948f 100644 (file)
@@ -9,7 +9,7 @@ package org.opendaylight.controller.cluster.databroker;
 
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransaction;
 
 /**
  * Abstract base class for concrete {@link DOMStoreTransaction} implementations. It holds a reference to the associated
index 1badccb..a52254e 100644 (file)
@@ -10,8 +10,8 @@ package org.opendaylight.controller.cluster.databroker;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
index 55139d3..9144f9b 100644 (file)
@@ -8,8 +8,8 @@
 package org.opendaylight.controller.cluster.databroker;
 
 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
index 97ed6ba..6b7237e 100644 (file)
@@ -10,10 +10,10 @@ package org.opendaylight.controller.cluster.databroker;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 
 /**
  * Implementation of {@link DOMStoreTransactionChain} backed by a {@link ClientLocalHistory}. It wraps
index 4c804d7..0e13ac4 100644 (file)
@@ -8,8 +8,8 @@
 package org.opendaylight.controller.cluster.databroker;
 
 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
index ce2c164..951b540 100644 (file)
@@ -8,16 +8,22 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
+import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
+import org.opendaylight.controller.cluster.access.client.InversibleLockException;
+import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,8 +51,8 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
     @GuardedBy("this")
     private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
 
-    private final Map<Long, AbstractProxyHistory> histories = new ConcurrentHashMap<>();
-    private final DistributedDataStoreClientBehavior client;
+    private final Map<Long, ProxyHistory> histories = new ConcurrentHashMap<>();
+    private final AbstractDataStoreClientBehavior client;
     private final LocalHistoryIdentifier identifier;
 
     // Used via NEXT_TX_UPDATER
@@ -55,7 +61,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
 
     private volatile State state = State.IDLE;
 
-    AbstractClientHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
+    AbstractClientHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
         this.client = Preconditions.checkNotNull(client);
         this.identifier = Preconditions.checkNotNull(identifier);
         Preconditions.checkArgument(identifier.getCookie() == 0);
@@ -68,6 +74,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
     final void updateState(final State expected, final State next) {
         final boolean success = STATE_UPDATER.compareAndSet(this, expected, next);
         Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
+        LOG.debug("Client history {} changed state from {} to {}", this, expected, next);
     }
 
     @Override
@@ -75,14 +82,14 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
         return identifier;
     }
 
-    final DistributedDataStoreClientBehavior getClient() {
-        return client;
-    }
-
     final long nextTx() {
         return NEXT_TX_UPDATER.getAndIncrement(this);
     }
 
+    final Long resolveShardForPath(final YangInstanceIdentifier path) {
+        return client.resolveShardForPath(path);
+    }
+
     @Override
     final void localAbort(final Throwable cause) {
         final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
@@ -99,17 +106,43 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
         }
     }
 
-    private AbstractProxyHistory createHistoryProxy(final Long shard) {
-        return createHistoryProxy(new LocalHistoryIdentifier(identifier.getClientId(),
-            identifier.getHistoryId(), shard), client.resolver().getFutureBackendInfo(shard));
+    /**
+     * Create a new history proxy for a given shard.
+     *
+     * @throws InversibleLockException if the shard is being reconnected
+     */
+    private ProxyHistory createHistoryProxy(final Long shard) {
+        final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard);
+        final ProxyHistory ret = createHistoryProxy(new LocalHistoryIdentifier(identifier.getClientId(),
+            identifier.getHistoryId(), shard), connection);
+
+        // Request creation of the history.
+        connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
+            this::createHistoryCallback);
+        return ret;
     }
 
-    abstract AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
-            final Optional<ShardBackendInfo> backendInfo);
+    abstract ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
+            final AbstractClientConnection<ShardBackendInfo> connection);
+
+    private void createHistoryCallback(final Response<?, ?> response) {
+        LOG.debug("Create history response {}", response);
+    }
 
     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
-        final AbstractProxyHistory history = histories.computeIfAbsent(shard, this::createHistoryProxy);
-        return history.createTransactionProxy(transactionId);
+        while (true) {
+            final ProxyHistory history;
+            try {
+                history = histories.computeIfAbsent(shard, this::createHistoryProxy);
+            } catch (InversibleLockException e) {
+                LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
+                e.awaitResolution();
+                LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
+                continue;
+            }
+
+            return history.createTransactionProxy(transactionId);
+        }
     }
 
     public final ClientTransaction createTransaction() {
@@ -140,6 +173,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
         Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
                 cohort, txId, previous);
 
+        LOG.debug("Local history {} readied transaction {}", this, txId);
         return cohort;
     }
 
@@ -166,4 +200,34 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
             LOG.warn("Could not find completed transaction {}", txId);
         }
     }
+
+    HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
+        final ProxyHistory oldProxy = histories.get(newConn.cookie());
+        if (oldProxy == null) {
+            return null;
+        }
+
+        final ProxyReconnectCohort proxy = Verify.verifyNotNull(oldProxy.startReconnect(newConn));
+        return new HistoryReconnectCohort() {
+            @Override
+            ProxyReconnectCohort getProxy() {
+                return proxy;
+            }
+
+            @Override
+            void replaySuccessfulRequests() {
+                proxy.replaySuccessfulRequests();
+            }
+
+            @Override
+            public void close() {
+                LOG.debug("Client history {} finishing reconnect to {}", AbstractClientHistory.this, newConn);
+                final ProxyHistory newProxy = proxy.finishReconnect();
+                if (!histories.replace(newConn.cookie(), oldProxy, newProxy)) {
+                    LOG.warn("Failed to replace proxy {} with {} in {}", oldProxy, newProxy,
+                        AbstractClientHistory.this);
+                }
+            }
+        };
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java
new file mode 100644 (file)
index 0000000..a84715c
--- /dev/null
@@ -0,0 +1,210 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import com.google.common.base.Throwables;
+import com.google.common.base.Verify;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
+import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
+import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
+import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore
+ * frontend.
+ *
+ * <p>
+ * This class is not visible outside of this package because it breaks the actor containment. Services provided to
+ * Java world outside of actor containment are captured in {@link DistributedDataStoreClient}.
+ *
+ * <p>
+ * IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract.
+ *            When touching internal state, be mindful of the execution context from which execution context, Actor
+ *            or POJO, is the state being accessed or modified.
+ *
+ * <p>
+ * THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application
+ *                threads can run concurrently. All state transitions must be made in a thread-safe manner. When in
+ *                doubt, feel free to synchronize on this object.
+ *
+ * <p>
+ * PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but
+ *              performance must not come at the price of correctness. Any optimizations need to be carefully analyzed
+ *              for correctness and performance impact.
+ *
+ * <p>
+ * TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal
+ *             for performing work and charging applications for it. That has two positive effects:
+ *             - CPU usage is distributed across applications, minimizing work done in the actor thread
+ *             - CPU usage provides back-pressure towards the application.
+ *
+ * @author Robert Varga
+ */
+abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<ShardBackendInfo>
+        implements DistributedDataStoreClient {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStoreClientBehavior.class);
+
+    private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
+    private final AtomicLong nextHistoryId = new AtomicLong(1);
+    private final SingleClientHistory singleHistory;
+
+    private volatile Throwable aborted;
+
+    AbstractDataStoreClientBehavior(final ClientActorContext context,
+            final BackendInfoResolver<ShardBackendInfo> resolver) {
+        super(context, resolver);
+        singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
+    }
+
+    //
+    //
+    // Methods below are invoked from the client actor thread
+    //
+    //
+
+    @Override
+    protected final void haltClient(final Throwable cause) {
+        // If we have encountered a previous problem there is no cleanup necessary, as we have already cleaned up
+        // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor)
+        // thread.
+        if (aborted != null) {
+            abortOperations(cause);
+        }
+    }
+
+    private void abortOperations(final Throwable cause) {
+        // This acts as a barrier, application threads check this after they have added an entry in the maps,
+        // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
+        aborted = cause;
+
+        for (ClientLocalHistory h : histories.values()) {
+            h.localAbort(cause);
+        }
+        histories.clear();
+    }
+
+    private AbstractDataStoreClientBehavior shutdown(final ClientActorBehavior<ShardBackendInfo> currentBehavior) {
+        abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down"));
+        return null;
+    }
+
+    @Override
+    protected final AbstractDataStoreClientBehavior onCommand(final Object command) {
+        if (command instanceof GetClientRequest) {
+            ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
+        } else {
+            LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command);
+        }
+
+        return this;
+    }
+
+    /*
+     * The connection has resolved, which means we have to potentially perform message adaptation. This is a bit more
+     * involved, as the messages need to be replayed to the individual proxies.
+     */
+    @Override
+    @GuardedBy("connectionsLock")
+    protected final ConnectedClientConnection<ShardBackendInfo> connectionUp(
+            final AbstractClientConnection<ShardBackendInfo> conn, final ShardBackendInfo backend) {
+
+        // Step 0: create a new connected connection
+        final ConnectedClientConnection<ShardBackendInfo> newConn = new ConnectedClientConnection<>(conn.context(),
+                conn.cookie(), backend);
+
+        LOG.debug("{}: resolving connection {} to {}", persistenceId(), conn, newConn);
+
+        final Collection<HistoryReconnectCohort> cohorts = new ArrayList<>();
+        try {
+            // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no
+            //         further TransactionProxies can be created and we can safely traverse maps without risking
+            //         missing an entry
+            startReconnect(singleHistory, newConn, cohorts);
+            for (ClientLocalHistory h : histories.values()) {
+                startReconnect(h, newConn, cohorts);
+            }
+
+            // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
+            //         the non-throttling interface to the connection, hence we use a wrapper consumer
+            for (HistoryReconnectCohort c : cohorts) {
+                c.replaySuccessfulRequests();
+            }
+
+            // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
+            //         requests will be immediately sent to it and requests being sent concurrently will get forwarded
+            //         once they hit the new connection.
+            conn.setForwarder(BouncingReconnectForwarder.forCohorts(newConn, cohorts));
+        } finally {
+            // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
+            for (HistoryReconnectCohort c : cohorts) {
+                c.close();
+            }
+        }
+
+        return newConn;
+    }
+
+    private static void startReconnect(final AbstractClientHistory history,
+            final ConnectedClientConnection<ShardBackendInfo> newConn,
+            final Collection<HistoryReconnectCohort> cohorts) {
+        final HistoryReconnectCohort cohort = history.startReconnect(newConn);
+        if (cohort != null) {
+            cohorts.add(cohort);
+        }
+    }
+
+    //
+    //
+    // Methods below are invoked from application threads
+    //
+    //
+
+    @Override
+    public final ClientLocalHistory createLocalHistory() {
+        final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(),
+            nextHistoryId.getAndIncrement());
+        final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
+        LOG.debug("{}: creating a new local history {}", persistenceId(), history);
+
+        Verify.verify(histories.put(historyId, history) == null);
+
+        final Throwable a = aborted;
+        if (a != null) {
+            history.localAbort(a);
+            histories.remove(historyId, history);
+            throw Throwables.propagate(a);
+        }
+
+        return history;
+    }
+
+    @Override
+    public final ClientTransaction createTransaction() {
+        return singleHistory.createTransaction();
+    }
+
+    @Override
+    public final void close() {
+        context().executeInActor(this::shutdown);
+    }
+
+    abstract Long resolveShardForPath(final YangInstanceIdentifier path);
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractLocalProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractLocalProxyHistory.java
deleted file mode 100644 (file)
index b8493ea..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.databroker.actors.dds;
-
-import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
-
-abstract class AbstractLocalProxyHistory extends AbstractProxyHistory {
-    private final DataTree dataTree;
-
-    AbstractLocalProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier,
-        final DataTree dataTree) {
-        super(client, identifier);
-        this.dataTree = Preconditions.checkNotNull(dataTree);
-    }
-
-    final DataTreeSnapshot takeSnapshot() {
-        return dataTree.takeSnapshot();
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyHistory.java
deleted file mode 100644 (file)
index d9f3b5f..0000000
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.databroker.actors.dds;
-
-import akka.actor.ActorRef;
-import com.google.common.base.Preconditions;
-import java.util.Optional;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-
-/**
- * Per-connection representation of a local history. This class handles state replication across a single connection.
- *
- * @author Robert Varga
- */
-abstract class AbstractProxyHistory implements Identifiable<LocalHistoryIdentifier> {
-    // FIXME: this should really be ClientConnection
-    private final DistributedDataStoreClientBehavior client;
-    private final LocalHistoryIdentifier identifier;
-
-    AbstractProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
-        this.client = Preconditions.checkNotNull(client);
-        this.identifier = Preconditions.checkNotNull(identifier);
-    }
-
-    static AbstractProxyHistory createClient(final DistributedDataStoreClientBehavior client,
-            final Optional<ShardBackendInfo> backendInfo, final LocalHistoryIdentifier identifier) {
-        final Optional<DataTree> dataTree = backendInfo.flatMap(ShardBackendInfo::getDataTree);
-        return dataTree.isPresent() ? new ClientLocalProxyHistory(client, identifier, dataTree.get())
-             : new RemoteProxyHistory(client, identifier);
-    }
-
-    static AbstractProxyHistory createSingle(final DistributedDataStoreClientBehavior client,
-            final Optional<ShardBackendInfo> backendInfo, final LocalHistoryIdentifier identifier) {
-        final Optional<DataTree> dataTree = backendInfo.flatMap(ShardBackendInfo::getDataTree);
-        return dataTree.isPresent() ? new SingleLocalProxyHistory(client, identifier, dataTree.get())
-             : new RemoteProxyHistory(client, identifier);
-    }
-
-    @Override
-    public LocalHistoryIdentifier getIdentifier() {
-        return identifier;
-    }
-
-    final ActorRef localActor() {
-        return client.self();
-    }
-
-    final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) {
-        return doCreateTransactionProxy(client, new TransactionIdentifier(identifier, txId.getTransactionId()));
-    }
-
-    abstract AbstractProxyTransaction doCreateTransactionProxy(DistributedDataStoreClientBehavior clientBehavior,
-            TransactionIdentifier txId);
-}
index 9235f25..adfc0df 100644 (file)
@@ -15,6 +15,7 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.function.Consumer;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
@@ -23,13 +24,16 @@ import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRe
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Class translating transaction operations towards a particular backend shard.
@@ -45,17 +49,20 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  * @author Robert Varga
  */
 abstract class AbstractProxyTransaction implements Identifiable<TransactionIdentifier> {
-    private final DistributedDataStoreClientBehavior client;
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
 
+    private final ProxyHistory parent;
+
+    private AbstractProxyTransaction successor;
     private long sequence;
     private boolean sealed;
 
-    AbstractProxyTransaction(final DistributedDataStoreClientBehavior client) {
-        this.client = Preconditions.checkNotNull(client);
+    AbstractProxyTransaction(final ProxyHistory parent) {
+        this.parent = Preconditions.checkNotNull(parent);
     }
 
     final ActorRef localActor() {
-        return client.self();
+        return parent.localActor();
     }
 
     final long nextSequence() {
@@ -87,17 +94,19 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         return doRead(path);
     }
 
-    final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
-        client.sendRequest(request, completer);
+    final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+        LOG.debug("Transaction proxy {} sending request {} callback {}", this, request, callback);
+        parent.sendRequest(request, callback);
     }
 
     /**
-     * Seals this transaction when ready.
+     * Seal this transaction before it is either committed or aborted.
      */
     final void seal() {
         checkNotSealed();
         doSeal();
         sealed = true;
+        parent.onTransactionSealed(this);
     }
 
     private void checkNotSealed() {
@@ -115,12 +124,13 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     final void abort() {
         checkNotSealed();
         doAbort();
+        parent.abortTransaction(this);
     }
 
-    void abort(final VotingFuture<Void> ret) {
+    final void abort(final VotingFuture<Void> ret) {
         checkSealed();
 
-        sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), t -> {
+        sendAbort(t -> {
             if (t instanceof TransactionAbortSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
@@ -128,9 +138,15 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             } else {
                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
             }
+
+            parent.completeTransaction(this);
         });
     }
 
+    final void sendAbort(final Consumer<Response<?, ?>> callback) {
+        sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback);
+    }
+
     /**
      * Commit this transaction, possibly in a coordinated fashion.
      *
@@ -141,7 +157,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         checkSealed();
 
         final SettableFuture<Boolean> ret = SettableFuture.create();
-        sendRequest(Verify.verifyNotNull(doCommit(false)), t -> {
+        sendRequest(Verify.verifyNotNull(commitRequest(false)), t -> {
             if (t instanceof TransactionCommitSuccess) {
                 ret.set(Boolean.TRUE);
             } else if (t instanceof RequestFailure) {
@@ -149,14 +165,17 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             } else {
                 ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
             }
+
+            parent.completeTransaction(this);
         });
         return ret;
     }
 
+
     void canCommit(final VotingFuture<?> ret) {
         checkSealed();
 
-        sendRequest(Verify.verifyNotNull(doCommit(true)), t -> {
+        sendRequest(Verify.verifyNotNull(commitRequest(true)), t -> {
             if (t instanceof TransactionCanCommitSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
@@ -192,10 +211,35 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             } else {
                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
             }
+
+            parent.completeTransaction(this);
         });
     }
 
-    abstract TransactionRequest<?> doCommit(boolean coordinated);
+    void replaySuccessfulRequests(final AbstractProxyTransaction successor) {
+        this.successor = Preconditions.checkNotNull(successor);
+    }
+
+    /**
+     * Invoked from a retired connection for requests which have been in-flight and need to be re-adjusted
+     * and forwarded to the successor connection.
+     *
+     * @param request Request to be forwarded
+     * @param callback Original callback
+     * @throws RequestException when the request is unhandled by the successor
+     */
+    final void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback)
+            throws RequestException {
+        Preconditions.checkState(successor != null, "%s does not have a successor set", this);
+
+        if (successor instanceof LocalProxyTransaction) {
+            forwardToLocal((LocalProxyTransaction)successor, request, callback);
+        } else if (successor instanceof RemoteProxyTransaction) {
+            forwardToRemote((RemoteProxyTransaction)successor, request, callback);
+        } else {
+            throw new IllegalStateException("Unhandled successor " + successor);
+        }
+    }
 
     abstract void doDelete(final YangInstanceIdentifier path);
 
@@ -211,4 +255,32 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     abstract void doSeal();
 
     abstract void doAbort();
+
+    abstract TransactionRequest<?> commitRequest(boolean coordinated);
+
+    /**
+     * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor. There is
+     * no equivalent of this call from {@link LocalProxyTransaction} because it does not send a request until all
+     * operations are packaged in the message.
+     *
+     * <p>
+     * Note: this method is invoked by the predecessor on the successor.
+     *
+     * @param request Request which needs to be forwarded
+     * @param callback Callback to be invoked once the request completes
+     */
+    abstract void handleForwardedRemoteRequest(TransactionRequest<?> request,
+            @Nullable Consumer<Response<?, ?>> callback);
+
+    /**
+     * Replay a request originating in this proxy to a successor remote proxy.
+     */
+    abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest<?> request,
+            Consumer<Response<?, ?>> callback) throws RequestException;
+
+    /**
+     * Replay a request originating in this proxy to a successor local proxy.
+     */
+    abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest<?> request,
+            Consumer<Response<?, ?>> callback) throws RequestException;
 }
index bd6cb64..789309c 100644 (file)
@@ -12,7 +12,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
 
 /**
  * Base class for internal {@link DOMStoreThreePhaseCommitCohort} implementation. It contains utility constants for
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java
new file mode 100644 (file)
index 0000000..c6564b6
--- /dev/null
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Maps;
+import java.util.Collection;
+import java.util.Map;
+import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
+import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
+import org.opendaylight.controller.cluster.access.client.ReconnectForwarder;
+import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// Cohort aware forwarder, which forwards the request to the cohort, giving it a reference to the successor
+// connection
+final class BouncingReconnectForwarder extends ReconnectForwarder {
+    private static final Logger LOG = LoggerFactory.getLogger(BouncingReconnectForwarder.class);
+
+    private static final RequestException FAILED_TO_REPLAY_EXCEPTION = new RequestException("Cohort not found") {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public boolean isRetriable() {
+            return false;
+        }
+    };
+
+    private final Map<LocalHistoryIdentifier, ProxyReconnectCohort> cohorts;
+
+    private BouncingReconnectForwarder(final ConnectedClientConnection<?> successor,
+            final Map<LocalHistoryIdentifier, ProxyReconnectCohort> cohorts) {
+        super(successor);
+        this.cohorts = Preconditions.checkNotNull(cohorts);
+    }
+
+    static ReconnectForwarder forCohorts(final ConnectedClientConnection<?> successor,
+            final Collection<HistoryReconnectCohort> cohorts) {
+        return new BouncingReconnectForwarder(successor, Maps.uniqueIndex(Collections2.transform(cohorts,
+            HistoryReconnectCohort::getProxy), ProxyReconnectCohort::getIdentifier));
+    }
+
+
+    @Override
+    protected void forwardEntry(final ConnectionEntry entry) {
+        final Request<? , ?> request = entry.getRequest();
+
+        final LocalHistoryIdentifier historyId;
+        if (request instanceof TransactionRequest) {
+            historyId = ((TransactionRequest<?>) request).getTarget().getHistoryId();
+        } else if (request instanceof LocalHistoryRequest) {
+            historyId = ((LocalHistoryRequest<?>) request).getTarget();
+        } else {
+            throw new IllegalArgumentException("Unhandled request " + request);
+        }
+
+        try {
+            final ProxyReconnectCohort cohort = cohorts.get(historyId);
+            if (cohort == null) {
+                LOG.warn("Cohort for request {} not found, aborting it", request);
+                throw FAILED_TO_REPLAY_EXCEPTION;
+            }
+
+            // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
+            //        period required to get into the queue.
+            cohort.replayRequest(request, entry.getCallback(), this::sendToSuccessor);
+        } catch (RequestException e) {
+            entry.complete(request.toRequestFailure(e));
+        }
+    }
+}
\ No newline at end of file
index 102d050..b6c2746 100644 (file)
@@ -9,8 +9,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.annotations.Beta;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
-import java.util.Optional;
+import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 
@@ -20,14 +19,14 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
  *
  * <p>
  * This interface is used by the world outside of the actor system and in the actor system it is manifested via
- * its client actor. That requires some state transfer with {@link DistributedDataStoreClientBehavior}. In order to
+ * its client actor. That requires some state transfer with {@link AbstractDataStoreClientBehavior}. In order to
  * reduce request latency, all messages are carbon-copied (and enqueued first) to the client actor.
  *
  * @author Robert Varga
  */
 @Beta
 public final class ClientLocalHistory extends AbstractClientHistory implements AutoCloseable {
-    ClientLocalHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier historyId) {
+    ClientLocalHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier historyId) {
         super(client, historyId);
     }
 
@@ -52,17 +51,25 @@ public final class ClientLocalHistory extends AbstractClientHistory implements A
     @Override
     AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
             final AbstractTransactionCommitCohort cohort) {
-        // FIXME: deal with CLOSED here
         final State local = state();
-        Verify.verify(local == State.TX_OPEN, "Local history %s is in unexpected state %s", this, local);
-        updateState(local, State.IDLE);
+        switch (local) {
+            case CLOSED:
+                return super.onTransactionReady(txId, cohort);
+            case IDLE:
+                throw new IllegalStateException(String.format("Local history %s is idle when readying transaction %s",
+                    this, txId));
+            case TX_OPEN:
+                updateState(local, State.IDLE);
+                return super.onTransactionReady(txId, cohort);
+            default:
+                throw new IllegalStateException(String.format("Local history %s in unhandled state %s", this, local));
 
-        return super.onTransactionReady(txId, cohort);
+        }
     }
 
     @Override
-    AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
-            final Optional<ShardBackendInfo> backendInfo) {
-        return AbstractProxyHistory.createClient(getClient(), backendInfo, historyId);
+    ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
+            final AbstractClientConnection<ShardBackendInfo> connection) {
+        return ProxyHistory.createClient(connection, historyId);
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalProxyHistory.java
deleted file mode 100644 (file)
index f06e57c..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.databroker.actors.dds;
-
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-
-final class ClientLocalProxyHistory extends AbstractLocalProxyHistory {
-    ClientLocalProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier,
-        final DataTree dataTree) {
-        super(client, identifier, dataTree);
-    }
-
-    @Override
-    AbstractProxyTransaction doCreateTransactionProxy(final DistributedDataStoreClientBehavior client,
-            final TransactionIdentifier txId) {
-        // FIXME: this violates history contract: we should use the last submitted transaction instead to ensure
-        //        causality
-        return new LocalProxyTransaction(client, txId, takeSnapshot());
-    }
-}
\ No newline at end of file
index 81d00ee..8450c67 100644 (file)
@@ -12,12 +12,12 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.CheckedFuture;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -62,7 +62,7 @@ public final class ClientTransaction extends LocalAbortable implements Identifia
     private static final int OPEN_STATE = 0;
     private static final int CLOSED_STATE = 1;
 
-    private final Map<Long, AbstractProxyTransaction> proxies = new HashMap<>();
+    private final Map<Long, AbstractProxyTransaction> proxies = new ConcurrentHashMap<>();
     private final TransactionIdentifier transactionId;
     private final AbstractClientHistory parent;
 
@@ -84,8 +84,7 @@ public final class ClientTransaction extends LocalAbortable implements Identifia
     private AbstractProxyTransaction ensureProxy(final YangInstanceIdentifier path) {
         checkNotClosed();
 
-        final ModuleShardBackendResolver resolver = parent.getClient().resolver();
-        final Long shard = resolver.resolveShardForPath(path);
+        final Long shard = parent.resolveShardForPath(path);
         return proxies.computeIfAbsent(shard, this::createProxy);
     }
 
@@ -116,13 +115,13 @@ public final class ClientTransaction extends LocalAbortable implements Identifia
 
     private boolean ensureClosed() {
         final int local = state;
-        if (local != CLOSED_STATE) {
-            final boolean success = STATE_UPDATER.compareAndSet(this, OPEN_STATE, CLOSED_STATE);
-            Preconditions.checkState(success, "Transaction %s raced during close", this);
-            return true;
-        } else {
+        if (local == CLOSED_STATE) {
             return false;
         }
+
+        final boolean success = STATE_UPDATER.compareAndSet(this, OPEN_STATE, CLOSED_STATE);
+        Preconditions.checkState(success, "Transaction %s raced during close", this);
+        return true;
     }
 
     public DOMStoreThreePhaseCommitCohort ready() {
index 9940ae5..bc393a4 100644 (file)
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import com.google.common.base.Throwables;
-import com.google.common.base.Verify;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
-import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
+import java.util.function.Function;
 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
-import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 /**
- * {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore
- * frontend.
- *
- * <p>
- * This class is not visible outside of this package because it breaks the actor containment. Services provided to
- * Java world outside of actor containment are captured in {@link DistributedDataStoreClient}.
- *
- * <p>
- * IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract.
- *            When touching internal state, be mindful of the execution context from which execution context, Actor
- *            or POJO, is the state being accessed or modified.
- *
- * <p>
- * THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application
- *                threads can run concurrently. All state transitions must be made in a thread-safe manner. When in
- *                doubt, feel free to synchronize on this object.
- *
- * <p>
- * PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but
- *              performance must not come at the price of correctness. Any optimizations need to be carefully analyzed
- *              for correctness and performance impact.
- *
- * <p>
- * TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal
- *             for performing work and charging applications for it. That has two positive effects:
- *             - CPU usage is distributed across applications, minimizing work done in the actor thread
- *             - CPU usage provides back-pressure towards the application.
+ * {@link AbstractDataStoreClientBehavior} which performs module-based sharding.
  *
  * @author Robert Varga
  */
-final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
-    private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
-
-    private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
-    private final AtomicLong nextHistoryId = new AtomicLong(1);
-    private final ModuleShardBackendResolver resolver;
-    private final SingleClientHistory singleHistory;
-
-    private volatile Throwable aborted;
-
-    DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
-        super(context);
-        resolver = new ModuleShardBackendResolver(context.getIdentifier(), actorContext);
-        singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
-    }
-
-    //
-    //
-    // Methods below are invoked from the client actor thread
-    //
-    //
-
-    @Override
-    protected void haltClient(final Throwable cause) {
-        // If we have encountered a previous problem there is not cleanup necessary, as we have already cleaned up
-        // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor)
-        // thread.
-        if (aborted != null) {
-            abortOperations(cause);
-        }
-    }
-
-    private void abortOperations(final Throwable cause) {
-        // This acts as a barrier, application threads check this after they have added an entry in the maps,
-        // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
-        aborted = cause;
-
-        for (ClientLocalHistory h : histories.values()) {
-            h.localAbort(cause);
-        }
-        histories.clear();
-    }
-
-    private DistributedDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) {
-        abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down"));
-        return null;
-    }
+final class DistributedDataStoreClientBehavior extends AbstractDataStoreClientBehavior {
+    private final Function<YangInstanceIdentifier, Long> pathToShard;
 
-    @Override
-    protected DistributedDataStoreClientBehavior onCommand(final Object command) {
-        if (command instanceof GetClientRequest) {
-            ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
-        } else {
-            LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command);
-        }
-
-        return this;
-    }
-
-    //
-    //
-    // Methods below are invoked from application threads
-    //
-    //
-
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private static <K, V extends LocalAbortable> V returnIfOperational(final Map<K , V> map, final K key, final V value,
-            final Throwable aborted) {
-        Verify.verify(map.put(key, value) == null);
-
-        if (aborted != null) {
-            try {
-                value.localAbort(aborted);
-            } catch (Exception e) {
-                LOG.debug("Close of {} failed", value, e);
-            }
-            map.remove(key, value);
-            throw Throwables.propagate(aborted);
-        }
-
-        return value;
+    private DistributedDataStoreClientBehavior(final ClientActorContext context,
+            final ModuleShardBackendResolver resolver) {
+        super(context, resolver);
+        pathToShard = resolver::resolveShardForPath;
     }
 
-    @Override
-    public ClientLocalHistory createLocalHistory() {
-        final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(),
-            nextHistoryId.getAndIncrement());
-        final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
-        LOG.debug("{}: creating a new local history {}", persistenceId(), history);
-
-        return returnIfOperational(histories, historyId, history, aborted);
-    }
-
-    @Override
-    public ClientTransaction createTransaction() {
-        return singleHistory.createTransaction();
-    }
-
-    @Override
-    public void close() {
-        context().executeInActor(this::shutdown);
+    DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
+        this(context, new ModuleShardBackendResolver(context.getIdentifier(), actorContext));
     }
 
     @Override
-    protected ModuleShardBackendResolver resolver() {
-        return resolver;
-    }
-
-    void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
-        sendRequest(request, response -> {
-            completer.accept(response);
-            return this;
-        });
+    Long resolveShardForPath(final YangInstanceIdentifier path) {
+        return pathToShard.apply(path);
     }
-
 }
index b21b46d..7c8bd92 100644 (file)
@@ -12,7 +12,7 @@ import com.google.common.base.Preconditions;
 import java.util.function.Supplier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
 
 /**
@@ -21,7 +21,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification
  *
  * @author Robert Varga
  */
-final class FailedDataTreeModification implements DataTreeModification {
+final class FailedDataTreeModification implements CursorAwareDataTreeModification {
     private final Supplier<? extends RuntimeException> supplier;
 
     FailedDataTreeModification(final Supplier<? extends RuntimeException> supplier) {
@@ -34,7 +34,7 @@ final class FailedDataTreeModification implements DataTreeModification {
     }
 
     @Override
-    public DataTreeModification newModification() {
+    public CursorAwareDataTreeModification newModification() {
         throw supplier.get();
     }
 
@@ -62,4 +62,9 @@ final class FailedDataTreeModification implements DataTreeModification {
     public void applyToCursor(final DataTreeModificationCursor cursor) {
         throw supplier.get();
     }
+
+    @Override
+    public DataTreeModificationCursor createCursor(final YangInstanceIdentifier path) {
+        throw supplier.get();
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/HistoryReconnectCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/HistoryReconnectCohort.java
new file mode 100644 (file)
index 0000000..ab961f1
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+/**
+ * Interface exposed by {@link AbstractClientHistory} to {@link DistributedDataStoreClientBehavior} for the sole
+ * purpose of performing a connection switchover.
+ *
+ * @author Robert Varga
+ */
+abstract class HistoryReconnectCohort implements AutoCloseable {
+    abstract ProxyReconnectCohort getProxy();
+
+    abstract void replaySuccessfulRequests();
+
+    @Override
+    public abstract void close();
+}
index 576fa67..7b652f4 100644 (file)
@@ -9,18 +9,32 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import java.util.function.Consumer;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
+import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,18 +57,16 @@ import org.slf4j.LoggerFactory;
 @NotThreadSafe
 final class LocalProxyTransaction extends AbstractProxyTransaction {
     private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class);
-    private static final Consumer<Response<?, ?>> ABORT_COMPLETER = response -> {
-        LOG.debug("Abort completed with {}", response);
-    };
 
     private final TransactionIdentifier identifier;
-    private DataTreeModification modification;
 
-    LocalProxyTransaction(final DistributedDataStoreClientBehavior client,
-        final TransactionIdentifier identifier, final DataTreeSnapshot snapshot) {
-        super(client);
+    private CursorAwareDataTreeModification modification;
+
+    LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
+        final CursorAwareDataTreeModification modification) {
+        super(parent);
         this.identifier = Preconditions.checkNotNull(identifier);
-        this.modification = snapshot.newModification();
+        this.modification = Preconditions.checkNotNull(modification);
     }
 
     @Override
@@ -87,17 +99,26 @@ final class LocalProxyTransaction extends AbstractProxyTransaction {
         return Futures.immediateCheckedFuture(modification.readNode(path));
     }
 
+    private RuntimeException abortedException() {
+        return new IllegalStateException("Tracker " + identifier + " has been aborted");
+    }
+
+    private RuntimeException submittedException() {
+        return new IllegalStateException("Tracker " + identifier + " has been submitted");
+    }
+
     @Override
     void doAbort() {
-        sendRequest(new AbortLocalTransactionRequest(identifier, localActor()), ABORT_COMPLETER);
-        modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been aborted"));
+        sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), response -> {
+            LOG.debug("Transaction {} abort completed with {}", identifier, response);
+        });
     }
 
     @Override
-    CommitLocalTransactionRequest doCommit(final boolean coordinated) {
+    CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
         final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, localActor(),
             modification, coordinated);
-        modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been submitted"));
+        modification = new FailedDataTreeModification(this::submittedException);
         return ret;
     }
 
@@ -105,4 +126,121 @@ final class LocalProxyTransaction extends AbstractProxyTransaction {
     void doSeal() {
         modification.ready();
     }
+
+    DataTreeSnapshot getSnapshot() {
+        return modification;
+    }
+
+    private void applyModifyTransactionRequest(final ModifyTransactionRequest request,
+            final @Nullable Consumer<Response<?, ?>> callback) {
+        for (TransactionModification mod : request.getModifications()) {
+            if (mod instanceof TransactionWrite) {
+                modification.write(mod.getPath(), ((TransactionWrite)mod).getData());
+            } else if (mod instanceof TransactionMerge) {
+                modification.merge(mod.getPath(), ((TransactionMerge)mod).getData());
+            } else if (mod instanceof TransactionDelete) {
+                modification.delete(mod.getPath());
+            } else {
+                throw new IllegalArgumentException("Unsupported modification " + mod);
+            }
+        }
+
+        final java.util.Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
+        if (maybeProtocol.isPresent()) {
+            seal();
+            Verify.verify(callback != null, "Request {} has null callback", request);
+
+            switch (maybeProtocol.get()) {
+                case ABORT:
+                    sendAbort(callback);
+                    break;
+                case SIMPLE:
+                    sendRequest(commitRequest(false), callback);
+                    break;
+                case THREE_PHASE:
+                    sendRequest(commitRequest(true), callback);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get());
+            }
+        }
+    }
+
+    @Override
+    void handleForwardedRemoteRequest(final TransactionRequest<?> request,
+            final @Nullable Consumer<Response<?, ?>> callback) {
+        LOG.debug("Applying forwaded request {}", request);
+
+        if (request instanceof ModifyTransactionRequest) {
+            applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
+        } else {
+            throw new IllegalArgumentException("Unhandled request " + request);
+        }
+    }
+
+    @Override
+    void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
+            final Consumer<Response<?, ?>> callback) throws RequestException {
+        if (request instanceof CommitLocalTransactionRequest) {
+            final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request;
+            final DataTreeModification mod = req.getModification();
+
+            LOG.debug("Applying modification {} to successor {}", mod, successor);
+            mod.applyToCursor(new AbstractDataTreeModificationCursor() {
+                @Override
+                public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
+                    successor.write(current().node(child), data);
+                }
+
+                @Override
+                public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
+                    successor.merge(current().node(child), data);
+                }
+
+                @Override
+                public void delete(final PathArgument child) {
+                    successor.delete(current().node(child));
+                }
+            });
+
+            successor.seal();
+
+            final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
+            successor.sendRequest(successorReq, callback);
+        } else if (request instanceof AbortLocalTransactionRequest) {
+            LOG.debug("Forwarding abort {} to successor {}", request, successor);
+            successor.abort();
+        } else {
+            throw new IllegalArgumentException("Unhandled request" + request);
+        }
+    }
+
+    @Override
+    void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
+            final Consumer<Response<?, ?>> callback) throws RequestException {
+        if (request instanceof AbortLocalTransactionRequest) {
+            successor.sendAbort(request, callback);
+        } else if (request instanceof CommitLocalTransactionRequest) {
+            successor.sendCommit((CommitLocalTransactionRequest)request, callback);
+        } else {
+            throw new IllegalArgumentException("Unhandled request" + request);
+        }
+
+        LOG.debug("Forwarded request {} to successor {}", request, successor);
+    }
+
+    private void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+        sendRequest(request, callback);
+        modification = new FailedDataTreeModification(this::abortedException);
+    }
+
+    private void sendCommit(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
+        // Rebase old modification on new data tree.
+        try (DataTreeModificationCursor cursor = modification.createCursor(YangInstanceIdentifier.EMPTY)) {
+            request.getModification().applyToCursor(cursor);
+        }
+
+        seal();
+        sendRequest(commitRequest(request.isCoordinated()), callback);
+    }
 }
index a101896..9e6485b 100644 (file)
@@ -18,11 +18,15 @@ import com.google.common.primitives.UnsignedLong;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
 import org.opendaylight.controller.cluster.access.ABIVersion;
-import org.opendaylight.controller.cluster.access.client.BackendInfo;
 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
@@ -46,7 +50,35 @@ import scala.compat.java8.FutureConverters;
  */
 @SuppressFBWarnings(value = "NP_NONNULL_PARAM_VIOLATION",
                     justification = "Pertains to the NULL_FUTURE field below. Null is allowed and is intended")
+@ThreadSafe
 final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendInfo> {
+    private static final class Entry {
+        private final CompletionStage<ShardBackendInfo> stage;
+        @GuardedBy("this")
+        private ShardBackendInfo result;
+
+        Entry(final CompletionStage<ShardBackendInfo> stage) {
+            this.stage = Preconditions.checkNotNull(stage);
+            stage.whenComplete(this::onStageResolved);
+        }
+
+        @Nonnull CompletionStage<ShardBackendInfo> getStage() {
+            return stage;
+        }
+
+        synchronized @Nullable ShardBackendInfo getResult() {
+            return result;
+        }
+
+        private synchronized void onStageResolved(final ShardBackendInfo result, final Throwable failure) {
+            if (failure == null) {
+                this.result = Preconditions.checkNotNull(result);
+            } else {
+                LOG.warn("Failed to resolve shard", failure);
+            }
+        }
+    }
+
     private static final CompletableFuture<ShardBackendInfo> NULL_FUTURE = CompletableFuture.completedFuture(null);
     private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class);
 
@@ -58,10 +90,10 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
     // TODO: maybe make this configurable somehow?
     private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES);
 
-    private final ActorContext actorContext;
-    // FIXME: this counter should be in superclass somewhere
+    private final ConcurrentMap<Long, Entry> backends = new ConcurrentHashMap<>();
     private final AtomicLong nextSessionId = new AtomicLong();
     private final Function1<ActorRef, ?> connectFunction;
+    private final ActorContext actorContext;
 
     @GuardedBy("this")
     private long nextShard = 1;
@@ -75,18 +107,6 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
             ABIVersion.current()));
     }
 
-    @Override
-    protected void invalidateBackendInfo(final CompletionStage<? extends BackendInfo> info) {
-        LOG.trace("Initiated invalidation of backend information {}", info);
-        info.thenAccept(this::invalidate);
-    }
-
-    private void invalidate(final BackendInfo result) {
-        Preconditions.checkArgument(result instanceof ShardBackendInfo);
-        LOG.debug("Invalidating backend information {}", result);
-        actorContext.getPrimaryShardInfoCache().remove(((ShardBackendInfo)result).getShardName());
-    }
-
     Long resolveShardForPath(final YangInstanceIdentifier path) {
         final String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path);
         Long cookie = shards.get(shardName);
@@ -107,17 +127,16 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
         return cookie;
     }
 
-    @Override
-    protected CompletableFuture<ShardBackendInfo> resolveBackendInfo(final Long cookie) {
+    private CompletionStage<ShardBackendInfo> resolveBackendInfo(final Long cookie) {
         final String shardName = shards.inverse().get(cookie);
         if (shardName == null) {
             LOG.warn("Failing request for non-existent cookie {}", cookie);
             return NULL_FUTURE;
         }
 
-        final CompletableFuture<ShardBackendInfo> ret = new CompletableFuture<>();
+        LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
 
-        FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> {
+        return FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> {
             LOG.debug("Looking up primary info for {} from {}", shardName, info);
             return FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT));
         }).thenApply(response -> {
@@ -135,15 +154,30 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
             return new ShardBackendInfo(success.getBackend(),
                 nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie),
                 success.getDataTree(), success.getMaxMessages());
-        }).whenComplete((info, throwablw) -> {
-            if (throwablw != null) {
-                ret.completeExceptionally(throwablw);
-            } else {
-                ret.complete(info);
-            }
         });
+    }
 
-        LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
-        return ret;
+    @Override
+    public CompletionStage<? extends ShardBackendInfo> getBackendInfo(final Long cookie) {
+        return backends.computeIfAbsent(cookie, key -> new Entry(resolveBackendInfo(key))).getStage();
+    }
+
+    @Override
+    public CompletionStage<? extends ShardBackendInfo> refreshBackendInfo(final Long cookie,
+            final ShardBackendInfo staleInfo) {
+        final Entry existing = backends.get(cookie);
+        if (existing != null) {
+            if (!staleInfo.equals(existing.getResult())) {
+                return existing.getStage();
+            }
+
+            LOG.debug("Invalidating backend information {}", staleInfo);
+            actorContext.getPrimaryShardInfoCache().remove(staleInfo.getShardName());
+
+            LOG.trace("Invalidated cache %s -> %s", Long.toUnsignedString(cookie), staleInfo);
+            backends.remove(cookie, existing);
+        }
+
+        return getBackendInfo(cookie);
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
new file mode 100644 (file)
index 0000000..ae55379
--- /dev/null
@@ -0,0 +1,351 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
+import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
+import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Per-connection representation of a local history. This class handles state replication across a single connection.
+ *
+ * @author Robert Varga
+ */
+abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
+    private abstract static class AbstractLocal extends ProxyHistory {
+        private final DataTree dataTree;
+
+        AbstractLocal(final AbstractClientConnection<ShardBackendInfo> connection,
+            final LocalHistoryIdentifier identifier, final DataTree dataTree) {
+            super(connection, identifier);
+            this.dataTree = Preconditions.checkNotNull(dataTree);
+        }
+
+        final DataTreeSnapshot takeSnapshot() {
+            return dataTree.takeSnapshot();
+        }
+    }
+
+    private abstract static class AbstractRemote extends ProxyHistory {
+        AbstractRemote(final AbstractClientConnection<ShardBackendInfo> connection,
+            final LocalHistoryIdentifier identifier) {
+            super(connection, identifier);
+        }
+
+        @Override
+        final AbstractProxyTransaction doCreateTransactionProxy(
+                final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId) {
+            return new RemoteProxyTransaction(this, txId);
+        }
+    }
+
+    private static final class Local extends AbstractLocal {
+        private static final AtomicReferenceFieldUpdater<Local, LocalProxyTransaction> LAST_SEALED_UPDATER =
+                AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalProxyTransaction.class, "lastSealed");
+
+        // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
+        // the open one and attempts to create a new transaction again.
+        private LocalProxyTransaction lastOpen;
+
+        private volatile LocalProxyTransaction lastSealed;
+
+        Local(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier,
+            final DataTree dataTree) {
+            super(connection, identifier, dataTree);
+        }
+
+        @Override
+        AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
+                final TransactionIdentifier txId) {
+            Preconditions.checkState(lastOpen == null, "Proxy {} is currently open", lastOpen);
+
+            // onTransactionCompleted() runs concurrently
+            final LocalProxyTransaction localSealed = lastSealed;
+            final DataTreeSnapshot baseSnapshot;
+            if (localSealed != null) {
+                baseSnapshot = localSealed.getSnapshot();
+            } else {
+                baseSnapshot = takeSnapshot();
+            }
+
+            lastOpen = new LocalProxyTransaction(this, txId,
+                (CursorAwareDataTreeModification) baseSnapshot.newModification());
+            return lastOpen;
+        }
+
+        @Override
+        ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
+            return createClient(connection, getIdentifier());
+        }
+
+        @Override
+        void onTransactionAborted(final AbstractProxyTransaction tx) {
+            Preconditions.checkState(tx.equals(lastOpen));
+            lastOpen = null;
+        }
+
+        @Override
+        void onTransactionCompleted(final AbstractProxyTransaction tx) {
+            Verify.verify(tx instanceof LocalProxyTransaction);
+
+            if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalProxyTransaction) tx, null)) {
+                LOG.debug("Completed last sealed transaction {}", tx);
+            }
+        }
+
+        @Override
+        void onTransactionSealed(final AbstractProxyTransaction tx) {
+            Preconditions.checkState(tx.equals(lastOpen));
+            lastSealed = lastOpen;
+            lastOpen = null;
+        }
+    }
+
+    private static final class LocalSingle extends AbstractLocal {
+        LocalSingle(final AbstractClientConnection<ShardBackendInfo> connection,
+            final LocalHistoryIdentifier identifier, final DataTree dataTree) {
+            super(connection, identifier, dataTree);
+        }
+
+        @Override
+        AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
+                final TransactionIdentifier txId) {
+            return new LocalProxyTransaction(this, txId,
+                (CursorAwareDataTreeModification) takeSnapshot().newModification());
+        }
+
+        @Override
+        ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
+            return createSingle(connection, getIdentifier());
+        }
+    }
+
+    private static final class Remote extends AbstractRemote {
+        Remote(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
+            super(connection, identifier);
+        }
+
+        @Override
+        ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
+            return createClient(connection, getIdentifier());
+        }
+    }
+
+    private static final class RemoteSingle extends AbstractRemote {
+        RemoteSingle(final AbstractClientConnection<ShardBackendInfo> connection,
+            final LocalHistoryIdentifier identifier) {
+            super(connection, identifier);
+        }
+
+        @Override
+        ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
+            return createSingle(connection, getIdentifier());
+        }
+    }
+
+    private static final class RequestReplayException extends RequestException {
+        private static final long serialVersionUID = 1L;
+
+        RequestReplayException(final String format, final Object... args) {
+            super(String.format(format, args));
+        }
+
+        @Override
+        public boolean isRetriable() {
+            return false;
+        }
+    }
+
+    private final class ReconnectCohort extends ProxyReconnectCohort {
+        @Override
+        public LocalHistoryIdentifier getIdentifier() {
+            return identifier;
+        }
+
+        @GuardedBy("lock")
+        @Override
+        void replaySuccessfulRequests() {
+            for (AbstractProxyTransaction t : proxies.values()) {
+                final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier());
+                LOG.debug("{} created successor transaction proxy {} for {}", identifier, newProxy, t);
+                t.replaySuccessfulRequests(newProxy);
+            }
+        }
+
+        @GuardedBy("lock")
+        @Override
+        ProxyHistory finishReconnect() {
+            final ProxyHistory ret = Verify.verifyNotNull(successor);
+            LOG.debug("Finished reconnecting proxy history {}", this);
+            lock.unlock();
+            return ret;
+        }
+
+        @Override
+        void replayRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
+                final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException {
+            if (request instanceof TransactionRequest) {
+                replayTransactionRequest((TransactionRequest<?>) request, callback);
+            } else if (request instanceof LocalHistoryRequest) {
+                replayTo.accept(request, callback);
+            } else {
+                throw new IllegalArgumentException("Unhandled request " + request);
+            }
+        }
+
+        private void replayTransactionRequest(final TransactionRequest<?> request,
+                final Consumer<Response<?, ?>> callback) throws RequestException {
+
+            final AbstractProxyTransaction proxy;
+            lock.lock();
+            try {
+                proxy = proxies.get(request.getTarget());
+            } finally {
+                lock.unlock();
+            }
+            if (proxy == null) {
+                throw new RequestReplayException("Failed to find proxy for %s", request);
+            }
+
+            proxy.replayRequest(request, callback);
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
+
+    private final Lock lock = new ReentrantLock();
+    private final LocalHistoryIdentifier identifier;
+    private final AbstractClientConnection<ShardBackendInfo> connection;
+
+    @GuardedBy("lock")
+    private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
+    @GuardedBy("lock")
+    private ProxyHistory successor;
+
+    private ProxyHistory(final AbstractClientConnection<ShardBackendInfo> connection,
+            final LocalHistoryIdentifier identifier) {
+        this.connection = Preconditions.checkNotNull(connection);
+        this.identifier = Preconditions.checkNotNull(identifier);
+    }
+
+    static ProxyHistory createClient(final AbstractClientConnection<ShardBackendInfo> connection,
+            final LocalHistoryIdentifier identifier) {
+        final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
+        return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get())
+             : new Remote(connection, identifier);
+    }
+
+    static ProxyHistory createSingle(final AbstractClientConnection<ShardBackendInfo> connection,
+            final LocalHistoryIdentifier identifier) {
+        final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
+        return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get())
+             : new RemoteSingle(connection, identifier);
+    }
+
+    @Override
+    public LocalHistoryIdentifier getIdentifier() {
+        return identifier;
+    }
+
+    final ActorRef localActor() {
+        return connection.localActor();
+    }
+
+    final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) {
+        final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
+
+        lock.lock();
+        try {
+            final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId);
+            proxies.put(proxyId, ret);
+            LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
+            return ret;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    final void abortTransaction(final AbstractProxyTransaction tx) {
+        lock.lock();
+        try {
+            proxies.remove(tx.getIdentifier());
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    final void completeTransaction(final AbstractProxyTransaction tx) {
+        lock.lock();
+        try {
+            proxies.remove(tx.getIdentifier());
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+        connection.sendRequest(request, callback);
+    }
+
+    @GuardedBy("lock")
+    abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
+            TransactionIdentifier txId);
+
+    abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
+
+    @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
+    ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
+        lock.lock();
+        if (successor != null) {
+            lock.unlock();
+            throw new IllegalStateException("Proxy history " + this + " already has a successor");
+        }
+
+        successor = createSuccessor(newConnection);
+        return new ReconnectCohort();
+    }
+
+    @GuardedBy("lock")
+    void onTransactionAborted(final AbstractProxyTransaction tx) {
+        // No-op for most implementations
+    }
+
+    @GuardedBy("lock")
+    void onTransactionCompleted(final AbstractProxyTransaction tx) {
+        // No-op for most implementations
+    }
+
+    void onTransactionSealed(final AbstractProxyTransaction tx) {
+        // No-op on most implementations
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java
new file mode 100644 (file)
index 0000000..7c37d9d
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.yangtools.concepts.Identifiable;
+
+abstract class ProxyReconnectCohort implements Identifiable<LocalHistoryIdentifier> {
+
+    abstract void replaySuccessfulRequests();
+
+    abstract ProxyHistory finishReconnect();
+
+    abstract void replayRequest(Request<?, ?> request, Consumer<Response<?, ?>> callback,
+            BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException;
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyHistory.java
deleted file mode 100644 (file)
index c596d31..0000000
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.databroker.actors.dds;
-
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-
-final class RemoteProxyHistory extends AbstractProxyHistory {
-    RemoteProxyHistory(DistributedDataStoreClientBehavior client, LocalHistoryIdentifier identifier) {
-        super(client, identifier);
-    }
-
-    @Override
-    AbstractProxyTransaction doCreateTransactionProxy(final DistributedDataStoreClientBehavior client,
-            final TransactionIdentifier txId) {
-        return new RemoteProxyTransaction(client, txId);
-    }
-}
\ No newline at end of file
index 26d718d..347c7ea 100644 (file)
@@ -8,27 +8,34 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.function.Consumer;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
+import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -55,15 +62,15 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     // FIXME: make this tuneable
     private static final int REQUEST_MAX_MODIFICATIONS = 1000;
 
+    private final Collection<TransactionRequest<?>> successfulRequests = new ArrayList<>();
     private final ModifyTransactionRequestBuilder builder;
 
     private boolean builderBusy;
 
     private volatile Exception operationFailure;
 
-    RemoteProxyTransaction(final DistributedDataStoreClientBehavior client,
-        final TransactionIdentifier identifier) {
-        super(client);
+    RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) {
+        super(parent);
         builder = new ModifyTransactionRequestBuilder(identifier, localActor());
     }
 
@@ -136,10 +143,38 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     }
 
     private void flushBuilder() {
-        final ModifyTransactionRequest message = builder.build();
+        final ModifyTransactionRequest request = builder.build();
         builderBusy = false;
 
-        sendRequest(message, this::completeModify);
+        sendModification(request);
+    }
+
+    private void sendModification(final TransactionRequest<?> request) {
+        sendRequest(request, response -> completeModify(request, response));
+    }
+
+    @Override
+    void handleForwardedRemoteRequest(final TransactionRequest<?> request,
+            final @Nullable Consumer<Response<?, ?>> callback) {
+        nextSequence();
+
+        if (callback == null) {
+            sendModification(request);
+            return;
+        }
+
+        /*
+         * FindBugs is utterly stupid, as it does not recognize the fact that we have checked for null
+         * and reports NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE in the lambda below.
+         */
+        final Consumer<Response<?, ?>> findBugsIsStupid = callback;
+
+        // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
+        //        period required to get into the queue.
+        sendRequest(request, response -> {
+            findBugsIsStupid.accept(Preconditions.checkNotNull(response));
+            completeModify(request, response);
+        });
     }
 
     private void appendModification(final TransactionModification modification) {
@@ -155,11 +190,12 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         }
     }
 
-    private void completeModify(final Response<?, ?> response) {
-        LOG.debug("Modification request completed with {}", response);
+    private void completeModify(final TransactionRequest<?> request, final Response<?, ?> response) {
+        LOG.debug("Modification request {} completed with {}", request, response);
 
         if (response instanceof TransactionSuccess) {
-            // Happy path no-op
+            // Happy path
+            successfulRequests.add(request);
         } else {
             recordFailedResponse(response);
         }
@@ -207,7 +243,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     }
 
     @Override
-    ModifyTransactionRequest doCommit(final boolean coordinated) {
+    ModifyTransactionRequest commitRequest(final boolean coordinated) {
         ensureInitializedBuider();
         builder.setCommit(coordinated);
 
@@ -220,4 +256,57 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     void doSeal() {
         // No-op
     }
+
+    @Override
+    void replaySuccessfulRequests(final AbstractProxyTransaction successor) {
+        super.replaySuccessfulRequests(successor);
+
+        for (TransactionRequest<?> req : successfulRequests) {
+            LOG.debug("Forwarding request {} to successor {}", req, successor);
+            successor.handleForwardedRemoteRequest(req, null);
+        }
+        successfulRequests.clear();
+    }
+
+    @Override
+    void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
+            final Consumer<Response<?, ?>> callback) throws RequestException {
+        successor.handleForwardedRequest(request, callback);
+    }
+
+    private void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback)
+            throws RequestException {
+        if (request instanceof ModifyTransactionRequest) {
+            final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
+
+            req.getModifications().forEach(this::appendModification);
+
+            final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
+            if (maybeProto.isPresent()) {
+                seal();
+
+                switch (maybeProto.get()) {
+                    case ABORT:
+                        sendAbort(callback);
+                        break;
+                    case SIMPLE:
+                        sendRequest(commitRequest(false), callback);
+                        break;
+                    case THREE_PHASE:
+                        sendRequest(commitRequest(true), callback);
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
+                }
+            }
+        } else {
+            throw new IllegalArgumentException("Unhandled request {}" + request);
+        }
+    }
+
+    @Override
+    void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
+            final Consumer<Response<?, ?>> callback) throws RequestException {
+        successor.handleForwardedRemoteRequest(request, callback);
+    }
 }
index 6fa3cdf..c04c9c5 100644 (file)
@@ -7,7 +7,7 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
-import java.util.Optional;
+import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.slf4j.Logger;
@@ -21,7 +21,7 @@ import org.slf4j.LoggerFactory;
 final class SingleClientHistory extends AbstractClientHistory {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
 
-    SingleClientHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
+    SingleClientHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
         super(client, identifier);
     }
 
@@ -34,8 +34,8 @@ final class SingleClientHistory extends AbstractClientHistory {
     }
 
     @Override
-    AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
-            final Optional<ShardBackendInfo> backendInfo) {
-        return AbstractProxyHistory.createSingle(getClient(), backendInfo, historyId);
+    ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
+            final AbstractClientConnection<ShardBackendInfo> connection) {
+        return ProxyHistory.createSingle(connection, historyId);
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleLocalProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleLocalProxyHistory.java
deleted file mode 100644 (file)
index f32fd59..0000000
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.databroker.actors.dds;
-
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-
-final class SingleLocalProxyHistory extends AbstractLocalProxyHistory {
-    SingleLocalProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier,
-        final DataTree dataTree) {
-        super(client, identifier, dataTree);
-    }
-
-    @Override
-    AbstractProxyTransaction doCreateTransactionProxy(final DistributedDataStoreClientBehavior client,
-            final TransactionIdentifier txId) {
-        return new LocalProxyTransaction(client, txId, takeSnapshot());
-    }
-}
\ No newline at end of file