X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractProxyTransaction.java;h=46af74a5630999e0c75f430955875754c274dcb5;hp=83ba07b69ac8f6e3ed253d18e420315065069e4d;hb=17e4759c7561e09786a22210e43b5b32db45149e;hpb=7da03519bcdb9a9ff3b0c73064eb48dd9393f013;ds=sidebyside diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index 83ba07b69a..46af74a563 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -13,6 +13,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.base.Verify; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -229,6 +230,12 @@ abstract class AbstractProxyTransaction implements Identifiable request, final Consumer> callback, + final long enqueuedTicks) { + LOG.debug("Transaction proxy {} enqueing request {} callback {}", this, request, callback); + parent.enqueueRequest(request, callback, enqueuedTicks); + } + final void sendRequest(final TransactionRequest request, final Consumer> callback) { LOG.debug("Transaction proxy {} sending request {} callback {}", this, request, callback); parent.sendRequest(request, callback); @@ -324,10 +331,15 @@ abstract class AbstractProxyTransaction implements Identifiable> callback, final long enqueuedTicks) { + enqueueRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback, + enqueuedTicks); + } + final void sendAbort(final Consumer> callback) { sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback); } @@ -357,7 +369,7 @@ abstract class AbstractProxyTransaction implements Identifiable req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor()); @@ -465,6 +477,16 @@ abstract class AbstractProxyTransaction implements Identifiable req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor()); + enqueueRequest(req, t -> { + LOG.debug("Transaction {} purge completed", this); + parent.completeTransaction(this); + }, enqueuedTicks); + } + // Called with the connection unlocked final synchronized void startReconnect() { // At this point canCommit/directCommit are blocked, we assert a new successor state, retrieving the previous @@ -489,17 +511,26 @@ abstract class AbstractProxyTransaction implements Identifiable) obj, response -> { }); - } else { - Verify.verify(obj instanceof IncrementSequence); - successor.incrementSequence(((IncrementSequence) obj).getDelta()); + if (!successfulRequests.isEmpty()) { + // We need to find a good timestamp to use for successful requests, as we do not want to time them out + // nor create timing inconsistencies in the queue -- requests are expected to be ordered by their enqueue + // time. We will pick the time of the first entry available. If there is none, we will just use current + // time, as all other requests will get enqueued afterwards. + final ConnectionEntry firstInQueue = Iterables.getFirst(enqueuedEntries, null); + final long now = firstInQueue != null ? firstInQueue.getEnqueuedTicks() : parent.currentTime(); + + for (Object obj : successfulRequests) { + if (obj instanceof TransactionRequest) { + LOG.debug("Forwarding successful request {} to successor {}", obj, successor); + successor.replayRequest((TransactionRequest) obj, resp -> { }, now); + } else { + Verify.verify(obj instanceof IncrementSequence); + successor.incrementSequence(((IncrementSequence) obj).getDelta()); + } } + LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size()); + successfulRequests.clear(); } - LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size()); - successfulRequests.clear(); // Now replay whatever is in the connection final Iterator it = enqueuedEntries.iterator(); @@ -509,8 +540,8 @@ abstract class AbstractProxyTransaction implements Identifiable) req, e.getCallback()); + LOG.debug("Replaying queued request {} to successor {}", req, successor); + successor.replayRequest((TransactionRequest) req, e.getCallback(), e.getEnqueuedTicks()); it.remove(); } } @@ -537,12 +568,14 @@ abstract class AbstractProxyTransaction implements Identifiable request, Consumer> callback) { + private void replayRequest(final TransactionRequest request, final Consumer> callback, + final long enqueuedTicks) { if (request instanceof AbstractLocalTransactionRequest) { - handleForwardedLocalRequest((AbstractLocalTransactionRequest) request, callback); + handleReplayedLocalRequest((AbstractLocalTransactionRequest) request, callback, enqueuedTicks); } else { - handleForwardedRemoteRequest(request, callback); + handleReplayedRemoteRequest(request, callback, enqueuedTicks); } } @@ -563,8 +596,11 @@ abstract class AbstractProxyTransaction implements Identifiable request, final Consumer> callback) { - final AbstractProxyTransaction successor = getSuccessorState().getSuccessor(); + forwardToSuccessor(getSuccessorState().getSuccessor(), request, callback); + } + final void forwardToSuccessor(final AbstractProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) { if (successor instanceof LocalProxyTransaction) { forwardToLocal((LocalProxyTransaction)successor, request, callback); } else if (successor instanceof RemoteProxyTransaction) { @@ -615,9 +651,10 @@ abstract class AbstractProxyTransaction implements Identifiable request, - @Nullable Consumer> callback); + abstract void handleReplayedLocalRequest(AbstractLocalTransactionRequest request, + @Nullable Consumer> callback, long enqueuedTicks); /** * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor. @@ -627,9 +664,10 @@ abstract class AbstractProxyTransaction implements Identifiable request, - @Nullable Consumer> callback); + abstract void handleReplayedRemoteRequest(TransactionRequest request, + @Nullable Consumer> callback, long enqueuedTicks); @Override public final String toString() {