BUG-8538: do not invoke read callbacks during replay. 04/58204/1
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 24 May 2017 10:01:20 +0000 (12:01 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 4 Jun 2017 07:45:59 +0000 (09:45 +0200)
As evidenced by a ConcurrentModificationException happening reliably
in face of aborted read-only transactions, there are avenues how
our state can be modified eventhough we hold the locks.

One such avenue is listeners hanging on read operations, which
can enqueue further requests in the context of calling thread. That
thread must not be performing replay, hence delay request completion
into a separate actor message by using executeInActor().

Change-Id: Ibcd0ac788156011ec3a4cc573dc7fb249ebf93a2
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 8123d0fc56a28324fed48e3027edf090e8149b9b)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/access/client/AccessClientUtil.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java

index 2661ea82e9fb30a65ea6fb656cc03fa944f96a1b..8dfdf34adfebae0a255d5f0f98c69886a3c9182d 100644 (file)
@@ -197,6 +197,13 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         this.parent = Preconditions.checkNotNull(parent);
     }
 
+    final void executeInActor(final Runnable command) {
+        parent.context().executeInActor(behavior -> {
+            command.run();
+            return behavior;
+        });
+    }
+
     final ActorRef localActor() {
         return parent.localActor();
     }
index 9f4b18eaaa73e07d91d12d2f3639c202a1eee436..5b47f22971c9af76298e82032202871bb986b96b 100644 (file)
@@ -104,15 +104,27 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
 
     private boolean handleReadRequest(final TransactionRequest<?> request,
             final @Nullable Consumer<Response<?, ?>> callback) {
+        // Note we delay completion of read requests to limit the scope at which the client can run, as they have
+        // listeners, which we do not want to execute while we are reconnecting.
         if (request instanceof ReadTransactionRequest) {
             final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath();
             final Optional<NormalizedNode<?, ?>> result = readOnlyView().readNode(path);
-            callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result));
+            if (callback != null) {
+                // XXX: FB does not see that callback is final, on stack and has be check for non-null.
+                final Consumer<Response<?, ?>> fbIsStupid = Preconditions.checkNotNull(callback);
+                executeInActor(() -> fbIsStupid.accept(new ReadTransactionSuccess(request.getTarget(),
+                    request.getSequence(), result)));
+            }
             return true;
         } else if (request instanceof ExistsTransactionRequest) {
             final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath();
             final boolean result = readOnlyView().readNode(path).isPresent();
-            callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result));
+            if (callback != null) {
+                // XXX: FB does not see that callback is final, on stack and has be check for non-null.
+                final Consumer<Response<?, ?>> fbIsStupid = Preconditions.checkNotNull(callback);
+                executeInActor(() -> fbIsStupid.accept(new ExistsTransactionSuccess(request.getTarget(),
+                    request.getSequence(), result)));
+            }
             return true;
         } else {
             return false;
index 7c3b2010c2ce809307646f852ee473e9d6c9cbf4..b529d94c2b226bb879b9c0f3cf4baa07a7414359 100644 (file)
@@ -23,6 +23,7 @@ import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
@@ -338,6 +339,10 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         return identifier;
     }
 
+    final ClientActorContext context() {
+        return connection.context();
+    }
+
     final long currentTime() {
         return connection.currentTime();
     }
index 987b90c57272dd9af821b175657462fbb6a0dc18..a43dc55fdcd532beca3f2d1e51d2de70ce4e6e9f 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.access.client;
 
+import static org.mockito.Mockito.spy;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import java.util.function.Consumer;
@@ -22,7 +24,7 @@ public class AccessClientUtil {
 
     public static ClientActorContext createClientActorContext(final ActorSystem system, final ActorRef actor,
                                                               final ClientIdentifier id, final String persistenceId) {
-        return new ClientActorContext(actor, system.scheduler(), system.dispatcher(), persistenceId, id);
+        return spy(new ClientActorContext(actor, system.scheduler(), system.dispatcher(), persistenceId, id));
     }
 
     public static <T extends BackendInfo> ConnectedClientConnection<T> createConnectedConnection(
index dfbd11fc1e2d37cfbf213a2298de075d2d2ffbb4..92c996407a6d607c5e1e413705467f68f1d1d419 100644 (file)
@@ -93,6 +93,7 @@ public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransa
     private TestProbe backendProbe;
     private TestProbe clientContextProbe;
     private TransactionTester<T> tester;
+    protected ClientActorContext context;
     protected T transaction;
 
     @Before
@@ -101,8 +102,8 @@ public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransa
         system = ActorSystem.apply();
         clientContextProbe = new TestProbe(system, "clientContext");
         backendProbe = new TestProbe(system, "backend");
-        final ClientActorContext context =
-                AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
+        context = AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID,
+                PERSISTENCE_ID);
         final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
                 "default", UnsignedLong.ZERO, Optional.empty(), 3);
         final AbstractClientConnection<ShardBackendInfo> connection =
index 3cf1b03493b2085bdf501a38127bac08e602cf70..e536a0677208c6a6aaec97d494d1b3807b626b20 100644 (file)
@@ -21,6 +21,8 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
+import org.opendaylight.controller.cluster.access.client.InternalCommand;
 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
@@ -58,12 +60,22 @@ public abstract class LocalProxyTransactionTest<T extends LocalProxyTransaction>
         getTester().expectTransactionRequest(AbortLocalTransactionRequest.class);
     }
 
+    @SuppressWarnings("unchecked")
+    private void setupExecuteInActor() {
+        doAnswer(inv -> {
+            inv.getArgumentAt(0, InternalCommand.class).execute(mock(ClientActorBehavior.class));
+            return null;
+        }).when(context).executeInActor(any(InternalCommand.class));
+    }
+
     @Test
     public void testHandleForwardedRemoteReadRequest() throws Exception {
         final TestProbe probe = createProbe();
         final ReadTransactionRequest request =
                 new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
         final Consumer<Response<?, ?>> callback = createCallbackMock();
+        setupExecuteInActor();
+
         transaction.handleReplayedRemoteRequest(request, callback, Ticker.systemTicker().read());
         final ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
         verify(callback).accept(captor.capture());
@@ -80,6 +92,8 @@ public abstract class LocalProxyTransactionTest<T extends LocalProxyTransaction>
         final ExistsTransactionRequest request =
                 new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
         final Consumer<Response<?, ?>> callback = createCallbackMock();
+        setupExecuteInActor();
+
         transaction.handleReplayedRemoteRequest(request, callback, Ticker.systemTicker().read());
         final ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
         verify(callback).accept(captor.capture());