From: Robert Varga Date: Thu, 6 Jul 2017 07:04:10 +0000 (+0200) Subject: BUG-8618: rework AbstractProxyTransaction.flushState() X-Git-Tag: release/nitrogen~43 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=7991491f2854dde2ec625ed6c08b44df7d258795 BUG-8618: rework AbstractProxyTransaction.flushState() Instead of directly forwarding state use ModifyTransactionRequest to encapsulate state and forward it separately to the successor. This eliminates sendRequest() from replay path, ensuring the replay thread is not blocked. Change-Id: Ice86791d417b7487b9d3b1df06341dd028cde7f8 Signed-off-by: Robert Varga (cherry picked from commit c525e5f25b951daa28d0cbde237ba3040b68f99f) --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index da21e691a7..51f528150d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -31,6 +31,7 @@ import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException; import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; @@ -350,7 +351,10 @@ abstract class AbstractProxyTransaction implements Identifiable optState = flushState(); + if (optState.isPresent()) { + forwardToSuccessor(successor, optState.get(), null); + } successor.predecessorSealed(); } @@ -718,9 +722,13 @@ abstract class AbstractProxyTransaction implements Identifiable optState = flushState(); + if (optState.isPresent()) { + successor.handleReplayedRemoteRequest(optState.get(), null, enqueuedTicks); + } if (successor.markSealed()) { - successor.sealAndSend(Optional.of(parent.currentTime())); + successor.sealAndSend(Optional.of(enqueuedTicks)); } } } @@ -794,7 +802,7 @@ abstract class AbstractProxyTransaction implements Identifiable>, ReadFailedException> doRead(YangInstanceIdentifier path); @GuardedBy("this") - abstract void flushState(AbstractProxyTransaction successor); + abstract java.util.Optional flushState(); abstract TransactionRequest abortRequest(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index 99806fd81f..f34abfff9c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -203,6 +203,8 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { } else if (request instanceof TransactionPurgeRequest) { LOG.debug("Forwarding purge {} to successor {}", request, successor); successor.enqueuePurge(callback); + } else if (request instanceof ModifyTransactionRequest) { + successor.handleForwardedRequest(request, callback); } else { throw new IllegalArgumentException("Unhandled request" + request); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java index e85c86f5b5..00e294022b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.base.Preconditions; import com.google.common.base.Verify; +import java.util.Optional; import java.util.function.Consumer; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; @@ -73,8 +74,9 @@ final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction { } @Override - void flushState(final AbstractProxyTransaction successor) { + Optional flushState() { // No-op + return Optional.empty(); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java index 77e2a6a6cc..76ad672255 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java @@ -20,6 +20,7 @@ import org.opendaylight.controller.cluster.access.commands.AbortLocalTransaction import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder; import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionDelete; @@ -196,23 +197,28 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { } @Override - void flushState(final AbstractProxyTransaction successor) { + Optional flushState() { + final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(getIdentifier(), localActor()); + b.setSequence(0); + sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() { @Override public void write(final PathArgument child, final NormalizedNode data) { - successor.write(current().node(child), data); + b.addModification(new TransactionWrite(current().node(child), data)); } @Override public void merge(final PathArgument child, final NormalizedNode data) { - successor.merge(current().node(child), data); + b.addModification(new TransactionMerge(current().node(child), data)); } @Override public void delete(final PathArgument child) { - successor.delete(current().node(child)); + b.addModification(new TransactionDelete(current().node(child))); } }); + + return Optional.of(b.build()); } DataTreeSnapshot getSnapshot() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 1ba96426df..5a6b539494 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -277,12 +277,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } @Override - void flushState(final AbstractProxyTransaction successor) { - if (builderBusy) { - final ModifyTransactionRequest request = builder.build(); - builderBusy = false; - forwardToSuccessor(successor, request, null); + java.util.Optional flushState() { + if (!builderBusy) { + return java.util.Optional.empty(); } + + final ModifyTransactionRequest request = builder.build(); + builderBusy = false; + return java.util.Optional.of(request); } @Override @@ -291,7 +293,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { successor.handleForwardedRequest(request, callback); } - private void handleForwardedRequest(final TransactionRequest request, final Consumer> callback) { + void handleForwardedRequest(final TransactionRequest request, final Consumer> callback) { if (request instanceof ModifyTransactionRequest) { final ModifyTransactionRequest req = (ModifyTransactionRequest) request; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java index cbd9f3da3e..e1daac489e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java @@ -149,7 +149,8 @@ public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTes final RemoteProxyTransaction successor = transactionTester.getTransaction(); doAnswer(LocalProxyTransactionTest::applyToCursorAnswer).when(modification).applyToCursor(any()); transaction.sealOnly(); - transaction.flushState(successor); + final TransactionRequest request = transaction.flushState().get(); + transaction.forwardToSuccessor(successor, request, null); verify(modification).applyToCursor(any()); transactionTester.getTransaction().seal(); transactionTester.getTransaction().directCommit();