Do not break actor containment 80/81680/1
authorTomas Cere <tomas.cere@pantheon.tech>
Tue, 9 Apr 2019 08:46:18 +0000 (10:46 +0200)
committerTomas Cere <tomas.cere@pantheon.tech>
Wed, 17 Apr 2019 11:42:13 +0000 (13:42 +0200)
During async persistence the actor containment is broken which
results in applyState being called directly from the persistence actor.
This means that the ClientRequestTracker might be missing an entry
for the transaction and the transaction being applied as a foreign candidate
in ShardDataTree.
If this happens there will be a transaction stuck in COMMIT_PENDING state,
blocking any further progress in the shard until its restarted.

JIRA: CONTROLLER-1890
Change-Id: I944b233d13103df08b68baeaf3907c064d1d526e
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
16 files changed:
opendaylight/md-sal/sal-akka-raft/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java

index 6a28f93bd6b472d8c05daf6ef69aff5bceb92082..d649bbbb286782b3d944404b39a90e0c37d38ad3 100644 (file)
     </dependency>
 
     <!-- Test Dependencies -->
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
index f9099a7b818a7e8b5004157a4b73da0f90e57ac9..1cd9eae0d6ef002c1015033fba0c8cc1672b4c51 100755 (executable)
@@ -133,7 +133,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
             -1, -1, peerAddresses,
             configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
-            delegatingPersistenceProvider, this::handleApplyState, LOG);
+            delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf);
 
         context.setPayloadVersion(payloadVersion);
         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
index aed050c3c04ee4e15afa9bb907d9887d556bb33f..6e56f64801e66035e2ea8268d094661413cb2f78 100644 (file)
@@ -16,10 +16,12 @@ import akka.cluster.Cluster;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Collection;
 import java.util.Optional;
+import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -65,6 +67,13 @@ public interface RaftActorContext {
      */
     ActorRef getActor();
 
+    /**
+     * Return an Executor which is guaranteed to run tasks in the context of {@link #getActor()}.
+     *
+     * @return An executor.
+     */
+    @NonNull Executor getExecutor();
+
     /**
      * The akka Cluster singleton for the actor system if one is configured.
      *
index 19d796c9363aa928e7f287e218807093fa93f2a3..8ba0f48d72e9ca8c5825f657bdc5ec9940365c17 100644 (file)
@@ -5,9 +5,10 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.raft;
 
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.ActorContext;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -15,7 +16,6 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.cluster.Cluster;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -24,10 +24,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import java.util.function.LongSupplier;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -50,6 +50,8 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private final ActorContext context;
 
+    private final @NonNull Executor executor;
+
     private final String id;
 
     private final ElectionTerm termInformation;
@@ -94,25 +96,27 @@ public class RaftActorContextImpl implements RaftActorContext {
     private RaftActorLeadershipTransferCohort leadershipTransferCohort;
 
     public RaftActorContextImpl(final ActorRef actor, final ActorContext context, final String id,
-            @Nonnull final ElectionTerm termInformation, final long commitIndex, final long lastApplied,
-            @Nonnull final Map<String, String> peerAddresses,
-            @Nonnull final ConfigParams configParams, @Nonnull final DataPersistenceProvider persistenceProvider,
-            @Nonnull final Consumer<ApplyState> applyStateConsumer, @Nonnull final Logger logger) {
+            final @NonNull ElectionTerm termInformation, final long commitIndex, final long lastApplied,
+            final @NonNull Map<String, String> peerAddresses,
+            final @NonNull ConfigParams configParams, final @NonNull DataPersistenceProvider persistenceProvider,
+            final @NonNull Consumer<ApplyState> applyStateConsumer, final @NonNull Logger logger,
+            final @NonNull Executor executor) {
         this.actor = actor;
         this.context = context;
         this.id = id;
-        this.termInformation = Preconditions.checkNotNull(termInformation);
+        this.termInformation = requireNonNull(termInformation);
+        this.executor = requireNonNull(executor);
         this.commitIndex = commitIndex;
         this.lastApplied = lastApplied;
-        this.configParams = Preconditions.checkNotNull(configParams);
-        this.persistenceProvider = Preconditions.checkNotNull(persistenceProvider);
-        this.log = Preconditions.checkNotNull(logger);
-        this.applyStateConsumer = Preconditions.checkNotNull(applyStateConsumer);
+        this.configParams = requireNonNull(configParams);
+        this.persistenceProvider = requireNonNull(persistenceProvider);
+        this.log = requireNonNull(logger);
+        this.applyStateConsumer = requireNonNull(applyStateConsumer);
 
         fileBackedOutputStreamFactory = new FileBackedOutputStreamFactory(
                 configParams.getFileBackedStreamingThreshold(), configParams.getTempFileDirectory());
 
-        for (Map.Entry<String, String> e: Preconditions.checkNotNull(peerAddresses).entrySet()) {
+        for (Map.Entry<String, String> e : requireNonNull(peerAddresses).entrySet()) {
             peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
         }
     }
@@ -151,6 +155,11 @@ public class RaftActorContextImpl implements RaftActorContext {
         return actor;
     }
 
+    @Override
+    public final Executor getExecutor() {
+        return executor;
+    }
+
     @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
     public Optional<Cluster> getCluster() {
@@ -159,7 +168,7 @@ public class RaftActorContextImpl implements RaftActorContext {
                 cluster = Optional.of(Cluster.get(getActorSystem()));
             } catch (Exception e) {
                 // An exception means there's no cluster configured. This will only happen in unit tests.
-                log.debug("{}: Could not obtain Cluster: {}", getId(), e);
+                log.debug("{}: Could not obtain Cluster", getId(), e);
                 cluster = Optional.empty();
             }
         }
@@ -404,7 +413,7 @@ public class RaftActorContextImpl implements RaftActorContext {
     }
 
     void setCurrentBehavior(final RaftActorBehavior behavior) {
-        this.currentBehavior = Preconditions.checkNotNull(behavior);
+        this.currentBehavior = requireNonNull(behavior);
     }
 
     @Override
@@ -429,15 +438,13 @@ public class RaftActorContextImpl implements RaftActorContext {
     }
 
     @Override
-    @Nullable
     public RaftActorLeadershipTransferCohort getRaftActorLeadershipTransferCohort() {
         return leadershipTransferCohort;
     }
 
     @Override
     @SuppressWarnings("checkstyle:hiddenField")
-    public void setRaftActorLeadershipTransferCohort(
-            @Nullable final RaftActorLeadershipTransferCohort leadershipTransferCohort) {
+    public void setRaftActorLeadershipTransferCohort(final RaftActorLeadershipTransferCohort leadershipTransferCohort) {
         this.leadershipTransferCohort = leadershipTransferCohort;
     }
 }
index 71576f6d21b71fd02704e50d74e431f16a641f50..707c128098be446eb1422c0e03be90527d3a0743 100644 (file)
@@ -94,7 +94,8 @@ public interface ReplicatedLog {
      * @param callback the Procedure to be notified when persistence is complete (optional).
      * @param doAsync if true, the persistent actor can receive subsequent messages to process in between the persist
      *        call and the execution of the associated callback. If false, subsequent messages are stashed and get
-     *        delivered after persistence is complete and the associated callback is executed.
+     *        delivered after persistence is complete and the associated callback is executed. In either case the
+     *        callback is guaranteed to execute in the context of the actor associated with this log.
      * @return true if the entry was successfully appended, false otherwise.
      */
     boolean appendAndPersist(@Nonnull ReplicatedLogEntry replicatedLogEntry,
index 764bce1ddb97aa4af14620736a5e286d64d1a87a..e372d0072766239459db1ac49f9bbf79ec2df8d6 100644 (file)
@@ -102,22 +102,35 @@ final class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
             return false;
         }
 
-        Procedure<ReplicatedLogEntry> persistCallback = persistedLogEntry -> {
-            context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry);
-
-            dataSizeSinceLastSnapshot += persistedLogEntry.size();
-
-            if (callback != null) {
-                callback.apply(persistedLogEntry);
-            }
-        };
-
         if (doAsync) {
-            context.getPersistenceProvider().persistAsync(replicatedLogEntry, persistCallback);
+            context.getPersistenceProvider().persistAsync(replicatedLogEntry,
+                entry -> persistCallback(entry, callback));
         } else {
-            context.getPersistenceProvider().persist(replicatedLogEntry, persistCallback);
+            context.getPersistenceProvider().persist(replicatedLogEntry, entry -> syncPersistCallback(entry, callback));
         }
 
         return true;
     }
+
+    private void persistCallback(final ReplicatedLogEntry persistedLogEntry,
+            final Procedure<ReplicatedLogEntry> callback) {
+        context.getExecutor().execute(() -> syncPersistCallback(persistedLogEntry, callback));
+    }
+
+    @SuppressWarnings("checkstyle:illegalCatch")
+    private void syncPersistCallback(final ReplicatedLogEntry persistedLogEntry,
+            final Procedure<ReplicatedLogEntry> callback) {
+        context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry);
+
+        dataSizeSinceLastSnapshot += persistedLogEntry.size();
+
+        if (callback != null) {
+            try {
+                callback.apply(persistedLogEntry);
+            } catch (Exception e) {
+                context.getLogger().error("{}: persist callback failed", context.getId(), e);
+                throw new IllegalStateException("Persist callback failed", e);
+            }
+        }
+    }
 }
index 6e4d536d3d10bf4b32ad5452e55626788208bcc9..b194d38068173b0cd08fc62ad415e10334b8af95 100644 (file)
@@ -90,6 +90,34 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         }
     }
 
+    /**
+     * Message intended for testing to allow triggering persistData via the mailbox.
+     */
+    public static final class TestPersist {
+
+        private final ActorRef actorRef;
+        private final Identifier identifier;
+        private final Payload payload;
+
+        TestPersist(final ActorRef actorRef, final Identifier identifier, final Payload payload) {
+            this.actorRef = actorRef;
+            this.identifier = identifier;
+            this.payload = payload;
+        }
+
+        public ActorRef getActorRef() {
+            return actorRef;
+        }
+
+        public Identifier getIdentifier() {
+            return identifier;
+        }
+
+        public Payload getPayload() {
+            return payload;
+        }
+    }
+
     public static class TestRaftActor extends MockRaftActor {
 
         private final ActorRef collectorActor;
@@ -136,6 +164,12 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
                 return;
             }
 
+            if (message instanceof TestPersist) {
+                persistData(((TestPersist) message).getActorRef(), ((TestPersist) message).getIdentifier(),
+                        ((TestPersist) message).getPayload(), false);
+                return;
+            }
+
             try {
                 Predicate drop = dropMessages.get(message.getClass());
                 if (drop == null || !drop.test(message)) {
index 28cc9b3f81e9226e2baf5e7ecd4422d9e20413ee..5c5a3fbc9922ec87f5779f24cf4f3746b8868ce5 100644 (file)
@@ -52,7 +52,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
     protected MockRaftActor(final AbstractBuilder<?, ?> builder) {
         super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
             Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
-        state = new ArrayList<>();
+        state = Collections.synchronizedList(new ArrayList<>());
         this.actorDelegate = mock(RaftActor.class);
         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
 
index 2e05a7e5708b49381082353518b81cd299521f4f..1935e2650398e74cd4ea33e071e2387b975ec7bd 100644 (file)
@@ -14,6 +14,7 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.japi.Procedure;
 import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
@@ -74,13 +75,15 @@ public class MockRaftActorContext extends RaftActorContextImpl {
 
     public MockRaftActorContext() {
         super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
-                new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG);
+                new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG,
+                MoreExecutors.directExecutor());
         setReplicatedLog(new MockReplicatedLogBuilder().build());
     }
 
     public MockRaftActorContext(final String id, final ActorSystem system, final ActorRef actor) {
         super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
-            new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG);
+            new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG,
+            MoreExecutors.directExecutor());
 
         this.system = system;
 
index 2dd1c211af37d42e2c251d81db5cc329e1d9b153..5dd708cb3be97a28ad35bd2ea25805587742fde4 100644 (file)
@@ -21,6 +21,7 @@ import akka.actor.Props;
 import akka.testkit.TestActorRef;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -60,7 +61,7 @@ public class RaftActorContextImplTest extends AbstractActorTest {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 "test", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1,
-                peerMap, configParams, createProvider(), applyState -> { }, LOG);
+                peerMap, configParams, createProvider(), applyState -> { }, LOG,  MoreExecutors.directExecutor());
 
         assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1"));
         assertEquals("getPeerAddress", null, context.getPeerAddress("peer2"));
@@ -85,7 +86,7 @@ public class RaftActorContextImplTest extends AbstractActorTest {
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 "test", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1,
                 Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")), configParams,
-                createProvider(), applyState -> { }, LOG);
+                createProvider(), applyState -> { }, LOG,  MoreExecutors.directExecutor());
 
         context.setPeerAddress("peer1", "peerAddress1_1");
         assertEquals("getPeerAddress", "peerAddress1_1", context.getPeerAddress("peer1"));
@@ -99,7 +100,8 @@ public class RaftActorContextImplTest extends AbstractActorTest {
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 "self", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1,
                 Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")),
-                new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG);
+                new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG,
+                MoreExecutors.directExecutor());
 
         context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo("self", false),
                 new ServerInfo("peer2", true), new ServerInfo("peer3", false))));
index 897294718f3ab78a9ee1dda4a5594c97dbab8fdd..92ff6d9f2d1e0a0eea4cea76ea9d95d7402f5768 100644 (file)
@@ -23,6 +23,7 @@ import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotMetadata;
 import akka.persistence.SnapshotOffer;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Arrays;
 import java.util.Collections;
 import org.hamcrest.Description;
@@ -84,7 +85,7 @@ public class RaftActorRecoverySupportTest {
 
         context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
                 LOG), -1, -1, Collections.<String,String>emptyMap(), configParams,
-                mockPersistence, applyState -> { }, LOG);
+                mockPersistence, applyState -> { }, LOG,  MoreExecutors.directExecutor());
 
         support = new RaftActorRecoverySupport(context, mockCohort);
 
index b1231d62d345b3477df95b4837da3c8ce2bcb46f..6de32f1598c336657e2c74ffe2797be53cf084b6 100644 (file)
@@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -1478,7 +1479,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         termInfo.update(1, LEADER_ID);
         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams,
-                noPersistence, applyState -> actor.tell(applyState, actor), LOG);
+                noPersistence, applyState -> actor.tell(applyState, actor), LOG,  MoreExecutors.directExecutor());
     }
 
     abstract static class AbstractMockRaftActor extends MockRaftActor {
index 495cc6d6af490df19ae44919af6f9b7a2bb8dacd..a210d71835b1b52135df9813235d4e12d59f3fba 100644 (file)
@@ -18,6 +18,7 @@ import akka.actor.ActorRef;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotMetadata;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.io.OutputStream;
 import java.util.Collections;
 import java.util.Optional;
@@ -69,7 +70,7 @@ public class RaftActorSnapshotMessageSupportTest {
 
         context = new RaftActorContextImpl(mockRaftActorRef, null, "test",
                 new ElectionTermImpl(mockPersistence, "test", LOG), -1, -1, Collections.<String,String>emptyMap(),
-                configParams, mockPersistence, applyState -> { }, LOG) {
+                configParams, mockPersistence, applyState -> { }, LOG,  MoreExecutors.directExecutor()) {
             @Override
             public SnapshotManager getSnapshotManager() {
                 return mockSnapshotManager;
index 8a6b0ad61b81ee72a20db6f5069b36181cbd441e..679d2c28b4e70ef147e51fa74e116dafba713bac 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
@@ -49,6 +50,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.junit.After;
@@ -60,6 +63,8 @@ import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.AbstractRaftActorIntegrationTest.TestPersist;
+import org.opendaylight.controller.cluster.raft.AbstractRaftActorIntegrationTest.TestRaftActor;
 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
@@ -1351,4 +1356,68 @@ public class RaftActorTest extends AbstractActorTest {
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
         assertEquals("AppendEntries size", 3, appendEntries.getEntries().size());
     }
+
+    @Test
+    @SuppressWarnings("checkstyle:illegalcatch")
+    public void testApplyStateRace() throws Exception {
+        final String leaderId = factory.generateActorId("leader-");
+        final String followerId = factory.generateActorId("follower-");
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        ActorRef mockFollowerActorRef = factory.createActor(MessageCollectorActor.props());
+
+        TestRaftActor.Builder builder = TestRaftActor.newBuilder()
+                .id(leaderId)
+                .peerAddresses(ImmutableMap.of(followerId,
+                        mockFollowerActorRef.path().toString()))
+                .config(config)
+                .collectorActor(factory.createActor(
+                        MessageCollectorActor.props(), factory.generateActorId(leaderId + "-collector")));
+
+        TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
+                builder.props(), leaderId);
+        MockRaftActor leaderActor = leaderActorRef.underlyingActor();
+        leaderActor.waitForInitializeBehaviorComplete();
+
+        leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
+        Leader leader = new Leader(leaderActor.getRaftActorContext());
+        leaderActor.setCurrentBehavior(leader);
+
+        final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+        leaderActor.setPersistence(new PersistentDataProvider(leaderActor) {
+            @Override
+            public <T> void persistAsync(final T entry, final Procedure<T> procedure) {
+                // needs to be executed from another thread to simulate the persistence actor calling this callback
+                executorService.submit(() -> {
+                    try {
+                        procedure.apply(entry);
+                    } catch (Exception e) {
+                        TEST_LOG.info("Fail during async persist callback", e);
+                    }
+                }, "persistence-callback");
+            }
+        });
+
+        leader.getFollower(followerId).setNextIndex(0);
+        leader.getFollower(followerId).setMatchIndex(-1);
+
+        // hitting this is flimsy so run multiple times to improve the chance of things
+        // blowing up while breaking actor containment
+        final TestPersist message =
+                new TestPersist(leaderActorRef, new MockIdentifier("1"), new MockPayload("1"));
+        for (int i = 0; i < 100; i++) {
+            leaderActorRef.tell(message, null);
+
+            AppendEntriesReply reply =
+                    new AppendEntriesReply(followerId, 1, true, i, 1, (short) 5);
+            leaderActorRef.tell(reply, mockFollowerActorRef);
+        }
+
+        await("Persistence callback.").atMost(5, TimeUnit.SECONDS).until(() -> leaderActor.getState().size() == 100);
+        executorService.shutdown();
+    }
 }
index ff4d097010081de5fcf8814fc4e12a9b44fbf396..2a7eb069540ec658249a4dabbd71b51bcd384ef9 100644 (file)
@@ -15,6 +15,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
 import akka.japi.Procedure;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Collections;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
@@ -58,7 +59,7 @@ public class ReplicatedLogImplTest {
 
         context = new RaftActorContextImpl(null, null, "test",
                 new ElectionTermImpl(mockPersistence, "test", LOG), -1, -1, Collections.<String,String>emptyMap(),
-                configParams, mockPersistence, applyState -> { }, LOG);
+                configParams, mockPersistence, applyState -> { }, LOG,  MoreExecutors.directExecutor());
     }
 
     private void verifyPersist(Object message) throws Exception {
index 6dd5336716c93419ee39ae5caa5d7ef76f3cbcd7..b04c9e39715acbb3566e2c5f11e77a0f132cbcf4 100644 (file)
@@ -15,6 +15,7 @@ import akka.actor.ActorRef;
 import akka.dispatch.Dispatchers;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -178,7 +179,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest<Candidate> {
         Mockito.doReturn(1L).when(mockElectionTerm).getCurrentTerm();
         RaftActorContext raftActorContext = new RaftActorContextImpl(candidateActor, candidateActor.actorContext(),
                 "candidate", mockElectionTerm, -1, -1, setupPeers(4), new DefaultConfigParamsImpl(),
-                new NonPersistentDataProvider(Runnable::run), applyState -> { }, LOG);
+                new NonPersistentDataProvider(Runnable::run), applyState -> { }, LOG,  MoreExecutors.directExecutor());
         raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
         raftActorContext.getPeerInfo("peer1").setVotingState(VotingState.NON_VOTING);
         raftActorContext.getPeerInfo("peer4").setVotingState(VotingState.NON_VOTING);