- try {
- if (message instanceof SendHeartBeat) {
- return sendHeartBeat();
- } else if(message instanceof SendInstallSnapshot) {
- installSnapshotIfNeeded();
- } else if (message instanceof Replicate) {
- replicate((Replicate) message);
- } else if (message instanceof InstallSnapshotReply){
- // FIXME : Should I be checking the term here too?
- handleInstallSnapshotReply(
- (InstallSnapshotReply) message);
- }
- } finally {
- scheduleHeartBeat(HEART_BEAT_INTERVAL);
- }
-
- return super.handleMessage(sender, message);
- }
-
- private void handleInstallSnapshotReply(InstallSnapshotReply message) {
- InstallSnapshotReply reply = message;
- String followerId = reply.getFollowerId();
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- followerLogInformation
- .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
- followerLogInformation
- .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
- }
-
- private void replicate(Replicate replicate) {
- long logIndex = replicate.getReplicatedLogEntry().getIndex();
-
- context.getLogger().debug("Replicate message " + logIndex);
-
- if (followerToActor.size() == 0) {
- context.setCommitIndex(
- replicate.getReplicatedLogEntry().getIndex());
-
- context.getActor()
- .tell(new ApplyState(replicate.getClientActor(),
- replicate.getIdentifier(),
- replicate.getReplicatedLogEntry()),
- context.getActor()
- );
- } else {
-
- // Create a tracker entry we will use this later to notify the
- // client actor
- trackerList.add(
- new ClientRequestTrackerImpl(replicate.getClientActor(),
- replicate.getIdentifier(),
- logIndex)
- );
-
- sendAppendEntries();
- }
- }
-
- private void sendAppendEntries() {
- // Send an AppendEntries to all followers
- for (String followerId : followerToActor.keySet()) {
- ActorSelection followerActor =
- followerToActor.get(followerId);
-
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- long nextIndex = followerLogInformation.getNextIndex().get();
-
- List<ReplicatedLogEntry> entries = Collections.emptyList();
-
- if(context.getReplicatedLog().isPresent(nextIndex)){
- entries =
- context.getReplicatedLog().getFrom(nextIndex);
- }
-
- followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(nextIndex), prevLogTerm(nextIndex),
- entries, context.getCommitIndex()
- ),
- actor()
- );
- }
- }
-
- private void installSnapshotIfNeeded(){
- for (String followerId : followerToActor.keySet()) {
- ActorSelection followerActor =
- followerToActor.get(followerId);
-
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- long nextIndex = followerLogInformation.getNextIndex().get();
-
- if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){
- followerActor.tell(
- new InstallSnapshot(currentTerm(), context.getId(),
- context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().getSnapshotTerm(),
- context.getReplicatedLog().getSnapshot()
- ),
- actor()
- );