X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FProxyHistory.java;h=34e8ba37a379669de09013099ad83a8e9ceaa38c;hb=refs%2Fchanges%2F70%2F57770%2F12;hp=88e86bc08985f00310444854b20a20e129bd1d50;hpb=5ab22a0bea0492dd8f1541b2f0cde4ea7618a786;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index 88e86bc089..34e8ba37a3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -11,6 +11,8 @@ import akka.actor.ActorRef; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Collection; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; @@ -21,6 +23,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; +import org.opendaylight.controller.cluster.access.client.ClientActorContext; import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest; @@ -211,14 +214,17 @@ abstract class ProxyHistory implements Identifiable { @GuardedBy("lock") @Override - void replaySuccessfulRequests(final Iterable previousEntries) { + void replayRequests(final Collection previousEntries) { // First look for our Create message - for (ConnectionEntry e : previousEntries) { + Iterator it = previousEntries.iterator(); + while (it.hasNext()) { + final ConnectionEntry e = it.next(); final Request req = e.getRequest(); if (identifier.equals(req.getTarget())) { Verify.verify(req instanceof LocalHistoryRequest); if (req instanceof CreateLocalHistoryRequest) { successor.connection.sendRequest(req, e.getCallback()); + it.remove(); break; } } @@ -233,11 +239,17 @@ abstract class ProxyHistory implements Identifiable { } // Now look for any finalizing messages - for (ConnectionEntry e : previousEntries) { + it = previousEntries.iterator(); + while (it.hasNext()) { + final ConnectionEntry e = it.next(); final Request req = e.getRequest(); if (identifier.equals(req.getTarget())) { Verify.verify(req instanceof LocalHistoryRequest); - successor.connection.sendRequest(req, e.getCallback()); + if (req instanceof DestroyLocalHistoryRequest) { + successor.connection.sendRequest(req, e.getCallback()); + it.remove(); + break; + } } } } @@ -257,18 +269,20 @@ abstract class ProxyHistory implements Identifiable { } @Override - void replayRequest(final Request request, final Consumer> callback, - final BiConsumer, Consumer>> replayTo) throws RequestException { + void forwardRequest(final Request request, final Consumer> callback, + final BiConsumer, Consumer>> forwardTo) throws RequestException { + // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the + // period required to get into the queue. if (request instanceof TransactionRequest) { - replayTransactionRequest((TransactionRequest) request, callback); + forwardTransactionRequest((TransactionRequest) request, callback); } else if (request instanceof LocalHistoryRequest) { - replayTo.accept(request, callback); + forwardTo.accept(request, callback); } else { throw new IllegalArgumentException("Unhandled request " + request); } } - private void replayTransactionRequest(final TransactionRequest request, + private void forwardTransactionRequest(final TransactionRequest request, final Consumer> callback) throws RequestException { final AbstractProxyTransaction proxy; @@ -282,7 +296,7 @@ abstract class ProxyHistory implements Identifiable { throw new RequestReplayException("Failed to find proxy for %s", request); } - proxy.replayRequest(request, callback); + proxy.forwardRequest(request, callback); } } @@ -325,6 +339,14 @@ abstract class ProxyHistory implements Identifiable { return identifier; } + final ClientActorContext context() { + return connection.context(); + } + + final long currentTime() { + return connection.currentTime(); + } + final ActorRef localActor() { return connection.localActor(); } @@ -354,8 +376,8 @@ abstract class ProxyHistory implements Identifiable { final void abortTransaction(final AbstractProxyTransaction tx) { lock.lock(); try { - proxies.remove(tx.getIdentifier()); - LOG.debug("Proxy {} aborting transaction {}", this, tx); + // Removal will be completed once purge completes + LOG.debug("Proxy {} aborted transaction {}", this, tx); onTransactionAborted(tx); } finally { lock.unlock(); @@ -389,6 +411,11 @@ abstract class ProxyHistory implements Identifiable { } } + final void enqueueRequest(final TransactionRequest request, final Consumer> callback, + final long enqueuedTicks) { + connection.enqueueRequest(request, callback, enqueuedTicks); + } + final void sendRequest(final TransactionRequest request, final Consumer> callback) { connection.sendRequest(request, callback); }