As evidenced by a ConcurrentModificationException happening reliably
in face of aborted read-only transactions, there are avenues how
our state can be modified eventhough we hold the locks.
One such avenue is listeners hanging on read operations, which
can enqueue further requests in the context of calling thread. That
thread must not be performing replay, hence delay request completion
into a separate actor message by using executeInActor().
Change-Id: Ibcd0ac788156011ec3a4cc573dc7fb249ebf93a2
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit
8123d0fc56a28324fed48e3027edf090e8149b9b)
this.parent = Preconditions.checkNotNull(parent);
}
+ final void executeInActor(final Runnable command) {
+ parent.context().executeInActor(behavior -> {
+ command.run();
+ return behavior;
+ });
+ }
+
final ActorRef localActor() {
return parent.localActor();
}
private boolean handleReadRequest(final TransactionRequest<?> request,
final @Nullable Consumer<Response<?, ?>> callback) {
+ // Note we delay completion of read requests to limit the scope at which the client can run, as they have
+ // listeners, which we do not want to execute while we are reconnecting.
if (request instanceof ReadTransactionRequest) {
final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath();
final Optional<NormalizedNode<?, ?>> result = readOnlyView().readNode(path);
- callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result));
+ if (callback != null) {
+ // XXX: FB does not see that callback is final, on stack and has be check for non-null.
+ final Consumer<Response<?, ?>> fbIsStupid = Preconditions.checkNotNull(callback);
+ executeInActor(() -> fbIsStupid.accept(new ReadTransactionSuccess(request.getTarget(),
+ request.getSequence(), result)));
+ }
return true;
} else if (request instanceof ExistsTransactionRequest) {
final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath();
final boolean result = readOnlyView().readNode(path).isPresent();
- callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result));
+ if (callback != null) {
+ // XXX: FB does not see that callback is final, on stack and has be check for non-null.
+ final Consumer<Response<?, ?>> fbIsStupid = Preconditions.checkNotNull(callback);
+ executeInActor(() -> fbIsStupid.accept(new ExistsTransactionSuccess(request.getTarget(),
+ request.getSequence(), result)));
+ }
return true;
} else {
return false;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
return identifier;
}
+ final ClientActorContext context() {
+ return connection.context();
+ }
+
final long currentTime() {
return connection.currentTime();
}
*/
package org.opendaylight.controller.cluster.access.client;
+import static org.mockito.Mockito.spy;
+
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import java.util.function.Consumer;
public static ClientActorContext createClientActorContext(final ActorSystem system, final ActorRef actor,
final ClientIdentifier id, final String persistenceId) {
- return new ClientActorContext(actor, system.scheduler(), system.dispatcher(), persistenceId, id);
+ return spy(new ClientActorContext(actor, system.scheduler(), system.dispatcher(), persistenceId, id));
}
public static <T extends BackendInfo> ConnectedClientConnection<T> createConnectedConnection(
private TestProbe backendProbe;
private TestProbe clientContextProbe;
private TransactionTester<T> tester;
+ protected ClientActorContext context;
protected T transaction;
@Before
system = ActorSystem.apply();
clientContextProbe = new TestProbe(system, "clientContext");
backendProbe = new TestProbe(system, "backend");
- final ClientActorContext context =
- AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
+ context = AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID,
+ PERSISTENCE_ID);
final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
"default", UnsignedLong.ZERO, Optional.empty(), 3);
final AbstractClientConnection<ShardBackendInfo> connection =
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
+import org.opendaylight.controller.cluster.access.client.InternalCommand;
import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
getTester().expectTransactionRequest(AbortLocalTransactionRequest.class);
}
+ @SuppressWarnings("unchecked")
+ private void setupExecuteInActor() {
+ doAnswer(inv -> {
+ inv.getArgumentAt(0, InternalCommand.class).execute(mock(ClientActorBehavior.class));
+ return null;
+ }).when(context).executeInActor(any(InternalCommand.class));
+ }
+
@Test
public void testHandleForwardedRemoteReadRequest() throws Exception {
final TestProbe probe = createProbe();
final ReadTransactionRequest request =
new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
final Consumer<Response<?, ?>> callback = createCallbackMock();
+ setupExecuteInActor();
+
transaction.handleReplayedRemoteRequest(request, callback, Ticker.systemTicker().read());
final ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
verify(callback).accept(captor.capture());
final ExistsTransactionRequest request =
new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
final Consumer<Response<?, ?>> callback = createCallbackMock();
+ setupExecuteInActor();
+
transaction.handleReplayedRemoteRequest(request, callback, Ticker.systemTicker().read());
final ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
verify(callback).accept(captor.capture());