From 7daddd72031b33ed686abe18a0813e41263aac8d Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Tue, 9 Apr 2019 10:46:18 +0200 Subject: [PATCH] Do not break actor containment During async persistence the actor containment is broken which results in applyState being called directly from the persistence actor. This means that the ClientRequestTracker might be missing an entry for the transaction and the transaction being applied as a foreign candidate in ShardDataTree. If this happens there will be a transaction stuck in COMMIT_PENDING state, blocking any further progress in the shard until its restarted. JIRA: CONTROLLER-1890 Change-Id: I944b233d13103df08b68baeaf3907c064d1d526e Signed-off-by: Tomas Cere Signed-off-by: Robert Varga --- opendaylight/md-sal/sal-akka-raft/pom.xml | 5 ++ .../controller/cluster/raft/RaftActor.java | 2 +- .../cluster/raft/RaftActorContext.java | 8 +++ .../cluster/raft/RaftActorContextImpl.java | 12 +++- .../cluster/raft/ReplicatedLog.java | 3 +- .../cluster/raft/ReplicatedLogImpl.java | 37 ++++++---- .../AbstractRaftActorIntegrationTest.java | 34 +++++++++ .../cluster/raft/MockRaftActor.java | 2 +- .../cluster/raft/MockRaftActorContext.java | 7 +- .../raft/RaftActorContextImplTest.java | 8 ++- .../raft/RaftActorRecoverySupportTest.java | 3 +- ...ftActorServerConfigurationSupportTest.java | 3 +- .../RaftActorSnapshotMessageSupportTest.java | 3 +- .../cluster/raft/RaftActorTest.java | 69 +++++++++++++++++++ .../cluster/raft/ReplicatedLogImplTest.java | 3 +- .../cluster/raft/behaviors/CandidateTest.java | 3 +- 16 files changed, 176 insertions(+), 26 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/pom.xml b/opendaylight/md-sal/sal-akka-raft/pom.xml index 52a8c1548b..89c32f645a 100644 --- a/opendaylight/md-sal/sal-akka-raft/pom.xml +++ b/opendaylight/md-sal/sal-akka-raft/pom.xml @@ -66,6 +66,11 @@ + + org.awaitility + awaitility + test + org.mockito mockito-core diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 775ab0cdef..cdffef3a7a 100755 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -133,7 +133,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses, configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(), - delegatingPersistenceProvider, this::handleApplyState, LOG); + delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf); context.setPayloadVersion(payloadVersion); context.setReplicatedLog(ReplicatedLogImpl.newInstance(context)); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index b5f3afdc16..e971ed4f6d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -15,6 +15,7 @@ import akka.cluster.Cluster; import com.google.common.annotations.VisibleForTesting; import java.util.Collection; import java.util.Optional; +import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.LongSupplier; import org.eclipse.jdt.annotation.NonNull; @@ -64,6 +65,13 @@ public interface RaftActorContext { */ ActorRef getActor(); + /** + * Return an Executor which is guaranteed to run tasks in the context of {@link #getActor()}. + * + * @return An executor. + */ + @NonNull Executor getExecutor(); + /** * The akka Cluster singleton for the actor system if one is configured. * diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 634bbd4343..8ba0f48d72 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.LongSupplier; import org.eclipse.jdt.annotation.NonNull; @@ -49,6 +50,8 @@ public class RaftActorContextImpl implements RaftActorContext { private final ActorContext context; + private final @NonNull Executor executor; + private final String id; private final ElectionTerm termInformation; @@ -96,11 +99,13 @@ public class RaftActorContextImpl implements RaftActorContext { final @NonNull ElectionTerm termInformation, final long commitIndex, final long lastApplied, final @NonNull Map peerAddresses, final @NonNull ConfigParams configParams, final @NonNull DataPersistenceProvider persistenceProvider, - final @NonNull Consumer applyStateConsumer, final @NonNull Logger logger) { + final @NonNull Consumer applyStateConsumer, final @NonNull Logger logger, + final @NonNull Executor executor) { this.actor = actor; this.context = context; this.id = id; this.termInformation = requireNonNull(termInformation); + this.executor = requireNonNull(executor); this.commitIndex = commitIndex; this.lastApplied = lastApplied; this.configParams = requireNonNull(configParams); @@ -150,6 +155,11 @@ public class RaftActorContextImpl implements RaftActorContext { return actor; } + @Override + public final Executor getExecutor() { + return executor; + } + @Override @SuppressWarnings("checkstyle:IllegalCatch") public Optional getCluster() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index 095e85cbe5..a05dbc662f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -91,7 +91,8 @@ public interface ReplicatedLog { * @param callback the Procedure to be notified when persistence is complete (optional). * @param doAsync if true, the persistent actor can receive subsequent messages to process in between the persist * call and the execution of the associated callback. If false, subsequent messages are stashed and get - * delivered after persistence is complete and the associated callback is executed. + * delivered after persistence is complete and the associated callback is executed. In either case the + * callback is guaranteed to execute in the context of the actor associated with this log. * @return true if the entry was successfully appended, false otherwise. */ boolean appendAndPersist(@NonNull ReplicatedLogEntry replicatedLogEntry, diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java index 45b0b3898e..c53cad68c5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java @@ -101,22 +101,35 @@ final class ReplicatedLogImpl extends AbstractReplicatedLogImpl { return false; } - Procedure persistCallback = persistedLogEntry -> { - context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry); - - dataSizeSinceLastSnapshot += persistedLogEntry.size(); - - if (callback != null) { - callback.apply(persistedLogEntry); - } - }; - if (doAsync) { - context.getPersistenceProvider().persistAsync(replicatedLogEntry, persistCallback); + context.getPersistenceProvider().persistAsync(replicatedLogEntry, + entry -> persistCallback(entry, callback)); } else { - context.getPersistenceProvider().persist(replicatedLogEntry, persistCallback); + context.getPersistenceProvider().persist(replicatedLogEntry, entry -> syncPersistCallback(entry, callback)); } return true; } + + private void persistCallback(final ReplicatedLogEntry persistedLogEntry, + final Procedure callback) { + context.getExecutor().execute(() -> syncPersistCallback(persistedLogEntry, callback)); + } + + @SuppressWarnings("checkstyle:illegalCatch") + private void syncPersistCallback(final ReplicatedLogEntry persistedLogEntry, + final Procedure callback) { + context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry); + + dataSizeSinceLastSnapshot += persistedLogEntry.size(); + + if (callback != null) { + try { + callback.apply(persistedLogEntry); + } catch (Exception e) { + context.getLogger().error("{}: persist callback failed", context.getId(), e); + throw new IllegalStateException("Persist callback failed", e); + } + } + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index 82017fa99f..d8d0ce5772 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -91,6 +91,34 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest } } + /** + * Message intended for testing to allow triggering persistData via the mailbox. + */ + public static final class TestPersist { + + private final ActorRef actorRef; + private final Identifier identifier; + private final Payload payload; + + TestPersist(final ActorRef actorRef, final Identifier identifier, final Payload payload) { + this.actorRef = actorRef; + this.identifier = identifier; + this.payload = payload; + } + + public ActorRef getActorRef() { + return actorRef; + } + + public Identifier getIdentifier() { + return identifier; + } + + public Payload getPayload() { + return payload; + } + } + public static class TestRaftActor extends MockRaftActor { private final ActorRef collectorActor; @@ -137,6 +165,12 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest return; } + if (message instanceof TestPersist) { + persistData(((TestPersist) message).getActorRef(), ((TestPersist) message).getIdentifier(), + ((TestPersist) message).getPayload(), false); + return; + } + try { Predicate drop = dropMessages.get(message.getClass()); if (drop == null || !drop.test(message)) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java index 9c58e6b7ca..61a673057d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java @@ -51,7 +51,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, protected MockRaftActor(final AbstractBuilder builder) { super(builder.id, builder.peerAddresses != null ? builder.peerAddresses : Collections.emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION); - state = new ArrayList<>(); + state = Collections.synchronizedList(new ArrayList<>()); this.actorDelegate = mock(RaftActor.class); this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 2e05a7e570..1935e26503 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -14,6 +14,7 @@ import akka.actor.ActorSystem; import akka.actor.Props; import akka.japi.Procedure; import com.google.common.io.ByteSource; +import com.google.common.util.concurrent.MoreExecutors; import java.io.IOException; import java.io.OutputStream; import java.io.Serializable; @@ -74,13 +75,15 @@ public class MockRaftActorContext extends RaftActorContextImpl { public MockRaftActorContext() { super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(), - new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG); + new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG, + MoreExecutors.directExecutor()); setReplicatedLog(new MockReplicatedLogBuilder().build()); } public MockRaftActorContext(final String id, final ActorSystem system, final ActorRef actor) { super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(), - new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG); + new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG, + MoreExecutors.directExecutor()); this.system = system; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java index 54661b67fc..cf09ab59e6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java @@ -21,6 +21,7 @@ import akka.actor.Props; import akka.testkit.TestActorRef; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.MoreExecutors; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -60,7 +61,7 @@ public class RaftActorContextImplTest extends AbstractActorTest { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(), "test", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1, - peerMap, configParams, createProvider(), applyState -> { }, LOG); + peerMap, configParams, createProvider(), applyState -> { }, LOG, MoreExecutors.directExecutor()); assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1")); assertEquals("getPeerAddress", null, context.getPeerAddress("peer2")); @@ -85,7 +86,7 @@ public class RaftActorContextImplTest extends AbstractActorTest { RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(), "test", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1, Maps.newHashMap(ImmutableMap.of("peer1", "peerAddress1")), configParams, - createProvider(), applyState -> { }, LOG); + createProvider(), applyState -> { }, LOG, MoreExecutors.directExecutor()); context.setPeerAddress("peer1", "peerAddress1_1"); assertEquals("getPeerAddress", "peerAddress1_1", context.getPeerAddress("peer1")); @@ -99,7 +100,8 @@ public class RaftActorContextImplTest extends AbstractActorTest { RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(), "self", new ElectionTermImpl(createProvider(), "test", LOG), -1, -1, Maps.newHashMap(ImmutableMap.of("peer1", "peerAddress1")), - new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG); + new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG, + MoreExecutors.directExecutor()); context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo("self", false), new ServerInfo("peer2", true), new ServerInfo("peer3", false)))); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java index 494ca11d68..4855f42931 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java @@ -22,6 +22,7 @@ import akka.persistence.RecoveryCompleted; import akka.persistence.SnapshotMetadata; import akka.persistence.SnapshotOffer; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; import java.util.Arrays; import java.util.Collections; import org.junit.Before; @@ -81,7 +82,7 @@ public class RaftActorRecoverySupportTest { context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test", LOG), -1, -1, Collections.emptyMap(), configParams, - mockPersistence, applyState -> { }, LOG); + mockPersistence, applyState -> { }, LOG, MoreExecutors.directExecutor()); support = new RaftActorRecoverySupport(context, mockCohort); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java index 8207fd76a8..e17d9faae5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.io.ByteSource; +import com.google.common.util.concurrent.MoreExecutors; import java.io.OutputStream; import java.time.Duration; import java.util.ArrayList; @@ -1474,7 +1475,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { termInfo.update(1, LEADER_ID); return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(), id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, - noPersistence, applyState -> actor.tell(applyState, actor), LOG); + noPersistence, applyState -> actor.tell(applyState, actor), LOG, MoreExecutors.directExecutor()); } abstract static class AbstractMockRaftActor extends MockRaftActor { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java index 11b5000ad5..fe1480d0cf 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java @@ -18,6 +18,7 @@ import akka.actor.ActorRef; import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotMetadata; +import com.google.common.util.concurrent.MoreExecutors; import java.io.OutputStream; import java.util.Collections; import java.util.Optional; @@ -69,7 +70,7 @@ public class RaftActorSnapshotMessageSupportTest { context = new RaftActorContextImpl(mockRaftActorRef, null, "test", new ElectionTermImpl(mockPersistence, "test", LOG), -1, -1, Collections.emptyMap(), - configParams, mockPersistence, applyState -> { }, LOG) { + configParams, mockPersistence, applyState -> { }, LOG, MoreExecutors.directExecutor()) { @Override public SnapshotManager getSnapshotManager() { return mockSnapshotManager; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 33f918119a..3b58cb01d1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.raft; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; @@ -48,6 +49,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.junit.After; @@ -59,6 +62,8 @@ import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; +import org.opendaylight.controller.cluster.raft.AbstractRaftActorIntegrationTest.TestPersist; +import org.opendaylight.controller.cluster.raft.AbstractRaftActorIntegrationTest.TestRaftActor; import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; @@ -1350,4 +1355,68 @@ public class RaftActorTest extends AbstractActorTest { AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); assertEquals("AppendEntries size", 3, appendEntries.getEntries().size()); } + + @Test + @SuppressWarnings("checkstyle:illegalcatch") + public void testApplyStateRace() throws Exception { + final String leaderId = factory.generateActorId("leader-"); + final String followerId = factory.generateActorId("follower-"); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + ActorRef mockFollowerActorRef = factory.createActor(MessageCollectorActor.props()); + + TestRaftActor.Builder builder = TestRaftActor.newBuilder() + .id(leaderId) + .peerAddresses(ImmutableMap.of(followerId, + mockFollowerActorRef.path().toString())) + .config(config) + .collectorActor(factory.createActor( + MessageCollectorActor.props(), factory.generateActorId(leaderId + "-collector"))); + + TestActorRef leaderActorRef = factory.createTestActor( + builder.props(), leaderId); + MockRaftActor leaderActor = leaderActorRef.underlyingActor(); + leaderActor.waitForInitializeBehaviorComplete(); + + leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId); + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + + leaderActor.setPersistence(new PersistentDataProvider(leaderActor) { + @Override + public void persistAsync(final T entry, final Procedure procedure) { + // needs to be executed from another thread to simulate the persistence actor calling this callback + executorService.submit(() -> { + try { + procedure.apply(entry); + } catch (Exception e) { + TEST_LOG.info("Fail during async persist callback", e); + } + }, "persistence-callback"); + } + }); + + leader.getFollower(followerId).setNextIndex(0); + leader.getFollower(followerId).setMatchIndex(-1); + + // hitting this is flimsy so run multiple times to improve the chance of things + // blowing up while breaking actor containment + final TestPersist message = + new TestPersist(leaderActorRef, new MockIdentifier("1"), new MockPayload("1")); + for (int i = 0; i < 100; i++) { + leaderActorRef.tell(message, null); + + AppendEntriesReply reply = + new AppendEntriesReply(followerId, 1, true, i, 1, (short) 5); + leaderActorRef.tell(reply, mockFollowerActorRef); + } + + await("Persistence callback.").atMost(5, TimeUnit.SECONDS).until(() -> leaderActor.getState().size() == 100); + executorService.shutdown(); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java index 6ae5731800..276ace15b5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java @@ -17,6 +17,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import akka.japi.Procedure; +import com.google.common.util.concurrent.MoreExecutors; import java.util.Collections; import org.junit.Before; import org.junit.Test; @@ -56,7 +57,7 @@ public class ReplicatedLogImplTest { context = new RaftActorContextImpl(null, null, "test", new ElectionTermImpl(mockPersistence, "test", LOG), -1, -1, Collections.emptyMap(), - configParams, mockPersistence, applyState -> { }, LOG); + configParams, mockPersistence, applyState -> { }, LOG, MoreExecutors.directExecutor()); } private void verifyPersist(Object message) throws Exception { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java index 6dd5336716..b04c9e3971 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java @@ -15,6 +15,7 @@ import akka.actor.ActorRef; import akka.dispatch.Dispatchers; import akka.testkit.TestActorRef; import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -178,7 +179,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { Mockito.doReturn(1L).when(mockElectionTerm).getCurrentTerm(); RaftActorContext raftActorContext = new RaftActorContextImpl(candidateActor, candidateActor.actorContext(), "candidate", mockElectionTerm, -1, -1, setupPeers(4), new DefaultConfigParamsImpl(), - new NonPersistentDataProvider(Runnable::run), applyState -> { }, LOG); + new NonPersistentDataProvider(Runnable::run), applyState -> { }, LOG, MoreExecutors.directExecutor()); raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); raftActorContext.getPeerInfo("peer1").setVotingState(VotingState.NON_VOTING); raftActorContext.getPeerInfo("peer4").setVotingState(VotingState.NON_VOTING); -- 2.36.6