X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractProxyTransaction.java;h=2661ea82e9fb30a65ea6fb656cc03fa944f96a1b;hp=27f9b47cd9211386c9d43bb080aaea4b78cc3c0d;hb=4dc70853503713e7ed729815e2ce7bfd750b2bd3;hpb=9a6ca7bd6458fa76642d07b41c34c40cb7ff6a20 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index 27f9b47cd9..2661ea82e9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -13,6 +13,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.base.Verify; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -28,6 +29,8 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; +import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; @@ -68,12 +71,21 @@ abstract class AbstractProxyTransaction implements Identifiable request, final Consumer> callback, + final long enqueuedTicks) { + LOG.debug("Transaction proxy {} enqueing request {} callback {}", this, request, callback); + parent.enqueueRequest(request, callback, enqueuedTicks); + } + final void sendRequest(final TransactionRequest request, final Consumer> callback) { LOG.debug("Transaction proxy {} sending request {} callback {}", this, request, callback); parent.sendRequest(request, callback); @@ -290,12 +308,12 @@ abstract class AbstractProxyTransaction implements Identifiable response) { final Object last = successfulRequests.peekLast(); if (last instanceof IncrementSequence) { ((IncrementSequence) last).incrementDelta(); } else { - successfulRequests.addLast(new IncrementSequence()); + successfulRequests.addLast(new IncrementSequence(response.getSequence())); } } @@ -316,17 +334,22 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause()); + ret.voteNo(((RequestFailure) t).getCause().unwrap()); } else { ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); } // This is a terminal request, hence we do not need to record it LOG.debug("Transaction {} abort completed", this); - purge(); + sendPurge(); }); } + final void enqueueAbort(final Consumer> callback, final long enqueuedTicks) { + enqueueRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback, + enqueuedTicks); + } + final void sendAbort(final Consumer> callback) { sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback); } @@ -349,14 +372,14 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause()); + ret.setException(((RequestFailure) t).getCause().unwrap()); } else { ret.setException(new IllegalStateException("Unhandled response " + t.getClass())); } // This is a terminal request, hence we do not need to record it LOG.debug("Transaction {} directCommit completed", this); - purge(); + sendPurge(); }); return ret; @@ -380,7 +403,7 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause()); + ret.voteNo(((RequestFailure) t).getCause().unwrap()); } else { ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); } @@ -411,7 +434,7 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause()); + ret.voteNo(((RequestFailure) t).getCause().unwrap()); } else { ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); } @@ -444,17 +467,17 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause()); + ret.voteNo(((RequestFailure) t).getCause().unwrap()); } else { ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); } LOG.debug("Transaction {} doCommit completed", this); - purge(); + sendPurge(); }); } - void purge() { + final void sendPurge() { successfulRequests.clear(); final TransactionRequest req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor()); @@ -464,6 +487,16 @@ abstract class AbstractProxyTransaction implements Identifiable req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor()); + enqueueRequest(req, t -> { + LOG.debug("Transaction {} purge completed", this); + parent.completeTransaction(this); + }, enqueuedTicks); + } + // Called with the connection unlocked final synchronized void startReconnect() { // At this point canCommit/directCommit are blocked, we assert a new successor state, retrieving the previous @@ -488,17 +521,30 @@ abstract class AbstractProxyTransaction implements Identifiable) obj, response -> { }); - } else { - Verify.verify(obj instanceof IncrementSequence); - successor.incrementSequence(((IncrementSequence) obj).getDelta()); + if (!successfulRequests.isEmpty()) { + // We need to find a good timestamp to use for successful requests, as we do not want to time them out + // nor create timing inconsistencies in the queue -- requests are expected to be ordered by their enqueue + // time. We will pick the time of the first entry available. If there is none, we will just use current + // time, as all other requests will get enqueued afterwards. + final ConnectionEntry firstInQueue = Iterables.getFirst(enqueuedEntries, null); + final long now = firstInQueue != null ? firstInQueue.getEnqueuedTicks() : parent.currentTime(); + + for (Object obj : successfulRequests) { + if (obj instanceof TransactionRequest) { + LOG.debug("Forwarding successful request {} to successor {}", obj, successor); + successor.replayRequest((TransactionRequest) obj, resp -> { }, now); + } else { + Verify.verify(obj instanceof IncrementSequence); + final IncrementSequence increment = (IncrementSequence) obj; + successor.replayRequest(new IncrementTransactionSequenceRequest(getIdentifier(), + increment.getSequence(), localActor(), isSnapshotOnly(), increment.getDelta()), resp -> { }, + now); + LOG.debug("Incrementing sequence {} to successor {}", obj, successor); + } } + LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size()); + successfulRequests.clear(); } - LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size()); - successfulRequests.clear(); // Now replay whatever is in the connection final Iterator it = enqueuedEntries.iterator(); @@ -508,8 +554,8 @@ abstract class AbstractProxyTransaction implements Identifiable) req, e.getCallback()); + LOG.debug("Replaying queued request {} to successor {}", req, successor); + successor.replayRequest((TransactionRequest) req, e.getCallback(), e.getEnqueuedTicks()); it.remove(); } } @@ -527,6 +573,26 @@ abstract class AbstractProxyTransaction implements Identifiable + * Note: this method is invoked by the predecessor on the successor. + * + * @param request Request which needs to be forwarded + * @param callback Callback to be invoked once the request completes + * @param enqueuedTicks ticker-based time stamp when the request was enqueued + */ + private void replayRequest(final TransactionRequest request, final Consumer> callback, + final long enqueuedTicks) { + if (request instanceof AbstractLocalTransactionRequest) { + handleReplayedLocalRequest((AbstractLocalTransactionRequest) request, callback, enqueuedTicks); + } else { + handleReplayedRemoteRequest(request, callback, enqueuedTicks); + } + } + // Called with the connection locked final void finishReconnect() { final SuccessorState local = getSuccessorState(); @@ -543,9 +609,12 @@ abstract class AbstractProxyTransaction implements Identifiable request, final Consumer> callback) { - final AbstractProxyTransaction successor = getSuccessorState().getSuccessor(); + final void forwardRequest(final TransactionRequest request, final Consumer> callback) { + forwardToSuccessor(getSuccessorState().getSuccessor(), request, callback); + } + final void forwardToSuccessor(final AbstractProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) { if (successor instanceof LocalProxyTransaction) { forwardToLocal((LocalProxyTransaction)successor, request, callback); } else if (successor instanceof RemoteProxyTransaction) { @@ -576,20 +645,6 @@ abstract class AbstractProxyTransaction implements Identifiable commitRequest(boolean coordinated); - /** - * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor. There is - * no equivalent of this call from {@link LocalProxyTransaction} because it does not send a request until all - * operations are packaged in the message. - * - *

- * Note: this method is invoked by the predecessor on the successor. - * - * @param request Request which needs to be forwarded - * @param callback Callback to be invoked once the request completes - */ - abstract void handleForwardedRemoteRequest(TransactionRequest request, - @Nullable Consumer> callback); - /** * Replay a request originating in this proxy to a successor remote proxy. */ @@ -602,6 +657,32 @@ abstract class AbstractProxyTransaction implements Identifiable request, Consumer> callback); + /** + * Invoked from {@link LocalProxyTransaction} when it replays its successful requests to its successor. + * + *

+ * Note: this method is invoked by the predecessor on the successor. + * + * @param request Request which needs to be forwarded + * @param callback Callback to be invoked once the request completes + * @param enqueuedTicks Time stamp to use for enqueue time + */ + abstract void handleReplayedLocalRequest(AbstractLocalTransactionRequest request, + @Nullable Consumer> callback, long enqueuedTicks); + + /** + * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor. + * + *

+ * Note: this method is invoked by the predecessor on the successor. + * + * @param request Request which needs to be forwarded + * @param callback Callback to be invoked once the request completes + * @param enqueuedTicks Time stamp to use for enqueue time + */ + abstract void handleReplayedRemoteRequest(TransactionRequest request, + @Nullable Consumer> callback, long enqueuedTicks); + @Override public final String toString() { return MoreObjects.toStringHelper(this).add("identifier", getIdentifier()).add("state", state).toString();