- Object message = fromSerializableMessage(originalMessage);
-
- if (message instanceof RaftRPC) {
- RaftRPC rpc = (RaftRPC) message;
- // If RPC request or response contains term T > currentTerm:
- // set currentTerm = T, convert to follower (§5.1)
- // This applies to all RPC messages and responses
- if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
- context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
- return RaftState.Follower;
- }
- }
-
- try {
- if (message instanceof SendHeartBeat) {
- return sendHeartBeat();
- } else if(message instanceof SendInstallSnapshot) {
- installSnapshotIfNeeded();
- } else if (message instanceof Replicate) {
- replicate((Replicate) message);
- } else if (message instanceof InstallSnapshotReply){
- handleInstallSnapshotReply(
- (InstallSnapshotReply) message);
- }
- } finally {
- scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
- }
-
- return super.handleMessage(sender, message);
- }
-
- private void handleInstallSnapshotReply(InstallSnapshotReply message) {
- InstallSnapshotReply reply = message;
- String followerId = reply.getFollowerId();
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- followerLogInformation
- .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
- followerLogInformation
- .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
- }
-
- private void replicate(Replicate replicate) {
- long logIndex = replicate.getReplicatedLogEntry().getIndex();
-
- context.getLogger().debug("Replicate message " + logIndex);
-
- // Create a tracker entry we will use this later to notify the
- // client actor
- trackerList.add(
- new ClientRequestTrackerImpl(replicate.getClientActor(),
- replicate.getIdentifier(),
- logIndex)
- );
-
- if (followers.size() == 0) {
- context.setCommitIndex(logIndex);
- applyLogToStateMachine(logIndex);
- } else {
- sendAppendEntries();
- }
- }
-
- private void sendAppendEntries() {
- // Send an AppendEntries to all followers
- for (String followerId : followers) {
- ActorSelection followerActor =
- context.getPeerActorSelection(followerId);
-
- if (followerActor != null) {
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- long nextIndex = followerLogInformation.getNextIndex().get();
-
- List<ReplicatedLogEntry> entries = Collections.emptyList();
-
- if (context.getReplicatedLog().isPresent(nextIndex)) {
- // FIXME : Sending one entry at a time
- entries =
- context.getReplicatedLog().getFrom(nextIndex, 1);
- }
-
- followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(nextIndex),
- prevLogTerm(nextIndex), entries,
- context.getCommitIndex()).toSerializable(),
- actor()
- );
- }
- }
- }
-
- /**
- * An installSnapshot is scheduled at a interval that is a multiple of
- * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
- * snapshots at every heartbeat.
- */
- private void installSnapshotIfNeeded(){
- for (String followerId : followers) {
- ActorSelection followerActor =
- context.getPeerActorSelection(followerId);
-
- if(followerActor != null) {
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- long nextIndex = followerLogInformation.getNextIndex().get();
-
- if (!context.getReplicatedLog().isPresent(nextIndex) && context
- .getReplicatedLog().isInSnapshot(nextIndex)) {
- followerActor.tell(
- new InstallSnapshot(currentTerm(), context.getId(),
- context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().getSnapshotTerm(),
- context.getReplicatedLog().getSnapshot()
- ),
- actor()
- );
- }