X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FMigratedMessagesTest.java;h=460dd4a445a8306731499e205b1207c563e1e110;hb=37238e4339ec7bfbfb0c7e57bf1545543f27a6cf;hp=08967d2b3f61f86c4dd71c83295a42fc121c9bcc;hpb=e1eca73a5ae2ffae8dd78c6fe5281cd2f45d5ef3;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MigratedMessagesTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MigratedMessagesTest.java index 08967d2b3f..460dd4a445 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MigratedMessagesTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MigratedMessagesTest.java @@ -14,18 +14,31 @@ import akka.dispatch.Dispatchers; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; +import java.io.OutputStream; +import java.io.Serializable; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import org.apache.commons.lang3.SerializationUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState; +import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.ByteState; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; +import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State; import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; @@ -100,7 +113,7 @@ public class MigratedMessagesTest extends AbstractActorTest { doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> { assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor()); assertEquals("getElectionTerm", 5, snapshot.getElectionTerm()); - }); + }, ByteState.empty()); TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled ending"); } @@ -119,7 +132,7 @@ public class MigratedMessagesTest extends AbstractActorTest { doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, false, snapshot -> { assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor()); assertEquals("getElectionTerm", 5, snapshot.getElectionTerm()); - }); + }, ByteState.empty()); TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled ending"); } @@ -131,7 +144,7 @@ public class MigratedMessagesTest extends AbstractActorTest { String persistenceId = factory.generateActorId("test-actor-"); InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId)); - InMemoryJournal.addEntry(persistenceId, 2, new ReplicatedLogImplEntry(0, 1, + InMemoryJournal.addEntry(persistenceId, 2, new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); InMemoryJournal.addEntry(persistenceId, 3, new org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries(0)); @@ -142,25 +155,36 @@ public class MigratedMessagesTest extends AbstractActorTest { assertEquals("getLastAppliedTerm", 1, snapshot.getLastAppliedTerm()); assertEquals("getLastIndex", 0, snapshot.getLastIndex()); assertEquals("getLastTerm", 1, snapshot.getLastTerm()); - }); + }, ByteState.empty()); TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries ending"); } @Test public void testNoSnapshotAfterStartupWithNoMigratedMessages() { + TEST_LOG.info("testNoSnapshotAfterStartupWithNoMigratedMessages starting"); String id = factory.generateActorId("test-actor-"); + + InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, id)); + InMemoryJournal.addEntry(id, 2, new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); + InMemoryJournal.addEntry(id, 3, new ApplyJournalEntries(0)); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() { @Override - public void createSnapshot(ActorRef actorRef) { - actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef); + public void createSnapshot(ActorRef actorRef, java.util.Optional installSnapshotStream) { + actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef); } @Override - public void applySnapshot(byte[] snapshotBytes) { + public void applySnapshot(Snapshot.State snapshotState) { + } + + @Override + public State deserializeSnapshot(ByteSource snapshotBytes) { + throw new UnsupportedOperationException(); } }; @@ -175,6 +199,29 @@ public class MigratedMessagesTest extends AbstractActorTest { List snapshots = InMemorySnapshotStore.getSnapshots(id, Snapshot.class); assertEquals("Snapshots", 0, snapshots.size()); + + TEST_LOG.info("testNoSnapshotAfterStartupWithNoMigratedMessages ending"); + } + + @Test + public void testSnapshotAfterStartupWithMigratedReplicatedLogEntry() { + TEST_LOG.info("testSnapshotAfterStartupWithMigratedReplicatedLogEntry starting"); + + String persistenceId = factory.generateActorId("test-actor-"); + + InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId)); + MockRaftActorContext.MockPayload expPayload = new MockRaftActorContext.MockPayload("A"); + InMemoryJournal.addEntry(persistenceId, 2, new org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry( + 0, 1, expPayload)); + + doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> { + assertEquals("Unapplied entries size", 1, snapshot.getUnAppliedEntries().size()); + assertEquals("Unapplied entry term", 1, snapshot.getUnAppliedEntries().get(0).getTerm()); + assertEquals("Unapplied entry index", 0, snapshot.getUnAppliedEntries().get(0).getIndex()); + assertEquals("Unapplied entry data", expPayload, snapshot.getUnAppliedEntries().get(0).getData()); + }, ByteState.empty()); + + TEST_LOG.info("testSnapshotAfterStartupWithMigratedReplicatedLogEntry ending"); } private TestActorRef doTestSnapshotAfterStartupWithMigratedServerConfigPayload(boolean persistent) { @@ -191,7 +238,7 @@ public class MigratedMessagesTest extends AbstractActorTest { new ServerInfo(persistenceId, true), new ServerInfo("downNode", true))); InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId)); - InMemoryJournal.addEntry(persistenceId, 3, new ReplicatedLogImplEntry(0, 1, persistedServerConfig)); + InMemoryJournal.addEntry(persistenceId, 3, new SimpleReplicatedLogEntry(0, 1, persistedServerConfig)); TestActorRef actor = doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, persistent, snapshot -> { @@ -199,14 +246,53 @@ public class MigratedMessagesTest extends AbstractActorTest { assertEquals("getElectionTerm", 1, snapshot.getElectionTerm()); assertEquals("getServerConfiguration", new HashSet<>(expectedServerConfig.getServerConfig()), new HashSet<>(snapshot.getServerConfiguration().getServerConfig())); - }); + }, ByteState.empty()); return actor; } + @Test + public void testSnapshotAfterStartupWithMigratedSnapshot() throws Exception { + TEST_LOG.info("testSnapshotAfterStartupWithMigratedSnapshot starting"); + + String persistenceId = factory.generateActorId("test-actor-"); + + List snapshotData = Arrays.asList(new MockPayload("1")); + final MockSnapshotState snapshotState = new MockSnapshotState(snapshotData); + + org.opendaylight.controller.cluster.raft.Snapshot legacy = org.opendaylight.controller.cluster.raft.Snapshot + .create(SerializationUtils.serialize((Serializable) snapshotData), + Arrays.asList(new SimpleReplicatedLogEntry(6, 2, new MockPayload("payload"))), + 6, 2, 5, 1, 3, "member-1", new ServerConfigurationPayload(Arrays.asList( + new ServerInfo(persistenceId, true), new ServerInfo("2", false)))); + InMemorySnapshotStore.addSnapshot(persistenceId, legacy); + + doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> { + assertEquals("getLastIndex", legacy.getLastIndex(), snapshot.getLastIndex()); + assertEquals("getLastTerm", legacy.getLastTerm(), snapshot.getLastTerm()); + assertEquals("getLastAppliedIndex", legacy.getLastAppliedIndex(), snapshot.getLastAppliedIndex()); + assertEquals("getLastAppliedTerm", legacy.getLastAppliedTerm(), snapshot.getLastAppliedTerm()); + assertEquals("getState", snapshotState, snapshot.getState()); + assertEquals("Unapplied entries size", legacy.getUnAppliedEntries().size(), + snapshot.getUnAppliedEntries().size()); + assertEquals("Unapplied entry term", legacy.getUnAppliedEntries().get(0).getTerm(), + snapshot.getUnAppliedEntries().get(0).getTerm()); + assertEquals("Unapplied entry index", legacy.getUnAppliedEntries().get(0).getIndex(), + snapshot.getUnAppliedEntries().get(0).getIndex()); + assertEquals("Unapplied entry data", legacy.getUnAppliedEntries().get(0).getData(), + snapshot.getUnAppliedEntries().get(0).getData()); + assertEquals("getElectionVotedFor", legacy.getElectionVotedFor(), snapshot.getElectionVotedFor()); + assertEquals("getElectionTerm", legacy.getElectionTerm(), snapshot.getElectionTerm()); + assertEquals("getServerConfiguration", Sets.newHashSet(legacy.getServerConfiguration().getServerConfig()), + Sets.newHashSet(snapshot.getServerConfiguration().getServerConfig())); + }, snapshotState); + + TEST_LOG.info("testSnapshotAfterStartupWithMigratedSnapshot ending"); + } + @SuppressWarnings("checkstyle:IllegalCatch") private TestActorRef doTestSnapshotAfterStartupWithMigratedMessage(String id, boolean persistent, - Consumer snapshotVerifier) { + Consumer snapshotVerifier, final State snapshotState) { InMemorySnapshotStore.addSnapshotSavedLatch(id); InMemoryJournal.addDeleteMessagesCompleteLatch(id); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -214,17 +300,23 @@ public class MigratedMessagesTest extends AbstractActorTest { RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() { @Override - public void createSnapshot(ActorRef actorRef) { - actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef); + public void createSnapshot(ActorRef actorRef, java.util.Optional installSnapshotStream) { + actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef); + } + + @Override + public void applySnapshot(State newState) { } @Override - public void applySnapshot(byte[] snapshotBytes) { + public State deserializeSnapshot(ByteSource snapshotBytes) { + throw new UnsupportedOperationException(); } }; TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id) - .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(persistent)).props() + .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(persistent)) + .peerAddresses(ImmutableMap.of("peer", "")).props() .withDispatcher(Dispatchers.DefaultDispatcherId()), id); MockRaftActor mockRaftActor = raftActorRef.underlyingActor();