From 023402c7c80372260b6c5c82f120093a73806717 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Sat, 26 Sep 2015 07:27:52 -0400 Subject: [PATCH] Always persist and recover election term info With data persistence disabled, this also disabled persistence/recovery of election term info. This was an oversight - we need to persist and recover election term info regardless. Change-Id: I48d33ca5d3b7d95e2aeb8ed7f9c8d5f1aa401ece Signed-off-by: Tom Pantelis --- .../controller/cluster/raft/RaftActor.java | 17 ++--- .../raft/RaftActorRecoverySupport.java | 45 ++++++++++--- .../cluster/raft/MockRaftActor.java | 14 ++++ .../raft/RaftActorRecoverySupportTest.java | 57 ++++++++++++++-- .../cluster/raft/RaftActorTest.java | 67 +++++++++++++++---- .../cluster/raft/TestActorFactory.java | 27 +++++--- .../cluster/raft/utils/InMemoryJournal.java | 3 +- .../cluster/datastore/ShardTest.java | 11 ++- 8 files changed, 185 insertions(+), 56 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 4e0f770291..a8c32cd469 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -12,7 +12,6 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.japi.Procedure; -import akka.persistence.SnapshotSelectionCriteria; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.base.Optional; @@ -111,6 +110,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null); + private final PersistentDataProvider persistentProvider; + private RaftActorRecoverySupport raftRecovery; private RaftActorSnapshotMessageSupport snapshotSupport; @@ -122,8 +123,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { public RaftActor(String id, Map peerAddresses, Optional configParams, short payloadVersion) { + persistentProvider = new PersistentDataProvider(this); context = new RaftActorContextImpl(this.getSelf(), - this.getContext(), id, new ElectionTermImpl(delegatingPersistenceProvider, id, LOG), + this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses, (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()), delegatingPersistenceProvider, LOG); @@ -161,17 +163,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { raftRecovery = newRaftActorRecoverySupport(); } - boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message); + boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message, persistentProvider); if(recoveryComplete) { - if(!persistence().isRecoveryApplicable()) { - // Delete all the messages from the akka journal so that we do not end up with consistency issues - // Note I am not using the dataPersistenceProvider and directly using the akka api here - deleteMessages(lastSequenceNr()); - - // Delete all the akka snapshots as they will not be needed - deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue())); - } - onRecoveryComplete(); initializeBehavior(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java index 8cf01f11eb..85de4dac75 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java @@ -9,7 +9,10 @@ package org.opendaylight.controller.cluster.raft; import akka.persistence.RecoveryCompleted; import akka.persistence.SnapshotOffer; +import akka.persistence.SnapshotSelectionCriteria; import com.google.common.base.Stopwatch; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries; @@ -28,6 +31,7 @@ class RaftActorRecoverySupport { private final RaftActorRecoveryCohort cohort; private int currentRecoveryBatchCount; + private boolean dataRecoveredWithPersistenceDisabled; private Stopwatch recoveryTimer; private final Logger log; @@ -40,9 +44,20 @@ class RaftActorRecoverySupport { this.log = context.getLogger(); } - boolean handleRecoveryMessage(Object message) { + boolean handleRecoveryMessage(Object message, PersistentDataProvider persistentProvider) { + log.trace("handleRecoveryMessage: {}", message); + boolean recoveryComplete = false; - if(context.getPersistenceProvider().isRecoveryApplicable()) { + DataPersistenceProvider persistence = context.getPersistenceProvider(); + if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm) { + // Handle this message for backwards compatibility with pre-Lithium versions. + org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm update = + (org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm)message; + context.getTermInformation().update(update.getCurrentTerm(), update.getVotedFor()); + } else if (message instanceof UpdateElectionTerm) { + context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), + ((UpdateElectionTerm) message).getVotedFor()); + } else if(persistence.isRecoveryApplicable()) { if (message instanceof SnapshotOffer) { onRecoveredSnapshot((SnapshotOffer) message); } else if (message instanceof ReplicatedLogEntry) { @@ -57,20 +72,30 @@ class RaftActorRecoverySupport { } else if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) { // Handle this message for backwards compatibility with pre-Lithium versions. replicatedLog().removeFrom(((org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) message).getFromIndex()); - } else if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm) { - // Handle this message for backwards compatibility with pre-Lithium versions. - org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm update = - (org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm)message; - context.getTermInformation().update(update.getCurrentTerm(), update.getVotedFor()); - } else if (message instanceof UpdateElectionTerm) { - context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), - ((UpdateElectionTerm) message).getVotedFor()); } else if (message instanceof RecoveryCompleted) { onRecoveryCompletedMessage(); recoveryComplete = true; } } else if (message instanceof RecoveryCompleted) { recoveryComplete = true; + + if(dataRecoveredWithPersistenceDisabled) { + // Data persistence is disabled but we recovered some data entries so we must have just + // transitioned to disabled or a persistence backup was restored. Either way, delete all the + // messages from the akka journal for efficiency and so that we do not end up with consistency + // issues in case persistence is -re-enabled. + persistentProvider.deleteMessages(persistentProvider.getLastSequenceNumber()); + + // Delete all the akka snapshots as they will not be needed + persistentProvider.deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), + scala.Long.MaxValue())); + + // Since we cleaned out the journal, we need to re-write the current election info. + context.getTermInformation().updateAndPersist(context.getTermInformation().getCurrentTerm(), + context.getTermInformation().getVotedFor()); + } + } else { + dataRecoveredWithPersistenceDisabled = true; } return recoveryComplete; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java index c1aa75a12d..741c75ee4b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java @@ -25,6 +25,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort { @@ -243,6 +244,19 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, return this.getId(); } + protected void newBehavior(RaftActorBehavior newBehavior) { + self().tell(newBehavior, ActorRef.noSender()); + } + + @Override + public void handleCommand(final Object message) { + if(message instanceof RaftActorBehavior) { + super.changeCurrentBehavior((RaftActorBehavior)message); + } else { + super.handleCommand(message); + } + } + public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException { Object obj = null; ByteArrayInputStream bis = null; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java index 31287e01c6..b4c6cab8ca 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java @@ -8,22 +8,29 @@ package org.opendaylight.controller.cluster.raft; import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import akka.japi.Procedure; import akka.persistence.RecoveryCompleted; import akka.persistence.SnapshotMetadata; import akka.persistence.SnapshotOffer; +import akka.persistence.SnapshotSelectionCriteria; import java.util.Arrays; import java.util.Collections; +import org.hamcrest.Description; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatcher; import org.mockito.InOrder; +import org.mockito.Matchers; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries; @@ -50,6 +57,9 @@ public class RaftActorRecoverySupportTest { @Mock private RaftActorRecoveryCohort mockCohort; + @Mock + PersistentDataProvider mockPersistentProvider; + private RaftActorRecoverySupport support; private RaftActorContext context; @@ -59,7 +69,7 @@ public class RaftActorRecoverySupportTest { public void setup() { MockitoAnnotations.initMocks(this); - context = new RaftActorContextImpl(null, null, "test", new ElectionTermImpl(mockPersistence, "test", LOG), + context = new RaftActorContextImpl(null, null, "test", new ElectionTermImpl(mockPersistentProvider, "test", LOG), -1, -1, Collections.emptyMap(), configParams, mockPersistence, LOG); support = new RaftActorRecoverySupport(context, mockBehavior , mockCohort); @@ -74,7 +84,7 @@ public class RaftActorRecoverySupportTest { } private void sendMessageToSupport(Object message, boolean expComplete) { - boolean complete = support.handleRecoveryMessage(message); + boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider); assertEquals("complete", expComplete, complete); } @@ -281,9 +291,13 @@ public class RaftActorRecoverySupportTest { assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor()); } + @SuppressWarnings("unchecked") @Test - public void testRecoveryWithPersistenceDisabled() { + public void testDataRecoveredWithPersistenceDisabled() { doReturn(false).when(mockPersistence).isRecoveryApplicable(); + doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber(); + + sendMessageToSupport(new UpdateElectionTerm(5, "member2")); Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.emptyList(), 3, 1, 3, 1); SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot); @@ -308,13 +322,44 @@ public class RaftActorRecoverySupportTest { assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm()); assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex()); + assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm()); + assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor()); + + sendMessageToSupport(RecoveryCompleted.getInstance(), true); + + verifyNoMoreInteractions(mockCohort); + + verify(mockPersistentProvider).deleteMessages(10L); + verify(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class)); + verify(mockPersistentProvider).persist(updateElectionTerm(5, "member2"), any(Procedure.class)); + } + + static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) { + return Matchers.argThat(new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + UpdateElectionTerm other = (UpdateElectionTerm) argument; + return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor()); + } + + @Override + public void describeTo(Description description) { + description.appendValue(new UpdateElectionTerm(term, votedFor)); + } + }); + } + + @Test + public void testNoDataRecoveredWithPersistenceDisabled() { + doReturn(false).when(mockPersistence).isRecoveryApplicable(); + sendMessageToSupport(new UpdateElectionTerm(5, "member2")); - assertEquals("Current term", 0, context.getTermInformation().getCurrentTerm()); - assertEquals("Voted For", null, context.getTermInformation().getVotedFor()); + assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm()); + assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor()); sendMessageToSupport(RecoveryCompleted.getInstance(), true); - verifyNoMoreInteractions(mockCohort); + verifyNoMoreInteractions(mockCohort, mockPersistentProvider); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index c73344c44a..a2382379f2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -18,12 +18,12 @@ import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Terminated; +import akka.dispatch.Dispatchers; import akka.japi.Procedure; import akka.persistence.RecoveryCompleted; import akka.persistence.SaveSnapshotFailure; @@ -50,6 +50,7 @@ import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.NonPersistentDataProvider; +import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; @@ -218,6 +219,48 @@ public class RaftActorTest extends AbstractActorTest { }}; } + @Test + public void testUpdateElectionTermPersistedWithPersistenceDisabled() throws Exception { + new JavaTestKit(getSystem()) {{ + String persistenceId = factory.generateActorId("follower-"); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); + config.setElectionTimeoutFactor(1); + + InMemoryJournal.addWriteMessagesCompleteLatch(persistenceId, 1); + + TestActorRef ref = factory.createTestActor(MockRaftActor.props(persistenceId, + ImmutableMap.builder().put("member1", "address").build(), + Optional.of(config), new NonPersistentDataProvider()). + withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId); + + InMemoryJournal.waitForWriteMessagesComplete(persistenceId); + List entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class); + assertEquals("UpdateElectionTerm entries", 1, entries.size()); + UpdateElectionTerm updateEntry = entries.get(0); + + factory.killActor(ref, this); + + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + ref = factory.createTestActor(MockRaftActor.props(persistenceId, + ImmutableMap.builder().put("member1", "address").build(), + Optional.of(config), new NonPersistentDataProvider()). + withDispatcher(Dispatchers.DefaultDispatcherId()), + factory.generateActorId("follower-")); + + MockRaftActor actor = ref.underlyingActor(); + actor.waitForRecoveryComplete(); + + RaftActorContext newContext = actor.getRaftActorContext(); + assertEquals("electionTerm", updateEntry.getCurrentTerm(), + newContext.getTermInformation().getCurrentTerm()); + assertEquals("votedFor", updateEntry.getVotedFor(), newContext.getTermInformation().getVotedFor()); + + entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class); + assertEquals("UpdateElectionTerm entries", 1, entries.size()); + }}; + } + @Test public void testRaftActorForwardsToRaftActorRecoverySupport() { String persistenceId = factory.generateActorId("leader-"); @@ -265,14 +308,14 @@ public class RaftActorTest extends AbstractActorTest { new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(6, "member3"); mockRaftActor.handleRecover(deprecatedUpdateElectionTerm); - verify(mockSupport).handleRecoveryMessage(same(snapshotOffer)); - verify(mockSupport).handleRecoveryMessage(same(logEntry)); - verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries)); - verify(mockSupport).handleRecoveryMessage(same(applyLogEntries)); - verify(mockSupport).handleRecoveryMessage(same(deleteEntries)); - verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries)); - verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm)); - verify(mockSupport).handleRecoveryMessage(same(deprecatedUpdateElectionTerm)); + verify(mockSupport).handleRecoveryMessage(same(snapshotOffer), any(PersistentDataProvider.class)); + verify(mockSupport).handleRecoveryMessage(same(logEntry), any(PersistentDataProvider.class)); + verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries), any(PersistentDataProvider.class)); + verify(mockSupport).handleRecoveryMessage(same(applyLogEntries), any(PersistentDataProvider.class)); + verify(mockSupport).handleRecoveryMessage(same(deleteEntries), any(PersistentDataProvider.class)); + verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries), any(PersistentDataProvider.class)); + verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm), any(PersistentDataProvider.class)); + verify(mockSupport).handleRecoveryMessage(same(deprecatedUpdateElectionTerm), any(PersistentDataProvider.class)); } @Test @@ -347,7 +390,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10)); - verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class)); + verify(dataPersistenceProvider).persist(any(ApplyJournalEntries.class), any(Procedure.class)); } @@ -401,7 +444,7 @@ public class RaftActorTest extends AbstractActorTest { TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, Collections.emptyMap(), Optional.of(config), notifierActor, - new NonPersistentDataProvider()), persistenceId); + new NonPersistentDataProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId); List matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3); @@ -444,7 +487,7 @@ public class RaftActorTest extends AbstractActorTest { } }; - raftActor.changeCurrentBehavior(follower); + raftActor.newBehavior(follower); leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); assertEquals(persistenceId, leaderStateChange.getMemberId()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/TestActorFactory.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/TestActorFactory.java index b47df13fed..82a5b498d2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/TestActorFactory.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/TestActorFactory.java @@ -108,15 +108,26 @@ public class TestActorFactory implements AutoCloseable { return prefix + actorCount++; } + public void killActor(ActorRef actor, JavaTestKit kit) { + killActor(actor, kit, true); + } + + private void killActor(ActorRef actor, JavaTestKit kit, boolean remove) { + LOG.info("Killing actor {}", actor); + kit.watch(actor); + actor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + kit.expectTerminated(JavaTestKit.duration("5 seconds"), actor); + + if(remove) { + createdActors.remove(actor); + } + } + @Override public void close() { - new JavaTestKit(system) {{ - for(ActorRef actor : createdActors) { - watch(actor); - LOG.info("Killing actor {}", actor); - actor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - expectTerminated(duration("5 seconds"), actor); - } - }}; + JavaTestKit kit = new JavaTestKit(system); + for(ActorRef actor : createdActors) { + killActor(actor, kit, false); + } } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java index 7142c47829..e1af7db7ee 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java @@ -173,10 +173,9 @@ public class InMemoryJournal extends AsyncWriteJournal { @Override public Future doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) { // Akka calls this during recovery. - Map journal = journals.get(persistenceId); if(journal == null) { - return Futures.successful(-1L); + return Futures.successful(fromSequenceNr); } synchronized (journal) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 4f4162f607..99606e751a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -20,7 +20,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; - import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; @@ -229,7 +228,7 @@ public class ShardTest extends AbstractShardTest { "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener"); final TestActorRef shard = TestActorRef.create(getSystem(), - Props.create(new DelegatingShardCreator(creator)), + Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), "testRegisterChangeListenerWhenNotLeaderInitially"); // Write initial data into the in-memory store. @@ -340,7 +339,7 @@ public class ShardTest extends AbstractShardTest { "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener"); final TestActorRef shard = TestActorRef.create(getSystem(), - Props.create(new DelegatingShardCreator(creator)), + Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration"); final YangInstanceIdentifier path = TestModel.TEST_PATH; @@ -2563,8 +2562,8 @@ public class ShardTest extends AbstractShardTest { "testDataChangeListenerOnFollower-DataChangeListener"); final TestActorRef shard = TestActorRef.create(getSystem(), - Props.create(new DelegatingShardCreator(creator)), - "testDataChangeListenerOnFollower"); + Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()). + withDispatcher(Dispatchers.DefaultDispatcherId()),"testDataChangeListenerOnFollower"); assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); @@ -2634,7 +2633,7 @@ public class ShardTest extends AbstractShardTest { member1ShardID.toString()); final TestActorRef shardLeader = TestActorRef.create(getSystem(), - Props.create(new DelegatingShardCreator(leaderShardCreator)), + Props.create(new DelegatingShardCreator(leaderShardCreator)).withDispatcher(Dispatchers.DefaultDispatcherId()), member2ShardID.toString()); // Sleep to let election happen Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); -- 2.36.6