BUG-8618: rework AbstractProxyTransaction.flushState() 17/60417/2
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 6 Jul 2017 07:04:10 +0000 (09:04 +0200)
committerRobert Varga <nite@hq.sk>
Sat, 15 Jul 2017 08:47:57 +0000 (08:47 +0000)
Instead of directly forwarding state use ModifyTransactionRequest
to encapsulate state and forward it separately to the successor.

This eliminates sendRequest() from replay path, ensuring the replay
thread is not blocked.

Change-Id: Ice86791d417b7487b9d3b1df06341dd028cde7f8
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit c525e5f25b951daa28d0cbde237ba3040b68f99f)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java

index da21e69..51f5281 100644 (file)
@@ -31,6 +31,7 @@ import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
 import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
@@ -350,7 +351,10 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         // At this point the successor has completed transition and is possibly visible by the user thread, which is
         // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
         // Propagate state and seal the successor.
-        flushState(successor);
+        final java.util.Optional<ModifyTransactionRequest> optState = flushState();
+        if (optState.isPresent()) {
+            forwardToSuccessor(successor, optState.get(), null);
+        }
         successor.predecessorSealed();
     }
 
@@ -718,9 +722,13 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
          */
         if (SEALED.equals(prevState)) {
             LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
-            flushState(successor);
+            final long enqueuedTicks = parent.currentTime();
+            final java.util.Optional<ModifyTransactionRequest> optState = flushState();
+            if (optState.isPresent()) {
+                successor.handleReplayedRemoteRequest(optState.get(), null, enqueuedTicks);
+            }
             if (successor.markSealed()) {
-                successor.sealAndSend(Optional.of(parent.currentTime()));
+                successor.sealAndSend(Optional.of(enqueuedTicks));
             }
         }
     }
@@ -794,7 +802,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(YangInstanceIdentifier path);
 
     @GuardedBy("this")
-    abstract void flushState(AbstractProxyTransaction successor);
+    abstract java.util.Optional<ModifyTransactionRequest> flushState();
 
     abstract TransactionRequest<?> abortRequest();
 
index 99806fd..f34abff 100644 (file)
@@ -203,6 +203,8 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
         } else if (request instanceof TransactionPurgeRequest) {
             LOG.debug("Forwarding purge {} to successor {}", request, successor);
             successor.enqueuePurge(callback);
+        } else if (request instanceof ModifyTransactionRequest) {
+            successor.handleForwardedRequest(request, callback);
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
index e85c86f..00e2940 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
+import java.util.Optional;
 import java.util.function.Consumer;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
@@ -73,8 +74,9 @@ final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction {
     }
 
     @Override
-    void flushState(final AbstractProxyTransaction successor) {
+    Optional<ModifyTransactionRequest> flushState() {
         // No-op
+        return Optional.empty();
     }
 
     @Override
index 77e2a6a..76ad672 100644 (file)
@@ -20,6 +20,7 @@ import org.opendaylight.controller.cluster.access.commands.AbortLocalTransaction
 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
@@ -196,23 +197,28 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
     }
 
     @Override
-    void flushState(final AbstractProxyTransaction successor) {
+    Optional<ModifyTransactionRequest> flushState() {
+        final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(getIdentifier(), localActor());
+        b.setSequence(0);
+
         sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() {
             @Override
             public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
-                successor.write(current().node(child), data);
+                b.addModification(new TransactionWrite(current().node(child), data));
             }
 
             @Override
             public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
-                successor.merge(current().node(child), data);
+                b.addModification(new TransactionMerge(current().node(child), data));
             }
 
             @Override
             public void delete(final PathArgument child) {
-                successor.delete(current().node(child));
+                b.addModification(new TransactionDelete(current().node(child)));
             }
         });
+
+        return Optional.of(b.build());
     }
 
     DataTreeSnapshot getSnapshot() {
index 1ba9642..5a6b539 100644 (file)
@@ -277,12 +277,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     }
 
     @Override
-    void flushState(final AbstractProxyTransaction successor) {
-        if (builderBusy) {
-            final ModifyTransactionRequest request = builder.build();
-            builderBusy = false;
-            forwardToSuccessor(successor, request, null);
+    java.util.Optional<ModifyTransactionRequest> flushState() {
+        if (!builderBusy) {
+            return java.util.Optional.empty();
         }
+
+        final ModifyTransactionRequest request = builder.build();
+        builderBusy = false;
+        return java.util.Optional.of(request);
     }
 
     @Override
@@ -291,7 +293,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         successor.handleForwardedRequest(request, callback);
     }
 
-    private void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+    void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         if (request instanceof ModifyTransactionRequest) {
             final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
 
index cbd9f3d..e1daac4 100644 (file)
@@ -149,7 +149,8 @@ public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTes
         final RemoteProxyTransaction successor = transactionTester.getTransaction();
         doAnswer(LocalProxyTransactionTest::applyToCursorAnswer).when(modification).applyToCursor(any());
         transaction.sealOnly();
-        transaction.flushState(successor);
+        final TransactionRequest<?> request = transaction.flushState().get();
+        transaction.forwardToSuccessor(successor, request, null);
         verify(modification).applyToCursor(any());
         transactionTester.getTransaction().seal();
         transactionTester.getTransaction().directCommit();

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.