private Optional<SnapshotHolder> snapshot;
- public AbstractLeader(RaftActorContext context) {
- super(context, RaftState.Leader);
-
- setLeaderPayloadVersion(context.getPayloadVersion());
+ protected AbstractLeader(RaftActorContext context, RaftState state) {
+ super(context, state);
for(PeerInfo peerInfo: context.getPeers()) {
FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
followerToLog.put(peerInfo.getId(), followerLogInformation);
}
- leaderId = context.getId();
-
LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
updateMinReplicaCount();
beforeSendHeartbeat();
sendHeartBeat();
scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
- return this;
-
} else if(message instanceof SendInstallSnapshot) {
// received from RaftActor
setSnapshot(((SendInstallSnapshot) message).getSnapshot());
sendInstallSnapshot();
-
} else if (message instanceof Replicate) {
replicate((Replicate) message);
-
- } else if (message instanceof InstallSnapshotReply){
+ } else if (message instanceof InstallSnapshotReply) {
handleInstallSnapshotReply((InstallSnapshotReply) message);
-
+ } else {
+ return super.handleMessage(sender, message);
}
-
- return super.handleMessage(sender, message);
+ return this;
}
private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
}
@Override
- public void close() throws Exception {
+ public void close() {
stopHeartBeat();
}
@Override
- public String getLeaderId() {
+ public final String getLeaderId() {
return context.getId();
}
+ @Override
+ public final short getLeaderPayloadVersion() {
+ return context.getPayloadVersion();
+ }
+
protected boolean isLeaderIsolated() {
int minPresent = getMinIsolatedLeaderPeerCount();
for (FollowerLogInformation followerLogInformation : followerToLog.values()) {