X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractProxyTransaction.java;h=07b89e09230949da6c4849b3fb5dc03d4c3c36d8;hp=ac91f2d0413ad8aca71391ba86921c92dc1b008b;hb=18ddbfdc55a1faddf7aeb2df6b25481d34c820ab;hpb=87551f3a44856d7494ffef678b63bb01a1b4ab2e diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index ac91f2d041..07b89e0923 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -30,6 +30,7 @@ import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException; import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; @@ -147,13 +148,15 @@ abstract class AbstractProxyTransaction implements Identifiable enqueuedTicks) { parent.onTransactionSealed(this); - // Now deal with state transfer, which can occur via successor or a follow-up canCommit() or directCommit(). - if (!STATE_UPDATER.compareAndSet(this, OPEN, SEALED)) { - // Slow path: wait for the successor to complete - final AbstractProxyTransaction successor = awaitSuccessor(); + // Transition internal state to sealed and detect presence of a successor + return STATE_UPDATER.compareAndSet(this, OPEN, SEALED); + } - // At this point the successor has completed transition and is possibly visible by the user thread, which is - // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed. - // Propagate state and seal the successor. - flushState(successor); - successor.ensureSealed(); - } + /** + * Mark this proxy as having been sealed. + * + * @return True if this call has transitioned to sealed state. + */ + final boolean markSealed() { + return SEALED_UPDATER.compareAndSet(this, 0, 1); } private void checkNotSealed() { @@ -454,7 +495,14 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause().unwrap()); + final Throwable cause = ((RequestFailure) t).getCause().unwrap(); + if (cause instanceof ClosedTransactionException) { + // This is okay, as it indicates the transaction has been completed. It can happen + // when we lose connectivity with the backend after it has received the request. + ret.set(Boolean.TRUE); + } else { + ret.setException(cause); + } } else { ret.setException(new IllegalStateException("Unhandled response " + t.getClass())); } @@ -620,8 +668,8 @@ abstract class AbstractProxyTransaction implements Identifiable) obj, resp -> { }, now); + successor.doReplayRequest((TransactionRequest) obj, resp -> { }, now); } else { Verify.verify(obj instanceof IncrementSequence); final IncrementSequence increment = (IncrementSequence) obj; - successor.replayRequest(new IncrementTransactionSequenceRequest(getIdentifier(), + successor.doReplayRequest(new IncrementTransactionSequenceRequest(getIdentifier(), increment.getSequence(), localActor(), isSnapshotOnly(), increment.getDelta()), resp -> { }, now); LOG.debug("Incrementing sequence {} to successor {}", obj, successor); @@ -659,7 +707,7 @@ abstract class AbstractProxyTransaction implements Identifiable) req, e.getCallback(), e.getEnqueuedTicks()); + successor.doReplayRequest((TransactionRequest) req, e.getCallback(), e.getEnqueuedTicks()); it.remove(); } } @@ -672,7 +720,9 @@ abstract class AbstractProxyTransaction implements Identifiable request, final Consumer> callback, + private void doReplayRequest(final TransactionRequest request, final Consumer> callback, final long enqueuedTicks) { if (request instanceof AbstractLocalTransactionRequest) { handleReplayedLocalRequest((AbstractLocalTransactionRequest) request, callback, enqueuedTicks); @@ -727,6 +777,11 @@ abstract class AbstractProxyTransaction implements Identifiable request, final Consumer> callback, + final long enqueuedTicks) { + getSuccessorState().getSuccessor().doReplayRequest(request, callback, enqueuedTicks); + } + abstract boolean isSnapshotOnly(); abstract void doDelete(YangInstanceIdentifier path); @@ -739,8 +794,6 @@ abstract class AbstractProxyTransaction implements Identifiable>, ReadFailedException> doRead(YangInstanceIdentifier path); - abstract void doSeal(); - @GuardedBy("this") abstract void flushState(AbstractProxyTransaction successor);