private final SyncStatusTracker initialSyncStatusTracker;
- private final Procedure<ReplicatedLogEntry> appendAndPersistCallback = new Procedure<ReplicatedLogEntry>() {
- @Override
- public void apply(ReplicatedLogEntry logEntry) {
- context.getReplicatedLog().captureSnapshotIfReady(logEntry);
- }
- };
+ private final Procedure<ReplicatedLogEntry> appendAndPersistCallback =
+ logEntry -> context.getReplicatedLog().captureSnapshotIfReady(logEntry);
private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted();
private SnapshotTracker snapshotTracker = null;
// to make it easier to read. Before refactoring ensure tests
// cover the code properly
+ if (snapshotTracker != null && !snapshotTracker.getLeaderId().equals(appendEntries.getLeaderId())) {
+ LOG.debug("{}: snapshot install is in progress but the prior snapshot leaderId {} does not match the " +
+ "AppendEntries leaderId {}", logName(), snapshotTracker.getLeaderId(), appendEntries.getLeaderId());
+ snapshotTracker = null;
+ }
+
if (snapshotTracker != null || context.getSnapshotManager().isApplying()) {
// if snapshot install is in progress, follower should just acknowledge append entries with a reply.
AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
leaderId = installSnapshot.getLeaderId();
if(snapshotTracker == null){
- snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
+ snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks(), installSnapshot.getLeaderId());
}
updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId());
package org.opendaylight.controller.cluster.raft.behaviors;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.util.Arrays;
import org.slf4j.Logger;
public class SnapshotTracker {
private final Logger LOG;
private final int totalChunks;
+ private final String leaderId;
private ByteString collectedChunks = ByteString.EMPTY;
private int lastChunkIndex = AbstractLeader.FIRST_CHUNK_INDEX - 1;
private boolean sealed = false;
private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
- SnapshotTracker(Logger LOG, int totalChunks){
+ SnapshotTracker(Logger LOG, int totalChunks, String leaderId) {
this.LOG = LOG;
this.totalChunks = totalChunks;
+ this.leaderId = Preconditions.checkNotNull(leaderId);
}
/**
}
}
- sealed = (chunkIndex == totalChunks);
+ sealed = chunkIndex == totalChunks;
lastChunkIndex = chunkIndex;
collectedChunks = collectedChunks.concat(ByteString.copyFrom(chunk));
this.lastChunkHashCode = Arrays.hashCode(chunk);
return collectedChunks;
}
+ String getLeaderId() {
+ return leaderId;
+ }
+
public static class InvalidChunkException extends Exception {
private static final long serialVersionUID = 1L;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
int offset = 0;
int snapshotLength = bsSnapshot.size();
int chunkSize = 50;
- int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+ int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
int lastIncludedIndex = 1;
int chunkIndex = 1;
InstallSnapshot lastInstallSnapshot = null;
ByteString bsSnapshot = createSnapshot();
int snapshotLength = bsSnapshot.size();
int chunkSize = 50;
- int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+ int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
int lastIncludedIndex = 1;
// Check that snapshot installation is not in progress
assertNotNull(follower.getSnapshotTracker());
// Send an append entry
- AppendEntries appendEntries = mock(AppendEntries.class);
- doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
+ AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
+ Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
follower.handleMessage(leaderActor, appendEntries);
AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
- assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
- assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
- assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
+ assertEquals("isSuccess", true, reply.isSuccess());
+ assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
+ assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
+ assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
+
+ assertNotNull(follower.getSnapshotTracker());
+ }
+
+ @Test
+ public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() throws Exception {
+ logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
+
+ MockRaftActorContext context = createActorContext();
+
+ follower = createBehavior(context);
- // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
- verify(appendEntries, never()).getPrevLogIndex();
+ ByteString bsSnapshot = createSnapshot();
+ int snapshotLength = bsSnapshot.size();
+ int chunkSize = 50;
+ int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
+ int lastIncludedIndex = 1;
+ // Check that snapshot installation is not in progress
+ assertNull(follower.getSnapshotTracker());
+
+ // Make sure that we have more than 1 chunk to send
+ assertTrue(totalChunks > 1);
+
+ // Send an install snapshot with the first chunk to start the process of installing a snapshot
+ byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
+ follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+ chunkData, 1, totalChunks));
+
+ // Check if snapshot installation is in progress now
+ assertNotNull(follower.getSnapshotTracker());
+
+ // Send appendEntries with a new term and leader.
+ AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
+ Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
+
+ follower.handleMessage(leaderActor, appendEntries);
+
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ assertEquals("isSuccess", true, reply.isSuccess());
+ assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
+ assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
+ assertEquals("getTerm", 2, reply.getTerm());
+
+ assertNull(follower.getSnapshotTracker());
}
@Test
int offset = 0;
int snapshotLength = bsSnapshot.size();
int chunkSize = 50;
- int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+ int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
int lastIncludedIndex = 1;
int chunkIndex = 1;
InstallSnapshot lastInstallSnapshot = null;
if (chunkSize > snapshotLength) {
size = snapshotLength;
} else {
- if ((start + chunkSize) > snapshotLength) {
+ if (start + chunkSize > snapshotLength) {
size = snapshotLength - start;
}
}
@Test
public void testAddChunk() throws SnapshotTracker.InvalidChunkException {
- SnapshotTracker tracker1 = new SnapshotTracker(logger, 5);
+ SnapshotTracker tracker1 = new SnapshotTracker(logger, 5, "leader");
tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
tracker1.addChunk(3, chunk3, Optional.<Integer>absent());
// Verify that an InvalidChunkException is thrown when we try to add a chunk to a sealed tracker
- SnapshotTracker tracker2 = new SnapshotTracker(logger, 2);
+ SnapshotTracker tracker2 = new SnapshotTracker(logger, 2, "leader");
tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
}
// The first chunk's index must at least be FIRST_CHUNK_INDEX
- SnapshotTracker tracker3 = new SnapshotTracker(logger, 2);
+ SnapshotTracker tracker3 = new SnapshotTracker(logger, 2, "leader");
try {
tracker3.addChunk(AbstractLeader.FIRST_CHUNK_INDEX - 1, chunk1, Optional.<Integer>absent());
}
// Out of sequence chunk indexes won't work
- SnapshotTracker tracker4 = new SnapshotTracker(logger, 2);
+ SnapshotTracker tracker4 = new SnapshotTracker(logger, 2, "leader");
tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
// No exceptions will be thrown when invalid chunk is added with the right sequence
// If the lastChunkHashCode is missing
- SnapshotTracker tracker5 = new SnapshotTracker(logger, 2);
+ SnapshotTracker tracker5 = new SnapshotTracker(logger, 2, "leader");
tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
// Look I can add the same chunk again
// An exception will be thrown when an invalid chunk is addedd with the right sequence
// when the lastChunkHashCode is present
- SnapshotTracker tracker6 = new SnapshotTracker(logger, 2);
+ SnapshotTracker tracker6 = new SnapshotTracker(logger, 2, "leader");
tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1));
public void testGetSnapShot() throws SnapshotTracker.InvalidChunkException {
// Trying to get a snapshot before all chunks have been received will throw an exception
- SnapshotTracker tracker1 = new SnapshotTracker(logger, 5);
+ SnapshotTracker tracker1 = new SnapshotTracker(logger, 5, "leader");
tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
try {
}
- SnapshotTracker tracker2 = new SnapshotTracker(logger, 3);
+ SnapshotTracker tracker2 = new SnapshotTracker(logger, 3, "leader");
tracker2.addChunk(1, chunk1, Optional.of(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE));
tracker2.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1)));
@Test
public void testGetCollectedChunks() throws SnapshotTracker.InvalidChunkException {
- SnapshotTracker tracker1 = new SnapshotTracker(logger, 5);
+ SnapshotTracker tracker1 = new SnapshotTracker(logger, 5, "leader");
ByteString chunks = ByteString.copyFrom(chunk1).concat(ByteString.copyFrom(chunk2));
if (size > snapshotLength) {
size = snapshotLength;
} else {
- if ((start + size) > snapshotLength) {
+ if (start + size > snapshotLength) {
size = snapshotLength - start;
}
}
}
-}
\ No newline at end of file
+}