From 320a4e5cd2d9d80468a3f82798744f2035488218 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 30 Aug 2016 17:57:41 +0200 Subject: [PATCH] BUG-5280: add AbstractClientConnection Introduce a connection concept. This is a replacement for the request queue, as it turns out we do need the concept of a full connection (e.g. generational logic). This comes from the need to sensibly switch behaviors as the locality of the backend leader changes. This patch implements two sets of strategies for dealing with reconnect: The first one assumes long-lived state and is used for proxies dealing with histories. Here we make sure to reinstantiate and replace them in a map, as we want new transactions to follow the new semantic and we do not want to tear histories down or follow inefficient paths. The second one assumes short-lived state and is used for proxies dealing with individual transactions. Transactions are assumed to come and go rapidly and therefore we do not replace the proxies in maps (as they will be short-lived), but rather forwards operations to successors. The first strategy has a higher access cost, but its state is always fully uptodate when reconnect finishes, while the second strategy favor access time, but operations end up "trailing" and will be forwarded (and hence inefficient) until the transaction completes. Change-Id: I7fd9e21c749f55b91229bf0b671c8dcf2e4d5982 Signed-off-by: Robert Varga --- .../access/client/AbstractClientActor.java | 2 +- .../client/AbstractClientActorContext.java | 1 + .../client/AbstractClientConnection.java | 259 +++++++++++ .../AbstractReceivingClientConnection.java | 213 +++++++++ .../cluster/access/client/BackendInfo.java | 2 +- .../access/client/BackendInfoResolver.java | 74 +--- .../access/client/ClientActorBehavior.java | 243 ++++++----- .../access/client/ClientActorContext.java | 51 +-- .../client/ConnectedClientConnection.java | 75 ++++ .../client/ConnectingClientConnection.java | 39 ++ .../access/client/ConnectionEntry.java | 54 +++ .../cluster/access/client/EmptyQueue.java | 3 +- .../client/InitialClientActorContext.java | 4 +- .../access/client/InternalCommand.java | 4 +- .../cluster/access/client/InversibleLock.java | 74 ++++ .../client/InversibleLockException.java | 40 ++ .../access/client/NoProgressException.java | 2 +- .../access/client/ReconnectForwarder.java | 41 ++ .../client/ReconnectingClientConnection.java | 38 ++ .../access/client/RequestCallback.java | 4 +- .../cluster/access/client/SequencedQueue.java | 403 ------------------ .../access/client/SequencedQueueEntry.java | 92 ---- .../client/SimpleReconnectForwarder.java | 20 + .../client/TransmittedConnectionEntry.java | 40 ++ ...va => ConnectingClientConnectionTest.java} | 246 ++++------- ...ntryTest.java => ConnectionEntryTest.java} | 86 +--- .../AbstractShardedTransaction.java | 2 +- .../ShardedDOMStoreReadTransaction.java | 4 +- .../ShardedDOMStoreReadWriteTransaction.java | 4 +- .../ShardedDOMStoreTransactionChain.java | 8 +- .../ShardedDOMStoreWriteTransaction.java | 4 +- .../actors/dds/AbstractClientHistory.java | 94 +++- .../dds/AbstractDataStoreClientBehavior.java | 210 +++++++++ .../actors/dds/AbstractLocalProxyHistory.java | 27 -- .../actors/dds/AbstractProxyHistory.java | 62 --- .../actors/dds/AbstractProxyTransaction.java | 98 ++++- .../dds/AbstractTransactionCommitCohort.java | 2 +- .../dds/BouncingReconnectForwarder.java | 82 ++++ .../actors/dds/ClientLocalHistory.java | 29 +- .../actors/dds/ClientLocalProxyHistory.java | 27 -- .../actors/dds/ClientTransaction.java | 21 +- .../DistributedDataStoreClientBehavior.java | 162 +------ .../dds/FailedDataTreeModification.java | 11 +- .../actors/dds/HistoryReconnectCohort.java | 23 + .../actors/dds/LocalProxyTransaction.java | 164 ++++++- .../dds/ModuleShardBackendResolver.java | 88 ++-- .../databroker/actors/dds/ProxyHistory.java | 351 +++++++++++++++ .../actors/dds/ProxyReconnectCohort.java | 26 ++ .../actors/dds/RemoteProxyHistory.java | 23 - .../actors/dds/RemoteProxyTransaction.java | 109 ++++- .../actors/dds/SingleClientHistory.java | 10 +- .../actors/dds/SingleLocalProxyHistory.java | 25 -- 52 files changed, 2390 insertions(+), 1386 deletions(-) create mode 100644 opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java create mode 100644 opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java create mode 100644 opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java create mode 100644 opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java create mode 100644 opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectionEntry.java create mode 100644 opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InversibleLock.java create mode 100644 opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InversibleLockException.java create mode 100644 opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectForwarder.java create mode 100644 opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnection.java delete mode 100644 opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java delete mode 100644 opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntry.java create mode 100644 opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java create mode 100644 opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmittedConnectionEntry.java rename opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/{SequencedQueueTest.java => ConnectingClientConnectionTest.java} (57%) rename opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/{SequencedQueueEntryTest.java => ConnectionEntryTest.java} (55%) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractLocalProxyHistory.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyHistory.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalProxyHistory.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/HistoryReconnectCohort.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyHistory.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleLocalProxyHistory.java diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientActor.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientActor.java index 36f06ef4dc..fdba64f064 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientActor.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientActor.java @@ -67,5 +67,5 @@ public abstract class AbstractClientActor extends UntypedPersistentActor { switchBehavior(currentBehavior.onReceiveRecover(recover)); } - protected abstract ClientActorBehavior initialBehavior(ClientActorContext context); + protected abstract ClientActorBehavior initialBehavior(ClientActorContext context); } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientActorContext.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientActorContext.java index 7899358a17..9f0bcbc25f 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientActorContext.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientActorContext.java @@ -27,6 +27,7 @@ abstract class AbstractClientActorContext implements Mutable { this.self = Preconditions.checkNotNull(self); } + // TODO: rename this to logContext() @Nonnull final String persistenceId() { return persistenceId; diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java new file mode 100644 index 0000000000..170b1507a9 --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java @@ -0,0 +1,259 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.client; + +import akka.actor.ActorRef; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayDeque; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +/** + * Base class for a connection to the backend. Responsible to queueing and dispatch of requests toward the backend. + * Can be in three conceptual states: Connecting, Connected and Reconnecting, which are represented by public final + * classes exposed from this package. + * + * @author Robert Varga + */ +@NotThreadSafe +public abstract class AbstractClientConnection { + private static final Logger LOG = LoggerFactory.getLogger(AbstractClientConnection.class); + + // Keep these constants in nanoseconds, as that prevents unnecessary conversions in the fast path + @VisibleForTesting + static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15); + @VisibleForTesting + static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30); + + private final Queue pending; + private final ClientActorContext context; + private final Long cookie; + + private volatile ReconnectForwarder successor; + private volatile RequestException poisoned; + private long lastProgress; + + private AbstractClientConnection(final ClientActorContext context, final Long cookie, + final Queue pending) { + this.context = Preconditions.checkNotNull(context); + this.cookie = Preconditions.checkNotNull(cookie); + this.pending = Preconditions.checkNotNull(pending); + this.lastProgress = readTime(); + } + + // Do not allow subclassing outside of this package + AbstractClientConnection(final ClientActorContext context, final Long cookie) { + this(context, cookie, new ArrayDeque<>(1)); + } + + // Do not allow subclassing outside of this package + AbstractClientConnection(final AbstractClientConnection oldConnection) { + this(oldConnection.context, oldConnection.cookie, oldConnection.pending); + } + + public final ClientActorContext context() { + return context; + } + + public final @Nonnull Long cookie() { + return cookie; + } + + public final ActorRef localActor() { + return context.self(); + } + + final long readTime() { + return context.ticker().read(); + } + + final Queue pending() { + return pending; + } + + /** + * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke + * from any thread. + * + * @param request Request to send + * @param callback Callback to invoke + */ + public final void sendRequest(final Request request, final Consumer> callback) { + Preconditions.checkState(poisoned == null, "Connection %s has been poisoned", this); + + final ReconnectForwarder beforeQueue = successor; + final ConnectionEntry entry = new ConnectionEntry(request, callback, readTime()); + if (beforeQueue != null) { + LOG.trace("Forwarding entry {} from {} to {}", entry, this, beforeQueue); + beforeQueue.forwardEntry(entry); + return; + } + + enqueueEntry(entry); + + final ReconnectForwarder afterQueue = successor; + if (afterQueue != null) { + synchronized (this) { + spliceToSuccessor(afterQueue); + } + } + } + + public final synchronized void setForwarder(final ReconnectForwarder forwarder) { + Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this); + successor = Preconditions.checkNotNull(forwarder); + LOG.debug("Connection {} superseded by {}, splicing queue", this, successor); + spliceToSuccessor(forwarder); + } + + public abstract Optional getBackendInfo(); + + @GuardedBy("this") + void spliceToSuccessor(final ReconnectForwarder successor) { + ConnectionEntry entry = pending.poll(); + while (entry != null) { + successor.forwardEntry(entry); + entry = pending.poll(); + } + } + + final ConnectionEntry dequeEntry() { + lastProgress = readTime(); + return pending.poll(); + } + + void enqueueEntry(final ConnectionEntry entry) { + pending.add(entry); + } + + /** + * Schedule a timer to fire on the actor thread after a delay. + * + * @param delay Delay, in nanoseconds + */ + private void scheduleTimer(final FiniteDuration delay) { + LOG.debug("{}: scheduling timeout in {}", context.persistenceId(), delay); + context.executeInActor(this::runTimer, delay); + } + + /** + * Check queue timeouts and return true if a timeout has occurred. + * + * @return True if a timeout occurred + * @throws NoProgressException if the queue failed to make progress for an extended + * time. + */ + @VisibleForTesting + final ClientActorBehavior runTimer(final ClientActorBehavior current) { + final long now = readTime(); + + if (!isEmpty()) { + final long ticksSinceProgress = now - lastProgress; + if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) { + LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this, + TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress)); + + poison(new NoProgressException(ticksSinceProgress)); + current.removeConnection(this); + return current; + } + } + + // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward. + // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual tri-state + // return convention. + final Optional delay = checkTimeout(now); + if (delay == null) { + // We have timed out. There is no point in scheduling a timer + return reconnectConnection(current); + } + + if (delay.isPresent()) { + // If there is new delay, schedule a timer + scheduleTimer(delay.get()); + } + + return current; + } + + boolean isEmpty() { + return pending.isEmpty(); + } + + /* + * We are using tri-state return here to indicate one of three conditions: + * - if there is no timeout to schedule, return Optional.empty() + * - if there is a timeout to schedule, return a non-empty optional + * - if this connections has timed out, return null + */ + @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL", + justification = "Returning null Optional is documented in the API contract.") + final Optional checkTimeout(final ConnectionEntry head, final long now) { + if (head == null) { + return Optional.empty(); + } + + final long delay = head.getEnqueuedTicks() - now + REQUEST_TIMEOUT_NANOS; + if (delay <= 0) { + LOG.debug("Connection {} timed out", this); + return null; + } + + return Optional.of(FiniteDuration.apply(delay, TimeUnit.NANOSECONDS)); + } + + /* + * We are using tri-state return here to indicate one of three conditions: + * - if there is no timeout to schedule, return Optional.empty() + * - if there is a timeout to schedule, return a non-empty optional + * - if this connections has timed out, return null + */ + @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL", + justification = "Returning null Optional is documented in the API contract.") + Optional checkTimeout(final long now) { + return checkTimeout(pending.peek(), now); + } + + static void poisonQueue(final Queue queue, final RequestException cause) { + for (ConnectionEntry e : queue) { + final Request request = e.getRequest(); + LOG.trace("Poisoning request {}", request, cause); + e.complete(request.toRequestFailure(cause)); + } + queue.clear(); + } + + void poison(final RequestException cause) { + poisoned = cause; + poisonQueue(pending, cause); + } + + @VisibleForTesting + final RequestException poisoned() { + return poisoned; + } + + abstract ClientActorBehavior reconnectConnection(ClientActorBehavior current); + + abstract void receiveResponse(final ResponseEnvelope envelope); +} diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java new file mode 100644 index 0000000000..180ac942a3 --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java @@ -0,0 +1,213 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.client; + +import akka.actor.ActorRef; +import com.google.common.base.Preconditions; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayDeque; +import java.util.Iterator; +import java.util.Optional; +import java.util.Queue; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +/** + * Implementation-internal intermediate subclass between {@link AbstractClientConnection} and two-out of three of its + * sublcasses. It allows us to share some code. + * + * @author Robert Varga + * + * @param Concrete {@link BackendInfo} type + */ +abstract class AbstractReceivingClientConnection extends AbstractClientConnection { + private static final Logger LOG = LoggerFactory.getLogger(AbstractReceivingClientConnection.class); + + private final Queue inflight = new ArrayDeque<>(); + private final T backend; + + private long lastProgress; + + AbstractReceivingClientConnection(final ClientActorContext context, final Long cookie, final T backend) { + super(context, cookie); + this.backend = Preconditions.checkNotNull(backend); + this.lastProgress = readTime(); + } + + AbstractReceivingClientConnection(final AbstractReceivingClientConnection oldConnection) { + super(oldConnection); + this.backend = oldConnection.backend; + this.lastProgress = oldConnection.lastProgress; + } + + @Override + public final Optional getBackendInfo() { + return Optional.of(backend); + } + + final ActorRef remoteActor() { + return backend.getActor(); + } + + final int remoteMaxMessages() { + return backend.getMaxMessages(); + } + + final ABIVersion remoteVersion() { + return backend.getVersion(); + } + + final long sessionId() { + return backend.getSessionId(); + } + + final int inflightSize() { + return inflight.size(); + } + + final void appendToInflight(final TransmittedConnectionEntry entry) { + // This should never fail + inflight.add(entry); + } + + @GuardedBy("this") + @Override + void spliceToSuccessor(final ReconnectForwarder successor) { + ConnectionEntry entry = inflight.poll(); + while (entry != null) { + successor.forwardEntry(entry); + entry = inflight.poll(); + } + + super.spliceToSuccessor(successor); + } + + @Override + void receiveResponse(final ResponseEnvelope envelope) { + Optional maybeEntry = findMatchingEntry(inflight, envelope); + if (maybeEntry == null) { + LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope); + maybeEntry = findMatchingEntry(pending(), envelope); + } + + if (maybeEntry == null || !maybeEntry.isPresent()) { + LOG.warn("No request matching {} found, ignoring response", envelope); + return; + } + + lastProgress = readTime(); + maybeEntry.get().complete(envelope.getMessage()); + + // We have freed up a slot, try to transmit something + final int toSend = remoteMaxMessages() - inflight.size(); + if (toSend > 0) { + sendMessages(toSend); + } + } + + @Override + boolean isEmpty() { + return inflight.isEmpty() && super.isEmpty(); + } + + @Override + void poison(final RequestException cause) { + super.poison(cause); + poisonQueue(inflight, cause); + } + + /** + * Transmit a given number of messages. + * + * @param count Number of messages to transmit, guaranteed to be positive. + */ + abstract void sendMessages(int count); + + /* + * We are using tri-state return here to indicate one of three conditions: + * - if a matching entry is found, return an Optional containing it + * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null + * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional + */ + @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL", + justification = "Returning null Optional is documented in the API contract.") + private static Optional findMatchingEntry(final Queue queue, + final ResponseEnvelope envelope) { + // Try to find the request in a queue. Responses may legally come back in a different order, hence we need + // to use an iterator + final Iterator it = queue.iterator(); + while (it.hasNext()) { + final ConnectionEntry e = it.next(); + final Request request = e.getRequest(); + final Response response = envelope.getMessage(); + + // First check for matching target, or move to next entry + if (!request.getTarget().equals(response.getTarget())) { + continue; + } + + // Sanity-check logical sequence, ignore any out-of-order messages + if (request.getSequence() != response.getSequence()) { + LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope); + return Optional.empty(); + } + + // Check if the entry has (ever) been transmitted + if (!(e instanceof TransmittedConnectionEntry)) { + return Optional.empty(); + } + + final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e; + + // Now check session match + if (envelope.getSessionId() != te.getSessionId()) { + LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope); + return Optional.empty(); + } + if (envelope.getTxSequence() != te.getTxSequence()) { + LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope); + return Optional.empty(); + } + + LOG.debug("Completing request {} with {}", request, envelope); + it.remove(); + return Optional.of(te); + } + + return null; + } + + @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL", + justification = "Returning null Optional is documented in the API contract.") + @Override + final Optional checkTimeout(final long now) { + final Optional xmit = checkTimeout(inflight.peek(), now); + if (xmit == null) { + return null; + } + final Optional pend = super.checkTimeout(now); + if (pend == null) { + return null; + } + if (!xmit.isPresent()) { + return pend; + } + if (!pend.isPresent()) { + return xmit; + } + + return Optional.of(xmit.get().min(pend.get())); + } +} diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfo.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfo.java index a5a579dc03..85d9f14e17 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfo.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfo.java @@ -20,7 +20,7 @@ import org.opendaylight.controller.cluster.access.ABIVersion; *

* This class is not final so concrete actor behavior implementations may subclass it and track more information about * the backend. The {@link #hashCode()} and {@link #equals(Object)} methods are made final to ensure subclasses compare - * on identity. + * on object identity. * * @author Robert Varga */ diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfoResolver.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfoResolver.java index 71805735f0..e4aa2b1e75 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfoResolver.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfoResolver.java @@ -8,18 +8,8 @@ package org.opendaylight.controller.cluster.access.client; import akka.actor.ActorRef; -import com.google.common.base.Preconditions; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import javax.annotation.Nonnull; -import javax.annotation.concurrent.ThreadSafe; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Caching resolver which resolves a cookie to a leader {@link ActorRef}. This class needs to be specialized by the @@ -29,70 +19,24 @@ import org.slf4j.LoggerFactory; * * @author Robert Varga */ -@ThreadSafe public abstract class BackendInfoResolver { - private static final Logger LOG = LoggerFactory.getLogger(BackendInfoResolver.class); - private final ConcurrentMap> backends = new ConcurrentHashMap<>(); - - /** - * Return the currently-resolved backend information, if available. This method is guaranteed not to block, but will - * initiate resolution of the information if there is none. - * - * @param cookie Backend cookie - * @return Backend information, if available - */ - public final Optional getFutureBackendInfo(final Long cookie) { - final Future f = lookupBackend(cookie); - if (f.isDone()) { - try { - return Optional.of(f.get()); - } catch (InterruptedException | ExecutionException e) { - LOG.debug("Resolution of {} failed", f, e); - } - } - - return Optional.empty(); - } - /** - * Invalidate a particular instance of {@link BackendInfo}, typically as a response to a request timing out. If - * the provided information is not the one currently cached this method does nothing. + * Request resolution of a particular backend identified by a cookie. This request can be satisfied from the cache. * * @param cookie Backend cookie - * @param info Previous information to be invalidated + * @return A {@link CompletionStage} resulting in information about the backend */ - public final void invalidateBackend(final long cookie, @Nonnull final CompletionStage info) { - if (backends.remove(cookie, Preconditions.checkNotNull(info))) { - LOG.trace("Invalidated cache %s -> %s", Long.toUnsignedString(cookie), info); - invalidateBackendInfo(info); - } - } + @Nonnull + public abstract CompletionStage getBackendInfo(@Nonnull Long cookie); /** - * Request new resolution of a particular backend identified by a cookie. This method is invoked when a client - * requests information which is not currently cached. + * Request re-resolution of a particular backend identified by a cookie, indicating a particular information as + * being stale. If the implementation's cache holds the stale information, it should be purged. * * @param cookie Backend cookie - * @return A {@link CompletableFuture} resulting in information about the backend + * @param staleInfo Stale backend information + * @return A {@link CompletionStage} resulting in information about the backend */ @Nonnull - protected abstract CompletableFuture resolveBackendInfo(@Nonnull final Long cookie); - - /** - * Invalidate previously-resolved shard information. This method is invoked when a timeout is detected - * and the information may need to be refreshed. - * - * @param info Previous promise of backend information - */ - protected abstract void invalidateBackendInfo(@Nonnull CompletionStage info); - - // This is what the client needs to start processing. For as long as we do not have this, we should not complete - // this stage until we have this information - final CompletionStage getBackendInfo(final Long cookie) { - return lookupBackend(cookie); - } - - private CompletableFuture lookupBackend(final Long cookie) { - return backends.computeIfAbsent(Preconditions.checkNotNull(cookie), this::resolveBackendInfo); - } + public abstract CompletionStage refreshBackendInfo(@Nonnull Long cookie, @Nonnull T staleInfo); } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java index 43b621c05c..e4b73b14f0 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java @@ -8,21 +8,24 @@ package org.opendaylight.controller.cluster.access.client; import com.google.common.annotations.Beta; -import java.util.Optional; -import java.util.concurrent.CompletionStage; +import com.google.common.base.Preconditions; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import org.opendaylight.controller.cluster.access.commands.TransactionRequest; +import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; +import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException; import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; +import org.opendaylight.yangtools.concepts.WritableIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.duration.FiniteDuration; /** * A behavior, which handles messages sent to a {@link AbstractClientActor}. @@ -30,12 +33,31 @@ import scala.concurrent.duration.FiniteDuration; * @author Robert Varga */ @Beta -public abstract class ClientActorBehavior extends RecoveredClientActorBehavior - implements Identifiable { +public abstract class ClientActorBehavior extends + RecoveredClientActorBehavior implements Identifiable { private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class); - protected ClientActorBehavior(@Nonnull final ClientActorContext context) { + /** + * Map of connections to the backend. This map is concurrent to allow lookups, but given complex operations + * involved in connection transitions it is protected by a {@link InversibleLock}. Write-side of the lock is taken + * during connection transitions. Optimistic read-side of the lock is taken when new connections are introduced + * into the map. + * + *

+ * The lock detects potential AB/BA deadlock scenarios and will force the reader side out by throwing + * a {@link InversibleLockException} -- which must be propagated up, releasing locks as it propagates. The initial + * entry point causing the the conflicting lookup must then call {@link InversibleLockException#awaitResolution()} + * before retrying the operation. + */ + // TODO: it should be possible to move these two into ClientActorContext + private final Map> connections = new ConcurrentHashMap<>(); + private final InversibleLock connectionsLock = new InversibleLock(); + private final BackendInfoResolver resolver; + + protected ClientActorBehavior(@Nonnull final ClientActorContext context, + @Nonnull final BackendInfoResolver resolver) { super(context); + this.resolver = Preconditions.checkNotNull(resolver); } @Override @@ -44,118 +66,89 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior getConnection(final Long shard) { + while (true) { + final long stamp = connectionsLock.optimisticRead(); + final AbstractClientConnection conn = connections.computeIfAbsent(shard, this::createConnection); + if (connectionsLock.validate(stamp)) { + // No write-lock in-between, return success + return conn; + } + } + } + + @SuppressWarnings("unchecked") @Override - final ClientActorBehavior onReceiveCommand(final Object command) { + final ClientActorBehavior onReceiveCommand(final Object command) { if (command instanceof InternalCommand) { - return ((InternalCommand) command).execute(this); + return ((InternalCommand) command).execute(this); } if (command instanceof SuccessEnvelope) { return onRequestSuccess((SuccessEnvelope) command); } if (command instanceof FailureEnvelope) { - return onRequestFailure((FailureEnvelope) command); + return internalOnRequestFailure((FailureEnvelope) command); } return onCommand(command); } - private ClientActorBehavior onRequestSuccess(final SuccessEnvelope command) { - return context().completeRequest(this, command); - } + private void onResponse(final ResponseEnvelope response) { + final WritableIdentifier id = response.getMessage().getTarget(); - private ClientActorBehavior onRequestFailure(final FailureEnvelope command) { - final RequestFailure failure = command.getMessage(); - final RequestException cause = failure.getCause(); - if (cause instanceof RetiredGenerationException) { - LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause); - haltClient(cause); - context().poison(cause); - return null; - } + // FIXME: this will need to be updated for other Request/Response types to extract cookie + Preconditions.checkArgument(id instanceof TransactionIdentifier); + final TransactionIdentifier txId = (TransactionIdentifier) id; - if (failure.isHardFailure()) { - return context().completeRequest(this, command); + final AbstractClientConnection connection = connections.get(txId.getHistoryId().getCookie()); + if (connection != null) { + connection.receiveResponse(response); + } else { + LOG.info("{}: Ignoring unknown response {}", persistenceId(), response); } - - // TODO: add instanceof checks on cause to detect more problems - - LOG.warn("{}: Unhandled retriable failure {}, promoting to hard failure", persistenceId(), command); - return context().completeRequest(this, command); } - // This method is executing in the actor context, hence we can safely interact with the queue - private ClientActorBehavior doSendRequest(final TransactionRequest request, final RequestCallback callback) { - // Get or allocate queue for the request - final SequencedQueue queue = context().queueFor(request.getTarget().getHistoryId().getCookie()); - - // Note this is a tri-state return and can be null - final Optional result = queue.enqueueRequest(request, callback); - if (result == null) { - // Happy path: we are done here - return this; - } - - if (result.isPresent()) { - // Less happy path: we need to schedule a timer - scheduleQueueTimeout(queue, result.get()); - return this; - } - - startResolve(queue, request.getTarget().getHistoryId().getCookie()); + private ClientActorBehavior onRequestSuccess(final SuccessEnvelope success) { + onResponse(success); return this; } - // This method is executing in the actor context, hence we can safely interact with the queue - private void startResolve(final SequencedQueue queue, final long cookie) { - // Queue does not have backend information. Initiate resolution, which may actually be piggy-backing on to a - // previous request to resolve. - final CompletionStage f = resolver().getBackendInfo(cookie); - - // This is the tricky part: depending on timing, the queue may have a stale request for resolution, which has - // been invalidated or it may already have a reference to this resolution request. Let us give it a chance to - // update and it will indicate if this resolution request is an update. If it is, we'll piggy-back on it and - // run backend information update in the actor thread. If it is not, we do not need to do anything, as we will - // bulk-process all requests. - if (queue.expectProof(f)) { - f.thenAccept(backend -> context().executeInActor(cb -> cb.finishResolve(queue, f, backend))); - } + private ClientActorBehavior onRequestFailure(final FailureEnvelope failure) { + onResponse(failure); + return this; } - // This method is executing in the actor context, hence we can safely interact with the queue - private ClientActorBehavior finishResolve(final SequencedQueue queue, - final CompletionStage futureBackend, final BackendInfo backend) { - - final Optional maybeTimeout = queue.setBackendInfo(futureBackend, backend); - if (maybeTimeout.isPresent()) { - scheduleQueueTimeout(queue, maybeTimeout.get()); + private ClientActorBehavior internalOnRequestFailure(final FailureEnvelope command) { + final RequestFailure failure = command.getMessage(); + final RequestException cause = failure.getCause(); + if (cause instanceof RetiredGenerationException) { + LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause); + haltClient(cause); + poison(cause); + return null; } - return this; - } - // This method is executing in the actor context, hence we can safely interact with the queue - private void scheduleQueueTimeout(final SequencedQueue queue, final FiniteDuration timeout) { - LOG.debug("{}: scheduling timeout in {}", persistenceId(), timeout); - context().executeInActor(cb -> cb.queueTimeout(queue), timeout); + return onRequestFailure(command); } - // This method is executing in the actor context, hence we can safely interact with the queue - private ClientActorBehavior queueTimeout(final SequencedQueue queue) { - final boolean needBackend; - + private void poison(final RequestException cause) { + final long stamp = connectionsLock.writeLock(); try { - needBackend = queue.runTimeout(); - } catch (NoProgressException e) { - // Uh-oh, no progress. The queue has already killed itself, now we need to remove it - LOG.debug("{}: No progress made - removing queue", persistenceId(), e); - context().removeQueue(queue); - return this; - } + for (AbstractClientConnection q : connections.values()) { + q.poison(cause); + } - if (needBackend) { - startResolve(queue, queue.getCookie()); + connections.clear(); + } finally { + connectionsLock.unlockWrite(stamp); } - - return this; } /** @@ -174,24 +167,76 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior onCommand(@Nonnull Object command); /** * Override this method to provide a backend resolver instance. * * @return a backend resolver instance */ - @Nonnull - protected abstract BackendInfoResolver resolver(); + protected final @Nonnull BackendInfoResolver resolver() { + return resolver; + } /** - * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke - * from any thread. + * Callback invoked when a new connection has been established. * - * @param request Request to send - * @param callback Callback to invoke + * @param conn Old connection + * @param backend New backend + * @return Newly-connected connection. */ - public final void sendRequest(final TransactionRequest request, final RequestCallback callback) { - context().executeInActor(cb -> cb.doSendRequest(request, callback)); + @GuardedBy("connectionsLock") + protected abstract @Nonnull ConnectedClientConnection connectionUp( + final @Nonnull AbstractClientConnection conn, final @Nonnull T backend); + + private void backendConnectFinished(final Long shard, final AbstractClientConnection conn, + final T backend, final Throwable failure) { + if (failure != null) { + LOG.error("{}: failed to resolve shard {}", persistenceId(), shard, failure); + return; + } + + final long stamp = connectionsLock.writeLock(); + try { + // Bring the connection up + final ConnectedClientConnection newConn = connectionUp(conn, backend); + + // Make sure new lookups pick up the new connection + connections.replace(shard, conn, newConn); + LOG.debug("{}: replaced connection {} with {}", persistenceId(), conn, newConn); + } finally { + connectionsLock.unlockWrite(stamp); + } + } + + void removeConnection(final AbstractClientConnection conn) { + connections.remove(conn.cookie(), conn); + LOG.debug("{}: removed connection {}", persistenceId(), conn); + } + + @SuppressWarnings("unchecked") + void reconnectConnection(final ConnectedClientConnection oldConn, + final ReconnectingClientConnection newConn) { + final ReconnectingClientConnection conn = (ReconnectingClientConnection)newConn; + connections.replace(oldConn.cookie(), (AbstractClientConnection)oldConn, conn); + LOG.debug("{}: connection {} reconnecting as {}", persistenceId(), oldConn, newConn); + + final Long shard = oldConn.cookie(); + resolver().refreshBackendInfo(shard, conn.getBackendInfo().get()).whenComplete( + (backend, failure) -> context().executeInActor(behavior -> { + backendConnectFinished(shard, conn, backend, failure); + return behavior; + })); + } + + private ConnectingClientConnection createConnection(final Long shard) { + final ConnectingClientConnection conn = new ConnectingClientConnection<>(context(), shard); + + resolver().getBackendInfo(shard).whenComplete((backend, failure) -> context().executeInActor(behavior -> { + backendConnectFinished(shard, conn, backend, failure); + return behavior; + })); + + return conn; } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorContext.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorContext.java index 26e68356d3..cb36223c26 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorContext.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorContext.java @@ -13,18 +13,10 @@ import akka.actor.Scheduler; import com.google.common.annotations.Beta; import com.google.common.base.Preconditions; import com.google.common.base.Ticker; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nonnull; import javax.annotation.concurrent.ThreadSafe; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; -import org.opendaylight.controller.cluster.access.concepts.RequestException; -import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; -import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; -import org.opendaylight.yangtools.concepts.WritableIdentifier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.concurrent.ExecutionContext; import scala.concurrent.duration.FiniteDuration; @@ -41,11 +33,8 @@ import scala.concurrent.duration.FiniteDuration; @Beta @ThreadSafe public class ClientActorContext extends AbstractClientActorContext implements Identifiable { - private static final Logger LOG = LoggerFactory.getLogger(ClientActorContext.class); - - private final Map queues = new ConcurrentHashMap<>(); - private final ClientIdentifier identifier; private final ExecutionContext executionContext; + private final ClientIdentifier identifier; private final Scheduler scheduler; // Hidden to avoid subclassing @@ -79,45 +68,15 @@ public class ClientActorContext extends AbstractClientActorContext implements Id * Execute a command in the context of the client actor. * * @param command Block of code which needs to be execute + * @param BackendInfo type */ - public void executeInActor(@Nonnull final InternalCommand command) { + public void executeInActor(@Nonnull final InternalCommand command) { self().tell(Preconditions.checkNotNull(command), ActorRef.noSender()); } - public Cancellable executeInActor(@Nonnull final InternalCommand command, final FiniteDuration delay) { + public Cancellable executeInActor(@Nonnull final InternalCommand command, + final FiniteDuration delay) { return scheduler.scheduleOnce(Preconditions.checkNotNull(delay), self(), Preconditions.checkNotNull(command), executionContext, ActorRef.noSender()); } - - SequencedQueue queueFor(final Long cookie) { - return queues.computeIfAbsent(cookie, t -> new SequencedQueue(t, ticker())); - } - - void removeQueue(final SequencedQueue queue) { - queues.remove(queue.getCookie(), queue); - } - - ClientActorBehavior completeRequest(final ClientActorBehavior current, final ResponseEnvelope response) { - final WritableIdentifier id = response.getMessage().getTarget(); - - // FIXME: this will need to be updated for other Request/Response types to extract cookie - Preconditions.checkArgument(id instanceof TransactionIdentifier); - final TransactionIdentifier txId = (TransactionIdentifier) id; - - final SequencedQueue queue = queues.get(txId.getHistoryId().getCookie()); - if (queue == null) { - LOG.info("{}: Ignoring unknown response {}", persistenceId(), response); - return current; - } else { - return queue.complete(current, response); - } - } - - void poison(final RequestException cause) { - for (SequencedQueue q : queues.values()) { - q.poison(cause); - } - - queues.clear(); - } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java new file mode 100644 index 0000000000..6c1507c50d --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.client; + +import akka.actor.ActorRef; +import com.google.common.annotations.Beta; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Beta +@NotThreadSafe +public final class ConnectedClientConnection extends AbstractReceivingClientConnection { + private static final Logger LOG = LoggerFactory.getLogger(ConnectedClientConnection.class); + + private long nextTxSequence; + + public ConnectedClientConnection(final ClientActorContext context, final Long cookie, final T backend) { + super(context, cookie, backend); + } + + private TransmittedConnectionEntry transmit(final ConnectionEntry entry) { + final long txSequence = nextTxSequence++; + + final RequestEnvelope toSend = new RequestEnvelope(entry.getRequest().toVersion(remoteVersion()), sessionId(), + txSequence); + + final ActorRef actor = remoteActor(); + LOG.trace("Transmitting request {} as {} to {}", entry.getRequest(), toSend, actor); + actor.tell(toSend, ActorRef.noSender()); + + return new TransmittedConnectionEntry(entry, sessionId(), txSequence, readTime()); + } + + @Override + void enqueueEntry(final ConnectionEntry entry) { + if (inflightSize() < remoteMaxMessages()) { + appendToInflight(transmit(entry)); + LOG.debug("Enqueued request {} to queue {}", entry.getRequest(), this); + } else { + LOG.debug("Queue is at capacity, delayed sending of request {}", entry.getRequest()); + super.enqueueEntry(entry); + } + } + + @Override + void sendMessages(final int count) { + int toSend = count; + + while (toSend > 0) { + final ConnectionEntry e = dequeEntry(); + if (e == null) { + break; + } + + LOG.debug("Transmitting entry {}", e); + appendToInflight(transmit(e)); + toSend--; + } + } + + @Override + ClientActorBehavior reconnectConnection(final ClientActorBehavior current) { + final ReconnectingClientConnection next = new ReconnectingClientConnection<>(this); + setForwarder(new SimpleReconnectForwarder(next)); + current.reconnectConnection(this, next); + return current; + } +} diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java new file mode 100644 index 0000000000..cdadf1d601 --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.client; + +import com.google.common.annotations.Beta; +import java.util.Optional; +import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Beta +public final class ConnectingClientConnection extends AbstractClientConnection { + private static final Logger LOG = LoggerFactory.getLogger(ConnectingClientConnection.class); + + // Initial state, never instantiated externally + ConnectingClientConnection(final ClientActorContext context, final Long cookie) { + super(context, cookie); + } + + @Override + public Optional getBackendInfo() { + return Optional.empty(); + } + + @Override + void receiveResponse(final ResponseEnvelope envelope) { + LOG.warn("Initial connection {} ignoring response {}", this, envelope); + } + + @Override + ClientActorBehavior reconnectConnection(final ClientActorBehavior current) { + throw new UnsupportedOperationException("Attempted to reconnect a connecting connection"); + } +} diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectionEntry.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectionEntry.java new file mode 100644 index 0000000000..64586f05ee --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectionEntry.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.client; + +import com.google.common.annotations.Beta; +import com.google.common.base.Preconditions; +import java.util.function.Consumer; +import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.yangtools.concepts.Immutable; + +/** + * Single entry in a {@link AbstractClientConnection}. Tracks the request, the associated callback and time when + * the request was first enqueued. + * + * @author Robert Varga + */ +@Beta +public class ConnectionEntry implements Immutable { + private final Consumer> callback; + private final Request request; + private final long enqueuedTicks; + + ConnectionEntry(final Request request, final Consumer> callback, final long now) { + this.request = Preconditions.checkNotNull(request); + this.callback = Preconditions.checkNotNull(callback); + this.enqueuedTicks = now; + } + + ConnectionEntry(final ConnectionEntry entry) { + this(entry.request, entry.callback, entry.enqueuedTicks); + } + + public final Consumer> getCallback() { + return callback; + } + + public final Request getRequest() { + return request; + } + + public void complete(final Response response) { + callback.accept(response); + } + + final long getEnqueuedTicks() { + return enqueuedTicks; + } +} diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/EmptyQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/EmptyQueue.java index c4a09e736c..40a296bd42 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/EmptyQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/EmptyQueue.java @@ -12,6 +12,7 @@ import java.util.AbstractQueue; import java.util.Collections; import java.util.Iterator; import java.util.Queue; +import org.opendaylight.yangtools.concepts.Immutable; /** * A specialized always-empty implementation of {@link java.util.Queue}. This implementation will always refuse new @@ -23,7 +24,7 @@ import java.util.Queue; */ // TODO: move this class into yangtools.util @Beta -public final class EmptyQueue extends AbstractQueue { +public final class EmptyQueue extends AbstractQueue implements Immutable { private static final EmptyQueue INSTANCE = new EmptyQueue<>(); private EmptyQueue() { diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InitialClientActorContext.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InitialClientActorContext.java index 50183c18b7..c0790e7b27 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InitialClientActorContext.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InitialClientActorContext.java @@ -29,11 +29,11 @@ final class InitialClientActorContext extends AbstractClientActorContext { actor.saveSnapshot(snapshot); } - void deleteSnapshots(SnapshotSelectionCriteria criteria) { + void deleteSnapshots(final SnapshotSelectionCriteria criteria) { actor.deleteSnapshots(criteria); } - ClientActorBehavior createBehavior(final ClientIdentifier clientId) { + ClientActorBehavior createBehavior(final ClientIdentifier clientId) { final ActorSystem system = actor.getContext().system(); final ClientActorContext context = new ClientActorContext(self(), system.scheduler(), system.dispatcher(), persistenceId(), clientId); diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InternalCommand.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InternalCommand.java index 2b773ca12b..f34a475168 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InternalCommand.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InternalCommand.java @@ -17,12 +17,12 @@ import javax.annotation.Nullable; * @author Robert Varga */ @FunctionalInterface -public interface InternalCommand { +public interface InternalCommand { /** * Run command actions. * * @param currentBehavior Current Behavior * @return Next behavior to use in the client actor */ - @Nullable ClientActorBehavior execute(@Nonnull ClientActorBehavior currentBehavior); + @Nullable ClientActorBehavior execute(@Nonnull ClientActorBehavior currentBehavior); } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InversibleLock.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InversibleLock.java new file mode 100644 index 0000000000..ea4e5817dd --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InversibleLock.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.client; + +import com.google.common.annotations.Beta; +import com.google.common.base.Verify; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.locks.StampedLock; + +/** + * A lock implementation which allows users to perform optimistic reads and validate them in a fashion similar + * to {@link StampedLock}. In case a read is contented with a write, the read side will throw + * an {@link InversibleLockException}, which the caller can catch and use to wait for the write to resolve. + * + * @author Robert Varga + */ +@Beta +public final class InversibleLock { + private static final AtomicReferenceFieldUpdater LATCH_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(InversibleLock.class, CountDownLatch.class, "latch"); + + private final StampedLock lock = new StampedLock(); + private volatile CountDownLatch latch; + + /** + * Return a stamp for read validation. + * + * @return A stamp, which can be used with {@link #validate(long)}. + * @throws InversibleLockException if this lock is currently write-locked + */ + public long optimisticRead() { + while (true) { + final long stamp = lock.tryOptimisticRead(); + if (stamp != 0) { + return stamp; + } + + // Write-locked. Read the corresponding latch and if present report an exception, which will propagate + // and force release of locks. + final CountDownLatch local = latch; + if (local != null) { + throw new InversibleLockException(latch); + } + + // No latch present: retry optimistic lock + } + } + + public boolean validate(final long stamp) { + return lock.validate(stamp); + } + + public long writeLock() { + final CountDownLatch local = new CountDownLatch(1); + final boolean taken = LATCH_UPDATER.compareAndSet(this, null, local); + Verify.verify(taken); + + return lock.writeLock(); + } + + public void unlockWrite(final long stamp) { + final CountDownLatch local = LATCH_UPDATER.getAndSet(this, null); + Verify.verifyNotNull(local); + lock.unlockWrite(stamp); + local.countDown(); + } + +} diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InversibleLockException.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InversibleLockException.java new file mode 100644 index 0000000000..7ec18a265d --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/InversibleLockException.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.client; + +import com.google.common.annotations.Beta; +import com.google.common.base.Preconditions; +import java.util.concurrent.CountDownLatch; + +/** + * Exception thrown from {@link InversibleLock#optimisticRead()} and can be used to wait for the racing write + * to complete using {@link #awaitResolution()}. + * + * @author Robert Varga + */ +@Beta +public final class InversibleLockException extends RuntimeException { + private static final long serialVersionUID = 1L; + + private final transient CountDownLatch latch; + + InversibleLockException(final CountDownLatch latch) { + this.latch = Preconditions.checkNotNull(latch); + } + + public void awaitResolution() { + // latch can be null after deserialization + if (latch != null) { + try { + latch.await(); + } catch (InterruptedException e) { + throw new IllegalStateException("Interrupted while waiting for latch " + latch, e); + } + } + } +} diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/NoProgressException.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/NoProgressException.java index f046be40c7..cc34b30e8b 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/NoProgressException.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/NoProgressException.java @@ -12,7 +12,7 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException; /** * Internal {@link RequestException} used as poison cause when the client fails to make progress for a long time. - * See {@link SequencedQueue} for details. + * See {@link AbstractClientConnection} for details. * * @author Robert Varga */ diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectForwarder.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectForwarder.java new file mode 100644 index 0000000000..37dc2f1c4d --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectForwarder.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.client; + +import com.google.common.base.Preconditions; +import java.util.function.Consumer; +import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Forwarder class responsible for routing requests from the previous connection incarnation back to the originator, + * which can then convert them as appropriate. + * + * @author Robert Varga + */ +public abstract class ReconnectForwarder { + static final Logger LOG = LoggerFactory.getLogger(ReconnectForwarder.class); + // Visible for subclass method handle + private final AbstractReceivingClientConnection successor; + + protected ReconnectForwarder(final AbstractReceivingClientConnection successor) { + this.successor = Preconditions.checkNotNull(successor); + } + + protected final void sendToSuccessor(final Request request, final Consumer> callback) { + successor.sendRequest(request, callback); + } + + protected abstract void forwardEntry(ConnectionEntry entry); + + final AbstractReceivingClientConnection successor() { + return successor; + } +} diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnection.java new file mode 100644 index 0000000000..0209f95bce --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnection.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.client; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An {@link AbstractClientConnection} which is being reconnected after having timed out. + * + * @author Robert Varga + * + * @param {@link BackendInfo} type + */ +public final class ReconnectingClientConnection extends AbstractReceivingClientConnection { + private static final Logger LOG = LoggerFactory.getLogger(ReconnectingClientConnection.class); + + ReconnectingClientConnection(final ConnectedClientConnection oldConnection) { + super(oldConnection); + } + + @Override + void sendMessages(final int count) { + LOG.debug("Connection {} is reconnecting, not transmitting anything", this); + } + + @Override + ClientActorBehavior reconnectConnection(final ClientActorBehavior current) { + // Intentional no-op + LOG.debug("Skipping reconnect of already-reconnecting connection {}", this); + return current; + } +} diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/RequestCallback.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/RequestCallback.java index 4d81ce0cbf..90af8cb2e1 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/RequestCallback.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/RequestCallback.java @@ -12,12 +12,12 @@ import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.Response; @FunctionalInterface -public interface RequestCallback { +interface RequestCallback { /** * Invoked when a particular request completes. * * @param response Response to the request * @return Next client actor behavior */ - @Nullable ClientActorBehavior complete(@Nonnull Response response); + @Nullable ClientActorBehavior complete(@Nonnull Response response); } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java deleted file mode 100644 index 596e353c98..0000000000 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java +++ /dev/null @@ -1,403 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.access.client; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Ticker; -import com.google.common.base.Verify; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.ArrayDeque; -import java.util.Iterator; -import java.util.Optional; -import java.util.Queue; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; -import javax.annotation.concurrent.NotThreadSafe; -import org.opendaylight.controller.cluster.access.concepts.Request; -import org.opendaylight.controller.cluster.access.concepts.RequestException; -import org.opendaylight.controller.cluster.access.concepts.Response; -import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.duration.FiniteDuration; - -/* - * A queue that processes entries in sequence. - * - * TODO: make this class and its users thread-safe. This will require some atomic state-keeping so that timeouts, - * retries and enqueues work as expected. - */ -@NotThreadSafe -final class SequencedQueue { - private static final Logger LOG = LoggerFactory.getLogger(SequencedQueue.class); - - // Keep these constant in nanoseconds, as that prevents unnecessary conversions in the fast path - @VisibleForTesting - static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15); - @VisibleForTesting - static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30); - private static final FiniteDuration INITIAL_REQUEST_TIMEOUT = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS, - TimeUnit.NANOSECONDS); - - /** - * Default number of permits we start with. This value is used when we start up only, once we resolve a backend - * we will use its advertized {@link BackendInfo#getMaxMessages()} forever, refreshing the value on each successful - * resolution. - */ - private static final int DEFAULT_TX_LIMIT = 1000; - - private final Ticker ticker; - private final Long cookie; - - /* - * We need to keep the sequence of operations towards the backend and rate-limit what we send out, possibly dealing - * with the limit changing between reconnects (which imply retransmission). - * - * We keep three queues: one for requests that have been sent to the last known backend (until we have a new one), - * one for requests that have been sent to the previous backend (and have not been transmitted to the current one), - * and one for requests which have not been transmitted at all. - * - * When transmitting we first try to drain the second queue and service the third one only when that becomes empty. - * When receiving, we look at the first two -- as the response has to match a transmitted request. Since responses - * can get re-ordered, we may end up receiving responses to previously-sent requests before we have a chance - * to retransmit -- hence the second queue. - */ - private Queue currentInflight = new ArrayDeque<>(); - private Queue lastInflight = new ArrayDeque<>(); - private final Queue pending = new ArrayDeque<>(); - - /** - * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when - * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested - * result. - */ - private CompletionStage backendProof; - private BackendInfo backend; - - // This is not final because we need to be able to replace it. - private long txSequence; - - private int lastTxLimit = DEFAULT_TX_LIMIT; - - /** - * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue. - */ - private Object expectingTimer; - - private long lastProgress; - - // Updated from application thread - private volatile boolean notClosed = true; - - SequencedQueue(final Long cookie, final Ticker ticker) { - this.cookie = Preconditions.checkNotNull(cookie); - this.ticker = Preconditions.checkNotNull(ticker); - lastProgress = ticker.read(); - } - - Long getCookie() { - return cookie; - } - - private void checkNotClosed() { - Preconditions.checkState(notClosed, "Queue %s is closed", this); - } - - private long nextTxSequence() { - return txSequence++; - } - - /** - * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller - * the following scenarios: - * 1) The request has been enqueued and transmitted. No further actions are necessary - * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer - * 3) The request has been enqueued, but the caller needs to request resolution of backend information and that - * process needs to complete before transmission occurs - *

- * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode - * the scenarios above according to the following rules: - * - if is null, the first case applies - * - if {@link Optional#isPresent()} returns false, the third case applies and the caller should initiate backend - * resolution and eventually call {@link #setBackendInfo(CompletionStage, BackendInfo)} - * - if {@link Optional#isPresent()} returns true, the second case applies and the caller MUST schedule a timer - * - * @param request Request to be sent - * @param callback Callback to be invoked - * @return Optional duration with semantics described above. - */ - @Nullable Optional enqueueRequest(final Request request, final RequestCallback callback) { - checkNotClosed(); - - final long now = ticker.read(); - final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now); - if (backend == null) { - LOG.debug("No backend available, request resolution"); - pending.add(e); - return Optional.empty(); - } - if (!lastInflight.isEmpty()) { - LOG.debug("Retransmit not yet complete, delaying request {}", request); - pending.add(e); - return null; - } - if (currentInflight.size() >= lastTxLimit) { - LOG.debug("Queue is at capacity, delayed sending of request {}", request); - pending.add(e); - return null; - } - - // Ready to transmit - - if (currentInflight.offer(e)) { - LOG.debug("Enqueued request {} to queue {}", request, this); - } else { - // This shouldn't happen since the queue has unlimited capacity but check anyway to avoid FindBugs warning - // about checking return value. - LOG.warn("Fail to enqueued request {} to queue {}", request, this); - } - - e.retransmit(backend, nextTxSequence(), now); - if (expectingTimer == null) { - expectingTimer = now + REQUEST_TIMEOUT_NANOS; - return Optional.of(INITIAL_REQUEST_TIMEOUT); - } else { - return null; - } - } - - /* - * We are using tri-state return here to indicate one of three conditions: - * - if a matching entry is found, return an Optional containing it - * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null - * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional - */ - @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL", - justification = "Returning null Optional is documented in the API contract.") - private static Optional findMatchingEntry(final Queue queue, - final ResponseEnvelope envelope) { - // Try to find the request in a queue. Responses may legally come back in a different order, hence we need - // to use an iterator - final Iterator it = queue.iterator(); - while (it.hasNext()) { - final SequencedQueueEntry e = it.next(); - final TxDetails txDetails = Verify.verifyNotNull(e.getTxDetails()); - - final Request request = e.getRequest(); - final Response response = envelope.getMessage(); - - // First check for matching target, or move to next entry - if (!request.getTarget().equals(response.getTarget())) { - continue; - } - - // Sanity-check logical sequence, ignore any out-of-order messages - if (request.getSequence() != response.getSequence()) { - LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope); - return Optional.empty(); - } - - // Now check session match - if (envelope.getSessionId() != txDetails.getSessionId()) { - LOG.debug("Expecting session {}, ignoring response {}", txDetails.getSessionId(), envelope); - return Optional.empty(); - } - if (envelope.getTxSequence() != txDetails.getTxSequence()) { - LOG.warn("Expecting txSequence {}, ignoring response {}", txDetails.getTxSequence(), envelope); - return Optional.empty(); - } - - LOG.debug("Completing request {} with {}", request, envelope); - it.remove(); - return Optional.of(e); - } - - return null; - } - - ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope envelope) { - Optional maybeEntry = findMatchingEntry(currentInflight, envelope); - if (maybeEntry == null) { - maybeEntry = findMatchingEntry(lastInflight, envelope); - } - - if (maybeEntry == null || !maybeEntry.isPresent()) { - LOG.warn("No request matching {} found, ignoring response", envelope); - return current; - } - - lastProgress = ticker.read(); - final ClientActorBehavior ret = maybeEntry.get().complete(envelope.getMessage()); - - // We have freed up a slot, try to transmit something - if (backend != null) { - final int toSend = lastTxLimit - currentInflight.size(); - if (toSend > 0) { - runTransmit(toSend); - } - } - - return ret; - } - - private int transmitEntries(final Queue queue, final int count) { - int toSend = count; - - while (toSend > 0) { - final SequencedQueueEntry e = queue.poll(); - if (e == null) { - break; - } - - LOG.debug("Transmitting entry {}", e); - e.retransmit(backend, nextTxSequence(), lastProgress); - toSend--; - } - - return toSend; - } - - private void runTransmit(final int count) { - final int toSend; - - // Process lastInflight first, possibly clearing it - if (!lastInflight.isEmpty()) { - toSend = transmitEntries(lastInflight, count); - if (lastInflight.isEmpty()) { - // We won't be needing the queue anymore, change it to specialized implementation - lastInflight = EmptyQueue.getInstance(); - } - } else { - toSend = count; - } - - // Process pending next. - transmitEntries(pending, toSend); - } - - Optional setBackendInfo(final CompletionStage proof, - final BackendInfo backend) { - Preconditions.checkNotNull(backend); - if (!proof.equals(backendProof)) { - LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof); - return Optional.empty(); - } - - LOG.debug("Resolved backend {}", backend); - - // We are un-blocking transmission, but we need to juggle the queues first to get retransmit order right - // and also not to exceed new limits - final Queue newLast = new ArrayDeque<>(currentInflight.size() + lastInflight.size()); - newLast.addAll(currentInflight); - newLast.addAll(lastInflight); - lastInflight = newLast.isEmpty() ? EmptyQueue.getInstance() : newLast; - - // Clear currentInflight, possibly compacting it - final int txLimit = backend.getMaxMessages(); - if (lastTxLimit > txLimit) { - currentInflight = new ArrayDeque<>(); - } else { - currentInflight.clear(); - } - - // We are ready to roll - this.backend = backend; - backendProof = null; - txSequence = 0; - lastTxLimit = txLimit; - lastProgress = ticker.read(); - - // No pending requests, return - if (lastInflight.isEmpty() && pending.isEmpty()) { - return Optional.empty(); - } - - LOG.debug("Sending up to {} requests to backend {}", txLimit, backend); - - runTransmit(lastTxLimit); - - // Calculate next timer if necessary - if (expectingTimer == null) { - // Request transmission may have cost us some time. Recalculate timeout. - final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS; - expectingTimer = nextTicks; - return Optional.of(FiniteDuration.apply(nextTicks - lastProgress, TimeUnit.NANOSECONDS)); - } else { - return Optional.empty(); - } - } - - boolean expectProof(final CompletionStage proof) { - if (!proof.equals(backendProof)) { - LOG.debug("Setting resolution handle to {}", proof); - backendProof = proof; - return true; - } else { - LOG.trace("Already resolving handle {}", proof); - return false; - } - } - - boolean hasCompleted() { - return !notClosed && currentInflight.isEmpty() && lastInflight.isEmpty() && pending.isEmpty(); - } - - /** - * Check queue timeouts and return true if a timeout has occured. - * - * @return True if a timeout occured - * @throws NoProgressException if the queue failed to make progress for an extended - * time. - */ - boolean runTimeout() throws NoProgressException { - expectingTimer = null; - final long now = ticker.read(); - - if (!currentInflight.isEmpty() || !lastInflight.isEmpty() || !pending.isEmpty()) { - final long ticksSinceProgress = now - lastProgress; - if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) { - LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this, - TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress)); - - final NoProgressException ex = new NoProgressException(ticksSinceProgress); - poison(ex); - throw ex; - } - } - - // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue - final SequencedQueueEntry head = currentInflight.peek(); - if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) { - backend = null; - LOG.debug("Queue {} invalidated backend info", this); - return true; - } else { - return false; - } - } - - private static void poisonQueue(final Queue queue, final RequestException cause) { - queue.forEach(e -> e.poison(cause)); - queue.clear(); - } - - void poison(final RequestException cause) { - close(); - - poisonQueue(currentInflight, cause); - poisonQueue(lastInflight, cause); - poisonQueue(pending, cause); - } - - // FIXME: add a caller from ClientSingleTransaction - void close() { - notClosed = false; - } -} diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntry.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntry.java deleted file mode 100644 index 8814d50c54..0000000000 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntry.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.access.client; - -import akka.actor.ActorRef; -import com.google.common.base.MoreObjects; -import com.google.common.base.Preconditions; -import javax.annotation.Nullable; -import org.opendaylight.controller.cluster.access.concepts.Request; -import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; -import org.opendaylight.controller.cluster.access.concepts.RequestException; -import org.opendaylight.controller.cluster.access.concepts.Response; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Single entry in {@link SequencedQueue}. Tracks the request, the associated callback and accounting information. - * - * @author Robert Varga - */ -final class SequencedQueueEntry { - private static final Logger LOG = LoggerFactory.getLogger(SequencedQueueEntry.class); - - private final Request request; - private final RequestCallback callback; - private final long enqueuedTicks; - - private TxDetails txDetails; - - SequencedQueueEntry(final Request request, final RequestCallback callback, - final long now) { - this.request = Preconditions.checkNotNull(request); - this.callback = Preconditions.checkNotNull(callback); - this.enqueuedTicks = now; - } - - Request getRequest() { - return request; - } - - @Nullable TxDetails getTxDetails() { - return txDetails; - } - - ClientActorBehavior complete(final Response response) { - LOG.debug("Completing request {} with {}", request, response); - return callback.complete(response); - } - - void poison(final RequestException cause) { - LOG.trace("Poisoning request {}", request, cause); - callback.complete(request.toRequestFailure(cause)); - } - - boolean isTimedOut(final long now, final long timeoutNanos) { - final long elapsed; - - if (txDetails != null) { - elapsed = now - txDetails.getTimeTicks(); - } else { - elapsed = now - enqueuedTicks; - } - - if (elapsed >= timeoutNanos) { - LOG.debug("Request {} timed out after {}ns", request, elapsed); - return true; - } else { - return false; - } - } - - void retransmit(final BackendInfo backend, final long txSequence, final long now) { - final RequestEnvelope toSend = new RequestEnvelope(request.toVersion(backend.getVersion()), - backend.getSessionId(), txSequence); - - final ActorRef actor = backend.getActor(); - LOG.trace("Transmitting request {} as {} to {}", request, toSend, actor); - actor.tell(toSend, ActorRef.noSender()); - txDetails = new TxDetails(backend.getSessionId(), txSequence, now); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(SequencedQueueEntry.class).add("request", request).toString(); - } - -} diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java new file mode 100644 index 0000000000..a8ab7c4e23 --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.client; + +// Simple forwarder which just pushes the entry to the successor +final class SimpleReconnectForwarder extends ReconnectForwarder { + SimpleReconnectForwarder(final AbstractReceivingClientConnection successor) { + super(successor); + } + + @Override + protected void forwardEntry(final ConnectionEntry entry) { + successor().enqueueEntry(entry); + } +} diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmittedConnectionEntry.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmittedConnectionEntry.java new file mode 100644 index 0000000000..34cbd49536 --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmittedConnectionEntry.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.client; + +/** + * A {@link ConnectionEntry} which has been transmitted. It holds additional information about the last transmission. + * + * @author Robert Varga + */ +final class TransmittedConnectionEntry extends ConnectionEntry { + private final long sessionId; + private final long txSequence; + private final long txTicks; + + TransmittedConnectionEntry(final ConnectionEntry entry, final long sessionId, final long txSequence, + final long now) { + super(entry); + this.sessionId = sessionId; + this.txSequence = txSequence; + this.txTicks = now; + } + + long getSessionId() { + return sessionId; + } + + long getTxSequence() { + return txSequence; + } + + long getTxTicks() { + return txTicks; + } + +} diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/SequencedQueueTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java similarity index 57% rename from opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/SequencedQueueTest.java rename to opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java index 72be005664..dec343fce0 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/SequencedQueueTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java @@ -14,6 +14,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -22,9 +23,9 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.testkit.TestProbe; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -41,16 +42,18 @@ import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; +import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.common.actor.TestTicker; import org.opendaylight.yangtools.concepts.WritableIdentifier; import scala.concurrent.duration.FiniteDuration; /** - * Test suite covering logic contained in {@link SequencedQueue}. It assumes {@link SequencedQueueEntryTest} passes. + * Test suite covering logic contained in {@link ConnectingClientConnection}. It assumes {@link ConnectionEntryTest} + * passes. * * @author Robert Varga */ -public class SequencedQueueTest { +public class ConnectingClientConnectionTest { private static class MockFailure extends RequestFailure { private static final long serialVersionUID = 1L; @@ -100,9 +103,11 @@ public class SequencedQueueTest { @Mock private RequestException mockCause; @Mock - private RequestCallback mockCallback; + private Consumer> mockCallback; @Mock - private ClientActorBehavior mockBehavior; + private ClientActorBehavior mockBehavior; + @Mock + private ClientActorContext mockContext; private TestTicker ticker; private BackendInfo mockBackendInfo; @@ -115,7 +120,7 @@ public class SequencedQueueTest { private static ActorSystem actorSystem; private TestProbe mockActor; - private SequencedQueue queue; + private AbstractClientConnection queue; @BeforeClass public static void setupClass() { @@ -131,10 +136,11 @@ public class SequencedQueueTest { public void setup() { MockitoAnnotations.initMocks(this); - doReturn(mockBehavior).when(mockCallback).complete(any(MockFailure.class)); + doNothing().when(mockCallback).accept(any(MockFailure.class)); ticker = new TestTicker(); ticker.increment(ThreadLocalRandom.current().nextLong()); + doReturn(ticker).when(mockContext).ticker(); mockActor = TestProbe.apply(actorSystem); mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5); @@ -144,7 +150,7 @@ public class SequencedQueueTest { mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0); mockCookie = ThreadLocalRandom.current().nextLong(); - queue = new SequencedQueue(mockCookie, ticker); + queue = new ConnectingClientConnection<>(mockContext, mockCookie); } @After @@ -153,38 +159,17 @@ public class SequencedQueueTest { } @Test - public void testGetCookie() { - assertSame(mockCookie, queue.getCookie()); - } - - @Test - public void testEmptyClose() { - assertFalse(queue.hasCompleted()); - queue.close(); - assertTrue(queue.hasCompleted()); - } - - @Test(expected = IllegalStateException.class) - public void testClosedEnqueueRequest() { - queue.close(); - - // Kaboom - queue.enqueueRequest(mockRequest, mockCallback); - } - - @Test - public void testCloseIdempotent() { - queue.close(); - queue.close(); + public void testCookie() { + assertSame(mockCookie, queue.cookie()); } @Test public void testPoison() { - queue.enqueueRequest(mockRequest, mockCallback); + queue.sendRequest(mockRequest, mockCallback); queue.poison(mockCause); final ArgumentCaptor captor = ArgumentCaptor.forClass(MockFailure.class); - verify(mockCallback).complete(captor.capture()); + verify(mockCallback).accept(captor.capture()); assertSame(mockCause, captor.getValue().getCause()); } @@ -194,7 +179,7 @@ public class SequencedQueueTest { queue.poison(mockCause); // Kaboom - queue.enqueueRequest(mockRequest, mockCallback); + queue.sendRequest(mockRequest, mockCallback); } @Test @@ -204,47 +189,11 @@ public class SequencedQueueTest { } @Test - public void testEnqueueRequestNeedsBackend() { - final Optional ret = queue.enqueueRequest(mockRequest, mockCallback); - + public void testSendRequestNeedsBackend() { + queue.sendRequest(mockRequest, mockCallback); + final Optional ret = queue.checkTimeout(ticker.read()); assertNotNull(ret); - assertFalse(ret.isPresent()); - } - - @Test - public void testExpectProof() { - final CompletableFuture proof = new CompletableFuture<>(); - assertTrue(queue.expectProof(proof)); - assertFalse(queue.expectProof(proof)); - } - - @Test(expected = NullPointerException.class) - public void testSetBackendNull() { - final CompletableFuture proof = new CompletableFuture<>(); - assertTrue(queue.expectProof(proof)); - queue.setBackendInfo(proof, null); - } - - @Test - public void testSetBackendWithNoResolution() { - queue.enqueueRequest(mockRequest, mockCallback); - - final CompletableFuture proof = new CompletableFuture<>(); - final Optional ret = queue.setBackendInfo(proof, mockBackendInfo); - assertNotNull(ret); - assertFalse(ret.isPresent()); - } - - @Test - public void testSetBackendWithWrongProof() { - queue.enqueueRequest(mockRequest, mockCallback); - - final CompletableFuture proof = new CompletableFuture<>(); - assertTrue(queue.expectProof(proof)); - - final Optional ret = queue.setBackendInfo(new CompletableFuture<>(), mockBackendInfo); - assertNotNull(ret); - assertFalse(ret.isPresent()); + assertTrue(ret.isPresent()); } @Test @@ -254,135 +203,111 @@ public class SequencedQueueTest { } @Test - public void testSetBackendWithRequestsNoTimer() { - queue.enqueueRequest(mockRequest, mockCallback); - - final CompletableFuture proof = new CompletableFuture<>(); - assertTrue(queue.expectProof(proof)); - assertFalse(mockActor.msgAvailable()); - - final Optional ret = queue.setBackendInfo(proof, mockBackendInfo); - assertNotNull(ret); - assertTrue(ret.isPresent()); - - assertTransmit(mockRequest, 0); - } - - @Test - public void testEnqueueRequestNeedsTimer() { + public void testSendRequestNeedsTimer() { setupBackend(); - final Optional ret = queue.enqueueRequest(mockRequest, mockCallback); + queue.sendRequest(mockRequest, mockCallback); + final Optional ret = queue.checkTimeout(ticker.read()); assertNotNull(ret); assertTrue(ret.isPresent()); assertTransmit(mockRequest, 0); } - @Test - public void testEnqueueRequestWithoutTimer() { - setupBackend(); - - // First request - Optional ret = queue.enqueueRequest(mockRequest, mockCallback); - assertNotNull(ret); - assertTrue(ret.isPresent()); - assertTransmit(mockRequest, 0); - - // Second request, no timer fired - ret = queue.enqueueRequest(mockRequest2, mockCallback); - assertNull(ret); - assertTransmit(mockRequest2, 1); - } - @Test public void testRunTimeoutEmpty() throws NoProgressException { - final boolean ret = queue.runTimeout(); - assertFalse(ret); + Optional ret = queue.checkTimeout(ticker.read()); + assertNotNull(ret); + assertFalse(ret.isPresent()); } @Test public void testRunTimeoutWithoutShift() throws NoProgressException { - queue.enqueueRequest(mockRequest, mockCallback); - final boolean ret = queue.runTimeout(); - assertFalse(ret); + queue.sendRequest(mockRequest, mockCallback); + Optional ret = queue.checkTimeout(ticker.read()); + assertNotNull(ret); + assertTrue(ret.isPresent()); } @Test public void testRunTimeoutWithTimeoutLess() throws NoProgressException { - queue.enqueueRequest(mockRequest, mockCallback); + queue.sendRequest(mockRequest, mockCallback); - ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS - 1); + ticker.increment(AbstractClientConnection.REQUEST_TIMEOUT_NANOS - 1); - final boolean ret = queue.runTimeout(); - assertFalse(ret); + Optional ret = queue.checkTimeout(ticker.read()); + assertNotNull(ret); + assertTrue(ret.isPresent()); } @Test public void testRunTimeoutWithTimeoutExact() throws NoProgressException { setupBackend(); - queue.enqueueRequest(mockRequest, mockCallback); + queue.sendRequest(mockRequest, mockCallback); - ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS); + ticker.increment(AbstractClientConnection.REQUEST_TIMEOUT_NANOS); - final boolean ret = queue.runTimeout(); - assertTrue(ret); + Optional ret = queue.checkTimeout(ticker.read()); + assertNull(ret); } @Test public void testRunTimeoutWithTimeoutMore() throws NoProgressException { setupBackend(); - queue.enqueueRequest(mockRequest, mockCallback); + queue.sendRequest(mockRequest, mockCallback); - ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS + 1); + ticker.increment(AbstractClientConnection.REQUEST_TIMEOUT_NANOS + 1); - final boolean ret = queue.runTimeout(); - assertTrue(ret); + Optional ret = queue.checkTimeout(ticker.read()); + assertNull(ret); } - @Test(expected = NoProgressException.class) + @SuppressWarnings({ "rawtypes", "unchecked" }) public void testRunTimeoutWithoutProgressExact() throws NoProgressException { - queue.enqueueRequest(mockRequest, mockCallback); + queue.sendRequest(mockRequest, mockCallback); - ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS); + ticker.increment(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS); // Kaboom - queue.runTimeout(); + queue.runTimer((ClientActorBehavior) mockBehavior); + assertNotNull(queue.poisoned()); } - @Test(expected = NoProgressException.class) + @SuppressWarnings({ "rawtypes", "unchecked" }) public void testRunTimeoutWithoutProgressMore() throws NoProgressException { - queue.enqueueRequest(mockRequest, mockCallback); + queue.sendRequest(mockRequest, mockCallback); - ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1); + ticker.increment(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS + 1); // Kaboom - queue.runTimeout(); + queue.runTimer((ClientActorBehavior) mockBehavior); + assertNotNull(queue.poisoned()); } @Test public void testRunTimeoutEmptyWithoutProgressExact() throws NoProgressException { - ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS); + ticker.increment(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS); // No problem - final boolean ret = queue.runTimeout(); - assertFalse(ret); + Optional ret = queue.checkTimeout(ticker.read()); + assertNotNull(ret); + assertFalse(ret.isPresent()); } @Test public void testRunTimeoutEmptyWithoutProgressMore() throws NoProgressException { - ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1); + ticker.increment(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS + 1); // No problem - final boolean ret = queue.runTimeout(); - assertFalse(ret); + Optional ret = queue.checkTimeout(ticker.read()); + assertNotNull(ret); + assertFalse(ret.isPresent()); } @Test public void testCompleteEmpty() { - final ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope); - assertSame(mockBehavior, ret); + queue.receiveResponse(mockResponseEnvelope); verifyNoMoreInteractions(mockCallback); } @@ -390,14 +315,12 @@ public class SequencedQueueTest { public void testCompleteSingle() { setupBackend(); - queue.enqueueRequest(mockRequest, mockCallback); + queue.sendRequest(mockRequest, mockCallback); - ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope); - verify(mockCallback).complete(mockResponse); - assertSame(mockBehavior, ret); + queue.receiveResponse(mockResponseEnvelope); + verify(mockCallback).accept(mockResponse); - ret = queue.complete(mockBehavior, mockResponseEnvelope); - assertSame(mockBehavior, ret); + queue.receiveResponse(mockResponseEnvelope); verifyNoMoreInteractions(mockCallback); } @@ -405,36 +328,35 @@ public class SequencedQueueTest { public void testCompleteNull() { setupBackend(); - queue.enqueueRequest(mockRequest, mockCallback); + queue.sendRequest(mockRequest, mockCallback); - doReturn(null).when(mockCallback).complete(mockResponse); + doNothing().when(mockCallback).accept(mockResponse); - ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope); - verify(mockCallback).complete(mockResponse); - assertNull(ret); + queue.receiveResponse(mockResponseEnvelope); + verify(mockCallback).accept(mockResponse); } @Test public void testProgressRecord() throws NoProgressException { setupBackend(); - queue.enqueueRequest(mockRequest, mockCallback); + queue.sendRequest(mockRequest, mockCallback); ticker.increment(10); - queue.enqueueRequest(mockRequest2, mockCallback); - queue.complete(mockBehavior, mockResponseEnvelope); + queue.sendRequest(mockRequest2, mockCallback); + queue.receiveResponse(mockResponseEnvelope); - ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS - 11); - assertTrue(queue.runTimeout()); + ticker.increment(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS - 11); + + Optional ret = queue.checkTimeout(ticker.read()); + assertNull(ret); } private void setupBackend() { - final CompletableFuture proof = new CompletableFuture<>(); - assertTrue(queue.expectProof(proof)); - final Optional ret = queue.setBackendInfo(proof, mockBackendInfo); - assertNotNull(ret); - assertFalse(ret.isPresent()); - assertFalse(mockActor.msgAvailable()); + final ConnectedClientConnection newConn = new ConnectedClientConnection<>(mockContext, mockCookie, + mockBackendInfo); + queue.setForwarder(new SimpleReconnectForwarder(newConn)); + queue = newConn; } private void assertTransmit(final Request expected, final long sequence) { diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntryTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectionEntryTest.java similarity index 55% rename from opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntryTest.java rename to opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectionEntryTest.java index 8cc990ce7c..a772ddcc7b 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntryTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectionEntryTest.java @@ -7,46 +7,38 @@ */ package org.opendaylight.controller.cluster.access.client; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.testkit.TestProbe; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.access.ABIVersion; import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy; import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy; import org.opendaylight.controller.cluster.access.concepts.Request; -import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.common.actor.TestTicker; import org.opendaylight.yangtools.concepts.WritableIdentifier; -import scala.concurrent.duration.Duration; /** - * Test suite covering logic contained in {@link SequencedQueueEntry}. + * Test suite covering logic contained in {@link ConnectionEntry}. * * @author Robert Varga */ -public class SequencedQueueEntryTest { +public class ConnectionEntryTest { private static class MockFailure extends RequestFailure { private static final long serialVersionUID = 1L; @@ -96,19 +88,18 @@ public class SequencedQueueEntryTest { @Mock private RequestException mockCause; @Mock - private RequestCallback mockCallback; + private Consumer> mockCallback; @Mock - private ClientActorBehavior mockBehavior; + private ClientActorBehavior mockBehavior; private TestTicker ticker; - private BackendInfo mockBackendInfo; private Request mockRequest; private Response mockResponse; private static ActorSystem actorSystem; private TestProbe mockActor; - private SequencedQueueEntry entry; + private ConnectionEntry entry; @BeforeClass public static void setupClass() { @@ -124,17 +115,16 @@ public class SequencedQueueEntryTest { public void setup() { MockitoAnnotations.initMocks(this); - doReturn(mockBehavior).when(mockCallback).complete(any(MockFailure.class)); + doNothing().when(mockCallback).accept(any(MockFailure.class)); ticker = new TestTicker(); ticker.increment(ThreadLocalRandom.current().nextLong()); mockActor = TestProbe.apply(actorSystem); - mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5); mockRequest = new MockRequest(mockIdentifier, mockReplyTo); mockResponse = mockRequest.toRequestFailure(mockCause); - entry = new SequencedQueueEntry(mockRequest, mockCallback, ticker.read()); + entry = new ConnectionEntry(mockRequest, mockCallback, ticker.read()); } @After @@ -142,65 +132,9 @@ public class SequencedQueueEntryTest { actorSystem.stop(mockActor.ref()); } - @Test - public void testGetTxDetails() { - assertNull(entry.getTxDetails()); - entry.retransmit(mockBackendInfo, 0, ticker.read()); - assertEquals(0, entry.getTxDetails().getTxSequence()); - entry.retransmit(mockBackendInfo, 1, ticker.read()); - assertEquals(1, entry.getTxDetails().getTxSequence()); - entry.retransmit(mockBackendInfo, 3, ticker.read()); - assertEquals(3, entry.getTxDetails().getTxSequence()); - } - @Test public void testComplete() { entry.complete(mockResponse); - verify(mockCallback).complete(mockResponse); - } - - @Test - public void testPoison() { - entry.poison(mockCause); - - final ArgumentCaptor captor = ArgumentCaptor.forClass(MockFailure.class); - verify(mockCallback).complete(captor.capture()); - assertSame(mockCause, captor.getValue().getCause()); - } - - @Test - public void testIsTimedOut() { - assertTrue(entry.isTimedOut(ticker.read(), 0)); - assertFalse(entry.isTimedOut(ticker.read(), 1)); - - entry.retransmit(mockBackendInfo, 0, ticker.read()); - assertTrue(entry.isTimedOut(ticker.read(), 0)); - ticker.increment(10); - assertTrue(entry.isTimedOut(ticker.read(), 10)); - assertFalse(entry.isTimedOut(ticker.read(), 20)); - - entry.retransmit(mockBackendInfo, 1, ticker.read()); - assertTrue(entry.isTimedOut(ticker.read(), 0)); - ticker.increment(10); - assertTrue(entry.isTimedOut(ticker.read(), 10)); - assertFalse(entry.isTimedOut(ticker.read(), 11)); - } - - @Test - public void testRetransmit() { - assertFalse(mockActor.msgAvailable()); - entry.retransmit(mockBackendInfo, 0, ticker.read()); - - assertTrue(mockActor.msgAvailable()); - assertRequestEquals(mockRequest, mockActor.receiveOne(Duration.apply(5, TimeUnit.SECONDS))); - } - - private static void assertRequestEquals(final Request expected, final Object obj) { - assertTrue(obj instanceof RequestEnvelope); - - final RequestEnvelope actual = (RequestEnvelope) obj; - assertEquals(0, actual.getSessionId()); - assertEquals(0, actual.getTxSequence()); - assertEquals(expected.getTarget(), actual.getMessage().getTarget()); + verify(mockCallback).accept(mockResponse); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractShardedTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractShardedTransaction.java index ae845a727f..ea6948f859 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractShardedTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractShardedTransaction.java @@ -9,7 +9,7 @@ package org.opendaylight.controller.cluster.databroker; import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransaction; /** * Abstract base class for concrete {@link DOMStoreTransaction} implementations. It holds a reference to the associated diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadTransaction.java index 1badccb862..a52254e35f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadTransaction.java @@ -10,8 +10,8 @@ package org.opendaylight.controller.cluster.databroker; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.mdsal.common.api.ReadFailedException; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadWriteTransaction.java index 55139d3440..9144f9b091 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadWriteTransaction.java @@ -8,8 +8,8 @@ package org.opendaylight.controller.cluster.databroker; import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreTransactionChain.java index 97ed6ba4fb..6b7237e803 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreTransactionChain.java @@ -10,10 +10,10 @@ package org.opendaylight.controller.cluster.databroker; import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory; import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; /** * Implementation of {@link DOMStoreTransactionChain} backed by a {@link ClientLocalHistory}. It wraps diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreWriteTransaction.java index 4c804d762b..0e13ac49f3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreWriteTransaction.java @@ -8,8 +8,8 @@ package org.opendaylight.controller.cluster.databroker; import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java index ce2c164b56..951b540f1d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java @@ -8,16 +8,22 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; +import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; +import org.opendaylight.controller.cluster.access.client.InversibleLockException; +import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,8 +51,8 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia @GuardedBy("this") private final Map readyTransactions = new HashMap<>(); - private final Map histories = new ConcurrentHashMap<>(); - private final DistributedDataStoreClientBehavior client; + private final Map histories = new ConcurrentHashMap<>(); + private final AbstractDataStoreClientBehavior client; private final LocalHistoryIdentifier identifier; // Used via NEXT_TX_UPDATER @@ -55,7 +61,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia private volatile State state = State.IDLE; - AbstractClientHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) { + AbstractClientHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) { this.client = Preconditions.checkNotNull(client); this.identifier = Preconditions.checkNotNull(identifier); Preconditions.checkArgument(identifier.getCookie() == 0); @@ -68,6 +74,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia final void updateState(final State expected, final State next) { final boolean success = STATE_UPDATER.compareAndSet(this, expected, next); Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state); + LOG.debug("Client history {} changed state from {} to {}", this, expected, next); } @Override @@ -75,14 +82,14 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia return identifier; } - final DistributedDataStoreClientBehavior getClient() { - return client; - } - final long nextTx() { return NEXT_TX_UPDATER.getAndIncrement(this); } + final Long resolveShardForPath(final YangInstanceIdentifier path) { + return client.resolveShardForPath(path); + } + @Override final void localAbort(final Throwable cause) { final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED); @@ -99,17 +106,43 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia } } - private AbstractProxyHistory createHistoryProxy(final Long shard) { - return createHistoryProxy(new LocalHistoryIdentifier(identifier.getClientId(), - identifier.getHistoryId(), shard), client.resolver().getFutureBackendInfo(shard)); + /** + * Create a new history proxy for a given shard. + * + * @throws InversibleLockException if the shard is being reconnected + */ + private ProxyHistory createHistoryProxy(final Long shard) { + final AbstractClientConnection connection = client.getConnection(shard); + final ProxyHistory ret = createHistoryProxy(new LocalHistoryIdentifier(identifier.getClientId(), + identifier.getHistoryId(), shard), connection); + + // Request creation of the history. + connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()), + this::createHistoryCallback); + return ret; } - abstract AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId, - final Optional backendInfo); + abstract ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId, + final AbstractClientConnection connection); + + private void createHistoryCallback(final Response response) { + LOG.debug("Create history response {}", response); + } final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) { - final AbstractProxyHistory history = histories.computeIfAbsent(shard, this::createHistoryProxy); - return history.createTransactionProxy(transactionId); + while (true) { + final ProxyHistory history; + try { + history = histories.computeIfAbsent(shard, this::createHistoryProxy); + } catch (InversibleLockException e) { + LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard); + e.awaitResolution(); + LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard); + continue; + } + + return history.createTransactionProxy(transactionId); + } } public final ClientTransaction createTransaction() { @@ -140,6 +173,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s", cohort, txId, previous); + LOG.debug("Local history {} readied transaction {}", this, txId); return cohort; } @@ -166,4 +200,34 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia LOG.warn("Could not find completed transaction {}", txId); } } + + HistoryReconnectCohort startReconnect(final ConnectedClientConnection newConn) { + final ProxyHistory oldProxy = histories.get(newConn.cookie()); + if (oldProxy == null) { + return null; + } + + final ProxyReconnectCohort proxy = Verify.verifyNotNull(oldProxy.startReconnect(newConn)); + return new HistoryReconnectCohort() { + @Override + ProxyReconnectCohort getProxy() { + return proxy; + } + + @Override + void replaySuccessfulRequests() { + proxy.replaySuccessfulRequests(); + } + + @Override + public void close() { + LOG.debug("Client history {} finishing reconnect to {}", AbstractClientHistory.this, newConn); + final ProxyHistory newProxy = proxy.finishReconnect(); + if (!histories.replace(newConn.cookie(), oldProxy, newProxy)) { + LOG.warn("Failed to replace proxy {} with {} in {}", oldProxy, newProxy, + AbstractClientHistory.this); + } + } + }; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java new file mode 100644 index 0000000000..a84715c843 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import akka.actor.ActorRef; +import akka.actor.Status; +import com.google.common.base.Throwables; +import com.google.common.base.Verify; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; +import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; +import org.opendaylight.controller.cluster.access.client.ClientActorBehavior; +import org.opendaylight.controller.cluster.access.client.ClientActorContext; +import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore + * frontend. + * + *

+ * This class is not visible outside of this package because it breaks the actor containment. Services provided to + * Java world outside of actor containment are captured in {@link DistributedDataStoreClient}. + * + *

+ * IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract. + * When touching internal state, be mindful of the execution context from which execution context, Actor + * or POJO, is the state being accessed or modified. + * + *

+ * THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application + * threads can run concurrently. All state transitions must be made in a thread-safe manner. When in + * doubt, feel free to synchronize on this object. + * + *

+ * PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but + * performance must not come at the price of correctness. Any optimizations need to be carefully analyzed + * for correctness and performance impact. + * + *

+ * TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal + * for performing work and charging applications for it. That has two positive effects: + * - CPU usage is distributed across applications, minimizing work done in the actor thread + * - CPU usage provides back-pressure towards the application. + * + * @author Robert Varga + */ +abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior + implements DistributedDataStoreClient { + private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStoreClientBehavior.class); + + private final Map histories = new ConcurrentHashMap<>(); + private final AtomicLong nextHistoryId = new AtomicLong(1); + private final SingleClientHistory singleHistory; + + private volatile Throwable aborted; + + AbstractDataStoreClientBehavior(final ClientActorContext context, + final BackendInfoResolver resolver) { + super(context, resolver); + singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0)); + } + + // + // + // Methods below are invoked from the client actor thread + // + // + + @Override + protected final void haltClient(final Throwable cause) { + // If we have encountered a previous problem there is no cleanup necessary, as we have already cleaned up + // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor) + // thread. + if (aborted != null) { + abortOperations(cause); + } + } + + private void abortOperations(final Throwable cause) { + // This acts as a barrier, application threads check this after they have added an entry in the maps, + // and if they observe aborted being non-null, they will perform their cleanup and not return the handle. + aborted = cause; + + for (ClientLocalHistory h : histories.values()) { + h.localAbort(cause); + } + histories.clear(); + } + + private AbstractDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) { + abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down")); + return null; + } + + @Override + protected final AbstractDataStoreClientBehavior onCommand(final Object command) { + if (command instanceof GetClientRequest) { + ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender()); + } else { + LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command); + } + + return this; + } + + /* + * The connection has resolved, which means we have to potentially perform message adaptation. This is a bit more + * involved, as the messages need to be replayed to the individual proxies. + */ + @Override + @GuardedBy("connectionsLock") + protected final ConnectedClientConnection connectionUp( + final AbstractClientConnection conn, final ShardBackendInfo backend) { + + // Step 0: create a new connected connection + final ConnectedClientConnection newConn = new ConnectedClientConnection<>(conn.context(), + conn.cookie(), backend); + + LOG.debug("{}: resolving connection {} to {}", persistenceId(), conn, newConn); + + final Collection cohorts = new ArrayList<>(); + try { + // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no + // further TransactionProxies can be created and we can safely traverse maps without risking + // missing an entry + startReconnect(singleHistory, newConn, cohorts); + for (ClientLocalHistory h : histories.values()) { + startReconnect(h, newConn, cohorts); + } + + // Step 2: Collect previous successful requests from the cohorts. We do not want to expose + // the non-throttling interface to the connection, hence we use a wrapper consumer + for (HistoryReconnectCohort c : cohorts) { + c.replaySuccessfulRequests(); + } + + // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding + // requests will be immediately sent to it and requests being sent concurrently will get forwarded + // once they hit the new connection. + conn.setForwarder(BouncingReconnectForwarder.forCohorts(newConn, cohorts)); + } finally { + // Step 4: Complete switchover of the connection. The cohorts can resume normal operations. + for (HistoryReconnectCohort c : cohorts) { + c.close(); + } + } + + return newConn; + } + + private static void startReconnect(final AbstractClientHistory history, + final ConnectedClientConnection newConn, + final Collection cohorts) { + final HistoryReconnectCohort cohort = history.startReconnect(newConn); + if (cohort != null) { + cohorts.add(cohort); + } + } + + // + // + // Methods below are invoked from application threads + // + // + + @Override + public final ClientLocalHistory createLocalHistory() { + final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), + nextHistoryId.getAndIncrement()); + final ClientLocalHistory history = new ClientLocalHistory(this, historyId); + LOG.debug("{}: creating a new local history {}", persistenceId(), history); + + Verify.verify(histories.put(historyId, history) == null); + + final Throwable a = aborted; + if (a != null) { + history.localAbort(a); + histories.remove(historyId, history); + throw Throwables.propagate(a); + } + + return history; + } + + @Override + public final ClientTransaction createTransaction() { + return singleHistory.createTransaction(); + } + + @Override + public final void close() { + context().executeInActor(this::shutdown); + } + + abstract Long resolveShardForPath(final YangInstanceIdentifier path); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractLocalProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractLocalProxyHistory.java deleted file mode 100644 index b8493eae82..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractLocalProxyHistory.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.databroker.actors.dds; - -import com.google.common.base.Preconditions; -import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; - -abstract class AbstractLocalProxyHistory extends AbstractProxyHistory { - private final DataTree dataTree; - - AbstractLocalProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier, - final DataTree dataTree) { - super(client, identifier); - this.dataTree = Preconditions.checkNotNull(dataTree); - } - - final DataTreeSnapshot takeSnapshot() { - return dataTree.takeSnapshot(); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyHistory.java deleted file mode 100644 index d9f3b5f557..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyHistory.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.databroker.actors.dds; - -import akka.actor.ActorRef; -import com.google.common.base.Preconditions; -import java.util.Optional; -import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; -import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.yangtools.concepts.Identifiable; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; - -/** - * Per-connection representation of a local history. This class handles state replication across a single connection. - * - * @author Robert Varga - */ -abstract class AbstractProxyHistory implements Identifiable { - // FIXME: this should really be ClientConnection - private final DistributedDataStoreClientBehavior client; - private final LocalHistoryIdentifier identifier; - - AbstractProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) { - this.client = Preconditions.checkNotNull(client); - this.identifier = Preconditions.checkNotNull(identifier); - } - - static AbstractProxyHistory createClient(final DistributedDataStoreClientBehavior client, - final Optional backendInfo, final LocalHistoryIdentifier identifier) { - final Optional dataTree = backendInfo.flatMap(ShardBackendInfo::getDataTree); - return dataTree.isPresent() ? new ClientLocalProxyHistory(client, identifier, dataTree.get()) - : new RemoteProxyHistory(client, identifier); - } - - static AbstractProxyHistory createSingle(final DistributedDataStoreClientBehavior client, - final Optional backendInfo, final LocalHistoryIdentifier identifier) { - final Optional dataTree = backendInfo.flatMap(ShardBackendInfo::getDataTree); - return dataTree.isPresent() ? new SingleLocalProxyHistory(client, identifier, dataTree.get()) - : new RemoteProxyHistory(client, identifier); - } - - @Override - public LocalHistoryIdentifier getIdentifier() { - return identifier; - } - - final ActorRef localActor() { - return client.self(); - } - - final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) { - return doCreateTransactionProxy(client, new TransactionIdentifier(identifier, txId.getTransactionId())); - } - - abstract AbstractProxyTransaction doCreateTransactionProxy(DistributedDataStoreClientBehavior clientBehavior, - TransactionIdentifier txId); -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index 9235f25e6e..adfc0df876 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -15,6 +15,7 @@ import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.function.Consumer; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; @@ -23,13 +24,16 @@ import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRe import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; +import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class translating transaction operations towards a particular backend shard. @@ -45,17 +49,20 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; * @author Robert Varga */ abstract class AbstractProxyTransaction implements Identifiable { - private final DistributedDataStoreClientBehavior client; + private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class); + private final ProxyHistory parent; + + private AbstractProxyTransaction successor; private long sequence; private boolean sealed; - AbstractProxyTransaction(final DistributedDataStoreClientBehavior client) { - this.client = Preconditions.checkNotNull(client); + AbstractProxyTransaction(final ProxyHistory parent) { + this.parent = Preconditions.checkNotNull(parent); } final ActorRef localActor() { - return client.self(); + return parent.localActor(); } final long nextSequence() { @@ -87,17 +94,19 @@ abstract class AbstractProxyTransaction implements Identifiable request, final Consumer> completer) { - client.sendRequest(request, completer); + final void sendRequest(final TransactionRequest request, final Consumer> callback) { + LOG.debug("Transaction proxy {} sending request {} callback {}", this, request, callback); + parent.sendRequest(request, callback); } /** - * Seals this transaction when ready. + * Seal this transaction before it is either committed or aborted. */ final void seal() { checkNotSealed(); doSeal(); sealed = true; + parent.onTransactionSealed(this); } private void checkNotSealed() { @@ -115,12 +124,13 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { + final void abort(final VotingFuture ret) { checkSealed(); - sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), t -> { + sendAbort(t -> { if (t instanceof TransactionAbortSuccess) { ret.voteYes(); } else if (t instanceof RequestFailure) { @@ -128,9 +138,15 @@ abstract class AbstractProxyTransaction implements Identifiable> callback) { + sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback); + } + /** * Commit this transaction, possibly in a coordinated fashion. * @@ -141,7 +157,7 @@ abstract class AbstractProxyTransaction implements Identifiable ret = SettableFuture.create(); - sendRequest(Verify.verifyNotNull(doCommit(false)), t -> { + sendRequest(Verify.verifyNotNull(commitRequest(false)), t -> { if (t instanceof TransactionCommitSuccess) { ret.set(Boolean.TRUE); } else if (t instanceof RequestFailure) { @@ -149,14 +165,17 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { checkSealed(); - sendRequest(Verify.verifyNotNull(doCommit(true)), t -> { + sendRequest(Verify.verifyNotNull(commitRequest(true)), t -> { if (t instanceof TransactionCanCommitSuccess) { ret.voteYes(); } else if (t instanceof RequestFailure) { @@ -192,10 +211,35 @@ abstract class AbstractProxyTransaction implements Identifiable doCommit(boolean coordinated); + void replaySuccessfulRequests(final AbstractProxyTransaction successor) { + this.successor = Preconditions.checkNotNull(successor); + } + + /** + * Invoked from a retired connection for requests which have been in-flight and need to be re-adjusted + * and forwarded to the successor connection. + * + * @param request Request to be forwarded + * @param callback Original callback + * @throws RequestException when the request is unhandled by the successor + */ + final void replayRequest(final TransactionRequest request, final Consumer> callback) + throws RequestException { + Preconditions.checkState(successor != null, "%s does not have a successor set", this); + + if (successor instanceof LocalProxyTransaction) { + forwardToLocal((LocalProxyTransaction)successor, request, callback); + } else if (successor instanceof RemoteProxyTransaction) { + forwardToRemote((RemoteProxyTransaction)successor, request, callback); + } else { + throw new IllegalStateException("Unhandled successor " + successor); + } + } abstract void doDelete(final YangInstanceIdentifier path); @@ -211,4 +255,32 @@ abstract class AbstractProxyTransaction implements Identifiable commitRequest(boolean coordinated); + + /** + * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor. There is + * no equivalent of this call from {@link LocalProxyTransaction} because it does not send a request until all + * operations are packaged in the message. + * + *

+ * Note: this method is invoked by the predecessor on the successor. + * + * @param request Request which needs to be forwarded + * @param callback Callback to be invoked once the request completes + */ + abstract void handleForwardedRemoteRequest(TransactionRequest request, + @Nullable Consumer> callback); + + /** + * Replay a request originating in this proxy to a successor remote proxy. + */ + abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest request, + Consumer> callback) throws RequestException; + + /** + * Replay a request originating in this proxy to a successor local proxy. + */ + abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest request, + Consumer> callback) throws RequestException; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractTransactionCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractTransactionCommitCohort.java index bd6cb64352..789309cf8a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractTransactionCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractTransactionCommitCohort.java @@ -12,7 +12,7 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; /** * Base class for internal {@link DOMStoreThreePhaseCommitCohort} implementation. It contains utility constants for diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java new file mode 100644 index 0000000000..c6564b66fb --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Collections2; +import com.google.common.collect.Maps; +import java.util.Collection; +import java.util.Map; +import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; +import org.opendaylight.controller.cluster.access.client.ConnectionEntry; +import org.opendaylight.controller.cluster.access.client.ReconnectForwarder; +import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// Cohort aware forwarder, which forwards the request to the cohort, giving it a reference to the successor +// connection +final class BouncingReconnectForwarder extends ReconnectForwarder { + private static final Logger LOG = LoggerFactory.getLogger(BouncingReconnectForwarder.class); + + private static final RequestException FAILED_TO_REPLAY_EXCEPTION = new RequestException("Cohort not found") { + private static final long serialVersionUID = 1L; + + @Override + public boolean isRetriable() { + return false; + } + }; + + private final Map cohorts; + + private BouncingReconnectForwarder(final ConnectedClientConnection successor, + final Map cohorts) { + super(successor); + this.cohorts = Preconditions.checkNotNull(cohorts); + } + + static ReconnectForwarder forCohorts(final ConnectedClientConnection successor, + final Collection cohorts) { + return new BouncingReconnectForwarder(successor, Maps.uniqueIndex(Collections2.transform(cohorts, + HistoryReconnectCohort::getProxy), ProxyReconnectCohort::getIdentifier)); + } + + + @Override + protected void forwardEntry(final ConnectionEntry entry) { + final Request request = entry.getRequest(); + + final LocalHistoryIdentifier historyId; + if (request instanceof TransactionRequest) { + historyId = ((TransactionRequest) request).getTarget().getHistoryId(); + } else if (request instanceof LocalHistoryRequest) { + historyId = ((LocalHistoryRequest) request).getTarget(); + } else { + throw new IllegalArgumentException("Unhandled request " + request); + } + + try { + final ProxyReconnectCohort cohort = cohorts.get(historyId); + if (cohort == null) { + LOG.warn("Cohort for request {} not found, aborting it", request); + throw FAILED_TO_REPLAY_EXCEPTION; + } + + // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the + // period required to get into the queue. + cohort.replayRequest(request, entry.getCallback(), this::sendToSuccessor); + } catch (RequestException e) { + entry.complete(request.toRequestFailure(e)); + } + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java index 102d050617..b6c274628c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java @@ -9,8 +9,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.annotations.Beta; import com.google.common.base.Preconditions; -import com.google.common.base.Verify; -import java.util.Optional; +import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; @@ -20,14 +19,14 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier * *

* This interface is used by the world outside of the actor system and in the actor system it is manifested via - * its client actor. That requires some state transfer with {@link DistributedDataStoreClientBehavior}. In order to + * its client actor. That requires some state transfer with {@link AbstractDataStoreClientBehavior}. In order to * reduce request latency, all messages are carbon-copied (and enqueued first) to the client actor. * * @author Robert Varga */ @Beta public final class ClientLocalHistory extends AbstractClientHistory implements AutoCloseable { - ClientLocalHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier historyId) { + ClientLocalHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier historyId) { super(client, historyId); } @@ -52,17 +51,25 @@ public final class ClientLocalHistory extends AbstractClientHistory implements A @Override AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId, final AbstractTransactionCommitCohort cohort) { - // FIXME: deal with CLOSED here final State local = state(); - Verify.verify(local == State.TX_OPEN, "Local history %s is in unexpected state %s", this, local); - updateState(local, State.IDLE); + switch (local) { + case CLOSED: + return super.onTransactionReady(txId, cohort); + case IDLE: + throw new IllegalStateException(String.format("Local history %s is idle when readying transaction %s", + this, txId)); + case TX_OPEN: + updateState(local, State.IDLE); + return super.onTransactionReady(txId, cohort); + default: + throw new IllegalStateException(String.format("Local history %s in unhandled state %s", this, local)); - return super.onTransactionReady(txId, cohort); + } } @Override - AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId, - final Optional backendInfo) { - return AbstractProxyHistory.createClient(getClient(), backendInfo, historyId); + ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId, + final AbstractClientConnection connection) { + return ProxyHistory.createClient(connection, historyId); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalProxyHistory.java deleted file mode 100644 index f06e57cc36..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalProxyHistory.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.databroker.actors.dds; - -import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; -import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; - -final class ClientLocalProxyHistory extends AbstractLocalProxyHistory { - ClientLocalProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier, - final DataTree dataTree) { - super(client, identifier, dataTree); - } - - @Override - AbstractProxyTransaction doCreateTransactionProxy(final DistributedDataStoreClientBehavior client, - final TransactionIdentifier txId) { - // FIXME: this violates history contract: we should use the last submitted transaction instead to ensure - // causality - return new LocalProxyTransaction(client, txId, takeSnapshot()); - } -} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java index 81d00ee8bc..8450c67224 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java @@ -12,12 +12,12 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.CheckedFuture; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.mdsal.common.api.ReadFailedException; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -62,7 +62,7 @@ public final class ClientTransaction extends LocalAbortable implements Identifia private static final int OPEN_STATE = 0; private static final int CLOSED_STATE = 1; - private final Map proxies = new HashMap<>(); + private final Map proxies = new ConcurrentHashMap<>(); private final TransactionIdentifier transactionId; private final AbstractClientHistory parent; @@ -84,8 +84,7 @@ public final class ClientTransaction extends LocalAbortable implements Identifia private AbstractProxyTransaction ensureProxy(final YangInstanceIdentifier path) { checkNotClosed(); - final ModuleShardBackendResolver resolver = parent.getClient().resolver(); - final Long shard = resolver.resolveShardForPath(path); + final Long shard = parent.resolveShardForPath(path); return proxies.computeIfAbsent(shard, this::createProxy); } @@ -116,13 +115,13 @@ public final class ClientTransaction extends LocalAbortable implements Identifia private boolean ensureClosed() { final int local = state; - if (local != CLOSED_STATE) { - final boolean success = STATE_UPDATER.compareAndSet(this, OPEN_STATE, CLOSED_STATE); - Preconditions.checkState(success, "Transaction %s raced during close", this); - return true; - } else { + if (local == CLOSED_STATE) { return false; } + + final boolean success = STATE_UPDATER.compareAndSet(this, OPEN_STATE, CLOSED_STATE); + Preconditions.checkState(success, "Transaction %s raced during close", this); + return true; } public DOMStoreThreePhaseCommitCohort ready() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java index 9940ae57f3..bc393a4c0f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java @@ -7,167 +7,31 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import akka.actor.ActorRef; -import akka.actor.Status; -import com.google.common.base.Throwables; -import com.google.common.base.Verify; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; -import org.opendaylight.controller.cluster.access.client.ClientActorBehavior; +import java.util.function.Function; import org.opendaylight.controller.cluster.access.client.ClientActorContext; -import org.opendaylight.controller.cluster.access.commands.TransactionRequest; -import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; -import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; /** - * {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore - * frontend. - * - *

- * This class is not visible outside of this package because it breaks the actor containment. Services provided to - * Java world outside of actor containment are captured in {@link DistributedDataStoreClient}. - * - *

- * IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract. - * When touching internal state, be mindful of the execution context from which execution context, Actor - * or POJO, is the state being accessed or modified. - * - *

- * THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application - * threads can run concurrently. All state transitions must be made in a thread-safe manner. When in - * doubt, feel free to synchronize on this object. - * - *

- * PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but - * performance must not come at the price of correctness. Any optimizations need to be carefully analyzed - * for correctness and performance impact. - * - *

- * TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal - * for performing work and charging applications for it. That has two positive effects: - * - CPU usage is distributed across applications, minimizing work done in the actor thread - * - CPU usage provides back-pressure towards the application. + * {@link AbstractDataStoreClientBehavior} which performs module-based sharding. * * @author Robert Varga */ -final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient { - private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class); - - private final Map histories = new ConcurrentHashMap<>(); - private final AtomicLong nextHistoryId = new AtomicLong(1); - private final ModuleShardBackendResolver resolver; - private final SingleClientHistory singleHistory; - - private volatile Throwable aborted; - - DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) { - super(context); - resolver = new ModuleShardBackendResolver(context.getIdentifier(), actorContext); - singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0)); - } - - // - // - // Methods below are invoked from the client actor thread - // - // - - @Override - protected void haltClient(final Throwable cause) { - // If we have encountered a previous problem there is not cleanup necessary, as we have already cleaned up - // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor) - // thread. - if (aborted != null) { - abortOperations(cause); - } - } - - private void abortOperations(final Throwable cause) { - // This acts as a barrier, application threads check this after they have added an entry in the maps, - // and if they observe aborted being non-null, they will perform their cleanup and not return the handle. - aborted = cause; - - for (ClientLocalHistory h : histories.values()) { - h.localAbort(cause); - } - histories.clear(); - } - - private DistributedDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) { - abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down")); - return null; - } +final class DistributedDataStoreClientBehavior extends AbstractDataStoreClientBehavior { + private final Function pathToShard; - @Override - protected DistributedDataStoreClientBehavior onCommand(final Object command) { - if (command instanceof GetClientRequest) { - ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender()); - } else { - LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command); - } - - return this; - } - - // - // - // Methods below are invoked from application threads - // - // - - @SuppressWarnings("checkstyle:IllegalCatch") - private static V returnIfOperational(final Map map, final K key, final V value, - final Throwable aborted) { - Verify.verify(map.put(key, value) == null); - - if (aborted != null) { - try { - value.localAbort(aborted); - } catch (Exception e) { - LOG.debug("Close of {} failed", value, e); - } - map.remove(key, value); - throw Throwables.propagate(aborted); - } - - return value; + private DistributedDataStoreClientBehavior(final ClientActorContext context, + final ModuleShardBackendResolver resolver) { + super(context, resolver); + pathToShard = resolver::resolveShardForPath; } - @Override - public ClientLocalHistory createLocalHistory() { - final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), - nextHistoryId.getAndIncrement()); - final ClientLocalHistory history = new ClientLocalHistory(this, historyId); - LOG.debug("{}: creating a new local history {}", persistenceId(), history); - - return returnIfOperational(histories, historyId, history, aborted); - } - - @Override - public ClientTransaction createTransaction() { - return singleHistory.createTransaction(); - } - - @Override - public void close() { - context().executeInActor(this::shutdown); + DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) { + this(context, new ModuleShardBackendResolver(context.getIdentifier(), actorContext)); } @Override - protected ModuleShardBackendResolver resolver() { - return resolver; - } - - void sendRequest(final TransactionRequest request, final Consumer> completer) { - sendRequest(request, response -> { - completer.accept(response); - return this; - }); + Long resolveShardForPath(final YangInstanceIdentifier path) { + return pathToShard.apply(path); } - } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/FailedDataTreeModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/FailedDataTreeModification.java index b21b46dab8..7c8bd9211e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/FailedDataTreeModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/FailedDataTreeModification.java @@ -12,7 +12,7 @@ import com.google.common.base.Preconditions; import java.util.function.Supplier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor; /** @@ -21,7 +21,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification * * @author Robert Varga */ -final class FailedDataTreeModification implements DataTreeModification { +final class FailedDataTreeModification implements CursorAwareDataTreeModification { private final Supplier supplier; FailedDataTreeModification(final Supplier supplier) { @@ -34,7 +34,7 @@ final class FailedDataTreeModification implements DataTreeModification { } @Override - public DataTreeModification newModification() { + public CursorAwareDataTreeModification newModification() { throw supplier.get(); } @@ -62,4 +62,9 @@ final class FailedDataTreeModification implements DataTreeModification { public void applyToCursor(final DataTreeModificationCursor cursor) { throw supplier.get(); } + + @Override + public DataTreeModificationCursor createCursor(final YangInstanceIdentifier path) { + throw supplier.get(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/HistoryReconnectCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/HistoryReconnectCohort.java new file mode 100644 index 0000000000..ab961f1045 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/HistoryReconnectCohort.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +/** + * Interface exposed by {@link AbstractClientHistory} to {@link DistributedDataStoreClientBehavior} for the sole + * purpose of performing a connection switchover. + * + * @author Robert Varga + */ +abstract class HistoryReconnectCohort implements AutoCloseable { + abstract ProxyReconnectCohort getProxy(); + + abstract void replaySuccessfulRequests(); + + @Override + public abstract void close(); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index 576fa67ed4..7b652f474b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -9,18 +9,32 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import java.util.function.Consumer; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; +import org.opendaylight.controller.cluster.access.commands.TransactionDelete; +import org.opendaylight.controller.cluster.access.commands.TransactionMerge; +import org.opendaylight.controller.cluster.access.commands.TransactionModification; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionWrite; +import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; +import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,18 +57,16 @@ import org.slf4j.LoggerFactory; @NotThreadSafe final class LocalProxyTransaction extends AbstractProxyTransaction { private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class); - private static final Consumer> ABORT_COMPLETER = response -> { - LOG.debug("Abort completed with {}", response); - }; private final TransactionIdentifier identifier; - private DataTreeModification modification; - LocalProxyTransaction(final DistributedDataStoreClientBehavior client, - final TransactionIdentifier identifier, final DataTreeSnapshot snapshot) { - super(client); + private CursorAwareDataTreeModification modification; + + LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, + final CursorAwareDataTreeModification modification) { + super(parent); this.identifier = Preconditions.checkNotNull(identifier); - this.modification = snapshot.newModification(); + this.modification = Preconditions.checkNotNull(modification); } @Override @@ -87,17 +99,26 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { return Futures.immediateCheckedFuture(modification.readNode(path)); } + private RuntimeException abortedException() { + return new IllegalStateException("Tracker " + identifier + " has been aborted"); + } + + private RuntimeException submittedException() { + return new IllegalStateException("Tracker " + identifier + " has been submitted"); + } + @Override void doAbort() { - sendRequest(new AbortLocalTransactionRequest(identifier, localActor()), ABORT_COMPLETER); - modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been aborted")); + sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), response -> { + LOG.debug("Transaction {} abort completed with {}", identifier, response); + }); } @Override - CommitLocalTransactionRequest doCommit(final boolean coordinated) { + CommitLocalTransactionRequest commitRequest(final boolean coordinated) { final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, localActor(), modification, coordinated); - modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been submitted")); + modification = new FailedDataTreeModification(this::submittedException); return ret; } @@ -105,4 +126,121 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { void doSeal() { modification.ready(); } + + DataTreeSnapshot getSnapshot() { + return modification; + } + + private void applyModifyTransactionRequest(final ModifyTransactionRequest request, + final @Nullable Consumer> callback) { + for (TransactionModification mod : request.getModifications()) { + if (mod instanceof TransactionWrite) { + modification.write(mod.getPath(), ((TransactionWrite)mod).getData()); + } else if (mod instanceof TransactionMerge) { + modification.merge(mod.getPath(), ((TransactionMerge)mod).getData()); + } else if (mod instanceof TransactionDelete) { + modification.delete(mod.getPath()); + } else { + throw new IllegalArgumentException("Unsupported modification " + mod); + } + } + + final java.util.Optional maybeProtocol = request.getPersistenceProtocol(); + if (maybeProtocol.isPresent()) { + seal(); + Verify.verify(callback != null, "Request {} has null callback", request); + + switch (maybeProtocol.get()) { + case ABORT: + sendAbort(callback); + break; + case SIMPLE: + sendRequest(commitRequest(false), callback); + break; + case THREE_PHASE: + sendRequest(commitRequest(true), callback); + break; + default: + throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get()); + } + } + } + + @Override + void handleForwardedRemoteRequest(final TransactionRequest request, + final @Nullable Consumer> callback) { + LOG.debug("Applying forwaded request {}", request); + + if (request instanceof ModifyTransactionRequest) { + applyModifyTransactionRequest((ModifyTransactionRequest) request, callback); + } else { + throw new IllegalArgumentException("Unhandled request " + request); + } + } + + @Override + void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) throws RequestException { + if (request instanceof CommitLocalTransactionRequest) { + final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request; + final DataTreeModification mod = req.getModification(); + + LOG.debug("Applying modification {} to successor {}", mod, successor); + mod.applyToCursor(new AbstractDataTreeModificationCursor() { + @Override + public void write(final PathArgument child, final NormalizedNode data) { + successor.write(current().node(child), data); + } + + @Override + public void merge(final PathArgument child, final NormalizedNode data) { + successor.merge(current().node(child), data); + } + + @Override + public void delete(final PathArgument child) { + successor.delete(current().node(child)); + } + }); + + successor.seal(); + + final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated()); + successor.sendRequest(successorReq, callback); + } else if (request instanceof AbortLocalTransactionRequest) { + LOG.debug("Forwarding abort {} to successor {}", request, successor); + successor.abort(); + } else { + throw new IllegalArgumentException("Unhandled request" + request); + } + } + + @Override + void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) throws RequestException { + if (request instanceof AbortLocalTransactionRequest) { + successor.sendAbort(request, callback); + } else if (request instanceof CommitLocalTransactionRequest) { + successor.sendCommit((CommitLocalTransactionRequest)request, callback); + } else { + throw new IllegalArgumentException("Unhandled request" + request); + } + + LOG.debug("Forwarded request {} to successor {}", request, successor); + } + + private void sendAbort(final TransactionRequest request, final Consumer> callback) { + sendRequest(request, callback); + modification = new FailedDataTreeModification(this::abortedException); + } + + private void sendCommit(final CommitLocalTransactionRequest request, final Consumer> callback) { + // Rebase old modification on new data tree. + try (DataTreeModificationCursor cursor = modification.createCursor(YangInstanceIdentifier.EMPTY)) { + request.getModification().applyToCursor(cursor); + } + + seal(); + sendRequest(commitRequest(request.isCoordinated()), callback); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java index a1018967e7..9e6485b296 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java @@ -18,11 +18,15 @@ import com.google.common.primitives.UnsignedLong; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; import org.opendaylight.controller.cluster.access.ABIVersion; -import org.opendaylight.controller.cluster.access.client.BackendInfo; import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; @@ -46,7 +50,35 @@ import scala.compat.java8.FutureConverters; */ @SuppressFBWarnings(value = "NP_NONNULL_PARAM_VIOLATION", justification = "Pertains to the NULL_FUTURE field below. Null is allowed and is intended") +@ThreadSafe final class ModuleShardBackendResolver extends BackendInfoResolver { + private static final class Entry { + private final CompletionStage stage; + @GuardedBy("this") + private ShardBackendInfo result; + + Entry(final CompletionStage stage) { + this.stage = Preconditions.checkNotNull(stage); + stage.whenComplete(this::onStageResolved); + } + + @Nonnull CompletionStage getStage() { + return stage; + } + + synchronized @Nullable ShardBackendInfo getResult() { + return result; + } + + private synchronized void onStageResolved(final ShardBackendInfo result, final Throwable failure) { + if (failure == null) { + this.result = Preconditions.checkNotNull(result); + } else { + LOG.warn("Failed to resolve shard", failure); + } + } + } + private static final CompletableFuture NULL_FUTURE = CompletableFuture.completedFuture(null); private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class); @@ -58,10 +90,10 @@ final class ModuleShardBackendResolver extends BackendInfoResolver backends = new ConcurrentHashMap<>(); private final AtomicLong nextSessionId = new AtomicLong(); private final Function1 connectFunction; + private final ActorContext actorContext; @GuardedBy("this") private long nextShard = 1; @@ -75,18 +107,6 @@ final class ModuleShardBackendResolver extends BackendInfoResolver info) { - LOG.trace("Initiated invalidation of backend information {}", info); - info.thenAccept(this::invalidate); - } - - private void invalidate(final BackendInfo result) { - Preconditions.checkArgument(result instanceof ShardBackendInfo); - LOG.debug("Invalidating backend information {}", result); - actorContext.getPrimaryShardInfoCache().remove(((ShardBackendInfo)result).getShardName()); - } - Long resolveShardForPath(final YangInstanceIdentifier path) { final String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path); Long cookie = shards.get(shardName); @@ -107,17 +127,16 @@ final class ModuleShardBackendResolver extends BackendInfoResolver resolveBackendInfo(final Long cookie) { + private CompletionStage resolveBackendInfo(final Long cookie) { final String shardName = shards.inverse().get(cookie); if (shardName == null) { LOG.warn("Failing request for non-existent cookie {}", cookie); return NULL_FUTURE; } - final CompletableFuture ret = new CompletableFuture<>(); + LOG.debug("Resolving cookie {} to shard {}", cookie, shardName); - FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> { + return FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> { LOG.debug("Looking up primary info for {} from {}", shardName, info); return FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT)); }).thenApply(response -> { @@ -135,15 +154,30 @@ final class ModuleShardBackendResolver extends BackendInfoResolver { - if (throwablw != null) { - ret.completeExceptionally(throwablw); - } else { - ret.complete(info); - } }); + } - LOG.debug("Resolving cookie {} to shard {}", cookie, shardName); - return ret; + @Override + public CompletionStage getBackendInfo(final Long cookie) { + return backends.computeIfAbsent(cookie, key -> new Entry(resolveBackendInfo(key))).getStage(); + } + + @Override + public CompletionStage refreshBackendInfo(final Long cookie, + final ShardBackendInfo staleInfo) { + final Entry existing = backends.get(cookie); + if (existing != null) { + if (!staleInfo.equals(existing.getResult())) { + return existing.getStage(); + } + + LOG.debug("Invalidating backend information {}", staleInfo); + actorContext.getPrimaryShardInfoCache().remove(staleInfo.getShardName()); + + LOG.trace("Invalidated cache %s -> %s", Long.toUnsignedString(cookie), staleInfo); + backends.remove(cookie, existing); + } + + return getBackendInfo(cookie); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java new file mode 100644 index 0000000000..ae55379155 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -0,0 +1,351 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import akka.actor.ActorRef; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; +import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; +import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.yangtools.concepts.Identifiable; +import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Per-connection representation of a local history. This class handles state replication across a single connection. + * + * @author Robert Varga + */ +abstract class ProxyHistory implements Identifiable { + private abstract static class AbstractLocal extends ProxyHistory { + private final DataTree dataTree; + + AbstractLocal(final AbstractClientConnection connection, + final LocalHistoryIdentifier identifier, final DataTree dataTree) { + super(connection, identifier); + this.dataTree = Preconditions.checkNotNull(dataTree); + } + + final DataTreeSnapshot takeSnapshot() { + return dataTree.takeSnapshot(); + } + } + + private abstract static class AbstractRemote extends ProxyHistory { + AbstractRemote(final AbstractClientConnection connection, + final LocalHistoryIdentifier identifier) { + super(connection, identifier); + } + + @Override + final AbstractProxyTransaction doCreateTransactionProxy( + final AbstractClientConnection connection, final TransactionIdentifier txId) { + return new RemoteProxyTransaction(this, txId); + } + } + + private static final class Local extends AbstractLocal { + private static final AtomicReferenceFieldUpdater LAST_SEALED_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalProxyTransaction.class, "lastSealed"); + + // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting + // the open one and attempts to create a new transaction again. + private LocalProxyTransaction lastOpen; + + private volatile LocalProxyTransaction lastSealed; + + Local(final AbstractClientConnection connection, final LocalHistoryIdentifier identifier, + final DataTree dataTree) { + super(connection, identifier, dataTree); + } + + @Override + AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, + final TransactionIdentifier txId) { + Preconditions.checkState(lastOpen == null, "Proxy {} is currently open", lastOpen); + + // onTransactionCompleted() runs concurrently + final LocalProxyTransaction localSealed = lastSealed; + final DataTreeSnapshot baseSnapshot; + if (localSealed != null) { + baseSnapshot = localSealed.getSnapshot(); + } else { + baseSnapshot = takeSnapshot(); + } + + lastOpen = new LocalProxyTransaction(this, txId, + (CursorAwareDataTreeModification) baseSnapshot.newModification()); + return lastOpen; + } + + @Override + ProxyHistory createSuccessor(final AbstractClientConnection connection) { + return createClient(connection, getIdentifier()); + } + + @Override + void onTransactionAborted(final AbstractProxyTransaction tx) { + Preconditions.checkState(tx.equals(lastOpen)); + lastOpen = null; + } + + @Override + void onTransactionCompleted(final AbstractProxyTransaction tx) { + Verify.verify(tx instanceof LocalProxyTransaction); + + if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalProxyTransaction) tx, null)) { + LOG.debug("Completed last sealed transaction {}", tx); + } + } + + @Override + void onTransactionSealed(final AbstractProxyTransaction tx) { + Preconditions.checkState(tx.equals(lastOpen)); + lastSealed = lastOpen; + lastOpen = null; + } + } + + private static final class LocalSingle extends AbstractLocal { + LocalSingle(final AbstractClientConnection connection, + final LocalHistoryIdentifier identifier, final DataTree dataTree) { + super(connection, identifier, dataTree); + } + + @Override + AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, + final TransactionIdentifier txId) { + return new LocalProxyTransaction(this, txId, + (CursorAwareDataTreeModification) takeSnapshot().newModification()); + } + + @Override + ProxyHistory createSuccessor(final AbstractClientConnection connection) { + return createSingle(connection, getIdentifier()); + } + } + + private static final class Remote extends AbstractRemote { + Remote(final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { + super(connection, identifier); + } + + @Override + ProxyHistory createSuccessor(final AbstractClientConnection connection) { + return createClient(connection, getIdentifier()); + } + } + + private static final class RemoteSingle extends AbstractRemote { + RemoteSingle(final AbstractClientConnection connection, + final LocalHistoryIdentifier identifier) { + super(connection, identifier); + } + + @Override + ProxyHistory createSuccessor(final AbstractClientConnection connection) { + return createSingle(connection, getIdentifier()); + } + } + + private static final class RequestReplayException extends RequestException { + private static final long serialVersionUID = 1L; + + RequestReplayException(final String format, final Object... args) { + super(String.format(format, args)); + } + + @Override + public boolean isRetriable() { + return false; + } + } + + private final class ReconnectCohort extends ProxyReconnectCohort { + @Override + public LocalHistoryIdentifier getIdentifier() { + return identifier; + } + + @GuardedBy("lock") + @Override + void replaySuccessfulRequests() { + for (AbstractProxyTransaction t : proxies.values()) { + final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier()); + LOG.debug("{} created successor transaction proxy {} for {}", identifier, newProxy, t); + t.replaySuccessfulRequests(newProxy); + } + } + + @GuardedBy("lock") + @Override + ProxyHistory finishReconnect() { + final ProxyHistory ret = Verify.verifyNotNull(successor); + LOG.debug("Finished reconnecting proxy history {}", this); + lock.unlock(); + return ret; + } + + @Override + void replayRequest(final Request request, final Consumer> callback, + final BiConsumer, Consumer>> replayTo) throws RequestException { + if (request instanceof TransactionRequest) { + replayTransactionRequest((TransactionRequest) request, callback); + } else if (request instanceof LocalHistoryRequest) { + replayTo.accept(request, callback); + } else { + throw new IllegalArgumentException("Unhandled request " + request); + } + } + + private void replayTransactionRequest(final TransactionRequest request, + final Consumer> callback) throws RequestException { + + final AbstractProxyTransaction proxy; + lock.lock(); + try { + proxy = proxies.get(request.getTarget()); + } finally { + lock.unlock(); + } + if (proxy == null) { + throw new RequestReplayException("Failed to find proxy for %s", request); + } + + proxy.replayRequest(request, callback); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class); + + private final Lock lock = new ReentrantLock(); + private final LocalHistoryIdentifier identifier; + private final AbstractClientConnection connection; + + @GuardedBy("lock") + private final Map proxies = new LinkedHashMap<>(); + @GuardedBy("lock") + private ProxyHistory successor; + + private ProxyHistory(final AbstractClientConnection connection, + final LocalHistoryIdentifier identifier) { + this.connection = Preconditions.checkNotNull(connection); + this.identifier = Preconditions.checkNotNull(identifier); + } + + static ProxyHistory createClient(final AbstractClientConnection connection, + final LocalHistoryIdentifier identifier) { + final Optional dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree); + return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get()) + : new Remote(connection, identifier); + } + + static ProxyHistory createSingle(final AbstractClientConnection connection, + final LocalHistoryIdentifier identifier) { + final Optional dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree); + return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get()) + : new RemoteSingle(connection, identifier); + } + + @Override + public LocalHistoryIdentifier getIdentifier() { + return identifier; + } + + final ActorRef localActor() { + return connection.localActor(); + } + + final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) { + final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId()); + + lock.lock(); + try { + final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId); + proxies.put(proxyId, ret); + LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId); + return ret; + } finally { + lock.unlock(); + } + } + + final void abortTransaction(final AbstractProxyTransaction tx) { + lock.lock(); + try { + proxies.remove(tx.getIdentifier()); + } finally { + lock.unlock(); + } + } + + final void completeTransaction(final AbstractProxyTransaction tx) { + lock.lock(); + try { + proxies.remove(tx.getIdentifier()); + } finally { + lock.unlock(); + } + } + + final void sendRequest(final TransactionRequest request, final Consumer> callback) { + connection.sendRequest(request, callback); + } + + @GuardedBy("lock") + abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection connection, + TransactionIdentifier txId); + + abstract ProxyHistory createSuccessor(AbstractClientConnection connection); + + @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort") + ProxyReconnectCohort startReconnect(final ConnectedClientConnection newConnection) { + lock.lock(); + if (successor != null) { + lock.unlock(); + throw new IllegalStateException("Proxy history " + this + " already has a successor"); + } + + successor = createSuccessor(newConnection); + return new ReconnectCohort(); + } + + @GuardedBy("lock") + void onTransactionAborted(final AbstractProxyTransaction tx) { + // No-op for most implementations + } + + @GuardedBy("lock") + void onTransactionCompleted(final AbstractProxyTransaction tx) { + // No-op for most implementations + } + + void onTransactionSealed(final AbstractProxyTransaction tx) { + // No-op on most implementations + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java new file mode 100644 index 0000000000..7c37d9d4a0 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.yangtools.concepts.Identifiable; + +abstract class ProxyReconnectCohort implements Identifiable { + + abstract void replaySuccessfulRequests(); + + abstract ProxyHistory finishReconnect(); + + abstract void replayRequest(Request request, Consumer> callback, + BiConsumer, Consumer>> replayTo) throws RequestException; +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyHistory.java deleted file mode 100644 index c596d31f84..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyHistory.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.databroker.actors.dds; - -import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; -import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; - -final class RemoteProxyHistory extends AbstractProxyHistory { - RemoteProxyHistory(DistributedDataStoreClientBehavior client, LocalHistoryIdentifier identifier) { - super(client, identifier); - } - - @Override - AbstractProxyTransaction doCreateTransactionProxy(final DistributedDataStoreClientBehavior client, - final TransactionIdentifier txId) { - return new RemoteProxyTransaction(client, txId); - } -} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 26d718def5..347c7eac03 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -8,27 +8,34 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import java.util.ArrayList; +import java.util.Collection; import java.util.function.Consumer; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder; +import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionDelete; import org.opendaylight.controller.cluster.access.commands.TransactionMerge; import org.opendaylight.controller.cluster.access.commands.TransactionModification; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionWrite; +import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -55,15 +62,15 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { // FIXME: make this tuneable private static final int REQUEST_MAX_MODIFICATIONS = 1000; + private final Collection> successfulRequests = new ArrayList<>(); private final ModifyTransactionRequestBuilder builder; private boolean builderBusy; private volatile Exception operationFailure; - RemoteProxyTransaction(final DistributedDataStoreClientBehavior client, - final TransactionIdentifier identifier) { - super(client); + RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) { + super(parent); builder = new ModifyTransactionRequestBuilder(identifier, localActor()); } @@ -136,10 +143,38 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } private void flushBuilder() { - final ModifyTransactionRequest message = builder.build(); + final ModifyTransactionRequest request = builder.build(); builderBusy = false; - sendRequest(message, this::completeModify); + sendModification(request); + } + + private void sendModification(final TransactionRequest request) { + sendRequest(request, response -> completeModify(request, response)); + } + + @Override + void handleForwardedRemoteRequest(final TransactionRequest request, + final @Nullable Consumer> callback) { + nextSequence(); + + if (callback == null) { + sendModification(request); + return; + } + + /* + * FindBugs is utterly stupid, as it does not recognize the fact that we have checked for null + * and reports NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE in the lambda below. + */ + final Consumer> findBugsIsStupid = callback; + + // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the + // period required to get into the queue. + sendRequest(request, response -> { + findBugsIsStupid.accept(Preconditions.checkNotNull(response)); + completeModify(request, response); + }); } private void appendModification(final TransactionModification modification) { @@ -155,11 +190,12 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } } - private void completeModify(final Response response) { - LOG.debug("Modification request completed with {}", response); + private void completeModify(final TransactionRequest request, final Response response) { + LOG.debug("Modification request {} completed with {}", request, response); if (response instanceof TransactionSuccess) { - // Happy path no-op + // Happy path + successfulRequests.add(request); } else { recordFailedResponse(response); } @@ -207,7 +243,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } @Override - ModifyTransactionRequest doCommit(final boolean coordinated) { + ModifyTransactionRequest commitRequest(final boolean coordinated) { ensureInitializedBuider(); builder.setCommit(coordinated); @@ -220,4 +256,57 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { void doSeal() { // No-op } + + @Override + void replaySuccessfulRequests(final AbstractProxyTransaction successor) { + super.replaySuccessfulRequests(successor); + + for (TransactionRequest req : successfulRequests) { + LOG.debug("Forwarding request {} to successor {}", req, successor); + successor.handleForwardedRemoteRequest(req, null); + } + successfulRequests.clear(); + } + + @Override + void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) throws RequestException { + successor.handleForwardedRequest(request, callback); + } + + private void handleForwardedRequest(final TransactionRequest request, final Consumer> callback) + throws RequestException { + if (request instanceof ModifyTransactionRequest) { + final ModifyTransactionRequest req = (ModifyTransactionRequest) request; + + req.getModifications().forEach(this::appendModification); + + final java.util.Optional maybeProto = req.getPersistenceProtocol(); + if (maybeProto.isPresent()) { + seal(); + + switch (maybeProto.get()) { + case ABORT: + sendAbort(callback); + break; + case SIMPLE: + sendRequest(commitRequest(false), callback); + break; + case THREE_PHASE: + sendRequest(commitRequest(true), callback); + break; + default: + throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); + } + } + } else { + throw new IllegalArgumentException("Unhandled request {}" + request); + } + } + + @Override + void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) throws RequestException { + successor.handleForwardedRemoteRequest(request, callback); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java index 6fa3cdf2a9..c04c9c5071 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java @@ -7,7 +7,7 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import java.util.Optional; +import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.slf4j.Logger; @@ -21,7 +21,7 @@ import org.slf4j.LoggerFactory; final class SingleClientHistory extends AbstractClientHistory { private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class); - SingleClientHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) { + SingleClientHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) { super(client, identifier); } @@ -34,8 +34,8 @@ final class SingleClientHistory extends AbstractClientHistory { } @Override - AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId, - final Optional backendInfo) { - return AbstractProxyHistory.createSingle(getClient(), backendInfo, historyId); + ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId, + final AbstractClientConnection connection) { + return ProxyHistory.createSingle(connection, historyId); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleLocalProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleLocalProxyHistory.java deleted file mode 100644 index f32fd59a74..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleLocalProxyHistory.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.databroker.actors.dds; - -import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; -import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; - -final class SingleLocalProxyHistory extends AbstractLocalProxyHistory { - SingleLocalProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier, - final DataTree dataTree) { - super(client, identifier, dataTree); - } - - @Override - AbstractProxyTransaction doCreateTransactionProxy(final DistributedDataStoreClientBehavior client, - final TransactionIdentifier txId) { - return new LocalProxyTransaction(client, txId, takeSnapshot()); - } -} \ No newline at end of file -- 2.36.6