import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMap.Builder;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.Collection;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
// This would be passed as the hash code of the last chunk when sending the first chunk
public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
- private final Map<String, FollowerLogInformation> followerToLog;
+ private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
private Cancellable heartbeatSchedule = null;
setLeaderPayloadVersion(context.getPayloadVersion());
- final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
for (String followerId : context.getPeerAddresses().keySet()) {
FollowerLogInformation followerLogInformation =
new FollowerLogInformationImpl(followerId, -1, context);
- ftlBuilder.put(followerId, followerLogInformation);
+ followerToLog.put(followerId, followerLogInformation);
}
- followerToLog = ftlBuilder.build();
leaderId = context.getId();
return followerToLog.keySet();
}
+ public void addFollower(String followerId, FollowerState followerState) {
+ FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, -1, context);
+ followerLogInformation.setFollowerState(followerState);
+ followerToLog.put(followerId, followerLogInformation);
+ }
+
@VisibleForTesting
void setSnapshot(@Nullable Snapshot snapshot) {
if(snapshot != null) {
mapFollowerToSnapshot.remove(followerId);
LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
- logName(), followerId, followerLogInformation.getMatchIndex(),
- followerLogInformation.getNextIndex());
+ logName(), followerId, followerLogInformation.getMatchIndex(),
+ followerLogInformation.getNextIndex());
if (mapFollowerToSnapshot.isEmpty()) {
// once there are no pending followers receiving snapshots
* then send the existing snapshot in chunks to the follower.
* @param followerId
*/
- private void initiateCaptureSnapshot(String followerId) {
+ public void initiateCaptureSnapshot(String followerId) {
if (snapshot.isPresent()) {
// if a snapshot is present in the memory, most likely another install is in progress
// no need to capture snapshot.
LOG.debug("{}: sendInstallSnapshot", logName());
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
+ FollowerLogInformation followerLogInfo = e.getValue();
if (followerActor != null) {
long nextIndex = e.getValue().getNextIndex();
- if (canInstallSnapshot(nextIndex)) {
+ if (followerLogInfo.getFollowerState() == FollowerState.VOTING_NOT_INITIALIZED ||
+ canInstallSnapshot(nextIndex)) {
sendSnapshotChunk(followerActor, e.getKey());
}
}