import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
leaderActor, ApplyState.class);
assertEquals("ApplyState count", newLogIndex, applyStateList.size());
- for (int i = 0; i <= newLogIndex - 1; i++ ) {
+ for (int i = 0; i <= newLogIndex - 1; i++) {
ApplyState applyState = applyStateList.get(i);
assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
-1, null, null), ByteSource.wrap(bs.toByteArray())));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
- fts.setSnapshotBytes(bs);
+ fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
//send first chunk and no InstallSnapshotReply received yet
actorContext.getReplicatedLog().removeFrom(0);
+ AtomicReference<java.util.Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
+ actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
+
leader = new Leader(actorContext);
actorContext.setCurrentBehavior(leader);
assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
-
assertEquals(3, cs.getLastAppliedIndex());
assertEquals(1, cs.getLastAppliedTerm());
assertEquals(4, cs.getLastIndex());
assertEquals(2, cs.getLastTerm());
- // if an initiate is started again when first is in progress, it should not initiate Capture
+ assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
+ assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
+
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Sending Replicate message should not initiate another capture since the first is in progress.
leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
+ assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+ // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+
+ // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
+ final byte[] bytes = new byte[]{1, 2, 3};
+ installSnapshotStream.get().get().write(bytes);
+ actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
+ Runtime.getRuntime().totalMemory());
+ MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+
+ // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
+ MessageCollectorActor.clearMessages(followerActor);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+ MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
}
-1, null, null), ByteSource.wrap(bs.toByteArray())));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
- fts.setSnapshotBytes(bs);
+ fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
while (!fts.isLastChunk(fts.getChunkIndex())) {
fts.getNextChunk();
}
@Test
- public void testLeaderInstallSnapshotState() {
+ public void testLeaderInstallSnapshotState() throws IOException {
logStart("testLeaderInstallSnapshotState");
Map<String, String> leadersSnapshot = new HashMap<>();
byte[] barray = bs.toByteArray();
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
- fts.setSnapshotBytes(bs);
+ fts.setSnapshotBytes(ByteSource.wrap(barray));
assertEquals(bs.size(), barray.length);
}
assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
+ fts.close();
}
@Override
leader.transferLeadership(mockTransferCohort);
verify(mockTransferCohort, never()).transferComplete();
+ doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
MessageCollectorActor.clearMessages(followerActor);
RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
leader.transferLeadership(mockTransferCohort);
verify(mockTransferCohort, never()).transferComplete();
MessageCollectorActor.clearMessages(followerActor);
RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
leader.transferLeadership(mockTransferCohort);
verify(mockTransferCohort, never()).transferComplete();