X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractProxyTransaction.java;h=bf56376fcee3d538d483a063d652ae6d3d5a5654;hb=refs%2Fchanges%2F22%2F57822%2F3;hp=83ba07b69ac8f6e3ed253d18e420315065069e4d;hpb=3ee40198347cfb53bd0ce12ffd625cff8ed2383b;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index 83ba07b69a..bf56376fce 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -13,6 +13,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.base.Verify; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -29,6 +30,7 @@ import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; @@ -69,12 +71,21 @@ abstract class AbstractProxyTransaction implements Identifiable { + command.run(); + return behavior; + }); + } + final ActorRef localActor() { return parent.localActor(); } - private void incrementSequence(final long delta) { + final void incrementSequence(final long delta) { sequence += delta; LOG.debug("Transaction {} incremented sequence to {}", this, sequence); } @@ -229,6 +247,12 @@ abstract class AbstractProxyTransaction implements Identifiable request, final Consumer> callback, + final long enqueuedTicks) { + LOG.debug("Transaction proxy {} enqueing request {} callback {}", this, request, callback); + parent.enqueueRequest(request, callback, enqueuedTicks); + } + final void sendRequest(final TransactionRequest request, final Consumer> callback) { LOG.debug("Transaction proxy {} sending request {} callback {}", this, request, callback); parent.sendRequest(request, callback); @@ -291,12 +315,12 @@ abstract class AbstractProxyTransaction implements Identifiable response) { final Object last = successfulRequests.peekLast(); if (last instanceof IncrementSequence) { ((IncrementSequence) last).incrementDelta(); } else { - successfulRequests.addLast(new IncrementSequence()); + successfulRequests.addLast(new IncrementSequence(response.getSequence())); } } @@ -306,14 +330,18 @@ abstract class AbstractProxyTransaction implements Identifiable { + LOG.debug("Transaction {} abort completed with {}", getIdentifier(), resp); + enqueuePurge(); + }); } final void abort(final VotingFuture ret) { checkSealed(); - sendAbort(t -> { + sendDoAbort(t -> { if (t instanceof TransactionAbortSuccess) { ret.voteYes(); } else if (t instanceof RequestFailure) { @@ -324,11 +352,29 @@ abstract class AbstractProxyTransaction implements Identifiable> callback) { + final void enqueueAbort(final Consumer> callback, final long enqueuedTicks) { + checkNotSealed(); + parent.abortTransaction(this); + + enqueueRequest(abortRequest(), resp -> { + LOG.debug("Transaction {} abort completed with {}", getIdentifier(), resp); + // Purge will be sent by the predecessor's callback + if (callback != null) { + callback.accept(resp); + } + }, enqueuedTicks); + } + + final void enqueueDoAbort(final Consumer> callback, final long enqueuedTicks) { + enqueueRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback, + enqueuedTicks); + } + + final void sendDoAbort(final Consumer> callback) { sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback); } @@ -357,7 +403,7 @@ abstract class AbstractProxyTransaction implements Identifiable req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor()); - sendRequest(req, t -> { + final void enqueuePurge(final Consumer> callback) { + // Purge request are dispatched internally, hence should not wait + enqueuePurge(callback, parent.currentTime()); + } + + final void enqueuePurge(final Consumer> callback, final long enqueuedTicks) { + enqueueRequest(purgeRequest(), resp -> { LOG.debug("Transaction {} purge completed", this); parent.completeTransaction(this); - }); + if (callback != null) { + callback.accept(resp); + } + }, enqueuedTicks); + } + + private TransactionPurgeRequest purgeRequest() { + successfulRequests.clear(); + return new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor()); } // Called with the connection unlocked @@ -489,17 +549,30 @@ abstract class AbstractProxyTransaction implements Identifiable) obj, response -> { }); - } else { - Verify.verify(obj instanceof IncrementSequence); - successor.incrementSequence(((IncrementSequence) obj).getDelta()); + if (!successfulRequests.isEmpty()) { + // We need to find a good timestamp to use for successful requests, as we do not want to time them out + // nor create timing inconsistencies in the queue -- requests are expected to be ordered by their enqueue + // time. We will pick the time of the first entry available. If there is none, we will just use current + // time, as all other requests will get enqueued afterwards. + final ConnectionEntry firstInQueue = Iterables.getFirst(enqueuedEntries, null); + final long now = firstInQueue != null ? firstInQueue.getEnqueuedTicks() : parent.currentTime(); + + for (Object obj : successfulRequests) { + if (obj instanceof TransactionRequest) { + LOG.debug("Forwarding successful request {} to successor {}", obj, successor); + successor.replayRequest((TransactionRequest) obj, resp -> { }, now); + } else { + Verify.verify(obj instanceof IncrementSequence); + final IncrementSequence increment = (IncrementSequence) obj; + successor.replayRequest(new IncrementTransactionSequenceRequest(getIdentifier(), + increment.getSequence(), localActor(), isSnapshotOnly(), increment.getDelta()), resp -> { }, + now); + LOG.debug("Incrementing sequence {} to successor {}", obj, successor); + } } + LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size()); + successfulRequests.clear(); } - LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size()); - successfulRequests.clear(); // Now replay whatever is in the connection final Iterator it = enqueuedEntries.iterator(); @@ -509,8 +582,8 @@ abstract class AbstractProxyTransaction implements Identifiable) req, e.getCallback()); + LOG.debug("Replaying queued request {} to successor {}", req, successor); + successor.replayRequest((TransactionRequest) req, e.getCallback(), e.getEnqueuedTicks()); it.remove(); } } @@ -537,12 +610,14 @@ abstract class AbstractProxyTransaction implements Identifiable request, Consumer> callback) { + private void replayRequest(final TransactionRequest request, final Consumer> callback, + final long enqueuedTicks) { if (request instanceof AbstractLocalTransactionRequest) { - handleForwardedLocalRequest((AbstractLocalTransactionRequest) request, callback); + handleReplayedLocalRequest((AbstractLocalTransactionRequest) request, callback, enqueuedTicks); } else { - handleForwardedRemoteRequest(request, callback); + handleReplayedRemoteRequest(request, callback, enqueuedTicks); } } @@ -563,8 +638,11 @@ abstract class AbstractProxyTransaction implements Identifiable request, final Consumer> callback) { - final AbstractProxyTransaction successor = getSuccessorState().getSuccessor(); + forwardToSuccessor(getSuccessorState().getSuccessor(), request, callback); + } + final void forwardToSuccessor(final AbstractProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) { if (successor instanceof LocalProxyTransaction) { forwardToLocal((LocalProxyTransaction)successor, request, callback); } else if (successor instanceof RemoteProxyTransaction) { @@ -588,11 +666,11 @@ abstract class AbstractProxyTransaction implements Identifiable abortRequest(); + abstract TransactionRequest commitRequest(boolean coordinated); /** @@ -615,9 +693,10 @@ abstract class AbstractProxyTransaction implements Identifiable request, - @Nullable Consumer> callback); + abstract void handleReplayedLocalRequest(AbstractLocalTransactionRequest request, + @Nullable Consumer> callback, long enqueuedTicks); /** * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor. @@ -627,9 +706,10 @@ abstract class AbstractProxyTransaction implements Identifiable request, - @Nullable Consumer> callback); + abstract void handleReplayedRemoteRequest(TransactionRequest request, + @Nullable Consumer> callback, long enqueuedTicks); @Override public final String toString() {