Do not break actor containment 82/81482/7
authorTomas Cere <tomas.cere@pantheon.tech>
Tue, 9 Apr 2019 08:46:18 +0000 (10:46 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sat, 13 Apr 2019 15:58:11 +0000 (17:58 +0200)
During async persistence the actor containment is broken which
results in applyState being called directly from the persistence actor.
This means that the ClientRequestTracker might be missing an entry
for the transaction and the transaction being applied as a foreign candidate
in ShardDataTree.
If this happens there will be a transaction stuck in COMMIT_PENDING state,
blocking any further progress in the shard until its restarted.

JIRA: CONTROLLER-1890
Change-Id: I944b233d13103df08b68baeaf3907c064d1d526e
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
16 files changed:
opendaylight/md-sal/sal-akka-raft/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java

index 52a8c1548bf65a37e5b63fa1322b0da17abf5203..89c32f645aa715bc7276a4f58fab810cfd33a325 100644 (file)
     </dependency>
 
     <!-- Test Dependencies -->
     </dependency>
 
     <!-- Test Dependencies -->
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
index 775ab0cdef376061df6677cdd2b7b760fa393139..cdffef3a7af0f63494b441fea3d96d032781d8df 100755 (executable)
@@ -133,7 +133,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
             -1, -1, peerAddresses,
             configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
             this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
             -1, -1, peerAddresses,
             configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
-            delegatingPersistenceProvider, this::handleApplyState, LOG);
+            delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf);
 
         context.setPayloadVersion(payloadVersion);
         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
 
         context.setPayloadVersion(payloadVersion);
         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
index b5f3afdc16839252f2f2069986e3ef5955c995ae..e971ed4f6de95d5d53d02e7eeb9d3d6c89628db6 100644 (file)
@@ -15,6 +15,7 @@ import akka.cluster.Cluster;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Collection;
 import java.util.Optional;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Collection;
 import java.util.Optional;
+import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 import org.eclipse.jdt.annotation.NonNull;
 import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 import org.eclipse.jdt.annotation.NonNull;
@@ -64,6 +65,13 @@ public interface RaftActorContext {
      */
     ActorRef getActor();
 
      */
     ActorRef getActor();
 
+    /**
+     * Return an Executor which is guaranteed to run tasks in the context of {@link #getActor()}.
+     *
+     * @return An executor.
+     */
+    @NonNull Executor getExecutor();
+
     /**
      * The akka Cluster singleton for the actor system if one is configured.
      *
     /**
      * The akka Cluster singleton for the actor system if one is configured.
      *
index 634bbd4343fbba9219a8ab7a81c22c96c1ebf16c..8ba0f48d72e9ca8c5825f657bdc5ec9940365c17 100644 (file)
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 import org.eclipse.jdt.annotation.NonNull;
 import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 import org.eclipse.jdt.annotation.NonNull;
@@ -49,6 +50,8 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private final ActorContext context;
 
 
     private final ActorContext context;
 
+    private final @NonNull Executor executor;
+
     private final String id;
 
     private final ElectionTerm termInformation;
     private final String id;
 
     private final ElectionTerm termInformation;
@@ -96,11 +99,13 @@ public class RaftActorContextImpl implements RaftActorContext {
             final @NonNull ElectionTerm termInformation, final long commitIndex, final long lastApplied,
             final @NonNull Map<String, String> peerAddresses,
             final @NonNull ConfigParams configParams, final @NonNull DataPersistenceProvider persistenceProvider,
             final @NonNull ElectionTerm termInformation, final long commitIndex, final long lastApplied,
             final @NonNull Map<String, String> peerAddresses,
             final @NonNull ConfigParams configParams, final @NonNull DataPersistenceProvider persistenceProvider,
-            final @NonNull Consumer<ApplyState> applyStateConsumer, final @NonNull Logger logger) {
+            final @NonNull Consumer<ApplyState> applyStateConsumer, final @NonNull Logger logger,
+            final @NonNull Executor executor) {
         this.actor = actor;
         this.context = context;
         this.id = id;
         this.termInformation = requireNonNull(termInformation);
         this.actor = actor;
         this.context = context;
         this.id = id;
         this.termInformation = requireNonNull(termInformation);
+        this.executor = requireNonNull(executor);
         this.commitIndex = commitIndex;
         this.lastApplied = lastApplied;
         this.configParams = requireNonNull(configParams);
         this.commitIndex = commitIndex;
         this.lastApplied = lastApplied;
         this.configParams = requireNonNull(configParams);
@@ -150,6 +155,11 @@ public class RaftActorContextImpl implements RaftActorContext {
         return actor;
     }
 
         return actor;
     }
 
+    @Override
+    public final Executor getExecutor() {
+        return executor;
+    }
+
     @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
     public Optional<Cluster> getCluster() {
     @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
     public Optional<Cluster> getCluster() {
index 095e85cbe568ca3935c9a977af73c0564dea9370..a05dbc662fd6506af1ed84f31c29cf99c0fbcd84 100644 (file)
@@ -91,7 +91,8 @@ public interface ReplicatedLog {
      * @param callback the Procedure to be notified when persistence is complete (optional).
      * @param doAsync if true, the persistent actor can receive subsequent messages to process in between the persist
      *        call and the execution of the associated callback. If false, subsequent messages are stashed and get
      * @param callback the Procedure to be notified when persistence is complete (optional).
      * @param doAsync if true, the persistent actor can receive subsequent messages to process in between the persist
      *        call and the execution of the associated callback. If false, subsequent messages are stashed and get
-     *        delivered after persistence is complete and the associated callback is executed.
+     *        delivered after persistence is complete and the associated callback is executed. In either case the
+     *        callback is guaranteed to execute in the context of the actor associated with this log.
      * @return true if the entry was successfully appended, false otherwise.
      */
     boolean appendAndPersist(@NonNull ReplicatedLogEntry replicatedLogEntry,
      * @return true if the entry was successfully appended, false otherwise.
      */
     boolean appendAndPersist(@NonNull ReplicatedLogEntry replicatedLogEntry,
index 45b0b3898e20cc3b3a5acbe523944bbfc0ee6c6a..c53cad68c5def14c0dae3718ffc01144235db5fe 100644 (file)
@@ -101,22 +101,35 @@ final class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
             return false;
         }
 
             return false;
         }
 
-        Procedure<ReplicatedLogEntry> persistCallback = persistedLogEntry -> {
-            context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry);
-
-            dataSizeSinceLastSnapshot += persistedLogEntry.size();
-
-            if (callback != null) {
-                callback.apply(persistedLogEntry);
-            }
-        };
-
         if (doAsync) {
         if (doAsync) {
-            context.getPersistenceProvider().persistAsync(replicatedLogEntry, persistCallback);
+            context.getPersistenceProvider().persistAsync(replicatedLogEntry,
+                entry -> persistCallback(entry, callback));
         } else {
         } else {
-            context.getPersistenceProvider().persist(replicatedLogEntry, persistCallback);
+            context.getPersistenceProvider().persist(replicatedLogEntry, entry -> syncPersistCallback(entry, callback));
         }
 
         return true;
     }
         }
 
         return true;
     }
+
+    private void persistCallback(final ReplicatedLogEntry persistedLogEntry,
+            final Procedure<ReplicatedLogEntry> callback) {
+        context.getExecutor().execute(() -> syncPersistCallback(persistedLogEntry, callback));
+    }
+
+    @SuppressWarnings("checkstyle:illegalCatch")
+    private void syncPersistCallback(final ReplicatedLogEntry persistedLogEntry,
+            final Procedure<ReplicatedLogEntry> callback) {
+        context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry);
+
+        dataSizeSinceLastSnapshot += persistedLogEntry.size();
+
+        if (callback != null) {
+            try {
+                callback.apply(persistedLogEntry);
+            } catch (Exception e) {
+                context.getLogger().error("{}: persist callback failed", context.getId(), e);
+                throw new IllegalStateException("Persist callback failed", e);
+            }
+        }
+    }
 }
 }
index 82017fa99f97eb48e874c7544efbeb066222978e..d8d0ce57721d0c5e11876910e1f721c493628a4f 100644 (file)
@@ -91,6 +91,34 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         }
     }
 
         }
     }
 
+    /**
+     * Message intended for testing to allow triggering persistData via the mailbox.
+     */
+    public static final class TestPersist {
+
+        private final ActorRef actorRef;
+        private final Identifier identifier;
+        private final Payload payload;
+
+        TestPersist(final ActorRef actorRef, final Identifier identifier, final Payload payload) {
+            this.actorRef = actorRef;
+            this.identifier = identifier;
+            this.payload = payload;
+        }
+
+        public ActorRef getActorRef() {
+            return actorRef;
+        }
+
+        public Identifier getIdentifier() {
+            return identifier;
+        }
+
+        public Payload getPayload() {
+            return payload;
+        }
+    }
+
     public static class TestRaftActor extends MockRaftActor {
 
         private final ActorRef collectorActor;
     public static class TestRaftActor extends MockRaftActor {
 
         private final ActorRef collectorActor;
@@ -137,6 +165,12 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
                 return;
             }
 
                 return;
             }
 
+            if (message instanceof TestPersist) {
+                persistData(((TestPersist) message).getActorRef(), ((TestPersist) message).getIdentifier(),
+                        ((TestPersist) message).getPayload(), false);
+                return;
+            }
+
             try {
                 Predicate drop = dropMessages.get(message.getClass());
                 if (drop == null || !drop.test(message)) {
             try {
                 Predicate drop = dropMessages.get(message.getClass());
                 if (drop == null || !drop.test(message)) {
index 9c58e6b7ca10400614a7eec6a3a2585e97522153..61a673057d48dd19379905571e5724f83ad2407f 100644 (file)
@@ -51,7 +51,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
     protected MockRaftActor(final AbstractBuilder<?, ?> builder) {
         super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
             Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
     protected MockRaftActor(final AbstractBuilder<?, ?> builder) {
         super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
             Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
-        state = new ArrayList<>();
+        state = Collections.synchronizedList(new ArrayList<>());
         this.actorDelegate = mock(RaftActor.class);
         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
 
         this.actorDelegate = mock(RaftActor.class);
         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
 
index 2e05a7e5708b49381082353518b81cd299521f4f..1935e2650398e74cd4ea33e071e2387b975ec7bd 100644 (file)
@@ -14,6 +14,7 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.japi.Procedure;
 import com.google.common.io.ByteSource;
 import akka.actor.Props;
 import akka.japi.Procedure;
 import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
@@ -74,13 +75,15 @@ public class MockRaftActorContext extends RaftActorContextImpl {
 
     public MockRaftActorContext() {
         super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
 
     public MockRaftActorContext() {
         super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
-                new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG);
+                new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG,
+                MoreExecutors.directExecutor());
         setReplicatedLog(new MockReplicatedLogBuilder().build());
     }
 
     public MockRaftActorContext(final String id, final ActorSystem system, final ActorRef actor) {
         super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
         setReplicatedLog(new MockReplicatedLogBuilder().build());
     }
 
     public MockRaftActorContext(final String id, final ActorSystem system, final ActorRef actor) {
         super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
-            new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG);
+            new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG,
+            MoreExecutors.directExecutor());
 
         this.system = system;
 
 
         this.system = system;
 
index 54661b67fc7a12e24fbf6bf2144bcb39f6d066c8..cf09ab59e6bd5f10cb5d7890e84c74493245e4c2 100644 (file)
@@ -21,6 +21,7 @@ import akka.actor.Props;
 import akka.testkit.TestActorRef;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import akka.testkit.TestActorRef;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -60,7 +61,7 @@ public class RaftActorContextImplTest extends AbstractActorTest {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 "test", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1,
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 "test", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1,
-                peerMap, configParams, createProvider(), applyState -> { }, LOG);
+                peerMap, configParams, createProvider(), applyState -> { }, LOG,  MoreExecutors.directExecutor());
 
         assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1"));
         assertEquals("getPeerAddress", null, context.getPeerAddress("peer2"));
 
         assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1"));
         assertEquals("getPeerAddress", null, context.getPeerAddress("peer2"));
@@ -85,7 +86,7 @@ public class RaftActorContextImplTest extends AbstractActorTest {
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 "test", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1,
                 Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")), configParams,
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 "test", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1,
                 Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")), configParams,
-                createProvider(), applyState -> { }, LOG);
+                createProvider(), applyState -> { }, LOG,  MoreExecutors.directExecutor());
 
         context.setPeerAddress("peer1", "peerAddress1_1");
         assertEquals("getPeerAddress", "peerAddress1_1", context.getPeerAddress("peer1"));
 
         context.setPeerAddress("peer1", "peerAddress1_1");
         assertEquals("getPeerAddress", "peerAddress1_1", context.getPeerAddress("peer1"));
@@ -99,7 +100,8 @@ public class RaftActorContextImplTest extends AbstractActorTest {
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 "self", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1,
                 Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")),
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 "self", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1,
                 Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")),
-                new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG);
+                new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG,
+                MoreExecutors.directExecutor());
 
         context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo("self", false),
                 new ServerInfo("peer2", true), new ServerInfo("peer3", false))));
 
         context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo("self", false),
                 new ServerInfo("peer2", true), new ServerInfo("peer3", false))));
index 494ca11d6837616169bb0f778b6c7a2926e6cffa..4855f42931da658d3d6f7912edb930fd78498bf8 100644 (file)
@@ -22,6 +22,7 @@ import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotMetadata;
 import akka.persistence.SnapshotOffer;
 import com.google.common.collect.Sets;
 import akka.persistence.SnapshotMetadata;
 import akka.persistence.SnapshotOffer;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Arrays;
 import java.util.Collections;
 import org.junit.Before;
 import java.util.Arrays;
 import java.util.Collections;
 import org.junit.Before;
@@ -81,7 +82,7 @@ public class RaftActorRecoverySupportTest {
 
         context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
                 LOG), -1, -1, Collections.<String,String>emptyMap(), configParams,
 
         context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
                 LOG), -1, -1, Collections.<String,String>emptyMap(), configParams,
-                mockPersistence, applyState -> { }, LOG);
+                mockPersistence, applyState -> { }, LOG,  MoreExecutors.directExecutor());
 
         support = new RaftActorRecoverySupport(context, mockCohort);
 
 
         support = new RaftActorRecoverySupport(context, mockCohort);
 
index 8207fd76a89aea7a6dda483ddaef33b26891f3e2..e17d9faae5d85ce9904d042c355208614a8cbb8f 100644 (file)
@@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.io.ByteSource;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.io.OutputStream;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.io.OutputStream;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -1474,7 +1475,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         termInfo.update(1, LEADER_ID);
         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams,
         termInfo.update(1, LEADER_ID);
         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams,
-                noPersistence, applyState -> actor.tell(applyState, actor), LOG);
+                noPersistence, applyState -> actor.tell(applyState, actor), LOG,  MoreExecutors.directExecutor());
     }
 
     abstract static class AbstractMockRaftActor extends MockRaftActor {
     }
 
     abstract static class AbstractMockRaftActor extends MockRaftActor {
index 11b5000ad5816220a32dfb6be554702ddd761957..fe1480d0cf6110a3696bb59458d989b118c834c4 100644 (file)
@@ -18,6 +18,7 @@ import akka.actor.ActorRef;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotMetadata;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotMetadata;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.io.OutputStream;
 import java.util.Collections;
 import java.util.Optional;
 import java.io.OutputStream;
 import java.util.Collections;
 import java.util.Optional;
@@ -69,7 +70,7 @@ public class RaftActorSnapshotMessageSupportTest {
 
         context = new RaftActorContextImpl(mockRaftActorRef, null, "test",
                 new ElectionTermImpl(mockPersistence, "test", LOG), -1, -1, Collections.<String,String>emptyMap(),
 
         context = new RaftActorContextImpl(mockRaftActorRef, null, "test",
                 new ElectionTermImpl(mockPersistence, "test", LOG), -1, -1, Collections.<String,String>emptyMap(),
-                configParams, mockPersistence, applyState -> { }, LOG) {
+                configParams, mockPersistence, applyState -> { }, LOG,  MoreExecutors.directExecutor()) {
             @Override
             public SnapshotManager getSnapshotManager() {
                 return mockSnapshotManager;
             @Override
             public SnapshotManager getSnapshotManager() {
                 return mockSnapshotManager;
index 33f918119a63702d833b7fb86960491aff5d44f0..3b58cb01d13f846692bde656eb7c1e2f74173900 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
  */
 package org.opendaylight.controller.cluster.raft;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
@@ -48,6 +49,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.junit.After;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.junit.After;
@@ -59,6 +62,8 @@ import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.AbstractRaftActorIntegrationTest.TestPersist;
+import org.opendaylight.controller.cluster.raft.AbstractRaftActorIntegrationTest.TestRaftActor;
 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
@@ -1350,4 +1355,68 @@ public class RaftActorTest extends AbstractActorTest {
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
         assertEquals("AppendEntries size", 3, appendEntries.getEntries().size());
     }
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
         assertEquals("AppendEntries size", 3, appendEntries.getEntries().size());
     }
+
+    @Test
+    @SuppressWarnings("checkstyle:illegalcatch")
+    public void testApplyStateRace() throws Exception {
+        final String leaderId = factory.generateActorId("leader-");
+        final String followerId = factory.generateActorId("follower-");
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        ActorRef mockFollowerActorRef = factory.createActor(MessageCollectorActor.props());
+
+        TestRaftActor.Builder builder = TestRaftActor.newBuilder()
+                .id(leaderId)
+                .peerAddresses(ImmutableMap.of(followerId,
+                        mockFollowerActorRef.path().toString()))
+                .config(config)
+                .collectorActor(factory.createActor(
+                        MessageCollectorActor.props(), factory.generateActorId(leaderId + "-collector")));
+
+        TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
+                builder.props(), leaderId);
+        MockRaftActor leaderActor = leaderActorRef.underlyingActor();
+        leaderActor.waitForInitializeBehaviorComplete();
+
+        leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
+        Leader leader = new Leader(leaderActor.getRaftActorContext());
+        leaderActor.setCurrentBehavior(leader);
+
+        final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+        leaderActor.setPersistence(new PersistentDataProvider(leaderActor) {
+            @Override
+            public <T> void persistAsync(final T entry, final Procedure<T> procedure) {
+                // needs to be executed from another thread to simulate the persistence actor calling this callback
+                executorService.submit(() -> {
+                    try {
+                        procedure.apply(entry);
+                    } catch (Exception e) {
+                        TEST_LOG.info("Fail during async persist callback", e);
+                    }
+                }, "persistence-callback");
+            }
+        });
+
+        leader.getFollower(followerId).setNextIndex(0);
+        leader.getFollower(followerId).setMatchIndex(-1);
+
+        // hitting this is flimsy so run multiple times to improve the chance of things
+        // blowing up while breaking actor containment
+        final TestPersist message =
+                new TestPersist(leaderActorRef, new MockIdentifier("1"), new MockPayload("1"));
+        for (int i = 0; i < 100; i++) {
+            leaderActorRef.tell(message, null);
+
+            AppendEntriesReply reply =
+                    new AppendEntriesReply(followerId, 1, true, i, 1, (short) 5);
+            leaderActorRef.tell(reply, mockFollowerActorRef);
+        }
+
+        await("Persistence callback.").atMost(5, TimeUnit.SECONDS).until(() -> leaderActor.getState().size() == 100);
+        executorService.shutdown();
+    }
 }
 }
index 6ae5731800049ec42d252bc22a1a9ed74c47adb9..276ace15b533eb8d653e128268aeabee56a9ee9b 100644 (file)
@@ -17,6 +17,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
 import akka.japi.Procedure;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
 import akka.japi.Procedure;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Collections;
 import org.junit.Before;
 import org.junit.Test;
 import java.util.Collections;
 import org.junit.Before;
 import org.junit.Test;
@@ -56,7 +57,7 @@ public class ReplicatedLogImplTest {
 
         context = new RaftActorContextImpl(null, null, "test",
                 new ElectionTermImpl(mockPersistence, "test", LOG), -1, -1, Collections.<String,String>emptyMap(),
 
         context = new RaftActorContextImpl(null, null, "test",
                 new ElectionTermImpl(mockPersistence, "test", LOG), -1, -1, Collections.<String,String>emptyMap(),
-                configParams, mockPersistence, applyState -> { }, LOG);
+                configParams, mockPersistence, applyState -> { }, LOG,  MoreExecutors.directExecutor());
     }
 
     private void verifyPersist(Object message) throws Exception {
     }
 
     private void verifyPersist(Object message) throws Exception {
index 6dd5336716c93419ee39ae5caa5d7ef76f3cbcd7..b04c9e39715acbb3566e2c5f11e77a0f132cbcf4 100644 (file)
@@ -15,6 +15,7 @@ import akka.actor.ActorRef;
 import akka.dispatch.Dispatchers;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Stopwatch;
 import akka.dispatch.Dispatchers;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -178,7 +179,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest<Candidate> {
         Mockito.doReturn(1L).when(mockElectionTerm).getCurrentTerm();
         RaftActorContext raftActorContext = new RaftActorContextImpl(candidateActor, candidateActor.actorContext(),
                 "candidate", mockElectionTerm, -1, -1, setupPeers(4), new DefaultConfigParamsImpl(),
         Mockito.doReturn(1L).when(mockElectionTerm).getCurrentTerm();
         RaftActorContext raftActorContext = new RaftActorContextImpl(candidateActor, candidateActor.actorContext(),
                 "candidate", mockElectionTerm, -1, -1, setupPeers(4), new DefaultConfigParamsImpl(),
-                new NonPersistentDataProvider(Runnable::run), applyState -> { }, LOG);
+                new NonPersistentDataProvider(Runnable::run), applyState -> { }, LOG,  MoreExecutors.directExecutor());
         raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
         raftActorContext.getPeerInfo("peer1").setVotingState(VotingState.NON_VOTING);
         raftActorContext.getPeerInfo("peer4").setVotingState(VotingState.NON_VOTING);
         raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
         raftActorContext.getPeerInfo("peer1").setVotingState(VotingState.NON_VOTING);
         raftActorContext.getPeerInfo("peer4").setVotingState(VotingState.NON_VOTING);