- Object message = fromSerializableMessage(originalMessage);
-
- if (message instanceof RaftRPC) {
- RaftRPC rpc = (RaftRPC) message;
- // If RPC request or response contains term T > currentTerm:
- // set currentTerm = T, convert to follower (§5.1)
- // This applies to all RPC messages and responses
- if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
- context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
-
- return switchBehavior(new Follower(context));
- }
- }
-
- try {
- if (message instanceof SendHeartBeat) {
- sendHeartBeat();
- return this;
-
- } else if(message instanceof InitiateInstallSnapshot) {
- installSnapshotIfNeeded();
-
- } else if(message instanceof SendInstallSnapshot) {
- // received from RaftActor
- setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
- sendInstallSnapshot();
-
- } else if (message instanceof Replicate) {
- replicate((Replicate) message);
-
- } else if (message instanceof InstallSnapshotReply){
- handleInstallSnapshotReply(
- (InstallSnapshotReply) message);
- }
- } finally {
- scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
- }
-
- return super.handleMessage(sender, message);
- }
-
- private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
- String followerId = reply.getFollowerId();
- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
- FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- followerLogInformation.markFollowerActive();
-
- if (followerToSnapshot != null &&
- followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
-
- if (reply.isSuccess()) {
- if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
- //this was the last chunk reply
- if(LOG.isDebugEnabled()) {
- LOG.debug("InstallSnapshotReply received, " +
- "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
- reply.getChunkIndex(), followerId,
- context.getReplicatedLog().getSnapshotIndex() + 1
- );
- }
-
- followerLogInformation.setMatchIndex(
- context.getReplicatedLog().getSnapshotIndex());
- followerLogInformation.setNextIndex(
- context.getReplicatedLog().getSnapshotIndex() + 1);
- mapFollowerToSnapshot.remove(followerId);
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
- followerToLog.get(followerId).getNextIndex().get());
- }
-
- if (mapFollowerToSnapshot.isEmpty()) {
- // once there are no pending followers receiving snapshots
- // we can remove snapshot from the memory
- setSnapshot(Optional.<ByteString>absent());
- }
-
- } else {
- followerToSnapshot.markSendStatus(true);
- }
- } else {
- LOG.info("InstallSnapshotReply received, " +
- "sending snapshot chunk failed, Will retry, Chunk:{}",
- reply.getChunkIndex()
- );
- followerToSnapshot.markSendStatus(false);
- }
-
- } else {
- LOG.error("ERROR!!" +
- "FollowerId in InstallSnapshotReply not known to Leader" +
- " or Chunk Index in InstallSnapshotReply not matching {} != {}",
- followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
- );
- }
- }
-
- private void replicate(Replicate replicate) {
- long logIndex = replicate.getReplicatedLogEntry().getIndex();
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("Replicate message {}", logIndex);
- }
-
- // Create a tracker entry we will use this later to notify the
- // client actor
- trackerList.add(
- new ClientRequestTrackerImpl(replicate.getClientActor(),
- replicate.getIdentifier(),
- logIndex)
- );
-
- if (followers.size() == 0) {
- context.setCommitIndex(logIndex);
- applyLogToStateMachine(logIndex);
- } else {
- sendAppendEntries();
- }
- }
-
- private void sendAppendEntries() {
- // Send an AppendEntries to all followers
- for (String followerId : followers) {
- ActorSelection followerActor = context.getPeerActorSelection(followerId);
-
- if (followerActor != null) {
- FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- long followerNextIndex = followerLogInformation.getNextIndex().get();
- boolean isFollowerActive = followerLogInformation.isFollowerActive();
- List<ReplicatedLogEntry> entries = null;
-
- if (mapFollowerToSnapshot.get(followerId) != null) {
- // if install snapshot is in process , then sent next chunk if possible
- if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
- sendSnapshotChunk(followerActor, followerId);
- } else {
- // we send a heartbeat even if we have not received a reply for the last chunk
- sendAppendEntriesToFollower(followerActor, followerNextIndex,
- Collections.<ReplicatedLogEntry>emptyList());
- }
-
- } else {
- long leaderLastIndex = context.getReplicatedLog().lastIndex();
- long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
-
- if (isFollowerActive &&
- context.getReplicatedLog().isPresent(followerNextIndex)) {
- // FIXME : Sending one entry at a time
- entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
-
- } else if (isFollowerActive && followerNextIndex >= 0 &&
- leaderLastIndex >= followerNextIndex ) {
- // if the followers next index is not present in the leaders log, and
- // if the follower is just not starting and if leader's index is more than followers index
- // then snapshot should be sent
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("InitiateInstallSnapshot to follower:{}," +
- "follower-nextIndex:{}, leader-snapshot-index:{}, " +
- "leader-last-index:{}", followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex
- );
- }
- actor().tell(new InitiateInstallSnapshot(), actor());
-
- // we would want to sent AE as the capture snapshot might take time
- entries = Collections.<ReplicatedLogEntry>emptyList();
-
- } else {
- //we send an AppendEntries, even if the follower is inactive
- // in-order to update the followers timestamp, in case it becomes active again
- entries = Collections.<ReplicatedLogEntry>emptyList();
- }
-
- sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
-
- }
- }
- }
- }
-
- private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
- List<ReplicatedLogEntry> entries) {
- followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(followerNextIndex),
- prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex()).toSerializable(),
- actor()
- );
- }
-
- /**
- * An installSnapshot is scheduled at a interval that is a multiple of
- * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
- * snapshots at every heartbeat.
- *
- * Install Snapshot works as follows
- * 1. Leader sends a InitiateInstallSnapshot message to self
- * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor
- * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
- * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
- * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
- * 5. On complete, Follower sends back a InstallSnapshotReply.
- * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
- * and replenishes the memory by deleting the snapshot in Replicated log.
- *
- */
- private void installSnapshotIfNeeded() {
- for (String followerId : followers) {
- ActorSelection followerActor =
- context.getPeerActorSelection(followerId);
-
- if(followerActor != null) {
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- long nextIndex = followerLogInformation.getNextIndex().get();
-
- if (!context.getReplicatedLog().isPresent(nextIndex) &&
- context.getReplicatedLog().isInSnapshot(nextIndex)) {
- LOG.info("{} follower needs a snapshot install", followerId);
- if (snapshot.isPresent()) {
- // if a snapshot is present in the memory, most likely another install is in progress
- // no need to capture snapshot
- sendSnapshotChunk(followerActor, followerId);
-
- } else {
- initiateCaptureSnapshot();
- //we just need 1 follower who would need snapshot to be installed.
- // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
- // who needs an install and send to all who need
- break;
- }
-
- }
- }
- }
- }
-
- // on every install snapshot, we try to capture the snapshot.
- // Once a capture is going on, another one issued will get ignored by RaftActor.
- private void initiateCaptureSnapshot() {
- LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
- ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
- long lastAppliedIndex = -1;
- long lastAppliedTerm = -1;
-
- if (lastAppliedEntry != null) {
- lastAppliedIndex = lastAppliedEntry.getIndex();
- lastAppliedTerm = lastAppliedEntry.getTerm();
- } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
- lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
- lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
- }
-
- boolean isInstallSnapshotInitiated = true;
- actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
- lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
- actor());
- }
-
-
- private void sendInstallSnapshot() {
- for (String followerId : followers) {
- ActorSelection followerActor = context.getPeerActorSelection(followerId);
-
- if(followerActor != null) {
- FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- long nextIndex = followerLogInformation.getNextIndex().get();
-
- if (!context.getReplicatedLog().isPresent(nextIndex) &&
- context.getReplicatedLog().isInSnapshot(nextIndex)) {
- sendSnapshotChunk(followerActor, followerId);
- }
- }
- }
- }
-
- /**
- * Sends a snapshot chunk to a given follower
- * InstallSnapshot should qualify as a heartbeat too.
- */
- private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
- try {
- if (snapshot.isPresent()) {
- followerActor.tell(
- new InstallSnapshot(currentTerm(), context.getId(),
- context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().getSnapshotTerm(),
- getNextSnapshotChunk(followerId,snapshot.get()),
- mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks()
- ).toSerializable(),
- actor()
- );
- LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
- followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks());