X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeader.java;h=d83362b58081c0e4c4576a848bf10ca29d8fc7da;hp=90948ffef7d8a5e1341bb8aede6b03ccf8dae344;hb=915a86bcff78e373ae9487e19f5e24828ccc1e9b;hpb=15fa131be8b16703089a6d8508546120cf15d45d diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 90948ffef7..d83362b580 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -11,6 +11,8 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; @@ -20,6 +22,8 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -66,39 +70,38 @@ import java.util.concurrent.atomic.AtomicLong; public class Leader extends AbstractRaftActorBehavior { - protected final Map followerToLog = - new HashMap(); + protected final Map followerToLog = new HashMap<>(); protected final Map mapFollowerToSnapshot = new HashMap<>(); private final Set followers; private Cancellable heartbeatSchedule = null; - private Cancellable appendEntriesSchedule = null; private Cancellable installSnapshotSchedule = null; private List trackerList = new ArrayList<>(); private final int minReplicationCount; + private Optional snapshot; + public Leader(RaftActorContext context) { super(context); - if (lastIndex() >= 0) { - context.setCommitIndex(lastIndex()); - } - followers = context.getPeerAddresses().keySet(); for (String followerId : followers) { FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, - new AtomicLong(lastIndex()), - new AtomicLong(-1)); + new AtomicLong(context.getCommitIndex()), + new AtomicLong(-1), + context.getConfigParams().getElectionTimeOutInterval()); followerToLog.put(followerId, followerLogInformation); } - context.getLogger().debug("Election:Leader has following peers:"+ followers); + if(LOG.isDebugEnabled()) { + LOG.debug("Election:Leader has following peers: {}", followers); + } if (followers.size() > 0) { minReplicationCount = (followers.size() + 1) / 2 + 1; @@ -106,6 +109,7 @@ public class Leader extends AbstractRaftActorBehavior { minReplicationCount = 0; } + snapshot = Optional.absent(); // Immediately schedule a heartbeat // Upon election: send initial empty AppendEntries RPCs @@ -120,20 +124,32 @@ public class Leader extends AbstractRaftActorBehavior { } - @Override protected RaftState handleAppendEntries(ActorRef sender, + private Optional getSnapshot() { + return snapshot; + } + + @VisibleForTesting + void setSnapshot(Optional snapshot) { + this.snapshot = snapshot; + } + + @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { - context.getLogger().debug(appendEntries.toString()); + if(LOG.isDebugEnabled()) { + LOG.debug(appendEntries.toString()); + } - return state(); + return this; } - @Override protected RaftState handleAppendEntriesReply(ActorRef sender, + @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { if(! appendEntriesReply.isSuccess()) { - context.getLogger() - .debug(appendEntriesReply.toString()); + if(LOG.isDebugEnabled()) { + LOG.debug(appendEntriesReply.toString()); + } } // Update the FollowerLogInformation @@ -142,10 +158,12 @@ public class Leader extends AbstractRaftActorBehavior { followerToLog.get(followerId); if(followerLogInformation == null){ - context.getLogger().error("Unknown follower {}", followerId); - return state(); + LOG.error("Unknown follower {}", followerId); + return this; } + followerLogInformation.markFollowerActive(); + if (appendEntriesReply.isSuccess()) { followerLogInformation .setMatchIndex(appendEntriesReply.getLogLastIndex()); @@ -193,7 +211,17 @@ public class Leader extends AbstractRaftActorBehavior { applyLogToStateMachine(context.getCommitIndex()); } - return state(); + return this; + } + + protected ClientRequestTracker removeClientRequestTracker(long logIndex) { + + ClientRequestTracker toRemove = findClientRequestTracker(logIndex); + if(toRemove != null) { + trackerList.remove(toRemove); + } + + return toRemove; } protected ClientRequestTracker findClientRequestTracker(long logIndex) { @@ -206,16 +234,16 @@ public class Leader extends AbstractRaftActorBehavior { return null; } - @Override protected RaftState handleRequestVoteReply(ActorRef sender, + @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) { - return state(); + return this; } @Override public RaftState state() { return RaftState.Leader; } - @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) { + @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { Preconditions.checkNotNull(sender, "sender should not be null"); Object message = fromSerializableMessage(originalMessage); @@ -227,17 +255,27 @@ public class Leader extends AbstractRaftActorBehavior { // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { context.getTermInformation().updateAndPersist(rpc.getTerm(), null); - return RaftState.Follower; + + return switchBehavior(new Follower(context)); } } try { if (message instanceof SendHeartBeat) { - return sendHeartBeat(); - } else if(message instanceof SendInstallSnapshot) { + sendHeartBeat(); + return this; + + } else if(message instanceof InitiateInstallSnapshot) { installSnapshotIfNeeded(); + + } else if(message instanceof SendInstallSnapshot) { + // received from RaftActor + setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot())); + sendInstallSnapshot(); + } else if (message instanceof Replicate) { replicate((Replicate) message); + } else if (message instanceof InstallSnapshotReply){ handleInstallSnapshotReply( (InstallSnapshotReply) message); @@ -251,8 +289,9 @@ public class Leader extends AbstractRaftActorBehavior { private void handleInstallSnapshotReply(InstallSnapshotReply reply) { String followerId = reply.getFollowerId(); - FollowerToSnapshot followerToSnapshot = - mapFollowerToSnapshot.get(followerId); + FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + FollowerLogInformation followerLogInformation = followerToLog.get(followerId); + followerLogInformation.markFollowerActive(); if (followerToSnapshot != null && followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { @@ -260,43 +299,57 @@ public class Leader extends AbstractRaftActorBehavior { if (reply.isSuccess()) { if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) { //this was the last chunk reply - context.getLogger().debug("InstallSnapshotReply received, " + - "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}", - reply.getChunkIndex(), followerId, - context.getReplicatedLog().getSnapshotIndex() + 1); + if(LOG.isDebugEnabled()) { + LOG.debug("InstallSnapshotReply received, " + + "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}", + reply.getChunkIndex(), followerId, + context.getReplicatedLog().getSnapshotIndex() + 1 + ); + } - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); followerLogInformation.setMatchIndex( context.getReplicatedLog().getSnapshotIndex()); followerLogInformation.setNextIndex( context.getReplicatedLog().getSnapshotIndex() + 1); mapFollowerToSnapshot.remove(followerId); - context.getLogger().debug("followerToLog.get(followerId).getNextIndex().get()=" + - followerToLog.get(followerId).getNextIndex().get()); + + if(LOG.isDebugEnabled()) { + LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" + + followerToLog.get(followerId).getNextIndex().get()); + } + + if (mapFollowerToSnapshot.isEmpty()) { + // once there are no pending followers receiving snapshots + // we can remove snapshot from the memory + setSnapshot(Optional.absent()); + } } else { followerToSnapshot.markSendStatus(true); } } else { - context.getLogger().info("InstallSnapshotReply received, " + - "sending snapshot chunk failed, Will retry, Chunk:{}", - reply.getChunkIndex()); + LOG.info("InstallSnapshotReply received, " + + "sending snapshot chunk failed, Will retry, Chunk:{}", + reply.getChunkIndex() + ); followerToSnapshot.markSendStatus(false); } } else { - context.getLogger().error("ERROR!!" + - "FollowerId in InstallSnapshotReply not known to Leader" + - " or Chunk Index in InstallSnapshotReply not matching {} != {}", - followerToSnapshot.getChunkIndex(), reply.getChunkIndex() ); + LOG.error("ERROR!!" + + "FollowerId in InstallSnapshotReply not known to Leader" + + " or Chunk Index in InstallSnapshotReply not matching {} != {}", + followerToSnapshot.getChunkIndex(), reply.getChunkIndex() + ); } } private void replicate(Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); - context.getLogger().debug("Replicate message " + logIndex); + if(LOG.isDebugEnabled()) { + LOG.debug("Replicate message {}", logIndex); + } // Create a tracker entry we will use this later to notify the // client actor @@ -322,61 +375,87 @@ public class Leader extends AbstractRaftActorBehavior { if (followerActor != null) { FollowerLogInformation followerLogInformation = followerToLog.get(followerId); long followerNextIndex = followerLogInformation.getNextIndex().get(); - List entries = Collections.emptyList(); + boolean isFollowerActive = followerLogInformation.isFollowerActive(); + List entries = null; if (mapFollowerToSnapshot.get(followerId) != null) { - if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { + // if install snapshot is in process , then sent next chunk if possible + if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { sendSnapshotChunk(followerActor, followerId); + } else { + // we send a heartbeat even if we have not received a reply for the last chunk + sendAppendEntriesToFollower(followerActor, followerNextIndex, + Collections.emptyList()); } } else { + long leaderLastIndex = context.getReplicatedLog().lastIndex(); + long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); - if (context.getReplicatedLog().isPresent(followerNextIndex)) { + if (isFollowerActive && + context.getReplicatedLog().isPresent(followerNextIndex)) { // FIXME : Sending one entry at a time entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); - followerActor.tell( - new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(followerNextIndex), - prevLogTerm(followerNextIndex), entries, - context.getCommitIndex()).toSerializable(), - actor() - ); - - } else { - // if the followers next index is not present in the leaders log, then snapshot should be sent - long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); - long leaderLastIndex = context.getReplicatedLog().lastIndex(); - if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) { - // if the follower is just not starting and leader's index - // is more than followers index - context.getLogger().debug("SendInstallSnapshot to follower:{}," + - "follower-nextIndex:{}, leader-snapshot-index:{}, " + - "leader-last-index:{}", followerId, - followerNextIndex, leaderSnapShotIndex, leaderLastIndex); - - actor().tell(new SendInstallSnapshot(), actor()); - } else { - followerActor.tell( - new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(followerNextIndex), - prevLogTerm(followerNextIndex), entries, - context.getCommitIndex()).toSerializable(), - actor() + } else if (isFollowerActive && followerNextIndex >= 0 && + leaderLastIndex >= followerNextIndex ) { + // if the followers next index is not present in the leaders log, and + // if the follower is just not starting and if leader's index is more than followers index + // then snapshot should be sent + + if(LOG.isDebugEnabled()) { + LOG.debug("InitiateInstallSnapshot to follower:{}," + + "follower-nextIndex:{}, leader-snapshot-index:{}, " + + "leader-last-index:{}", followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex ); } + actor().tell(new InitiateInstallSnapshot(), actor()); + + // we would want to sent AE as the capture snapshot might take time + entries = Collections.emptyList(); + + } else { + //we send an AppendEntries, even if the follower is inactive + // in-order to update the followers timestamp, in case it becomes active again + entries = Collections.emptyList(); } + + sendAppendEntriesToFollower(followerActor, followerNextIndex, entries); + } } } } + private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex, + List entries) { + followerActor.tell( + new AppendEntries(currentTerm(), context.getId(), + prevLogIndex(followerNextIndex), + prevLogTerm(followerNextIndex), entries, + context.getCommitIndex()).toSerializable(), + actor() + ); + } + /** * An installSnapshot is scheduled at a interval that is a multiple of * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing * snapshots at every heartbeat. + * + * Install Snapshot works as follows + * 1. Leader sends a InitiateInstallSnapshot message to self + * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor + * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log + * and makes a call to Leader's handleMessage , with SendInstallSnapshot message. + * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower + * 5. On complete, Follower sends back a InstallSnapshotReply. + * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower + * and replenishes the memory by deleting the snapshot in Replicated log. + * */ - private void installSnapshotIfNeeded(){ + private void installSnapshotIfNeeded() { for (String followerId : followers) { ActorSelection followerActor = context.getPeerActorSelection(followerId); @@ -387,6 +466,58 @@ public class Leader extends AbstractRaftActorBehavior { long nextIndex = followerLogInformation.getNextIndex().get(); + if (!context.getReplicatedLog().isPresent(nextIndex) && + context.getReplicatedLog().isInSnapshot(nextIndex)) { + LOG.info("{} follower needs a snapshot install", followerId); + if (snapshot.isPresent()) { + // if a snapshot is present in the memory, most likely another install is in progress + // no need to capture snapshot + sendSnapshotChunk(followerActor, followerId); + + } else { + initiateCaptureSnapshot(); + //we just need 1 follower who would need snapshot to be installed. + // when we have the snapshot captured, we would again check (in SendInstallSnapshot) + // who needs an install and send to all who need + break; + } + + } + } + } + } + + // on every install snapshot, we try to capture the snapshot. + // Once a capture is going on, another one issued will get ignored by RaftActor. + private void initiateCaptureSnapshot() { + LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId()); + ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied()); + long lastAppliedIndex = -1; + long lastAppliedTerm = -1; + + if (lastAppliedEntry != null) { + lastAppliedIndex = lastAppliedEntry.getIndex(); + lastAppliedTerm = lastAppliedEntry.getTerm(); + } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { + lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex(); + lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm(); + } + + boolean isInstallSnapshotInitiated = true; + actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), + lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated), + actor()); + } + + + private void sendInstallSnapshot() { + for (String followerId : followers) { + ActorSelection followerActor = context.getPeerActorSelection(followerId); + + if(followerActor != null) { + FollowerLogInformation followerLogInformation = followerToLog.get(followerId); + long nextIndex = followerLogInformation.getNextIndex().get(); + if (!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)) { sendSnapshotChunk(followerActor, followerId); @@ -401,22 +532,23 @@ public class Leader extends AbstractRaftActorBehavior { */ private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { try { - followerActor.tell( - new InstallSnapshot(currentTerm(), context.getId(), - context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().getSnapshotTerm(), - getNextSnapshotChunk(followerId, - context.getReplicatedLog().getSnapshot()), - mapFollowerToSnapshot.get(followerId).incrementChunkIndex(), - mapFollowerToSnapshot.get(followerId).getTotalChunks() - ).toSerializable(), - actor() - ); - context.getLogger().info("InstallSnapshot sent to follower {}, Chunk: {}/{}", - followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(), - mapFollowerToSnapshot.get(followerId).getTotalChunks()); + if (snapshot.isPresent()) { + followerActor.tell( + new InstallSnapshot(currentTerm(), context.getId(), + context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm(), + getNextSnapshotChunk(followerId,snapshot.get()), + mapFollowerToSnapshot.get(followerId).incrementChunkIndex(), + mapFollowerToSnapshot.get(followerId).getTotalChunks() + ).toSerializable(), + actor() + ); + LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}", + followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(), + mapFollowerToSnapshot.get(followerId).getTotalChunks()); + } } catch (IOException e) { - context.getLogger().error("InstallSnapshot failed for Leader.", e); + LOG.error(e, "InstallSnapshot failed for Leader."); } } @@ -431,16 +563,16 @@ public class Leader extends AbstractRaftActorBehavior { mapFollowerToSnapshot.put(followerId, followerToSnapshot); } ByteString nextChunk = followerToSnapshot.getNextChunk(); - context.getLogger().debug("Leader's snapshot nextChunk size:{}", nextChunk.size()); - + if (LOG.isDebugEnabled()) { + LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size()); + } return nextChunk; } - private RaftState sendHeartBeat() { + private void sendHeartBeat() { if (followers.size() > 0) { sendAppendEntries(); } - return state(); } private void stopHeartBeat() { @@ -469,14 +601,11 @@ public class Leader extends AbstractRaftActorBehavior { // Scheduling the heartbeat only once here because heartbeats do not // need to be sent if there are other messages being sent to the remote // actor. - heartbeatSchedule = - context.getActorSystem().scheduler().scheduleOnce( - interval, - context.getActor(), new SendHeartBeat(), - context.getActorSystem().dispatcher(), context.getActor()); + heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce( + interval, context.getActor(), new SendHeartBeat(), + context.getActorSystem().dispatcher(), context.getActor()); } - private void scheduleInstallSnapshotCheck(FiniteDuration interval) { if(followers.size() == 0){ // Optimization - do not bother scheduling a heartbeat as there are @@ -491,7 +620,7 @@ public class Leader extends AbstractRaftActorBehavior { installSnapshotSchedule = context.getActorSystem().scheduler().scheduleOnce( interval, - context.getActor(), new SendInstallSnapshot(), + context.getActor(), new InitiateInstallSnapshot(), context.getActorSystem().dispatcher(), context.getActor()); } @@ -526,8 +655,10 @@ public class Leader extends AbstractRaftActorBehavior { int size = snapshotBytes.size(); totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) + ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); - context.getLogger().debug("Snapshot {} bytes, total chunks to send:{}", - size, totalChunks); + if(LOG.isDebugEnabled()) { + LOG.debug("Snapshot {} bytes, total chunks to send:{}", + size, totalChunks); + } } public ByteString getSnapshotBytes() { @@ -591,11 +722,28 @@ public class Leader extends AbstractRaftActorBehavior { } } - context.getLogger().debug("length={}, offset={},size={}", - snapshotLength, start, size); + if(LOG.isDebugEnabled()) { + LOG.debug("length={}, offset={},size={}", + snapshotLength, start, size); + } return getSnapshotBytes().substring(start, start + size); } } + // called from example-actor for printing the follower-states + public String printFollowerStates() { + StringBuilder sb = new StringBuilder(); + for(FollowerLogInformation followerLogInformation : followerToLog.values()) { + boolean isFollowerActive = followerLogInformation.isFollowerActive(); + sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},"); + + } + return "[" + sb.toString() + "]"; + } + + @VisibleForTesting + void markFollowerActive(String followerId) { + followerToLog.get(followerId).markFollowerActive(); + } }