import akka.actor.ActorRef;
import akka.dispatch.Dispatchers;
-import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
+import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Uninterruptibles;
-import java.util.Arrays;
-import java.util.HashSet;
+import java.io.OutputStream;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.junit.After;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
-import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
}
@After
- public void tearDown() throws Exception {
+ public void tearDown() {
factory.close();
InMemoryJournal.clear();
InMemorySnapshotStore.clear();
}
- @Test
- public void testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled() {
- TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled starting");
- doTestSnapshotAfterStartupWithMigratedServerConfigPayload(true);
- TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled ending");
- }
-
- @Test
- public void testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled() {
- TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled starting");
-
- TestActorRef<MockRaftActor> actor = doTestSnapshotAfterStartupWithMigratedServerConfigPayload(false);
- MockRaftActor mockRaftActor = actor.underlyingActor();
- String id = mockRaftActor.persistenceId();
- ConfigParams config = mockRaftActor.getRaftActorContext().getConfigParams();
-
- factory.killActor(actor, new JavaTestKit(getSystem()));
-
- actor = factory.createTestActor(MockRaftActor.builder().id(id).config(config)
- .persistent(Optional.of(false)).props().withDispatcher(Dispatchers.DefaultDispatcherId()), id);
- mockRaftActor = actor.underlyingActor();
- mockRaftActor.waitForRecoveryComplete();
-
- assertEquals("electionTerm", 1,
- mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
- assertEquals("votedFor", id,
- mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
-
- TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled ending");
- }
-
- @Test
- public void testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled() {
- TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled starting");
-
- String persistenceId = factory.generateActorId("test-actor-");
-
- org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm updateElectionTerm =
- new org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm(5, persistenceId);
-
- InMemoryJournal.addEntry(persistenceId, 1, updateElectionTerm);
-
- doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
- assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
- assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
- });
-
- TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled ending");
- }
-
- @Test
- public void testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled() {
- TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled starting");
-
- String persistenceId = factory.generateActorId("test-actor-");
-
- org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm updateElectionTerm =
- new org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm(5, persistenceId);
-
- InMemoryJournal.addEntry(persistenceId, 1, updateElectionTerm);
-
- doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, false, snapshot -> {
- assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
- assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
- });
-
- TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled ending");
- }
-
- @Test
- public void testSnapshotAfterStartupWithMigratedApplyJournalEntries() {
- TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries starting");
-
- String persistenceId = factory.generateActorId("test-actor-");
-
- InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
- InMemoryJournal.addEntry(persistenceId, 2, new SimpleReplicatedLogEntry(0, 1,
- new MockRaftActorContext.MockPayload("A")));
- InMemoryJournal.addEntry(persistenceId, 3,
- new org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries(0));
-
-
- doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
- assertEquals("getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
- assertEquals("getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
- assertEquals("getLastIndex", 0, snapshot.getLastIndex());
- assertEquals("getLastTerm", 1, snapshot.getLastTerm());
- });
-
- TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries ending");
- }
-
@Test
public void testNoSnapshotAfterStartupWithNoMigratedMessages() {
TEST_LOG.info("testNoSnapshotAfterStartupWithNoMigratedMessages starting");
RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
@Override
- public void createSnapshot(ActorRef actorRef) {
- actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
+ public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
+ actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
+ }
+
+ @Override
+ public void applySnapshot(final Snapshot.State snapshotState) {
}
@Override
- public void applySnapshot(byte[] snapshotBytes) {
+ public State deserializeSnapshot(final ByteSource snapshotBytes) {
+ throw new UnsupportedOperationException();
}
};
TEST_LOG.info("testNoSnapshotAfterStartupWithNoMigratedMessages ending");
}
- @Test
- public void testSnapshotAfterStartupWithMigratedReplicatedLogEntry() {
- TEST_LOG.info("testSnapshotAfterStartupWithMigratedReplicatedLogEntry starting");
-
- String persistenceId = factory.generateActorId("test-actor-");
-
- InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
- MockRaftActorContext.MockPayload expPayload = new MockRaftActorContext.MockPayload("A");
- InMemoryJournal.addEntry(persistenceId, 2, new org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry(
- 0, 1, expPayload));
-
- doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
- assertEquals("Unapplied entries size", 1, snapshot.getUnAppliedEntries().size());
- assertEquals("Unapplied entry term", 1, snapshot.getUnAppliedEntries().get(0).getTerm());
- assertEquals("Unapplied entry index", 0, snapshot.getUnAppliedEntries().get(0).getIndex());
- assertEquals("Unapplied entry data", expPayload, snapshot.getUnAppliedEntries().get(0).getData());
- });
-
- TEST_LOG.info("testSnapshotAfterStartupWithMigratedReplicatedLogEntry ending");
- }
-
- private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedServerConfigPayload(boolean persistent) {
- String persistenceId = factory.generateActorId("test-actor-");
-
- org.opendaylight.controller.cluster.raft.ServerConfigurationPayload persistedServerConfig =
- new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload(Arrays.asList(
- new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo(
- persistenceId, true),
- new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo(
- "downNode", true)));
-
- ServerConfigurationPayload expectedServerConfig = new ServerConfigurationPayload(Arrays.asList(
- new ServerInfo(persistenceId, true), new ServerInfo("downNode", true)));
-
- InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
- InMemoryJournal.addEntry(persistenceId, 3, new SimpleReplicatedLogEntry(0, 1, persistedServerConfig));
-
- TestActorRef<MockRaftActor> actor = doTestSnapshotAfterStartupWithMigratedMessage(persistenceId,
- persistent, snapshot -> {
- assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
- assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
- assertEquals("getServerConfiguration", new HashSet<>(expectedServerConfig.getServerConfig()),
- new HashSet<>(snapshot.getServerConfiguration().getServerConfig()));
- });
-
- return actor;
- }
-
@SuppressWarnings("checkstyle:IllegalCatch")
- private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedMessage(String id, boolean persistent,
- Consumer<Snapshot> snapshotVerifier) {
+ private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedMessage(final String id,
+ final boolean persistent, final Consumer<Snapshot> snapshotVerifier, final State snapshotState) {
InMemorySnapshotStore.addSnapshotSavedLatch(id);
InMemoryJournal.addDeleteMessagesCompleteLatch(id);
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
@Override
- public void createSnapshot(ActorRef actorRef) {
- actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
+ public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
+ actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
+ }
+
+ @Override
+ public void applySnapshot(final State newState) {
}
@Override
- public void applySnapshot(byte[] snapshotBytes) {
+ public State deserializeSnapshot(final ByteSource snapshotBytes) {
+ throw new UnsupportedOperationException();
}
};