X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeader.java;h=d83362b58081c0e4c4576a848bf10ca29d8fc7da;hp=234f9db664e4d43e833e4e354e3d3045094dd381;hb=b17a51ecb983331f0e521e40f9dd2474f268de13;hpb=bd943b7ee79b6324c561f8fbe2bea4a4293d5dd1 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 234f9db664..d83362b580 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -11,7 +11,10 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; @@ -19,6 +22,8 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -30,6 +35,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import scala.concurrent.duration.FiniteDuration; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -64,38 +70,38 @@ import java.util.concurrent.atomic.AtomicLong; public class Leader extends AbstractRaftActorBehavior { - private final Map followerToLog = - new HashMap(); + protected final Map followerToLog = new HashMap<>(); + protected final Map mapFollowerToSnapshot = new HashMap<>(); private final Set followers; private Cancellable heartbeatSchedule = null; - private Cancellable appendEntriesSchedule = null; private Cancellable installSnapshotSchedule = null; private List trackerList = new ArrayList<>(); private final int minReplicationCount; + private Optional snapshot; + public Leader(RaftActorContext context) { super(context); - if (lastIndex() >= 0) { - context.setCommitIndex(lastIndex()); - } - followers = context.getPeerAddresses().keySet(); for (String followerId : followers) { FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, - new AtomicLong(lastIndex()), - new AtomicLong(-1)); + new AtomicLong(context.getCommitIndex()), + new AtomicLong(-1), + context.getConfigParams().getElectionTimeOutInterval()); followerToLog.put(followerId, followerLogInformation); } - context.getLogger().debug("Election:Leader has following peers:"+ followers); + if(LOG.isDebugEnabled()) { + LOG.debug("Election:Leader has following peers: {}", followers); + } if (followers.size() > 0) { minReplicationCount = (followers.size() + 1) / 2 + 1; @@ -103,6 +109,7 @@ public class Leader extends AbstractRaftActorBehavior { minReplicationCount = 0; } + snapshot = Optional.absent(); // Immediately schedule a heartbeat // Upon election: send initial empty AppendEntries RPCs @@ -117,20 +124,32 @@ public class Leader extends AbstractRaftActorBehavior { } - @Override protected RaftState handleAppendEntries(ActorRef sender, + private Optional getSnapshot() { + return snapshot; + } + + @VisibleForTesting + void setSnapshot(Optional snapshot) { + this.snapshot = snapshot; + } + + @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { - context.getLogger().debug(appendEntries.toString()); + if(LOG.isDebugEnabled()) { + LOG.debug(appendEntries.toString()); + } - return state(); + return this; } - @Override protected RaftState handleAppendEntriesReply(ActorRef sender, + @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { if(! appendEntriesReply.isSuccess()) { - context.getLogger() - .debug(appendEntriesReply.toString()); + if(LOG.isDebugEnabled()) { + LOG.debug(appendEntriesReply.toString()); + } } // Update the FollowerLogInformation @@ -139,10 +158,12 @@ public class Leader extends AbstractRaftActorBehavior { followerToLog.get(followerId); if(followerLogInformation == null){ - context.getLogger().error("Unknown follower {}", followerId); - return state(); + LOG.error("Unknown follower {}", followerId); + return this; } + followerLogInformation.markFollowerActive(); + if (appendEntriesReply.isSuccess()) { followerLogInformation .setMatchIndex(appendEntriesReply.getLogLastIndex()); @@ -190,7 +211,17 @@ public class Leader extends AbstractRaftActorBehavior { applyLogToStateMachine(context.getCommitIndex()); } - return state(); + return this; + } + + protected ClientRequestTracker removeClientRequestTracker(long logIndex) { + + ClientRequestTracker toRemove = findClientRequestTracker(logIndex); + if(toRemove != null) { + trackerList.remove(toRemove); + } + + return toRemove; } protected ClientRequestTracker findClientRequestTracker(long logIndex) { @@ -203,16 +234,16 @@ public class Leader extends AbstractRaftActorBehavior { return null; } - @Override protected RaftState handleRequestVoteReply(ActorRef sender, + @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) { - return state(); + return this; } @Override public RaftState state() { return RaftState.Leader; } - @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) { + @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { Preconditions.checkNotNull(sender, "sender should not be null"); Object message = fromSerializableMessage(originalMessage); @@ -224,17 +255,27 @@ public class Leader extends AbstractRaftActorBehavior { // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { context.getTermInformation().updateAndPersist(rpc.getTerm(), null); - return RaftState.Follower; + + return switchBehavior(new Follower(context)); } } try { if (message instanceof SendHeartBeat) { - return sendHeartBeat(); - } else if(message instanceof SendInstallSnapshot) { + sendHeartBeat(); + return this; + + } else if(message instanceof InitiateInstallSnapshot) { installSnapshotIfNeeded(); + + } else if(message instanceof SendInstallSnapshot) { + // received from RaftActor + setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot())); + sendInstallSnapshot(); + } else if (message instanceof Replicate) { replicate((Replicate) message); + } else if (message instanceof InstallSnapshotReply){ handleInstallSnapshotReply( (InstallSnapshotReply) message); @@ -246,22 +287,69 @@ public class Leader extends AbstractRaftActorBehavior { return super.handleMessage(sender, message); } - private void handleInstallSnapshotReply(InstallSnapshotReply message) { - InstallSnapshotReply reply = message; + private void handleInstallSnapshotReply(InstallSnapshotReply reply) { String followerId = reply.getFollowerId(); - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); + FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + FollowerLogInformation followerLogInformation = followerToLog.get(followerId); + followerLogInformation.markFollowerActive(); + + if (followerToSnapshot != null && + followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { + + if (reply.isSuccess()) { + if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) { + //this was the last chunk reply + if(LOG.isDebugEnabled()) { + LOG.debug("InstallSnapshotReply received, " + + "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}", + reply.getChunkIndex(), followerId, + context.getReplicatedLog().getSnapshotIndex() + 1 + ); + } + + followerLogInformation.setMatchIndex( + context.getReplicatedLog().getSnapshotIndex()); + followerLogInformation.setNextIndex( + context.getReplicatedLog().getSnapshotIndex() + 1); + mapFollowerToSnapshot.remove(followerId); + + if(LOG.isDebugEnabled()) { + LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" + + followerToLog.get(followerId).getNextIndex().get()); + } + + if (mapFollowerToSnapshot.isEmpty()) { + // once there are no pending followers receiving snapshots + // we can remove snapshot from the memory + setSnapshot(Optional.absent()); + } + + } else { + followerToSnapshot.markSendStatus(true); + } + } else { + LOG.info("InstallSnapshotReply received, " + + "sending snapshot chunk failed, Will retry, Chunk:{}", + reply.getChunkIndex() + ); + followerToSnapshot.markSendStatus(false); + } - followerLogInformation - .setMatchIndex(context.getReplicatedLog().getSnapshotIndex()); - followerLogInformation - .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1); + } else { + LOG.error("ERROR!!" + + "FollowerId in InstallSnapshotReply not known to Leader" + + " or Chunk Index in InstallSnapshotReply not matching {} != {}", + followerToSnapshot.getChunkIndex(), reply.getChunkIndex() + ); + } } private void replicate(Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); - context.getLogger().debug("Replicate message " + logIndex); + if(LOG.isDebugEnabled()) { + LOG.debug("Replicate message {}", logIndex); + } // Create a tracker entry we will use this later to notify the // client actor @@ -282,40 +370,92 @@ public class Leader extends AbstractRaftActorBehavior { private void sendAppendEntries() { // Send an AppendEntries to all followers for (String followerId : followers) { - ActorSelection followerActor = - context.getPeerActorSelection(followerId); + ActorSelection followerActor = context.getPeerActorSelection(followerId); if (followerActor != null) { - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); - - long nextIndex = followerLogInformation.getNextIndex().get(); - - List entries = Collections.emptyList(); + FollowerLogInformation followerLogInformation = followerToLog.get(followerId); + long followerNextIndex = followerLogInformation.getNextIndex().get(); + boolean isFollowerActive = followerLogInformation.isFollowerActive(); + List entries = null; + + if (mapFollowerToSnapshot.get(followerId) != null) { + // if install snapshot is in process , then sent next chunk if possible + if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { + sendSnapshotChunk(followerActor, followerId); + } else { + // we send a heartbeat even if we have not received a reply for the last chunk + sendAppendEntriesToFollower(followerActor, followerNextIndex, + Collections.emptyList()); + } + + } else { + long leaderLastIndex = context.getReplicatedLog().lastIndex(); + long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); + + if (isFollowerActive && + context.getReplicatedLog().isPresent(followerNextIndex)) { + // FIXME : Sending one entry at a time + entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); + + } else if (isFollowerActive && followerNextIndex >= 0 && + leaderLastIndex >= followerNextIndex ) { + // if the followers next index is not present in the leaders log, and + // if the follower is just not starting and if leader's index is more than followers index + // then snapshot should be sent + + if(LOG.isDebugEnabled()) { + LOG.debug("InitiateInstallSnapshot to follower:{}," + + "follower-nextIndex:{}, leader-snapshot-index:{}, " + + "leader-last-index:{}", followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex + ); + } + actor().tell(new InitiateInstallSnapshot(), actor()); + + // we would want to sent AE as the capture snapshot might take time + entries = Collections.emptyList(); + + } else { + //we send an AppendEntries, even if the follower is inactive + // in-order to update the followers timestamp, in case it becomes active again + entries = Collections.emptyList(); + } + + sendAppendEntriesToFollower(followerActor, followerNextIndex, entries); - if (context.getReplicatedLog().isPresent(nextIndex)) { - // FIXME : Sending one entry at a time - entries = - context.getReplicatedLog().getFrom(nextIndex, 1); } - - followerActor.tell( - new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(nextIndex), - prevLogTerm(nextIndex), entries, - context.getCommitIndex()).toSerializable(), - actor() - ); } } } + private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex, + List entries) { + followerActor.tell( + new AppendEntries(currentTerm(), context.getId(), + prevLogIndex(followerNextIndex), + prevLogTerm(followerNextIndex), entries, + context.getCommitIndex()).toSerializable(), + actor() + ); + } + /** * An installSnapshot is scheduled at a interval that is a multiple of * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing * snapshots at every heartbeat. + * + * Install Snapshot works as follows + * 1. Leader sends a InitiateInstallSnapshot message to self + * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor + * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log + * and makes a call to Leader's handleMessage , with SendInstallSnapshot message. + * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower + * 5. On complete, Follower sends back a InstallSnapshotReply. + * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower + * and replenishes the memory by deleting the snapshot in Replicated log. + * */ - private void installSnapshotIfNeeded(){ + private void installSnapshotIfNeeded() { for (String followerId : followers) { ActorSelection followerActor = context.getPeerActorSelection(followerId); @@ -326,26 +466,113 @@ public class Leader extends AbstractRaftActorBehavior { long nextIndex = followerLogInformation.getNextIndex().get(); - if (!context.getReplicatedLog().isPresent(nextIndex) && context - .getReplicatedLog().isInSnapshot(nextIndex)) { - followerActor.tell( - new InstallSnapshot(currentTerm(), context.getId(), - context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().getSnapshotTerm(), - context.getReplicatedLog().getSnapshot() - ), - actor() - ); + if (!context.getReplicatedLog().isPresent(nextIndex) && + context.getReplicatedLog().isInSnapshot(nextIndex)) { + LOG.info("{} follower needs a snapshot install", followerId); + if (snapshot.isPresent()) { + // if a snapshot is present in the memory, most likely another install is in progress + // no need to capture snapshot + sendSnapshotChunk(followerActor, followerId); + + } else { + initiateCaptureSnapshot(); + //we just need 1 follower who would need snapshot to be installed. + // when we have the snapshot captured, we would again check (in SendInstallSnapshot) + // who needs an install and send to all who need + break; + } + } } } } - private RaftState sendHeartBeat() { + // on every install snapshot, we try to capture the snapshot. + // Once a capture is going on, another one issued will get ignored by RaftActor. + private void initiateCaptureSnapshot() { + LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId()); + ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied()); + long lastAppliedIndex = -1; + long lastAppliedTerm = -1; + + if (lastAppliedEntry != null) { + lastAppliedIndex = lastAppliedEntry.getIndex(); + lastAppliedTerm = lastAppliedEntry.getTerm(); + } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { + lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex(); + lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm(); + } + + boolean isInstallSnapshotInitiated = true; + actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), + lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated), + actor()); + } + + + private void sendInstallSnapshot() { + for (String followerId : followers) { + ActorSelection followerActor = context.getPeerActorSelection(followerId); + + if(followerActor != null) { + FollowerLogInformation followerLogInformation = followerToLog.get(followerId); + long nextIndex = followerLogInformation.getNextIndex().get(); + + if (!context.getReplicatedLog().isPresent(nextIndex) && + context.getReplicatedLog().isInSnapshot(nextIndex)) { + sendSnapshotChunk(followerActor, followerId); + } + } + } + } + + /** + * Sends a snapshot chunk to a given follower + * InstallSnapshot should qualify as a heartbeat too. + */ + private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { + try { + if (snapshot.isPresent()) { + followerActor.tell( + new InstallSnapshot(currentTerm(), context.getId(), + context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm(), + getNextSnapshotChunk(followerId,snapshot.get()), + mapFollowerToSnapshot.get(followerId).incrementChunkIndex(), + mapFollowerToSnapshot.get(followerId).getTotalChunks() + ).toSerializable(), + actor() + ); + LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}", + followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(), + mapFollowerToSnapshot.get(followerId).getTotalChunks()); + } + } catch (IOException e) { + LOG.error(e, "InstallSnapshot failed for Leader."); + } + } + + /** + * Acccepts snaphot as ByteString, enters into map for future chunks + * creates and return a ByteString chunk + */ + private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException { + FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + if (followerToSnapshot == null) { + followerToSnapshot = new FollowerToSnapshot(snapshotBytes); + mapFollowerToSnapshot.put(followerId, followerToSnapshot); + } + ByteString nextChunk = followerToSnapshot.getNextChunk(); + if (LOG.isDebugEnabled()) { + LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size()); + } + return nextChunk; + } + + private void sendHeartBeat() { if (followers.size() > 0) { sendAppendEntries(); } - return state(); } private void stopHeartBeat() { @@ -374,14 +601,11 @@ public class Leader extends AbstractRaftActorBehavior { // Scheduling the heartbeat only once here because heartbeats do not // need to be sent if there are other messages being sent to the remote // actor. - heartbeatSchedule = - context.getActorSystem().scheduler().scheduleOnce( - interval, - context.getActor(), new SendHeartBeat(), - context.getActorSystem().dispatcher(), context.getActor()); + heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce( + interval, context.getActor(), new SendHeartBeat(), + context.getActorSystem().dispatcher(), context.getActor()); } - private void scheduleInstallSnapshotCheck(FiniteDuration interval) { if(followers.size() == 0){ // Optimization - do not bother scheduling a heartbeat as there are @@ -396,7 +620,7 @@ public class Leader extends AbstractRaftActorBehavior { installSnapshotSchedule = context.getActorSystem().scheduler().scheduleOnce( interval, - context.getActor(), new SendInstallSnapshot(), + context.getActor(), new InitiateInstallSnapshot(), context.getActorSystem().dispatcher(), context.getActor()); } @@ -410,4 +634,116 @@ public class Leader extends AbstractRaftActorBehavior { return context.getId(); } + /** + * Encapsulates the snapshot bytestring and handles the logic of sending + * snapshot chunks + */ + protected class FollowerToSnapshot { + private ByteString snapshotBytes; + private int offset = 0; + // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset + private int replyReceivedForOffset; + // if replyStatus is false, the previous chunk is attempted + private boolean replyStatus = false; + private int chunkIndex; + private int totalChunks; + + public FollowerToSnapshot(ByteString snapshotBytes) { + this.snapshotBytes = snapshotBytes; + replyReceivedForOffset = -1; + chunkIndex = 1; + int size = snapshotBytes.size(); + totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) + + ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); + if(LOG.isDebugEnabled()) { + LOG.debug("Snapshot {} bytes, total chunks to send:{}", + size, totalChunks); + } + } + + public ByteString getSnapshotBytes() { + return snapshotBytes; + } + + public int incrementOffset() { + if(replyStatus) { + // if prev chunk failed, we would want to sent the same chunk again + offset = offset + context.getConfigParams().getSnapshotChunkSize(); + } + return offset; + } + + public int incrementChunkIndex() { + if (replyStatus) { + // if prev chunk failed, we would want to sent the same chunk again + chunkIndex = chunkIndex + 1; + } + return chunkIndex; + } + + public int getChunkIndex() { + return chunkIndex; + } + + public int getTotalChunks() { + return totalChunks; + } + + public boolean canSendNextChunk() { + // we only send a false if a chunk is sent but we have not received a reply yet + return replyReceivedForOffset == offset; + } + + public boolean isLastChunk(int chunkIndex) { + return totalChunks == chunkIndex; + } + + public void markSendStatus(boolean success) { + if (success) { + // if the chunk sent was successful + replyReceivedForOffset = offset; + replyStatus = true; + } else { + // if the chunk sent was failure + replyReceivedForOffset = offset; + replyStatus = false; + } + } + + public ByteString getNextChunk() { + int snapshotLength = getSnapshotBytes().size(); + int start = incrementOffset(); + int size = context.getConfigParams().getSnapshotChunkSize(); + if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) { + size = snapshotLength; + } else { + if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) { + size = snapshotLength - start; + } + } + + if(LOG.isDebugEnabled()) { + LOG.debug("length={}, offset={},size={}", + snapshotLength, start, size); + } + return getSnapshotBytes().substring(start, start + size); + + } + } + + // called from example-actor for printing the follower-states + public String printFollowerStates() { + StringBuilder sb = new StringBuilder(); + for(FollowerLogInformation followerLogInformation : followerToLog.values()) { + boolean isFollowerActive = followerLogInformation.isFollowerActive(); + sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},"); + + } + return "[" + sb.toString() + "]"; + } + + @VisibleForTesting + void markFollowerActive(String followerId) { + followerToLog.get(followerId).markFollowerActive(); + } }