X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollowerTest.java;h=15fca0cb8d80696393e30ea465d5263ba339fa91;hb=ddd479df27cfc49f353ceb66cd289694e891761a;hp=8cb914c2267b51375ef819ce26f9ff0818882347;hpb=913ae866cd0cc82991e1f66ac80f6a42b0daaa48;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 8cb914c226..15fca0cb8d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -29,8 +29,10 @@ import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -44,12 +46,11 @@ import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.MockRaftActor; import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder; +import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; -import org.opendaylight.controller.cluster.raft.RaftActorTest; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; @@ -63,9 +64,12 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.ByteState; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State; import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; @@ -108,7 +112,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { @Override protected MockRaftActorContext createActorContext(ActorRef actorRef) { MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef); - context.setPayloadVersion(payloadVersion ); + context.setPayloadVersion(payloadVersion); return context; } @@ -809,7 +813,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); - ByteString bsSnapshot = createSnapshot(); + ByteString bsSnapshot = createSnapshot(); int offset = 0; int snapshotLength = bsSnapshot.size(); int chunkSize = 50; @@ -838,7 +842,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastAppliedIndex()); assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm()); - Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState()); + assertEquals("getState type", ByteState.class, snapshot.getState().getClass()); + Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes()); assertEquals("getElectionTerm", 1, snapshot.getElectionTerm()); assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor()); applySnapshot.getCallback().onSuccess(); @@ -1126,6 +1131,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { InMemorySnapshotStore.addSnapshotSavedLatch(id); InMemoryJournal.addDeleteMessagesCompleteLatch(id); + InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class); List entries = Arrays.asList( newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two")); @@ -1140,6 +1146,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class); InMemoryJournal.waitForDeleteMessagesComplete(id); + InMemoryJournal.waitForWriteMessagesComplete(id); // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it. // This is OK - on recovery it will be a no-op since index 1 has already been applied. @@ -1154,7 +1161,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm()); assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex()); assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()), - MockRaftActor.toObject(snapshot.getState())); + MockRaftActor.fromState(snapshot.getState())); } @Test @@ -1179,6 +1186,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { InMemorySnapshotStore.addSnapshotSavedLatch(id); InMemoryJournal.addDeleteMessagesCompleteLatch(id); + InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class); List entries = Arrays.asList( newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"), @@ -1194,6 +1202,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class); InMemoryJournal.waitForDeleteMessagesComplete(id); + InMemoryJournal.waitForWriteMessagesComplete(id); // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it. // This is OK - on recovery it will be a no-op since index 2 has already been applied. @@ -1208,7 +1217,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm()); assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex()); assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(), - entries.get(2).getData()), MockRaftActor.toObject(snapshot.getState())); + entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState())); assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size()); assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex()); @@ -1252,6 +1261,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { InMemorySnapshotStore.addSnapshotSavedLatch(id); InMemoryJournal.addDeleteMessagesCompleteLatch(id); + InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class); List entries = Arrays.asList( newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"), @@ -1267,6 +1277,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class); InMemoryJournal.waitForDeleteMessagesComplete(id); + InMemoryJournal.waitForWriteMessagesComplete(id); // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it. // This is OK - on recovery it will be a no-op since index 0 has already been applied. @@ -1283,24 +1294,29 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm()); assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex()); assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()), - MockRaftActor.toObject(snapshot.getState())); + MockRaftActor.fromState(snapshot.getState())); } @SuppressWarnings("checkstyle:IllegalCatch") private RaftActorSnapshotCohort newRaftActorSnapshotCohort(final AtomicReference followerRaftActor) { RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() { @Override - public void createSnapshot(ActorRef actorRef) { + public void createSnapshot(ActorRef actorRef, java.util.Optional installSnapshotStream) { try { - actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject( - followerRaftActor.get().getState()).toByteArray()), actorRef); + actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()), + installSnapshotStream), actorRef); } catch (Exception e) { Throwables.propagate(e); } } @Override - public void applySnapshot(byte[] snapshotBytes) { + public void applySnapshot(State snapshotState) { + } + + @Override + public State deserializeSnapshot(ByteSource snapshotBytes) { + throw new UnsupportedOperationException(); } }; return snapshotCohort;