From 9cab91f5204c0f55ee269a507269f02d5fe5e90b Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 24 May 2017 12:01:20 +0200 Subject: [PATCH] BUG-8538: do not invoke read callbacks during replay. As evidenced by a ConcurrentModificationException happening reliably in face of aborted read-only transactions, there are avenues how our state can be modified eventhough we hold the locks. One such avenue is listeners hanging on read operations, which can enqueue further requests in the context of calling thread. That thread must not be performing replay, hence delay request completion into a separate actor message by using executeInActor(). Change-Id: Ibcd0ac788156011ec3a4cc573dc7fb249ebf93a2 Signed-off-by: Robert Varga (cherry picked from commit 8123d0fc56a28324fed48e3027edf090e8149b9b) --- .../actors/dds/AbstractProxyTransaction.java | 7 +++++++ .../actors/dds/LocalProxyTransaction.java | 16 ++++++++++++++-- .../databroker/actors/dds/ProxyHistory.java | 5 +++++ .../cluster/access/client/AccessClientUtil.java | 4 +++- .../actors/dds/AbstractProxyTransactionTest.java | 5 +++-- .../actors/dds/LocalProxyTransactionTest.java | 14 ++++++++++++++ 6 files changed, 46 insertions(+), 5 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index 2661ea82e9..8dfdf34adf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -197,6 +197,13 @@ abstract class AbstractProxyTransaction implements Identifiable { + command.run(); + return behavior; + }); + } + final ActorRef localActor() { return parent.localActor(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index 9f4b18eaaa..5b47f22971 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -104,15 +104,27 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { private boolean handleReadRequest(final TransactionRequest request, final @Nullable Consumer> callback) { + // Note we delay completion of read requests to limit the scope at which the client can run, as they have + // listeners, which we do not want to execute while we are reconnecting. if (request instanceof ReadTransactionRequest) { final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath(); final Optional> result = readOnlyView().readNode(path); - callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result)); + if (callback != null) { + // XXX: FB does not see that callback is final, on stack and has be check for non-null. + final Consumer> fbIsStupid = Preconditions.checkNotNull(callback); + executeInActor(() -> fbIsStupid.accept(new ReadTransactionSuccess(request.getTarget(), + request.getSequence(), result))); + } return true; } else if (request instanceof ExistsTransactionRequest) { final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath(); final boolean result = readOnlyView().readNode(path).isPresent(); - callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result)); + if (callback != null) { + // XXX: FB does not see that callback is final, on stack and has be check for non-null. + final Consumer> fbIsStupid = Preconditions.checkNotNull(callback); + executeInActor(() -> fbIsStupid.accept(new ExistsTransactionSuccess(request.getTarget(), + request.getSequence(), result))); + } return true; } else { return false; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index 7c3b2010c2..b529d94c2b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -23,6 +23,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; +import org.opendaylight.controller.cluster.access.client.ClientActorContext; import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest; @@ -338,6 +339,10 @@ abstract class ProxyHistory implements Identifiable { return identifier; } + final ClientActorContext context() { + return connection.context(); + } + final long currentTime() { return connection.currentTime(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/access/client/AccessClientUtil.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/access/client/AccessClientUtil.java index 987b90c572..a43dc55fdc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/access/client/AccessClientUtil.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/access/client/AccessClientUtil.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.access.client; +import static org.mockito.Mockito.spy; + import akka.actor.ActorRef; import akka.actor.ActorSystem; import java.util.function.Consumer; @@ -22,7 +24,7 @@ public class AccessClientUtil { public static ClientActorContext createClientActorContext(final ActorSystem system, final ActorRef actor, final ClientIdentifier id, final String persistenceId) { - return new ClientActorContext(actor, system.scheduler(), system.dispatcher(), persistenceId, id); + return spy(new ClientActorContext(actor, system.scheduler(), system.dispatcher(), persistenceId, id)); } public static ConnectedClientConnection createConnectedConnection( diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java index dfbd11fc1e..92c996407a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java @@ -93,6 +93,7 @@ public abstract class AbstractProxyTransactionTest tester; + protected ClientActorContext context; protected T transaction; @Before @@ -101,8 +102,8 @@ public abstract class AbstractProxyTransactionTest connection = diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java index 3cf1b03493..e536a06772 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java @@ -21,6 +21,8 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.opendaylight.controller.cluster.access.client.ClientActorBehavior; +import org.opendaylight.controller.cluster.access.client.InternalCommand; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; @@ -58,12 +60,22 @@ public abstract class LocalProxyTransactionTest getTester().expectTransactionRequest(AbortLocalTransactionRequest.class); } + @SuppressWarnings("unchecked") + private void setupExecuteInActor() { + doAnswer(inv -> { + inv.getArgumentAt(0, InternalCommand.class).execute(mock(ClientActorBehavior.class)); + return null; + }).when(context).executeInActor(any(InternalCommand.class)); + } + @Test public void testHandleForwardedRemoteReadRequest() throws Exception { final TestProbe probe = createProbe(); final ReadTransactionRequest request = new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true); final Consumer> callback = createCallbackMock(); + setupExecuteInActor(); + transaction.handleReplayedRemoteRequest(request, callback, Ticker.systemTicker().read()); final ArgumentCaptor captor = ArgumentCaptor.forClass(Response.class); verify(callback).accept(captor.capture()); @@ -80,6 +92,8 @@ public abstract class LocalProxyTransactionTest final ExistsTransactionRequest request = new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true); final Consumer> callback = createCallbackMock(); + setupExecuteInActor(); + transaction.handleReplayedRemoteRequest(request, callback, Ticker.systemTicker().read()); final ArgumentCaptor captor = ArgumentCaptor.forClass(Response.class); verify(callback).accept(captor.capture()); -- 2.36.6