X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeader.java;h=0dd39001136a25416c241eb8fb69085266d3752c;hp=d83362b58081c0e4c4576a848bf10ca29d8fc7da;hb=8cf40f4741c70a760dadb4300946c1dc88f95611;hpb=df4aabedd8d9a63da661845c70042102f36a14b5 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index d83362b580..0dd3900113 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -9,42 +9,14 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; -import akka.actor.ActorSelection; import akka.actor.Cancellable; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; -import org.opendaylight.controller.cluster.raft.ClientRequestTracker; -import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; -import org.opendaylight.controller.cluster.raft.FollowerLogInformation; -import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; -import org.opendaylight.controller.cluster.raft.RaftState; -import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot; -import org.opendaylight.controller.cluster.raft.base.messages.Replicate; -import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; -import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; -import org.opendaylight.controller.cluster.raft.messages.AppendEntries; -import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; -import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; -import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; -import org.opendaylight.controller.cluster.raft.messages.RaftRPC; -import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck; import scala.concurrent.duration.FiniteDuration; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - /** * The behavior of a RaftActor when it is in the Leader state *

@@ -67,546 +39,41 @@ import java.util.concurrent.atomic.AtomicLong; * of matchIndex[i] ≥ N, and log[N].term == currentTerm: * set commitIndex = N (§5.3, §5.4). */ -public class Leader extends AbstractRaftActorBehavior { - - - protected final Map followerToLog = new HashMap<>(); - protected final Map mapFollowerToSnapshot = new HashMap<>(); - - private final Set followers; - - private Cancellable heartbeatSchedule = null; +public class Leader extends AbstractLeader { private Cancellable installSnapshotSchedule = null; - - private List trackerList = new ArrayList<>(); - - private final int minReplicationCount; - - private Optional snapshot; + private Cancellable isolatedLeaderCheckSchedule = null; public Leader(RaftActorContext context) { super(context); - followers = context.getPeerAddresses().keySet(); - - for (String followerId : followers) { - FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(followerId, - new AtomicLong(context.getCommitIndex()), - new AtomicLong(-1), - context.getConfigParams().getElectionTimeOutInterval()); - - followerToLog.put(followerId, followerLogInformation); - } - - if(LOG.isDebugEnabled()) { - LOG.debug("Election:Leader has following peers: {}", followers); - } - - if (followers.size() > 0) { - minReplicationCount = (followers.size() + 1) / 2 + 1; - } else { - minReplicationCount = 0; - } - - snapshot = Optional.absent(); - - // Immediately schedule a heartbeat - // Upon election: send initial empty AppendEntries RPCs - // (heartbeat) to each server; repeat during idle periods to - // prevent election timeouts (§5.2) - scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); - - scheduleInstallSnapshotCheck( - new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000, - context.getConfigParams().getHeartBeatInterval().unit()) - ); - - } - - private Optional getSnapshot() { - return snapshot; - } - - @VisibleForTesting - void setSnapshot(Optional snapshot) { - this.snapshot = snapshot; - } - - @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, - AppendEntries appendEntries) { - - if(LOG.isDebugEnabled()) { - LOG.debug(appendEntries.toString()); - } - - return this; - } - - @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, - AppendEntriesReply appendEntriesReply) { - - if(! appendEntriesReply.isSuccess()) { - if(LOG.isDebugEnabled()) { - LOG.debug(appendEntriesReply.toString()); - } - } - - // Update the FollowerLogInformation - String followerId = appendEntriesReply.getFollowerId(); - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); - - if(followerLogInformation == null){ - LOG.error("Unknown follower {}", followerId); - return this; - } - - followerLogInformation.markFollowerActive(); - - if (appendEntriesReply.isSuccess()) { - followerLogInformation - .setMatchIndex(appendEntriesReply.getLogLastIndex()); - followerLogInformation - .setNextIndex(appendEntriesReply.getLogLastIndex() + 1); - } else { - - // TODO: When we find that the follower is out of sync with the - // Leader we simply decrement that followers next index by 1. - // Would it be possible to do better than this? The RAFT spec - // does not explicitly deal with it but may be something for us to - // think about - - followerLogInformation.decrNextIndex(); - } - - // Now figure out if this reply warrants a change in the commitIndex - // If there exists an N such that N > commitIndex, a majority - // of matchIndex[i] ≥ N, and log[N].term == currentTerm: - // set commitIndex = N (§5.3, §5.4). - for (long N = context.getCommitIndex() + 1; ; N++) { - int replicatedCount = 1; - - for (FollowerLogInformation info : followerToLog.values()) { - if (info.getMatchIndex().get() >= N) { - replicatedCount++; - } - } - - if (replicatedCount >= minReplicationCount) { - ReplicatedLogEntry replicatedLogEntry = - context.getReplicatedLog().get(N); - if (replicatedLogEntry != null - && replicatedLogEntry.getTerm() - == currentTerm()) { - context.setCommitIndex(N); - } - } else { - break; - } - } - - // Apply the change to the state machine - if (context.getCommitIndex() > context.getLastApplied()) { - applyLogToStateMachine(context.getCommitIndex()); - } - - return this; - } - - protected ClientRequestTracker removeClientRequestTracker(long logIndex) { - - ClientRequestTracker toRemove = findClientRequestTracker(logIndex); - if(toRemove != null) { - trackerList.remove(toRemove); - } - - return toRemove; - } - - protected ClientRequestTracker findClientRequestTracker(long logIndex) { - for (ClientRequestTracker tracker : trackerList) { - if (tracker.getIndex() == logIndex) { - return tracker; - } - } - - return null; - } - - @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, - RequestVoteReply requestVoteReply) { - return this; - } + scheduleInstallSnapshotCheck(context.getConfigParams().getIsolatedCheckInterval()); - @Override public RaftState state() { - return RaftState.Leader; + scheduleIsolatedLeaderCheck( + new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 10, + context.getConfigParams().getHeartBeatInterval().unit())); } @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { Preconditions.checkNotNull(sender, "sender should not be null"); - Object message = fromSerializableMessage(originalMessage); - - if (message instanceof RaftRPC) { - RaftRPC rpc = (RaftRPC) message; - // If RPC request or response contains term T > currentTerm: - // set currentTerm = T, convert to follower (§5.1) - // This applies to all RPC messages and responses - if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { - context.getTermInformation().updateAndPersist(rpc.getTerm(), null); - - return switchBehavior(new Follower(context)); - } - } - - try { - if (message instanceof SendHeartBeat) { - sendHeartBeat(); - return this; - - } else if(message instanceof InitiateInstallSnapshot) { - installSnapshotIfNeeded(); - - } else if(message instanceof SendInstallSnapshot) { - // received from RaftActor - setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot())); - sendInstallSnapshot(); - - } else if (message instanceof Replicate) { - replicate((Replicate) message); - - } else if (message instanceof InstallSnapshotReply){ - handleInstallSnapshotReply( - (InstallSnapshotReply) message); - } - } finally { - scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); - } - - return super.handleMessage(sender, message); - } - - private void handleInstallSnapshotReply(InstallSnapshotReply reply) { - String followerId = reply.getFollowerId(); - FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); - FollowerLogInformation followerLogInformation = followerToLog.get(followerId); - followerLogInformation.markFollowerActive(); - - if (followerToSnapshot != null && - followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { - - if (reply.isSuccess()) { - if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) { - //this was the last chunk reply - if(LOG.isDebugEnabled()) { - LOG.debug("InstallSnapshotReply received, " + - "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}", - reply.getChunkIndex(), followerId, - context.getReplicatedLog().getSnapshotIndex() + 1 - ); - } - - followerLogInformation.setMatchIndex( - context.getReplicatedLog().getSnapshotIndex()); - followerLogInformation.setNextIndex( - context.getReplicatedLog().getSnapshotIndex() + 1); - mapFollowerToSnapshot.remove(followerId); - - if(LOG.isDebugEnabled()) { - LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" + - followerToLog.get(followerId).getNextIndex().get()); - } - - if (mapFollowerToSnapshot.isEmpty()) { - // once there are no pending followers receiving snapshots - // we can remove snapshot from the memory - setSnapshot(Optional.absent()); - } - - } else { - followerToSnapshot.markSendStatus(true); - } - } else { - LOG.info("InstallSnapshotReply received, " + - "sending snapshot chunk failed, Will retry, Chunk:{}", - reply.getChunkIndex() - ); - followerToSnapshot.markSendStatus(false); + if (originalMessage instanceof IsolatedLeaderCheck) { + if (isLeaderIsolated()) { + LOG.info("At least {} followers need to be active, Switching {} from Leader to IsolatedLeader", + minIsolatedLeaderPeerCount, leaderId); + return switchBehavior(new IsolatedLeader(context)); } - - } else { - LOG.error("ERROR!!" + - "FollowerId in InstallSnapshotReply not known to Leader" + - " or Chunk Index in InstallSnapshotReply not matching {} != {}", - followerToSnapshot.getChunkIndex(), reply.getChunkIndex() - ); - } - } - - private void replicate(Replicate replicate) { - long logIndex = replicate.getReplicatedLogEntry().getIndex(); - - if(LOG.isDebugEnabled()) { - LOG.debug("Replicate message {}", logIndex); } - // Create a tracker entry we will use this later to notify the - // client actor - trackerList.add( - new ClientRequestTrackerImpl(replicate.getClientActor(), - replicate.getIdentifier(), - logIndex) - ); - - if (followers.size() == 0) { - context.setCommitIndex(logIndex); - applyLogToStateMachine(logIndex); - } else { - sendAppendEntries(); - } + return super.handleMessage(sender, originalMessage); } - private void sendAppendEntries() { - // Send an AppendEntries to all followers - for (String followerId : followers) { - ActorSelection followerActor = context.getPeerActorSelection(followerId); - - if (followerActor != null) { - FollowerLogInformation followerLogInformation = followerToLog.get(followerId); - long followerNextIndex = followerLogInformation.getNextIndex().get(); - boolean isFollowerActive = followerLogInformation.isFollowerActive(); - List entries = null; - - if (mapFollowerToSnapshot.get(followerId) != null) { - // if install snapshot is in process , then sent next chunk if possible - if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { - sendSnapshotChunk(followerActor, followerId); - } else { - // we send a heartbeat even if we have not received a reply for the last chunk - sendAppendEntriesToFollower(followerActor, followerNextIndex, - Collections.emptyList()); - } - - } else { - long leaderLastIndex = context.getReplicatedLog().lastIndex(); - long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); - - if (isFollowerActive && - context.getReplicatedLog().isPresent(followerNextIndex)) { - // FIXME : Sending one entry at a time - entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); - - } else if (isFollowerActive && followerNextIndex >= 0 && - leaderLastIndex >= followerNextIndex ) { - // if the followers next index is not present in the leaders log, and - // if the follower is just not starting and if leader's index is more than followers index - // then snapshot should be sent - - if(LOG.isDebugEnabled()) { - LOG.debug("InitiateInstallSnapshot to follower:{}," + - "follower-nextIndex:{}, leader-snapshot-index:{}, " + - "leader-last-index:{}", followerId, - followerNextIndex, leaderSnapShotIndex, leaderLastIndex - ); - } - actor().tell(new InitiateInstallSnapshot(), actor()); - - // we would want to sent AE as the capture snapshot might take time - entries = Collections.emptyList(); - - } else { - //we send an AppendEntries, even if the follower is inactive - // in-order to update the followers timestamp, in case it becomes active again - entries = Collections.emptyList(); - } - - sendAppendEntriesToFollower(followerActor, followerNextIndex, entries); - - } - } - } - } - - private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex, - List entries) { - followerActor.tell( - new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(followerNextIndex), - prevLogTerm(followerNextIndex), entries, - context.getCommitIndex()).toSerializable(), - actor() - ); - } - - /** - * An installSnapshot is scheduled at a interval that is a multiple of - * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing - * snapshots at every heartbeat. - * - * Install Snapshot works as follows - * 1. Leader sends a InitiateInstallSnapshot message to self - * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor - * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log - * and makes a call to Leader's handleMessage , with SendInstallSnapshot message. - * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower - * 5. On complete, Follower sends back a InstallSnapshotReply. - * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower - * and replenishes the memory by deleting the snapshot in Replicated log. - * - */ - private void installSnapshotIfNeeded() { - for (String followerId : followers) { - ActorSelection followerActor = - context.getPeerActorSelection(followerId); - - if(followerActor != null) { - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); - - long nextIndex = followerLogInformation.getNextIndex().get(); - - if (!context.getReplicatedLog().isPresent(nextIndex) && - context.getReplicatedLog().isInSnapshot(nextIndex)) { - LOG.info("{} follower needs a snapshot install", followerId); - if (snapshot.isPresent()) { - // if a snapshot is present in the memory, most likely another install is in progress - // no need to capture snapshot - sendSnapshotChunk(followerActor, followerId); - - } else { - initiateCaptureSnapshot(); - //we just need 1 follower who would need snapshot to be installed. - // when we have the snapshot captured, we would again check (in SendInstallSnapshot) - // who needs an install and send to all who need - break; - } - - } - } - } - } - - // on every install snapshot, we try to capture the snapshot. - // Once a capture is going on, another one issued will get ignored by RaftActor. - private void initiateCaptureSnapshot() { - LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId()); - ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied()); - long lastAppliedIndex = -1; - long lastAppliedTerm = -1; - - if (lastAppliedEntry != null) { - lastAppliedIndex = lastAppliedEntry.getIndex(); - lastAppliedTerm = lastAppliedEntry.getTerm(); - } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { - lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex(); - lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm(); - } - - boolean isInstallSnapshotInitiated = true; - actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), - lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated), - actor()); - } - - - private void sendInstallSnapshot() { - for (String followerId : followers) { - ActorSelection followerActor = context.getPeerActorSelection(followerId); - - if(followerActor != null) { - FollowerLogInformation followerLogInformation = followerToLog.get(followerId); - long nextIndex = followerLogInformation.getNextIndex().get(); - - if (!context.getReplicatedLog().isPresent(nextIndex) && - context.getReplicatedLog().isInSnapshot(nextIndex)) { - sendSnapshotChunk(followerActor, followerId); - } - } - } - } - - /** - * Sends a snapshot chunk to a given follower - * InstallSnapshot should qualify as a heartbeat too. - */ - private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { - try { - if (snapshot.isPresent()) { - followerActor.tell( - new InstallSnapshot(currentTerm(), context.getId(), - context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().getSnapshotTerm(), - getNextSnapshotChunk(followerId,snapshot.get()), - mapFollowerToSnapshot.get(followerId).incrementChunkIndex(), - mapFollowerToSnapshot.get(followerId).getTotalChunks() - ).toSerializable(), - actor() - ); - LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}", - followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(), - mapFollowerToSnapshot.get(followerId).getTotalChunks()); - } - } catch (IOException e) { - LOG.error(e, "InstallSnapshot failed for Leader."); - } - } - - /** - * Acccepts snaphot as ByteString, enters into map for future chunks - * creates and return a ByteString chunk - */ - private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException { - FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); - if (followerToSnapshot == null) { - followerToSnapshot = new FollowerToSnapshot(snapshotBytes); - mapFollowerToSnapshot.put(followerId, followerToSnapshot); - } - ByteString nextChunk = followerToSnapshot.getNextChunk(); - if (LOG.isDebugEnabled()) { - LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size()); - } - return nextChunk; - } - - private void sendHeartBeat() { - if (followers.size() > 0) { - sendAppendEntries(); - } - } - - private void stopHeartBeat() { - if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) { - heartbeatSchedule.cancel(); - } - } - - private void stopInstallSnapshotSchedule() { + protected void stopInstallSnapshotSchedule() { if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) { installSnapshotSchedule.cancel(); } } - private void scheduleHeartBeat(FiniteDuration interval) { - if(followers.size() == 0){ - // Optimization - do not bother scheduling a heartbeat as there are - // no followers - return; - } - - stopHeartBeat(); - - // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat - // message is sent to itself. - // Scheduling the heartbeat only once here because heartbeats do not - // need to be sent if there are other messages being sent to the remote - // actor. - heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce( - interval, context.getActor(), new SendHeartBeat(), - context.getActorSystem().dispatcher(), context.getActor()); - } - - private void scheduleInstallSnapshotCheck(FiniteDuration interval) { + protected void scheduleInstallSnapshotCheck(FiniteDuration interval) { if(followers.size() == 0){ // Optimization - do not bother scheduling a heartbeat as there are // no followers @@ -624,122 +91,22 @@ public class Leader extends AbstractRaftActorBehavior { context.getActorSystem().dispatcher(), context.getActor()); } - - - @Override public void close() throws Exception { - stopHeartBeat(); - } - - @Override public String getLeaderId() { - return context.getId(); - } - - /** - * Encapsulates the snapshot bytestring and handles the logic of sending - * snapshot chunks - */ - protected class FollowerToSnapshot { - private ByteString snapshotBytes; - private int offset = 0; - // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset - private int replyReceivedForOffset; - // if replyStatus is false, the previous chunk is attempted - private boolean replyStatus = false; - private int chunkIndex; - private int totalChunks; - - public FollowerToSnapshot(ByteString snapshotBytes) { - this.snapshotBytes = snapshotBytes; - replyReceivedForOffset = -1; - chunkIndex = 1; - int size = snapshotBytes.size(); - totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) + - ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); - if(LOG.isDebugEnabled()) { - LOG.debug("Snapshot {} bytes, total chunks to send:{}", - size, totalChunks); - } - } - - public ByteString getSnapshotBytes() { - return snapshotBytes; - } - - public int incrementOffset() { - if(replyStatus) { - // if prev chunk failed, we would want to sent the same chunk again - offset = offset + context.getConfigParams().getSnapshotChunkSize(); - } - return offset; - } - - public int incrementChunkIndex() { - if (replyStatus) { - // if prev chunk failed, we would want to sent the same chunk again - chunkIndex = chunkIndex + 1; - } - return chunkIndex; - } - - public int getChunkIndex() { - return chunkIndex; - } - - public int getTotalChunks() { - return totalChunks; - } - - public boolean canSendNextChunk() { - // we only send a false if a chunk is sent but we have not received a reply yet - return replyReceivedForOffset == offset; - } - - public boolean isLastChunk(int chunkIndex) { - return totalChunks == chunkIndex; - } - - public void markSendStatus(boolean success) { - if (success) { - // if the chunk sent was successful - replyReceivedForOffset = offset; - replyStatus = true; - } else { - // if the chunk sent was failure - replyReceivedForOffset = offset; - replyStatus = false; - } - } - - public ByteString getNextChunk() { - int snapshotLength = getSnapshotBytes().size(); - int start = incrementOffset(); - int size = context.getConfigParams().getSnapshotChunkSize(); - if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) { - size = snapshotLength; - } else { - if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) { - size = snapshotLength - start; - } - } - - if(LOG.isDebugEnabled()) { - LOG.debug("length={}, offset={},size={}", - snapshotLength, start, size); - } - return getSnapshotBytes().substring(start, start + size); - + protected void stopIsolatedLeaderCheckSchedule() { + if (isolatedLeaderCheckSchedule != null && !isolatedLeaderCheckSchedule.isCancelled()) { + isolatedLeaderCheckSchedule.cancel(); } } - // called from example-actor for printing the follower-states - public String printFollowerStates() { - StringBuilder sb = new StringBuilder(); - for(FollowerLogInformation followerLogInformation : followerToLog.values()) { - boolean isFollowerActive = followerLogInformation.isFollowerActive(); - sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},"); + protected void scheduleIsolatedLeaderCheck(FiniteDuration isolatedCheckInterval) { + isolatedLeaderCheckSchedule = context.getActorSystem().scheduler().schedule(isolatedCheckInterval, isolatedCheckInterval, + context.getActor(), new IsolatedLeaderCheck(), + context.getActorSystem().dispatcher(), context.getActor()); + } - } - return "[" + sb.toString() + "]"; + @Override public void close() throws Exception { + stopInstallSnapshotSchedule(); + stopIsolatedLeaderCheckSchedule(); + super.close(); } @VisibleForTesting