X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeader.java;h=9edba85865e87a0351ca14154f4665fe935f75d2;hp=fb8be8b891a315b420c6778f1691ba4712a069d7;hb=f14033146e051aca1b51c791373f6e867af340b0;hpb=c231099ee1cf768de3002e8a290befa172150b7a diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index fb8be8b891..9edba85865 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -12,6 +12,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; @@ -19,10 +20,9 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.internal.messages.Replicate; -import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat; -import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.Replicate; +import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; +import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; @@ -31,11 +31,13 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import scala.concurrent.duration.FiniteDuration; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -64,10 +66,11 @@ import java.util.concurrent.atomic.AtomicLong; public class Leader extends AbstractRaftActorBehavior { - private final Map followerToLog = + protected final Map followerToLog = new HashMap(); + protected final Map mapFollowerToSnapshot = new HashMap<>(); - private final Map followerToActor = new HashMap<>(); + private final Set followers; private Cancellable heartbeatSchedule = null; private Cancellable appendEntriesSchedule = null; @@ -80,26 +83,23 @@ public class Leader extends AbstractRaftActorBehavior { public Leader(RaftActorContext context) { super(context); - if (lastIndex() >= 0) { - context.setCommitIndex(lastIndex()); - } + followers = context.getPeerAddresses().keySet(); - for (String followerId : context.getPeerAddresses().keySet()) { + for (String followerId : followers) { FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, - new AtomicLong(lastIndex()), + new AtomicLong(context.getCommitIndex()), new AtomicLong(-1)); - followerToActor.put(followerId, - context.actorSelection(context.getPeerAddress(followerId))); - followerToLog.put(followerId, followerLogInformation); } - context.getLogger().debug("Election:Leader has following peers:"+followerToActor.keySet()); + if(LOG.isDebugEnabled()) { + LOG.debug("Election:Leader has following peers: {}", followers); + } - if (followerToActor.size() > 0) { - minReplicationCount = (followerToActor.size() + 1) / 2 + 1; + if (followers.size() > 0) { + minReplicationCount = (followers.size() + 1) / 2 + 1; } else { minReplicationCount = 0; } @@ -112,25 +112,41 @@ public class Leader extends AbstractRaftActorBehavior { scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); scheduleInstallSnapshotCheck( - new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, - HEART_BEAT_INTERVAL.unit()) + new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000, + context.getConfigParams().getHeartBeatInterval().unit()) ); } - @Override protected RaftState handleAppendEntries(ActorRef sender, + @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { - return state(); + if(LOG.isDebugEnabled()) { + LOG.debug(appendEntries.toString()); + } + + return this; } - @Override protected RaftState handleAppendEntriesReply(ActorRef sender, + @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { + if(! appendEntriesReply.isSuccess()) { + if(LOG.isDebugEnabled()) { + LOG.debug(appendEntriesReply.toString()); + } + } + // Update the FollowerLogInformation String followerId = appendEntriesReply.getFollowerId(); FollowerLogInformation followerLogInformation = followerToLog.get(followerId); + + if(followerLogInformation == null){ + LOG.error("Unknown follower {}", followerId); + return this; + } + if (appendEntriesReply.isSuccess()) { followerLogInformation .setMatchIndex(appendEntriesReply.getLogLastIndex()); @@ -178,7 +194,17 @@ public class Leader extends AbstractRaftActorBehavior { applyLogToStateMachine(context.getCommitIndex()); } - return state(); + return this; + } + + protected ClientRequestTracker removeClientRequestTracker(long logIndex) { + + ClientRequestTracker toRemove = findClientRequestTracker(logIndex); + if(toRemove != null) { + trackerList.remove(toRemove); + } + + return toRemove; } protected ClientRequestTracker findClientRequestTracker(long logIndex) { @@ -191,18 +217,20 @@ public class Leader extends AbstractRaftActorBehavior { return null; } - @Override protected RaftState handleRequestVoteReply(ActorRef sender, + @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) { - return state(); + return this; } @Override public RaftState state() { return RaftState.Leader; } - @Override public RaftState handleMessage(ActorRef sender, Object message) { + @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { Preconditions.checkNotNull(sender, "sender should not be null"); + Object message = fromSerializableMessage(originalMessage); + if (message instanceof RaftRPC) { RaftRPC rpc = (RaftRPC) message; // If RPC request or response contains term T > currentTerm: @@ -210,13 +238,15 @@ public class Leader extends AbstractRaftActorBehavior { // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { context.getTermInformation().updateAndPersist(rpc.getTerm(), null); - return RaftState.Follower; + + return switchBehavior(new Follower(context)); } } try { if (message instanceof SendHeartBeat) { - return sendHeartBeat(); + sendHeartBeat(); + return this; } else if(message instanceof SendInstallSnapshot) { installSnapshotIfNeeded(); } else if (message instanceof Replicate) { @@ -226,120 +256,218 @@ public class Leader extends AbstractRaftActorBehavior { (InstallSnapshotReply) message); } } finally { - scheduleHeartBeat(HEART_BEAT_INTERVAL); + scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } return super.handleMessage(sender, message); } - private void handleInstallSnapshotReply(InstallSnapshotReply message) { - InstallSnapshotReply reply = message; + private void handleInstallSnapshotReply(InstallSnapshotReply reply) { String followerId = reply.getFollowerId(); - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); + FollowerToSnapshot followerToSnapshot = + mapFollowerToSnapshot.get(followerId); + + if (followerToSnapshot != null && + followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { + + if (reply.isSuccess()) { + if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) { + //this was the last chunk reply + if(LOG.isDebugEnabled()) { + LOG.debug("InstallSnapshotReply received, " + + "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}", + reply.getChunkIndex(), followerId, + context.getReplicatedLog().getSnapshotIndex() + 1 + ); + } + + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + followerLogInformation.setMatchIndex( + context.getReplicatedLog().getSnapshotIndex()); + followerLogInformation.setNextIndex( + context.getReplicatedLog().getSnapshotIndex() + 1); + mapFollowerToSnapshot.remove(followerId); + + if(LOG.isDebugEnabled()) { + LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" + + followerToLog.get(followerId).getNextIndex().get()); + } + + } else { + followerToSnapshot.markSendStatus(true); + } + } else { + LOG.info("InstallSnapshotReply received, " + + "sending snapshot chunk failed, Will retry, Chunk:{}", + reply.getChunkIndex() + ); + followerToSnapshot.markSendStatus(false); + } - followerLogInformation - .setMatchIndex(context.getReplicatedLog().getSnapshotIndex()); - followerLogInformation - .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1); + } else { + LOG.error("ERROR!!" + + "FollowerId in InstallSnapshotReply not known to Leader" + + " or Chunk Index in InstallSnapshotReply not matching {} != {}", + followerToSnapshot.getChunkIndex(), reply.getChunkIndex() + ); + } } private void replicate(Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); - context.getLogger().debug("Replicate message " + logIndex); + if(LOG.isDebugEnabled()) { + LOG.debug("Replicate message {}", logIndex); + } - if (followerToActor.size() == 0) { - context.setCommitIndex( - replicate.getReplicatedLogEntry().getIndex()); + // Create a tracker entry we will use this later to notify the + // client actor + trackerList.add( + new ClientRequestTrackerImpl(replicate.getClientActor(), + replicate.getIdentifier(), + logIndex) + ); - context.getActor() - .tell(new ApplyState(replicate.getClientActor(), - replicate.getIdentifier(), - replicate.getReplicatedLogEntry()), - context.getActor() - ); + if (followers.size() == 0) { + context.setCommitIndex(logIndex); + applyLogToStateMachine(logIndex); } else { - - // Create a tracker entry we will use this later to notify the - // client actor - trackerList.add( - new ClientRequestTrackerImpl(replicate.getClientActor(), - replicate.getIdentifier(), - logIndex) - ); - sendAppendEntries(); } } private void sendAppendEntries() { // Send an AppendEntries to all followers - for (String followerId : followerToActor.keySet()) { - ActorSelection followerActor = - followerToActor.get(followerId); + for (String followerId : followers) { + ActorSelection followerActor = context.getPeerActorSelection(followerId); + + if (followerActor != null) { + FollowerLogInformation followerLogInformation = followerToLog.get(followerId); + long followerNextIndex = followerLogInformation.getNextIndex().get(); + List entries = Collections.emptyList(); + + if (mapFollowerToSnapshot.get(followerId) != null) { + if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { + sendSnapshotChunk(followerActor, followerId); + } + + } else { + + if (context.getReplicatedLog().isPresent(followerNextIndex)) { + // FIXME : Sending one entry at a time + entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); + + followerActor.tell( + new AppendEntries(currentTerm(), context.getId(), + prevLogIndex(followerNextIndex), + prevLogTerm(followerNextIndex), entries, + context.getCommitIndex()).toSerializable(), + actor() + ); + + } else { + // if the followers next index is not present in the leaders log, then snapshot should be sent + long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); + long leaderLastIndex = context.getReplicatedLog().lastIndex(); + if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) { + // if the follower is just not starting and leader's index + // is more than followers index + if(LOG.isDebugEnabled()) { + LOG.debug("SendInstallSnapshot to follower:{}," + + "follower-nextIndex:{}, leader-snapshot-index:{}, " + + "leader-last-index:{}", followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex + ); + } + + actor().tell(new SendInstallSnapshot(), actor()); + } else { + followerActor.tell( + new AppendEntries(currentTerm(), context.getId(), + prevLogIndex(followerNextIndex), + prevLogTerm(followerNextIndex), entries, + context.getCommitIndex()).toSerializable(), + actor() + ); + } + } + } + } + } + } - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); + /** + * An installSnapshot is scheduled at a interval that is a multiple of + * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing + * snapshots at every heartbeat. + */ + private void installSnapshotIfNeeded(){ + for (String followerId : followers) { + ActorSelection followerActor = + context.getPeerActorSelection(followerId); - long nextIndex = followerLogInformation.getNextIndex().get(); + if(followerActor != null) { + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); - List entries = Collections.emptyList(); + long nextIndex = followerLogInformation.getNextIndex().get(); - if(context.getReplicatedLog().isPresent(nextIndex)){ - // TODO: Instead of sending all entries from nextIndex - // only send a fixed number of entries to each follower - // This is to avoid the situation where there are a lot of - // entries to install for a fresh follower or to a follower - // that has fallen too far behind with the log but yet is not - // eligible to receive a snapshot - entries = - context.getReplicatedLog().getFrom(nextIndex); + if (!context.getReplicatedLog().isPresent(nextIndex) && + context.getReplicatedLog().isInSnapshot(nextIndex)) { + sendSnapshotChunk(followerActor, followerId); + } } + } + } + /** + * Sends a snapshot chunk to a given follower + * InstallSnapshot should qualify as a heartbeat too. + */ + private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { + try { followerActor.tell( - new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(nextIndex), prevLogTerm(nextIndex), - entries, context.getCommitIndex() - ), + new InstallSnapshot(currentTerm(), context.getId(), + context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm(), + getNextSnapshotChunk(followerId, + context.getReplicatedLog().getSnapshot()), + mapFollowerToSnapshot.get(followerId).incrementChunkIndex(), + mapFollowerToSnapshot.get(followerId).getTotalChunks() + ).toSerializable(), actor() ); + LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}", + followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(), + mapFollowerToSnapshot.get(followerId).getTotalChunks()); + } catch (IOException e) { + LOG.error(e, "InstallSnapshot failed for Leader."); } } /** - * An installSnapshot is scheduled at a interval that is a multiple of - * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing - * snapshots at every heartbeat. + * Acccepts snaphot as ByteString, enters into map for future chunks + * creates and return a ByteString chunk */ - private void installSnapshotIfNeeded(){ - for (String followerId : followerToActor.keySet()) { - ActorSelection followerActor = - followerToActor.get(followerId); - - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); - - long nextIndex = followerLogInformation.getNextIndex().get(); - - if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){ - followerActor.tell( - new InstallSnapshot(currentTerm(), context.getId(), - context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().getSnapshotTerm(), - context.getReplicatedLog().getSnapshot() - ), - actor() - ); - } + private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException { + FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + if (followerToSnapshot == null) { + followerToSnapshot = new FollowerToSnapshot(snapshotBytes); + mapFollowerToSnapshot.put(followerId, followerToSnapshot); } + ByteString nextChunk = followerToSnapshot.getNextChunk(); + if(LOG.isDebugEnabled()) { + LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size()); + } + + return nextChunk; } - private RaftState sendHeartBeat() { - if (followerToActor.size() > 0) { + private void sendHeartBeat() { + if (followers.size() > 0) { sendAppendEntries(); } - return state(); } private void stopHeartBeat() { @@ -355,7 +483,7 @@ public class Leader extends AbstractRaftActorBehavior { } private void scheduleHeartBeat(FiniteDuration interval) { - if(followerToActor.keySet().size() == 0){ + if(followers.size() == 0){ // Optimization - do not bother scheduling a heartbeat as there are // no followers return; @@ -377,7 +505,7 @@ public class Leader extends AbstractRaftActorBehavior { private void scheduleInstallSnapshotCheck(FiniteDuration interval) { - if(followerToActor.keySet().size() == 0){ + if(followers.size() == 0){ // Optimization - do not bother scheduling a heartbeat as there are // no followers return; @@ -404,4 +532,101 @@ public class Leader extends AbstractRaftActorBehavior { return context.getId(); } + /** + * Encapsulates the snapshot bytestring and handles the logic of sending + * snapshot chunks + */ + protected class FollowerToSnapshot { + private ByteString snapshotBytes; + private int offset = 0; + // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset + private int replyReceivedForOffset; + // if replyStatus is false, the previous chunk is attempted + private boolean replyStatus = false; + private int chunkIndex; + private int totalChunks; + + public FollowerToSnapshot(ByteString snapshotBytes) { + this.snapshotBytes = snapshotBytes; + replyReceivedForOffset = -1; + chunkIndex = 1; + int size = snapshotBytes.size(); + totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) + + ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); + if(LOG.isDebugEnabled()) { + LOG.debug("Snapshot {} bytes, total chunks to send:{}", + size, totalChunks); + } + } + + public ByteString getSnapshotBytes() { + return snapshotBytes; + } + + public int incrementOffset() { + if(replyStatus) { + // if prev chunk failed, we would want to sent the same chunk again + offset = offset + context.getConfigParams().getSnapshotChunkSize(); + } + return offset; + } + + public int incrementChunkIndex() { + if (replyStatus) { + // if prev chunk failed, we would want to sent the same chunk again + chunkIndex = chunkIndex + 1; + } + return chunkIndex; + } + + public int getChunkIndex() { + return chunkIndex; + } + + public int getTotalChunks() { + return totalChunks; + } + + public boolean canSendNextChunk() { + // we only send a false if a chunk is sent but we have not received a reply yet + return replyReceivedForOffset == offset; + } + + public boolean isLastChunk(int chunkIndex) { + return totalChunks == chunkIndex; + } + + public void markSendStatus(boolean success) { + if (success) { + // if the chunk sent was successful + replyReceivedForOffset = offset; + replyStatus = true; + } else { + // if the chunk sent was failure + replyReceivedForOffset = offset; + replyStatus = false; + } + } + + public ByteString getNextChunk() { + int snapshotLength = getSnapshotBytes().size(); + int start = incrementOffset(); + int size = context.getConfigParams().getSnapshotChunkSize(); + if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) { + size = snapshotLength; + } else { + if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) { + size = snapshotLength - start; + } + } + + if(LOG.isDebugEnabled()) { + LOG.debug("length={}, offset={},size={}", + snapshotLength, start, size); + } + return getSnapshotBytes().substring(start, start + size); + + } + } + }