X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=a8ecf71c47943c023cdbd64f0a63462b4084ef09;hb=dac16f0d464eff3325b3800a803e81b303964e4b;hp=4e4fa2c33e2d5fff349f2fc22daa9859b6313a47;hpb=e1eca73a5ae2ffae8dd78c6fe5281cd2f45d5ef3;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 4e4fa2c33e..a8ecf71c47 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -14,6 +14,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -24,15 +25,20 @@ import akka.actor.Props; import akka.actor.Terminated; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; +import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import java.io.IOException; +import java.io.OutputStream; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; @@ -43,8 +49,6 @@ import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohor import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.RaftVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; -import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; @@ -53,6 +57,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; @@ -60,6 +65,9 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.ByteState; +import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; @@ -153,10 +161,9 @@ public class LeaderTest extends AbstractLeaderTest { private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) { MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); - MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( - term, index, payload); + SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload); actorContext.getReplicatedLog().append(newEntry); - return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry)); + return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true)); } @Test @@ -513,13 +520,14 @@ public class LeaderTest extends AbstractLeaderTest { long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1; long term = actorContext.getTermInformation().getCurrentTerm(); - MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( - term, newLogIndex, new MockRaftActorContext.MockPayload("foo")); + ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry( + newLogIndex, term, new MockRaftActorContext.MockPayload("foo")); actorContext.getReplicatedLog().append(newEntry); final Identifier id = new MockIdentifier("state-id"); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry)); + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, + new Replicate(leaderActor, id, newEntry, true)); // State should not change assertTrue(raftBehavior instanceof Leader); @@ -532,7 +540,7 @@ public class LeaderTest extends AbstractLeaderTest { leaderActor, ApplyState.class); assertEquals("ApplyState count", newLogIndex, applyStateList.size()); - for (int i = 0; i <= newLogIndex - 1; i++ ) { + for (int i = 0; i <= newLogIndex - 1; i++) { ApplyState applyState = applyStateList.get(i); assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex()); assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm()); @@ -577,11 +585,12 @@ public class LeaderTest extends AbstractLeaderTest { leader.markFollowerActive(FOLLOWER_ID); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.emptyList(), - commitIndex, snapshotTerm, commitIndex, snapshotTerm)); + leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null), ByteSource.wrap(bs.toByteArray()))); LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState( actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); - fts.setSnapshotBytes(bs); + fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray())); leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts); //send first chunk and no InstallSnapshotReply received yet @@ -638,8 +647,8 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); // new entry - ReplicatedLogImplEntry entry = - new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + SimpleReplicatedLogEntry entry = + new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")); actorContext.getReplicatedLog().append(entry); @@ -649,7 +658,7 @@ public class LeaderTest extends AbstractLeaderTest { // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex RaftActorBehavior raftBehavior = leader.handleMessage( - leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry)); + leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); assertTrue(raftBehavior instanceof Leader); @@ -686,7 +695,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.setSnapshot(null); // new entry - ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")); actorContext.getReplicatedLog().append(entry); @@ -694,20 +703,19 @@ public class LeaderTest extends AbstractLeaderTest { //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); - leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry)); + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot(); - assertTrue(cs.isInstallSnapshotInitiated()); assertEquals(3, cs.getLastAppliedIndex()); assertEquals(1, cs.getLastAppliedTerm()); assertEquals(4, cs.getLastIndex()); assertEquals(2, cs.getLastTerm()); // if an initiate is started again when first is in progress, it shouldnt initiate Capture - leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry)); + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } @@ -732,6 +740,9 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getReplicatedLog().removeFrom(0); + AtomicReference> installSnapshotStream = new AtomicReference<>(); + actorContext.setCreateSnapshotProcedure(installSnapshotStream::set); + leader = new Leader(actorContext); actorContext.setCurrentBehavior(leader); @@ -742,12 +753,12 @@ public class LeaderTest extends AbstractLeaderTest { leader.setSnapshot(null); for (int i = 0; i < 4; i++) { - actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1, + actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1, new MockRaftActorContext.MockPayload("X" + i))); } // new entry - ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")); actorContext.getReplicatedLog().append(entry); @@ -762,17 +773,35 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot(); - - assertTrue(cs.isInstallSnapshotInitiated()); assertEquals(3, cs.getLastAppliedIndex()); assertEquals(1, cs.getLastAppliedTerm()); assertEquals(4, cs.getLastIndex()); assertEquals(2, cs.getLastTerm()); - // if an initiate is started again when first is in progress, it should not initiate Capture - leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry)); + assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get()); + assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent()); + + MessageCollectorActor.clearMessages(followerActor); + // Sending Replicate message should not initiate another capture since the first is in progress. + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); + + // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture. + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true)); + assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); + + // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent. + final byte[] bytes = new byte[]{1, 2, 3}; + installSnapshotStream.get().get().write(bytes); + actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(), + Runtime.getRuntime().totalMemory()); + MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); + + // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk. + MessageCollectorActor.clearMessages(followerActor); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true)); + MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200); } @@ -810,11 +839,12 @@ public class LeaderTest extends AbstractLeaderTest { leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); leader.getFollower(FOLLOWER_ID).setNextIndex(0); - Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(), - Collections.emptyList(), - lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm); + byte[] bytes = toByteString(leadersSnapshot).toByteArray(); + Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.emptyList(), + lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, + new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes))); assertTrue(raftBehavior instanceof Leader); @@ -861,11 +891,12 @@ public class LeaderTest extends AbstractLeaderTest { leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); leader.getFollower(FOLLOWER_ID).setNextIndex(-1); - Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(), - Collections.emptyList(), - lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm); + byte[] bytes = toByteString(leadersSnapshot).toByteArray(); + Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.emptyList(), + lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, + new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes))); assertTrue(raftBehavior instanceof Leader); @@ -915,11 +946,12 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.emptyList(), - commitIndex, snapshotTerm, commitIndex, snapshotTerm)); + leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null), ByteSource.wrap(bs.toByteArray()))); LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState( actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); - fts.setSnapshotBytes(bs); + fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray())); leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts); while (!fts.isLastChunk(fts.getChunkIndex())) { fts.getNextChunk(); @@ -983,11 +1015,11 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), - commitIndex, snapshotTerm, commitIndex, snapshotTerm); - leader.setSnapshot(snapshot); + Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null); - leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -1057,12 +1089,12 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), - commitIndex, snapshotTerm, commitIndex, snapshotTerm); - leader.setSnapshot(snapshot); + Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null); Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -1122,11 +1154,11 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), - commitIndex, snapshotTerm, commitIndex, snapshotTerm); - leader.setSnapshot(snapshot); + Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null); - leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -1151,7 +1183,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testLeaderInstallSnapshotState() { + public void testLeaderInstallSnapshotState() throws IOException { logStart("testLeaderInstallSnapshotState"); Map leadersSnapshot = new HashMap<>(); @@ -1163,7 +1195,7 @@ public class LeaderTest extends AbstractLeaderTest { byte[] barray = bs.toByteArray(); LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test"); - fts.setSnapshotBytes(bs); + fts.setSnapshotBytes(ByteSource.wrap(barray)); assertEquals(bs.size(), barray.length); @@ -1187,6 +1219,7 @@ public class LeaderTest extends AbstractLeaderTest { } assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks()); + fts.close(); } @Override @@ -2072,6 +2105,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.transferLeadership(mockTransferCohort); verify(mockTransferCohort, never()).transferComplete(); + doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); @@ -2102,6 +2136,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.clearMessages(followerActor); RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); leader.transferLeadership(mockTransferCohort); verify(mockTransferCohort, never()).transferComplete(); @@ -2133,6 +2168,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.clearMessages(followerActor); RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); leader.transferLeadership(mockTransferCohort); verify(mockTransferCohort, never()).transferComplete();