BUG-8618: rework AbstractProxyTransaction.flushState() 17/60417/2
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 6 Jul 2017 07:04:10 +0000 (09:04 +0200)
committerRobert Varga <nite@hq.sk>
Sat, 15 Jul 2017 08:47:57 +0000 (08:47 +0000)
Instead of directly forwarding state use ModifyTransactionRequest
to encapsulate state and forward it separately to the successor.

This eliminates sendRequest() from replay path, ensuring the replay
thread is not blocked.

Change-Id: Ice86791d417b7487b9d3b1df06341dd028cde7f8
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit c525e5f25b951daa28d0cbde237ba3040b68f99f)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java

index da21e691a78f5fca2328b6554d947e3b11dec0ae..51f528150d22a717034cc4281618591645cca92f 100644 (file)
@@ -31,6 +31,7 @@ import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
 import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
 import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
@@ -350,7 +351,10 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         // At this point the successor has completed transition and is possibly visible by the user thread, which is
         // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
         // Propagate state and seal the successor.
         // At this point the successor has completed transition and is possibly visible by the user thread, which is
         // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
         // Propagate state and seal the successor.
-        flushState(successor);
+        final java.util.Optional<ModifyTransactionRequest> optState = flushState();
+        if (optState.isPresent()) {
+            forwardToSuccessor(successor, optState.get(), null);
+        }
         successor.predecessorSealed();
     }
 
         successor.predecessorSealed();
     }
 
@@ -718,9 +722,13 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
          */
         if (SEALED.equals(prevState)) {
             LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
          */
         if (SEALED.equals(prevState)) {
             LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
-            flushState(successor);
+            final long enqueuedTicks = parent.currentTime();
+            final java.util.Optional<ModifyTransactionRequest> optState = flushState();
+            if (optState.isPresent()) {
+                successor.handleReplayedRemoteRequest(optState.get(), null, enqueuedTicks);
+            }
             if (successor.markSealed()) {
             if (successor.markSealed()) {
-                successor.sealAndSend(Optional.of(parent.currentTime()));
+                successor.sealAndSend(Optional.of(enqueuedTicks));
             }
         }
     }
             }
         }
     }
@@ -794,7 +802,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(YangInstanceIdentifier path);
 
     @GuardedBy("this")
     abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(YangInstanceIdentifier path);
 
     @GuardedBy("this")
-    abstract void flushState(AbstractProxyTransaction successor);
+    abstract java.util.Optional<ModifyTransactionRequest> flushState();
 
     abstract TransactionRequest<?> abortRequest();
 
 
     abstract TransactionRequest<?> abortRequest();
 
index 99806fd81f8bbaa27b97c9fc37563abe66bd0ee2..f34abfff9c665ccd38204758a94448781267c764 100644 (file)
@@ -203,6 +203,8 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
         } else if (request instanceof TransactionPurgeRequest) {
             LOG.debug("Forwarding purge {} to successor {}", request, successor);
             successor.enqueuePurge(callback);
         } else if (request instanceof TransactionPurgeRequest) {
             LOG.debug("Forwarding purge {} to successor {}", request, successor);
             successor.enqueuePurge(callback);
+        } else if (request instanceof ModifyTransactionRequest) {
+            successor.handleForwardedRequest(request, callback);
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
index e85c86f5b53e985771d6f470f8d75d363eca0ab0..00e294022b7845889f8afe01ab83caeed1f6a2e8 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
+import java.util.Optional;
 import java.util.function.Consumer;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
 import java.util.function.Consumer;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
@@ -73,8 +74,9 @@ final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction {
     }
 
     @Override
     }
 
     @Override
-    void flushState(final AbstractProxyTransaction successor) {
+    Optional<ModifyTransactionRequest> flushState() {
         // No-op
         // No-op
+        return Optional.empty();
     }
 
     @Override
     }
 
     @Override
index 77e2a6a6ccd9e2e3085589f6a1a751df537a0245..76ad672255c77dba307971e24c1350c59ef34b34 100644 (file)
@@ -20,6 +20,7 @@ import org.opendaylight.controller.cluster.access.commands.AbortLocalTransaction
 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
@@ -196,23 +197,28 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
     }
 
     @Override
     }
 
     @Override
-    void flushState(final AbstractProxyTransaction successor) {
+    Optional<ModifyTransactionRequest> flushState() {
+        final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(getIdentifier(), localActor());
+        b.setSequence(0);
+
         sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() {
             @Override
             public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
         sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() {
             @Override
             public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
-                successor.write(current().node(child), data);
+                b.addModification(new TransactionWrite(current().node(child), data));
             }
 
             @Override
             public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
             }
 
             @Override
             public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
-                successor.merge(current().node(child), data);
+                b.addModification(new TransactionMerge(current().node(child), data));
             }
 
             @Override
             public void delete(final PathArgument child) {
             }
 
             @Override
             public void delete(final PathArgument child) {
-                successor.delete(current().node(child));
+                b.addModification(new TransactionDelete(current().node(child)));
             }
         });
             }
         });
+
+        return Optional.of(b.build());
     }
 
     DataTreeSnapshot getSnapshot() {
     }
 
     DataTreeSnapshot getSnapshot() {
index 1ba96426df75feb7e38e4db0f217c8c4f98708b8..5a6b539494e3577f8d0642e95ec2bcc47a3f72a0 100644 (file)
@@ -277,12 +277,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     }
 
     @Override
     }
 
     @Override
-    void flushState(final AbstractProxyTransaction successor) {
-        if (builderBusy) {
-            final ModifyTransactionRequest request = builder.build();
-            builderBusy = false;
-            forwardToSuccessor(successor, request, null);
+    java.util.Optional<ModifyTransactionRequest> flushState() {
+        if (!builderBusy) {
+            return java.util.Optional.empty();
         }
         }
+
+        final ModifyTransactionRequest request = builder.build();
+        builderBusy = false;
+        return java.util.Optional.of(request);
     }
 
     @Override
     }
 
     @Override
@@ -291,7 +293,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         successor.handleForwardedRequest(request, callback);
     }
 
         successor.handleForwardedRequest(request, callback);
     }
 
-    private void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+    void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         if (request instanceof ModifyTransactionRequest) {
             final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
 
         if (request instanceof ModifyTransactionRequest) {
             final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
 
index cbd9f3da3ed6e0c803c2df15a57315bfdf09f951..e1daac489e8bd5a1194cfd71582c67c294f23520 100644 (file)
@@ -149,7 +149,8 @@ public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTes
         final RemoteProxyTransaction successor = transactionTester.getTransaction();
         doAnswer(LocalProxyTransactionTest::applyToCursorAnswer).when(modification).applyToCursor(any());
         transaction.sealOnly();
         final RemoteProxyTransaction successor = transactionTester.getTransaction();
         doAnswer(LocalProxyTransactionTest::applyToCursorAnswer).when(modification).applyToCursor(any());
         transaction.sealOnly();
-        transaction.flushState(successor);
+        final TransactionRequest<?> request = transaction.flushState().get();
+        transaction.forwardToSuccessor(successor, request, null);
         verify(modification).applyToCursor(any());
         transactionTester.getTransaction().seal();
         transactionTester.getTransaction().directCommit();
         verify(modification).applyToCursor(any());
         transactionTester.getTransaction().seal();
         transactionTester.getTransaction().directCommit();