X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=5076a8a38f891c894b9e01fdc61db5c0d5fa11a9;hp=7c439f7d5c929910c7dd04f4504deb7dc81c589f;hb=95d3c7975a423951dcbdecfbfa4cb6b7a23591cc;hpb=d3e310b940b60f6590f0e94a576aece95a055942 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 7c439f7d5c..5076a8a38f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -73,7 +73,6 @@ import scala.concurrent.duration.FiniteDuration; */ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private final Map followerToLog = new HashMap<>(); - private final Map mapFollowerToSnapshot = new HashMap<>(); /** * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really @@ -94,7 +93,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if(initializeFromLeader != null) { followerToLog.putAll(initializeFromLeader.followerToLog); - mapFollowerToSnapshot.putAll(initializeFromLeader.mapFollowerToSnapshot); snapshot = initializeFromLeader.snapshot; trackers.addAll(initializeFromLeader.trackers); } else { @@ -143,7 +141,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { public void removeFollower(String followerId) { followerToLog.remove(followerId); - mapFollowerToSnapshot.remove(followerId); } public void updateMinReplicaCount() { @@ -177,6 +174,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } + @VisibleForTesting + boolean hasSnapshot() { + return snapshot.isPresent(); + } + @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { @@ -437,29 +439,27 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply); String followerId = reply.getFollowerId(); - LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId); - - if (followerToSnapshot == null) { - LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply", - logName(), followerId); - return; - } - FollowerLogInformation followerLogInformation = followerToLog.get(followerId); if(followerLogInformation == null) { // This can happen during AddServer if it times out. LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply", logName(), followerId); - mapFollowerToSnapshot.remove(followerId); + return; + } + + LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState(); + if (installSnapshotState == null) { + LOG.error("{}: LeaderInstallSnapshotState not found for follower {} in InstallSnapshotReply", + logName(), followerId); return; } followerLogInformation.markFollowerActive(); - if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { + if (installSnapshotState.getChunkIndex() == reply.getChunkIndex()) { boolean wasLastChunk = false; if (reply.isSuccess()) { - if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) { + if(installSnapshotState.isLastChunk(reply.getChunkIndex())) { //this was the last chunk reply if(LOG.isDebugEnabled()) { LOG.debug("{}: InstallSnapshotReply received, " + @@ -472,17 +472,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long followerMatchIndex = snapshot.get().getLastIncludedIndex(); followerLogInformation.setMatchIndex(followerMatchIndex); followerLogInformation.setNextIndex(followerMatchIndex + 1); - mapFollowerToSnapshot.remove(followerId); + followerLogInformation.clearLeaderInstallSnapshotState(); LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}", logName(), followerId, followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex()); - if (mapFollowerToSnapshot.isEmpty()) { + if (!anyFollowersInstallingSnapshot()) { // once there are no pending followers receiving snapshots // we can remove snapshot from the memory setSnapshot(null); } + wasLastChunk = true; if(context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED){ UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess = @@ -491,19 +492,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.debug("Sent message UnInitializedFollowerSnapshotReply to self"); } } else { - followerToSnapshot.markSendStatus(true); + installSnapshotState.markSendStatus(true); } } else { LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}", logName(), reply.getChunkIndex()); - followerToSnapshot.markSendStatus(false); + installSnapshotState.markSendStatus(false); } if (wasLastChunk && !context.getSnapshotManager().isCapturing()) { // Since the follower is now caught up try to purge the log. purgeInMemoryLog(); - } else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) { + } else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) { ActorSelection followerActor = context.getPeerActorSelection(followerId); if(followerActor != null) { sendSnapshotChunk(followerActor, followerId); @@ -513,16 +514,27 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } else { LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}", logName(), reply.getChunkIndex(), followerId, - followerToSnapshot.getChunkIndex()); + installSnapshotState.getChunkIndex()); if(reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX){ // Since the Follower did not find this index to be valid we should reset the follower snapshot // so that Installing the snapshot can resume from the beginning - followerToSnapshot.reset(); + installSnapshotState.reset(); } } } + private boolean anyFollowersInstallingSnapshot() { + for(FollowerLogInformation info: followerToLog.values()) { + if(info.getInstallSnapshotState() != null) { + return true; + } + + } + + return false; + } + private void replicate(Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); @@ -579,9 +591,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { boolean sendAppendEntries = false; List entries = Collections.emptyList(); - if (mapFollowerToSnapshot.get(followerId) != null) { + LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState(); + if (installSnapshotState != null) { // if install snapshot is in process , then sent next chunk if possible - if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { + if (isFollowerActive && installSnapshotState.canSendNextChunk()) { sendSnapshotChunk(followerActor, followerId); } else if(sendHeartbeat) { // we send a heartbeat even if we have not received a reply for the last chunk @@ -724,11 +737,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Note: the previous call to getNextSnapshotChunk has the side-effect of adding // followerId to the followerToSnapshot map. - LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId); + LeaderInstallSnapshotState installSnapshotState = followerToLog.get(followerId).getInstallSnapshotState(); - int nextChunkIndex = followerToSnapshot.incrementChunkIndex(); + int nextChunkIndex = installSnapshotState.incrementChunkIndex(); Optional serverConfig = Optional.absent(); - if(followerToSnapshot.isLastChunk(nextChunkIndex)) { + if(installSnapshotState.isLastChunk(nextChunkIndex)) { serverConfig = Optional.fromNullable(context.getPeerServerInfo(true)); } @@ -738,8 +751,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { snapshot.get().getLastIncludedTerm(), nextSnapshotChunk, nextChunkIndex, - followerToSnapshot.getTotalChunks(), - Optional.of(followerToSnapshot.getLastChunkHashCode()), + installSnapshotState.getTotalChunks(), + Optional.of(installSnapshotState.getLastChunkHashCode()), serverConfig ).toSerializable(followerToLog.get(followerId).getRaftVersion()), actor() @@ -747,8 +760,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if(LOG.isDebugEnabled()) { LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", - logName(), followerActor.path(), followerToSnapshot.getChunkIndex(), - followerToSnapshot.getTotalChunks()); + logName(), followerActor.path(), installSnapshotState.getChunkIndex(), + installSnapshotState.getTotalChunks()); } } } catch (IOException e) { @@ -761,13 +774,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * creates and return a ByteString chunk */ private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException { - LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId); - if (followerToSnapshot == null) { - followerToSnapshot = new LeaderInstallSnapshotState(snapshotBytes, context.getConfigParams().getSnapshotChunkSize(), + LeaderInstallSnapshotState installSnapshotState = followerToLog.get(followerId).getInstallSnapshotState(); + if (installSnapshotState == null) { + installSnapshotState = new LeaderInstallSnapshotState(snapshotBytes, context.getConfigParams().getSnapshotChunkSize(), logName()); - mapFollowerToSnapshot.put(followerId, followerToSnapshot); + followerToLog.get(followerId).setLeaderInstallSnapshotState(installSnapshotState); } - byte[] nextChunk = followerToSnapshot.getNextChunk(); + byte[] nextChunk = installSnapshotState.getNextChunk(); LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length); @@ -857,16 +870,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return followerToLog.get(followerId); } - @VisibleForTesting - protected void setFollowerSnapshot(String followerId, LeaderInstallSnapshotState snapshot) { - mapFollowerToSnapshot.put(followerId, snapshot); - } - - @VisibleForTesting - public int followerSnapshotSize() { - return mapFollowerToSnapshot.size(); - } - @VisibleForTesting public int followerLogSize() { return followerToLog.size();