- private void handleInstallSnapshotReply(InstallSnapshotReply message) {
- InstallSnapshotReply reply = message;
- String followerId = reply.getFollowerId();
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- followerLogInformation
- .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
- followerLogInformation
- .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
- }
-
- private void replicate(Replicate replicate) {
- long logIndex = replicate.getReplicatedLogEntry().getIndex();
-
- context.getLogger().debug("Replicate message " + logIndex);
-
- if (followerToActor.size() == 0) {
- context.setCommitIndex(
- replicate.getReplicatedLogEntry().getIndex());
-
- context.getActor()
- .tell(new ApplyState(replicate.getClientActor(),
- replicate.getIdentifier(),
- replicate.getReplicatedLogEntry()),
- context.getActor()
- );
- } else {
-
- // Create a tracker entry we will use this later to notify the
- // client actor
- trackerList.add(
- new ClientRequestTrackerImpl(replicate.getClientActor(),
- replicate.getIdentifier(),
- logIndex)
- );
-
- sendAppendEntries();
- }
- }
-
- private void sendAppendEntries() {
- // Send an AppendEntries to all followers
- for (String followerId : followerToActor.keySet()) {
- ActorSelection followerActor =
- followerToActor.get(followerId);
-
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- long nextIndex = followerLogInformation.getNextIndex().get();
-
- List<ReplicatedLogEntry> entries = Collections.emptyList();
-
- if(context.getReplicatedLog().isPresent(nextIndex)){
- // TODO: Instead of sending all entries from nextIndex
- // only send a fixed number of entries to each follower
- // This is to avoid the situation where there are a lot of
- // entries to install for a fresh follower or to a follower
- // that has fallen too far behind with the log but yet is not
- // eligible to receive a snapshot
- entries =
- context.getReplicatedLog().getFrom(nextIndex);
- }
-
- followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(nextIndex), prevLogTerm(nextIndex),
- entries, context.getCommitIndex()
- ),
- actor()
- );
- }
- }
-
- /**
- * An installSnapshot is scheduled at a interval that is a multiple of
- * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
- * snapshots at every heartbeat.
- */
- private void installSnapshotIfNeeded(){
- for (String followerId : followerToActor.keySet()) {
- ActorSelection followerActor =
- followerToActor.get(followerId);
-
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- long nextIndex = followerLogInformation.getNextIndex().get();
-
- if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){
- followerActor.tell(
- new InstallSnapshot(currentTerm(), context.getId(),
- context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().getSnapshotTerm(),
- context.getReplicatedLog().getSnapshot()
- ),
- actor()
- );
- }
- }
- }
-
- private RaftState sendHeartBeat() {
- if (followerToActor.size() > 0) {
- sendAppendEntries();
- }
- return state();
- }
-
- private void stopHeartBeat() {
- if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
- heartbeatSchedule.cancel();
- }
- }
-
- private void stopInstallSnapshotSchedule() {