+ if (lastIndex() >= 0) {
+ context.setCommitIndex(lastIndex());
+ }
+
+ followers = context.getPeerAddresses().keySet();
+
+ for (String followerId : followers) {
+ FollowerLogInformation followerLogInformation =
+ new FollowerLogInformationImpl(followerId,
+ new AtomicLong(lastIndex()),
+ new AtomicLong(-1));
+
+ followerToLog.put(followerId, followerLogInformation);
+ }
+
+ context.getLogger().debug("Election:Leader has following peers:"+ followers);
+
+ if (followers.size() > 0) {
+ minReplicationCount = (followers.size() + 1) / 2 + 1;
+ } else {
+ minReplicationCount = 0;
+ }
+
+
+ // Immediately schedule a heartbeat
+ // Upon election: send initial empty AppendEntries RPCs
+ // (heartbeat) to each server; repeat during idle periods to
+ // prevent election timeouts (§5.2)
+ scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
+
+ scheduleInstallSnapshotCheck(
+ new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000,
+ context.getConfigParams().getHeartBeatInterval().unit())
+ );
+
+ }
+
+ @Override protected RaftState handleAppendEntries(ActorRef sender,
+ AppendEntries appendEntries) {
+
+ context.getLogger().info("Leader: Received {}", appendEntries.toString());
+
+ return state();
+ }
+
+ @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+ AppendEntriesReply appendEntriesReply) {
+
+ if(! appendEntriesReply.isSuccess()) {
+ context.getLogger()
+ .info("Leader: Received {}", appendEntriesReply.toString());
+ }
+
+ // Update the FollowerLogInformation
+ String followerId = appendEntriesReply.getFollowerId();
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+
+ if(followerLogInformation == null){
+ context.getLogger().error("Unknown follower {}", followerId);
+ return state();
+ }
+
+ if (appendEntriesReply.isSuccess()) {
+ followerLogInformation
+ .setMatchIndex(appendEntriesReply.getLogLastIndex());
+ followerLogInformation
+ .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
+ } else {
+
+ // TODO: When we find that the follower is out of sync with the
+ // Leader we simply decrement that followers next index by 1.
+ // Would it be possible to do better than this? The RAFT spec
+ // does not explicitly deal with it but may be something for us to
+ // think about
+
+ followerLogInformation.decrNextIndex();
+ }
+
+ // Now figure out if this reply warrants a change in the commitIndex
+ // If there exists an N such that N > commitIndex, a majority
+ // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
+ // set commitIndex = N (§5.3, §5.4).
+ for (long N = context.getCommitIndex() + 1; ; N++) {
+ int replicatedCount = 1;
+
+ for (FollowerLogInformation info : followerToLog.values()) {
+ if (info.getMatchIndex().get() >= N) {
+ replicatedCount++;
+ }
+ }
+
+ if (replicatedCount >= minReplicationCount) {
+ ReplicatedLogEntry replicatedLogEntry =
+ context.getReplicatedLog().get(N);
+ if (replicatedLogEntry != null
+ && replicatedLogEntry.getTerm()
+ == currentTerm()) {
+ context.setCommitIndex(N);
+ }
+ } else {
+ break;
+ }
+ }
+
+ // Apply the change to the state machine
+ if (context.getCommitIndex() > context.getLastApplied()) {
+ applyLogToStateMachine(context.getCommitIndex());
+ }
+
+ return state();
+ }
+
+ protected ClientRequestTracker findClientRequestTracker(long logIndex) {
+ for (ClientRequestTracker tracker : trackerList) {
+ if (tracker.getIndex() == logIndex) {
+ return tracker;
+ }
+ }
+
+ return null;
+ }
+
+ @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+ RequestVoteReply requestVoteReply) {
+ return state();
+ }
+
+ @Override public RaftState state() {
+ return RaftState.Leader;
+ }
+
+ @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+ Preconditions.checkNotNull(sender, "sender should not be null");
+
+ Object message = fromSerializableMessage(originalMessage);