*/
public abstract class AbstractLeader extends AbstractRaftActorBehavior {
private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
- private final Map<String, LeaderInstallSnapshotState> mapFollowerToSnapshot = new HashMap<>();
/**
* Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really
if(initializeFromLeader != null) {
followerToLog.putAll(initializeFromLeader.followerToLog);
- mapFollowerToSnapshot.putAll(initializeFromLeader.mapFollowerToSnapshot);
snapshot = initializeFromLeader.snapshot;
trackers.addAll(initializeFromLeader.trackers);
} else {
public void removeFollower(String followerId) {
followerToLog.remove(followerId);
- mapFollowerToSnapshot.remove(followerId);
}
public void updateMinReplicaCount() {
}
}
+ @VisibleForTesting
+ boolean hasSnapshot() {
+ return snapshot.isPresent();
+ }
+
@Override
protected RaftActorBehavior handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
String followerId = reply.getFollowerId();
- LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId);
-
- if (followerToSnapshot == null) {
- LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply",
- logName(), followerId);
- return;
- }
-
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
if(followerLogInformation == null) {
// This can happen during AddServer if it times out.
LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply",
logName(), followerId);
- mapFollowerToSnapshot.remove(followerId);
+ return;
+ }
+
+ LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState();
+ if (installSnapshotState == null) {
+ LOG.error("{}: LeaderInstallSnapshotState not found for follower {} in InstallSnapshotReply",
+ logName(), followerId);
return;
}
followerLogInformation.markFollowerActive();
- if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
+ if (installSnapshotState.getChunkIndex() == reply.getChunkIndex()) {
boolean wasLastChunk = false;
if (reply.isSuccess()) {
- if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
+ if(installSnapshotState.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
if(LOG.isDebugEnabled()) {
LOG.debug("{}: InstallSnapshotReply received, " +
long followerMatchIndex = snapshot.get().getLastIncludedIndex();
followerLogInformation.setMatchIndex(followerMatchIndex);
followerLogInformation.setNextIndex(followerMatchIndex + 1);
- mapFollowerToSnapshot.remove(followerId);
+ followerLogInformation.clearLeaderInstallSnapshotState();
LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
logName(), followerId, followerLogInformation.getMatchIndex(),
followerLogInformation.getNextIndex());
- if (mapFollowerToSnapshot.isEmpty()) {
+ if (!anyFollowersInstallingSnapshot()) {
// once there are no pending followers receiving snapshots
// we can remove snapshot from the memory
setSnapshot(null);
}
+
wasLastChunk = true;
if(context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED){
UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess =
LOG.debug("Sent message UnInitializedFollowerSnapshotReply to self");
}
} else {
- followerToSnapshot.markSendStatus(true);
+ installSnapshotState.markSendStatus(true);
}
} else {
LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
logName(), reply.getChunkIndex());
- followerToSnapshot.markSendStatus(false);
+ installSnapshotState.markSendStatus(false);
}
if (wasLastChunk && !context.getSnapshotManager().isCapturing()) {
// Since the follower is now caught up try to purge the log.
purgeInMemoryLog();
- } else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
+ } else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) {
ActorSelection followerActor = context.getPeerActorSelection(followerId);
if(followerActor != null) {
sendSnapshotChunk(followerActor, followerId);
} else {
LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
logName(), reply.getChunkIndex(), followerId,
- followerToSnapshot.getChunkIndex());
+ installSnapshotState.getChunkIndex());
if(reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX){
// Since the Follower did not find this index to be valid we should reset the follower snapshot
// so that Installing the snapshot can resume from the beginning
- followerToSnapshot.reset();
+ installSnapshotState.reset();
}
}
}
+ private boolean anyFollowersInstallingSnapshot() {
+ for(FollowerLogInformation info: followerToLog.values()) {
+ if(info.getInstallSnapshotState() != null) {
+ return true;
+ }
+
+ }
+
+ return false;
+ }
+
private void replicate(Replicate replicate) {
long logIndex = replicate.getReplicatedLogEntry().getIndex();
boolean sendAppendEntries = false;
List<ReplicatedLogEntry> entries = Collections.emptyList();
- if (mapFollowerToSnapshot.get(followerId) != null) {
+ LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState();
+ if (installSnapshotState != null) {
// if install snapshot is in process , then sent next chunk if possible
- if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+ if (isFollowerActive && installSnapshotState.canSendNextChunk()) {
sendSnapshotChunk(followerActor, followerId);
} else if(sendHeartbeat) {
// we send a heartbeat even if we have not received a reply for the last chunk
// Note: the previous call to getNextSnapshotChunk has the side-effect of adding
// followerId to the followerToSnapshot map.
- LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+ LeaderInstallSnapshotState installSnapshotState = followerToLog.get(followerId).getInstallSnapshotState();
- int nextChunkIndex = followerToSnapshot.incrementChunkIndex();
+ int nextChunkIndex = installSnapshotState.incrementChunkIndex();
Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
- if(followerToSnapshot.isLastChunk(nextChunkIndex)) {
+ if(installSnapshotState.isLastChunk(nextChunkIndex)) {
serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
}
snapshot.get().getLastIncludedTerm(),
nextSnapshotChunk,
nextChunkIndex,
- followerToSnapshot.getTotalChunks(),
- Optional.of(followerToSnapshot.getLastChunkHashCode()),
+ installSnapshotState.getTotalChunks(),
+ Optional.of(installSnapshotState.getLastChunkHashCode()),
serverConfig
).toSerializable(followerToLog.get(followerId).getRaftVersion()),
actor()
if(LOG.isDebugEnabled()) {
LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
- logName(), followerActor.path(), followerToSnapshot.getChunkIndex(),
- followerToSnapshot.getTotalChunks());
+ logName(), followerActor.path(), installSnapshotState.getChunkIndex(),
+ installSnapshotState.getTotalChunks());
}
}
} catch (IOException e) {
* creates and return a ByteString chunk
*/
private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
- LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId);
- if (followerToSnapshot == null) {
- followerToSnapshot = new LeaderInstallSnapshotState(snapshotBytes, context.getConfigParams().getSnapshotChunkSize(),
+ LeaderInstallSnapshotState installSnapshotState = followerToLog.get(followerId).getInstallSnapshotState();
+ if (installSnapshotState == null) {
+ installSnapshotState = new LeaderInstallSnapshotState(snapshotBytes, context.getConfigParams().getSnapshotChunkSize(),
logName());
- mapFollowerToSnapshot.put(followerId, followerToSnapshot);
+ followerToLog.get(followerId).setLeaderInstallSnapshotState(installSnapshotState);
}
- byte[] nextChunk = followerToSnapshot.getNextChunk();
+ byte[] nextChunk = installSnapshotState.getNextChunk();
LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length);
return followerToLog.get(followerId);
}
- @VisibleForTesting
- protected void setFollowerSnapshot(String followerId, LeaderInstallSnapshotState snapshot) {
- mapFollowerToSnapshot.put(followerId, snapshot);
- }
-
- @VisibleForTesting
- public int followerSnapshotSize() {
- return mapFollowerToSnapshot.size();
- }
-
@VisibleForTesting
public int followerLogSize() {
return followerToLog.size();
package org.opendaylight.controller.cluster.raft.behaviors;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
commitIndex, snapshotTerm, commitIndex, snapshotTerm));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
- leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+ leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
//send first chunk and no InstallSnapshotReply received yet
fts.getNextChunk();
commitIndex, snapshotTerm, commitIndex, snapshotTerm));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
- leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+ leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
while(!fts.isLastChunk(fts.getChunkIndex())) {
fts.getNextChunk();
fts.incrementChunkIndex();
assertTrue(raftBehavior instanceof Leader);
- assertEquals(0, leader.followerSnapshotSize());
assertEquals(1, leader.followerLogSize());
FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
assertNotNull(fli);
+ assertNull(fli.getInstallSnapshotState());
assertEquals(commitIndex, fli.getMatchIndex());
assertEquals(commitIndex + 1, fli.getNextIndex());
+ assertFalse(leader.hasSnapshot());
}
@Test
}
@Test
- public void testFollowerToSnapshotLogic() {
- logStart("testFollowerToSnapshotLogic");
-
- MockRaftActorContext actorContext = createActorContext();
-
- actorContext.setConfigParams(new DefaultConfigParamsImpl() {
- @Override
- public int getSnapshotChunkSize() {
- return 50;
- }
- });
-
- leader = new Leader(actorContext);
+ public void testLeaderInstallSnapshotState() {
+ logStart("testLeaderInstallSnapshotState");
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
ByteString bs = toByteString(leadersSnapshot);
byte[] barray = bs.toByteArray();
- LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
- actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
- leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+ LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs, 50, "test");
assertEquals(bs.size(), barray.length);