+ private void handleInstallSnapshotReply(InstallSnapshotReply message) {
+ InstallSnapshotReply reply = message;
+ String followerId = reply.getFollowerId();
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+
+ followerLogInformation
+ .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
+ followerLogInformation
+ .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
+ }
+
+ private void replicate(Replicate replicate) {
+ long logIndex = replicate.getReplicatedLogEntry().getIndex();
+
+ context.getLogger().debug("Replicate message " + logIndex);
+
+ if (followers.size() == 0) {
+ context.setCommitIndex(
+ replicate.getReplicatedLogEntry().getIndex());
+
+ context.getActor()
+ .tell(new ApplyState(replicate.getClientActor(),
+ replicate.getIdentifier(),
+ replicate.getReplicatedLogEntry()),
+ context.getActor()
+ );
+ } else {
+
+ // Create a tracker entry we will use this later to notify the
+ // client actor
+ trackerList.add(
+ new ClientRequestTrackerImpl(replicate.getClientActor(),
+ replicate.getIdentifier(),
+ logIndex)
+ );
+
+ sendAppendEntries();
+ }
+ }
+
+ private void sendAppendEntries() {
+ // Send an AppendEntries to all followers
+ for (String followerId : followers) {
+ ActorSelection followerActor =
+ context.getPeerActorSelection(followerId);
+
+ if (followerActor != null) {
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+
+ long nextIndex = followerLogInformation.getNextIndex().get();
+
+ List<ReplicatedLogEntry> entries = Collections.emptyList();
+
+ if (context.getReplicatedLog().isPresent(nextIndex)) {
+ // TODO: Instead of sending all entries from nextIndex
+ // only send a fixed number of entries to each follower
+ // This is to avoid the situation where there are a lot of
+ // entries to install for a fresh follower or to a follower
+ // that has fallen too far behind with the log but yet is not
+ // eligible to receive a snapshot
+ entries =
+ context.getReplicatedLog().getFrom(nextIndex);
+ }
+
+ followerActor.tell(
+ new AppendEntries(currentTerm(), context.getId(),
+ prevLogIndex(nextIndex),
+ prevLogTerm(nextIndex), entries,
+ context.getCommitIndex()).toSerializable(),
+ actor()
+ );
+ }
+ }
+ }
+
+ /**
+ * An installSnapshot is scheduled at a interval that is a multiple of
+ * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
+ * snapshots at every heartbeat.
+ */
+ private void installSnapshotIfNeeded(){
+ for (String followerId : followers) {
+ ActorSelection followerActor =
+ context.getPeerActorSelection(followerId);
+
+ if(followerActor != null) {
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+
+ long nextIndex = followerLogInformation.getNextIndex().get();
+
+ if (!context.getReplicatedLog().isPresent(nextIndex) && context
+ .getReplicatedLog().isInSnapshot(nextIndex)) {
+ followerActor.tell(
+ new InstallSnapshot(currentTerm(), context.getId(),
+ context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm(),
+ context.getReplicatedLog().getSnapshot()
+ ),
+ actor()
+ );
+ }
+ }
+ }
+ }
+
+ private RaftState sendHeartBeat() {
+ if (followers.size() > 0) {
+ sendAppendEntries();
+ }
+ return state();
+ }
+
+ private void stopHeartBeat() {
+ if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
+ heartbeatSchedule.cancel();
+ }
+ }
+
+ private void stopInstallSnapshotSchedule() {
+ if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
+ installSnapshotSchedule.cancel();
+ }
+ }
+