private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
private Cancellable heartbeatSchedule = null;
- private Optional<SnapshotHolder> snapshot;
+ private Optional<SnapshotHolder> snapshot = Optional.absent();;
private int minReplicationCount;
protected AbstractLeader(RaftActorContext context, RaftState state,
if(initializeFromLeader != null) {
followerToLog.putAll(initializeFromLeader.followerToLog);
+ mapFollowerToSnapshot.putAll(initializeFromLeader.mapFollowerToSnapshot);
+ snapshot = initializeFromLeader.snapshot;
+ trackers.addAll(initializeFromLeader.trackers);
} else {
for(PeerInfo peerInfo: context.getPeers()) {
FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
updateMinReplicaCount();
- snapshot = Optional.absent();
-
// Immediately schedule a heartbeat
// Upon election: send initial empty AppendEntries RPCs
// (heartbeat) to each server; repeat during idle periods to
LOG.trace("{}: checking Nth index {}", logName(), N);
for (FollowerLogInformation info : followerToLog.values()) {
final PeerInfo peerInfo = context.getPeerInfo(info.getId());
- if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) {
+ if(info.getMatchIndex() >= N && peerInfo != null && peerInfo.isVoting()) {
replicatedCount++;
} else if(LOG.isTraceEnabled()) {
LOG.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(),
protected void beforeSendHeartbeat(){}
@Override
- public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+ public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
Preconditions.checkNotNull(sender, "sender should not be null");
- Object message = fromSerializableMessage(originalMessage);
-
if (message instanceof RaftRPC) {
RaftRPC rpc = (RaftRPC) message;
// If RPC request or response contains term T > currentTerm:
// If the follower's nextIndex is -1 then we might as well send it a snapshot
// Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
// in the snapshot
- return (nextIndex == -1 ||
+ return nextIndex == -1 ||
(!context.getReplicatedLog().isPresent(nextIndex)
- && context.getReplicatedLog().isInSnapshot(nextIndex)));
+ && context.getReplicatedLog().isInSnapshot(nextIndex));
}
}
}
}
- return (minPresent != 0);
+ return minPresent != 0;
}
/**
public FollowerToSnapshot(ByteString snapshotBytes) {
this.snapshotBytes = snapshotBytes;
int size = snapshotBytes.size();
- totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
- ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
+ totalChunks = (size / context.getConfigParams().getSnapshotChunkSize()) +
+ (size % context.getConfigParams().getSnapshotChunkSize() > 0 ? 1 : 0);
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
logName(), size, totalChunks);