- private void replicate(Replicate replicate) {
- long logIndex = replicate.getReplicatedLogEntry().getIndex();
-
- context.getLogger().debug("Replicate message " + logIndex);
-
- // Create a tracker entry we will use this later to notify the
- // client actor
- trackerList.add(
- new ClientRequestTrackerImpl(replicate.getClientActor(),
- replicate.getIdentifier(),
- logIndex)
- );
-
- if (followers.size() == 0) {
- context.setCommitIndex(logIndex);
- applyLogToStateMachine(logIndex);
- } else {
- sendAppendEntries();
- }
- }
-
- private void sendAppendEntries() {
- // Send an AppendEntries to all followers
- for (String followerId : followers) {
- ActorSelection followerActor = context.getPeerActorSelection(followerId);
-
- if (followerActor != null) {
- FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- long followerNextIndex = followerLogInformation.getNextIndex().get();
- List<ReplicatedLogEntry> entries = Collections.emptyList();
-
- if (mapFollowerToSnapshot.get(followerId) != null) {
- if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
- sendSnapshotChunk(followerActor, followerId);
- }
-
- } else {
-
- if (context.getReplicatedLog().isPresent(followerNextIndex)) {
- // FIXME : Sending one entry at a time
- entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
-
- followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(followerNextIndex),
- prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex()).toSerializable(),
- actor()
- );
-
- } else {
- // if the followers next index is not present in the leaders log, then snapshot should be sent
- long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
- long leaderLastIndex = context.getReplicatedLog().lastIndex();
- if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
- // if the follower is just not starting and leader's index
- // is more than followers index
- context.getLogger().debug("SendInstallSnapshot to follower:{}," +
- "follower-nextIndex:{}, leader-snapshot-index:{}, " +
- "leader-last-index:{}", followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex);
-
- actor().tell(new SendInstallSnapshot(), actor());
- } else {
- followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(followerNextIndex),
- prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex()).toSerializable(),
- actor()
- );
- }
- }
- }
- }