X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractProxyTransaction.java;h=51f528150d22a717034cc4281618591645cca92f;hb=7991491f2854dde2ec625ed6c08b44df7d258795;hp=cb568647afe0b21bb5527cd1c8e6e1413c350be2;hpb=31316f39aecc6bad171de539292ff5d7f4743419;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index cb568647af..51f528150d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -11,7 +11,6 @@ import akka.actor.ActorRef; import com.google.common.base.MoreObjects; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.base.Verify; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.CheckedFuture; @@ -30,7 +29,9 @@ import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException; import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; @@ -137,7 +138,7 @@ abstract class AbstractProxyTransaction implements Identifiable optState = flushState(); + if (optState.isPresent()) { + forwardToSuccessor(successor, optState.get(), null); + } + successor.predecessorSealed(); } - final void ensureSealed() { - if (SEALED_UPDATER.compareAndSet(this, 0, 1)) { - internalSeal(); + private void predecessorSealed() { + if (markSealed() && !sealAndSend(Optional.absent())) { + sealSuccessor(); } } - private void internalSeal() { - doSeal(); + void sealOnly() { + parent.onTransactionSealed(this); + final boolean success = STATE_UPDATER.compareAndSet(this, OPEN, SEALED); + Verify.verify(success, "Attempted to replay seal on %s", this); + } + + /** + * Seal this transaction and potentially send it out towards the backend. If this method reports false, the caller + * needs to deal with propagating the seal operation towards the successor. + * + * @param enqueuedTicks Enqueue ticks when this is invoked from replay path. + * @return True if seal operation was successful, false if this proxy has a successor. + */ + boolean sealAndSend(final Optional enqueuedTicks) { parent.onTransactionSealed(this); - // Now deal with state transfer, which can occur via successor or a follow-up canCommit() or directCommit(). - if (!STATE_UPDATER.compareAndSet(this, OPEN, SEALED)) { - // Slow path: wait for the successor to complete - final AbstractProxyTransaction successor = awaitSuccessor(); + // Transition internal state to sealed and detect presence of a successor + return STATE_UPDATER.compareAndSet(this, OPEN, SEALED); + } - // At this point the successor has completed transition and is possibly visible by the user thread, which is - // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed. - // Propagate state and seal the successor. - flushState(successor); - successor.ensureSealed(); - } + /** + * Mark this proxy as having been sealed. + * + * @return True if this call has transitioned to sealed state. + */ + final boolean markSealed() { + return SEALED_UPDATER.compareAndSet(this, 0, 1); } private void checkNotSealed() { @@ -461,7 +498,14 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause().unwrap()); + final Throwable cause = ((RequestFailure) t).getCause().unwrap(); + if (cause instanceof ClosedTransactionException) { + // This is okay, as it indicates the transaction has been completed. It can happen + // when we lose connectivity with the backend after it has received the request. + ret.set(Boolean.TRUE); + } else { + ret.setException(cause); + } } else { ret.setException(new IllegalStateException("Unhandled response " + t.getClass())); } @@ -626,10 +670,8 @@ abstract class AbstractProxyTransaction implements Identifiable) obj, resp -> { }, now); + successor.doReplayRequest((TransactionRequest) obj, resp -> { }, now); } else { Verify.verify(obj instanceof IncrementSequence); final IncrementSequence increment = (IncrementSequence) obj; - successor.replayRequest(new IncrementTransactionSequenceRequest(getIdentifier(), + successor.doReplayRequest(new IncrementTransactionSequenceRequest(getIdentifier(), increment.getSequence(), localActor(), isSnapshotOnly(), increment.getDelta()), resp -> { }, now); LOG.debug("Incrementing sequence {} to successor {}", obj, successor); @@ -668,7 +710,7 @@ abstract class AbstractProxyTransaction implements Identifiable) req, e.getCallback(), e.getEnqueuedTicks()); + successor.doReplayRequest((TransactionRequest) req, e.getCallback(), e.getEnqueuedTicks()); it.remove(); } } @@ -680,8 +722,14 @@ abstract class AbstractProxyTransaction implements Identifiable optState = flushState(); + if (optState.isPresent()) { + successor.handleReplayedRemoteRequest(optState.get(), null, enqueuedTicks); + } + if (successor.markSealed()) { + successor.sealAndSend(Optional.of(enqueuedTicks)); + } } } @@ -696,7 +744,7 @@ abstract class AbstractProxyTransaction implements Identifiable request, final Consumer> callback, + private void doReplayRequest(final TransactionRequest request, final Consumer> callback, final long enqueuedTicks) { if (request instanceof AbstractLocalTransactionRequest) { handleReplayedLocalRequest((AbstractLocalTransactionRequest) request, callback, enqueuedTicks); @@ -736,6 +784,11 @@ abstract class AbstractProxyTransaction implements Identifiable request, final Consumer> callback, + final long enqueuedTicks) { + getSuccessorState().getSuccessor().doReplayRequest(request, callback, enqueuedTicks); + } + abstract boolean isSnapshotOnly(); abstract void doDelete(YangInstanceIdentifier path); @@ -748,10 +801,8 @@ abstract class AbstractProxyTransaction implements Identifiable>, ReadFailedException> doRead(YangInstanceIdentifier path); - abstract void doSeal(); - @GuardedBy("this") - abstract void flushState(AbstractProxyTransaction successor); + abstract java.util.Optional flushState(); abstract TransactionRequest abortRequest();