- if (message instanceof RaftRPC) {
- RaftRPC rpc = (RaftRPC) message;
- // If RPC request or response contains term T > currentTerm:
- // set currentTerm = T, convert to follower (§5.1)
- // This applies to all RPC messages and responses
- if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
- context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
- return RaftState.Follower;
- }
- }
-
- try {
- if (message instanceof SendHeartBeat) {
- return sendHeartBeat();
- } else if(message instanceof SendInstallSnapshot) {
- installSnapshotIfNeeded();
- } else if (message instanceof Replicate) {
- replicate((Replicate) message);
- } else if (message instanceof InstallSnapshotReply){
- handleInstallSnapshotReply(
- (InstallSnapshotReply) message);
- }
- } finally {
- scheduleHeartBeat(HEART_BEAT_INTERVAL);
- }
-
- return super.handleMessage(sender, message);
- }
-
- private void handleInstallSnapshotReply(InstallSnapshotReply message) {
- InstallSnapshotReply reply = message;
- String followerId = reply.getFollowerId();
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- followerLogInformation
- .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
- followerLogInformation
- .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
- }
-
- private void replicate(Replicate replicate) {
- long logIndex = replicate.getReplicatedLogEntry().getIndex();
-
- context.getLogger().debug("Replicate message " + logIndex);
-
- if (followerToActor.size() == 0) {
- context.setCommitIndex(
- replicate.getReplicatedLogEntry().getIndex());
-
- context.getActor()
- .tell(new ApplyState(replicate.getClientActor(),
- replicate.getIdentifier(),
- replicate.getReplicatedLogEntry()),
- context.getActor()
- );
- } else {
-
- // Create a tracker entry we will use this later to notify the
- // client actor
- trackerList.add(
- new ClientRequestTrackerImpl(replicate.getClientActor(),
- replicate.getIdentifier(),
- logIndex)
- );
-
- sendAppendEntries();
- }
- }
-
- private void sendAppendEntries() {
- // Send an AppendEntries to all followers
- for (String followerId : followerToActor.keySet()) {
- ActorSelection followerActor =
- followerToActor.get(followerId);
-
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- long nextIndex = followerLogInformation.getNextIndex().get();
-
- List<ReplicatedLogEntry> entries = Collections.emptyList();
-
- if(context.getReplicatedLog().isPresent(nextIndex)){
- // TODO: Instead of sending all entries from nextIndex
- // only send a fixed number of entries to each follower
- // This is to avoid the situation where there are a lot of
- // entries to install for a fresh follower or to a follower
- // that has fallen too far behind with the log but yet is not
- // eligible to receive a snapshot
- entries =
- context.getReplicatedLog().getFrom(nextIndex);