X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=3336cbd9bec1bbc9feaac98610dd8bea85a85a6f;hp=d85ac8ef67ded21d6140bb42512ba5fa6eb2165d;hb=1a6cfbeec045f1f12ddf82b39345f00b3eeb69f8;hpb=e6d2fd575fa138ec597714ad3056b5dc4761c984 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index d85ac8ef67..3336cbd9be 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -14,16 +14,19 @@ import akka.actor.Cancellable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; import com.google.protobuf.ByteString; import java.io.IOException; -import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.Map.Entry; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; @@ -67,14 +70,22 @@ import scala.concurrent.duration.FiniteDuration; * set commitIndex = N (§5.3, §5.4). */ public abstract class AbstractLeader extends AbstractRaftActorBehavior { - protected final Map followerToLog = new HashMap<>(); - protected final Map mapFollowerToSnapshot = new HashMap<>(); - protected final Set followers; + // The index of the first chunk that is sent when installing a snapshot + public static final int FIRST_CHUNK_INDEX = 1; + + // The index that the follower should respond with if it needs the install snapshot to be reset + public static final int INVALID_CHUNK_INDEX = -1; + + // This would be passed as the hash code of the last chunk when sending the first chunk + public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1; + + private final Map followerToLog; + private final Map mapFollowerToSnapshot = new HashMap<>(); private Cancellable heartbeatSchedule = null; - private List trackerList = new ArrayList<>(); + private final Collection trackerList = new LinkedList<>(); protected final int minReplicationCount; @@ -82,28 +93,27 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private Optional snapshot; + private long replicatedToAllIndex = -1; + public AbstractLeader(RaftActorContext context) { super(context); - followers = context.getPeerAddresses().keySet(); - - for (String followerId : followers) { + final Builder ftlBuilder = ImmutableMap.builder(); + for (String followerId : context.getPeerAddresses().keySet()) { FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, - new AtomicLong(context.getCommitIndex()), - new AtomicLong(-1), + context.getCommitIndex(), -1, context.getConfigParams().getElectionTimeOutInterval()); - followerToLog.put(followerId, followerLogInformation); + ftlBuilder.put(followerId, followerLogInformation); } + followerToLog = ftlBuilder.build(); leaderId = context.getId(); - if(LOG.isDebugEnabled()) { - LOG.debug("Election:Leader has following peers: {}", followers); - } + LOG.debug("{}: Election: Leader has following peers: {}", context.getId(), getFollowerIds()); - minReplicationCount = getMajorityVoteCount(followers.size()); + minReplicationCount = getMajorityVoteCount(getFollowerIds().size()); // the isolated Leader peer count will be 1 less than the majority vote count. // this is because the vote count has the self vote counted in it @@ -122,6 +132,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); } + /** + * Return an immutable collection of follower identifiers. + * + * @return Collection of follower IDs + */ + protected final Collection getFollowerIds() { + return followerToLog.keySet(); + } + private Optional getSnapshot() { return snapshot; } @@ -136,7 +155,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { AppendEntries appendEntries) { if(LOG.isDebugEnabled()) { - LOG.debug(appendEntries.toString()); + LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries); } return this; @@ -148,7 +167,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if(! appendEntriesReply.isSuccess()) { if(LOG.isDebugEnabled()) { - LOG.debug(appendEntriesReply.toString()); + LOG.debug("{}: handleAppendEntriesReply: {}", context.getId(), appendEntriesReply); } } @@ -158,7 +177,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToLog.get(followerId); if(followerLogInformation == null){ - LOG.error("Unknown follower {}", followerId); + LOG.error("{}: handleAppendEntriesReply - unknown follower {}", context.getId(), followerId); return this; } @@ -188,7 +207,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { int replicatedCount = 1; for (FollowerLogInformation info : followerToLog.values()) { - if (info.getMatchIndex().get() >= N) { + if (info.getMatchIndex() >= N) { replicatedCount++; } } @@ -209,19 +228,40 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { applyLogToStateMachine(context.getCommitIndex()); } + if (!context.isSnapshotCaptureInitiated()) { + purgeInMemoryLog(); + } + return this; } - protected ClientRequestTracker removeClientRequestTracker(long logIndex) { + private void purgeInMemoryLog() { + //find the lowest index across followers which has been replicated to all. -1 if there are no followers. + // we would delete the in-mem log from that index on, in-order to minimize mem usage + // we would also share this info thru AE with the followers so that they can delete their log entries as well. + long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE; + for (FollowerLogInformation info : followerToLog.values()) { + minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex()); + } + + replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex); + } - ClientRequestTracker toRemove = findClientRequestTracker(logIndex); - if(toRemove != null) { - trackerList.remove(toRemove); + @Override + protected ClientRequestTracker removeClientRequestTracker(long logIndex) { + final Iterator it = trackerList.iterator(); + while (it.hasNext()) { + final ClientRequestTracker t = it.next(); + if (t.getIndex() == logIndex) { + it.remove(); + return t; + } } - return toRemove; + return null; } + @Override protected ClientRequestTracker findClientRequestTracker(long logIndex) { for (ClientRequestTracker tracker : trackerList) { if (tracker.getIndex() == logIndex) { @@ -254,6 +294,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { + LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {}", context.getId(), + rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); + context.getTermInformation().updateAndPersist(rpc.getTerm(), null); return switchBehavior(new Follower(context)); @@ -290,19 +333,24 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void handleInstallSnapshotReply(InstallSnapshotReply reply) { String followerId = reply.getFollowerId(); FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + + if (followerToSnapshot == null) { + LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader", + context.getId(), followerId); + return; + } + FollowerLogInformation followerLogInformation = followerToLog.get(followerId); followerLogInformation.markFollowerActive(); - if (followerToSnapshot != null && - followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { - + if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { if (reply.isSuccess()) { if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) { //this was the last chunk reply if(LOG.isDebugEnabled()) { - LOG.debug("InstallSnapshotReply received, " + + LOG.debug("{}: InstallSnapshotReply received, " + "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}", - reply.getChunkIndex(), followerId, + context.getId(), reply.getChunkIndex(), followerId, context.getReplicatedLog().getSnapshotIndex() + 1 ); } @@ -314,8 +362,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { mapFollowerToSnapshot.remove(followerId); if(LOG.isDebugEnabled()) { - LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" + - followerToLog.get(followerId).getNextIndex().get()); + LOG.debug("{}: followerToLog.get(followerId).getNextIndex()=" + + context.getId(), followerToLog.get(followerId).getNextIndex()); } if (mapFollowerToSnapshot.isEmpty()) { @@ -328,19 +376,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToSnapshot.markSendStatus(true); } } else { - LOG.info("InstallSnapshotReply received, " + - "sending snapshot chunk failed, Will retry, Chunk:{}", - reply.getChunkIndex() - ); + LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}", + context.getId(), reply.getChunkIndex()); + followerToSnapshot.markSendStatus(false); } - } else { - LOG.error("ERROR!!" + - "FollowerId in InstallSnapshotReply not known to Leader" + - " or Chunk Index in InstallSnapshotReply not matching {} != {}", - followerToSnapshot.getChunkIndex(), reply.getChunkIndex() - ); + LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}", + context.getId(), reply.getChunkIndex(), followerId, + followerToSnapshot.getChunkIndex()); + + if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){ + // Since the Follower did not find this index to be valid we should reset the follower snapshot + // so that Installing the snapshot can resume from the beginning + followerToSnapshot.reset(); + } } } @@ -348,7 +398,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long logIndex = replicate.getReplicatedLogEntry().getIndex(); if(LOG.isDebugEnabled()) { - LOG.debug("Replicate message {}", logIndex); + LOG.debug("{}: Replicate message {}", context.getId(), logIndex); } // Create a tracker entry we will use this later to notify the @@ -359,7 +409,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { logIndex) ); - if (followers.size() == 0) { + if (followerToLog.isEmpty()) { context.setCommitIndex(logIndex); applyLogToStateMachine(logIndex); } else { @@ -369,31 +419,39 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendAppendEntries() { // Send an AppendEntries to all followers - for (String followerId : followers) { + + for (Entry e : followerToLog.entrySet()) { + final String followerId = e.getKey(); ActorSelection followerActor = context.getPeerActorSelection(followerId); if (followerActor != null) { FollowerLogInformation followerLogInformation = followerToLog.get(followerId); - long followerNextIndex = followerLogInformation.getNextIndex().get(); + long followerNextIndex = followerLogInformation.getNextIndex(); boolean isFollowerActive = followerLogInformation.isFollowerActive(); - List entries = null; - if (mapFollowerToSnapshot.get(followerId) != null) { + FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + if (followerToSnapshot != null) { // if install snapshot is in process , then sent next chunk if possible - if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { + if (isFollowerActive && followerToSnapshot.canSendNextChunk()) { sendSnapshotChunk(followerActor, followerId); } else { // we send a heartbeat even if we have not received a reply for the last chunk sendAppendEntriesToFollower(followerActor, followerNextIndex, - Collections.emptyList()); + Collections.emptyList(), followerId); } } else { long leaderLastIndex = context.getReplicatedLog().lastIndex(); long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); + final List entries; + + LOG.debug("{}: Checking sendAppendEntries for {}, leaderLastIndex: {}, leaderSnapShotIndex: {}", + context.getId(), leaderLastIndex, leaderSnapShotIndex); + + if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) { + LOG.debug("{}: sendAppendEntries: {} is present for {}", context.getId(), + followerNextIndex, followerId); - if (isFollowerActive && - context.getReplicatedLog().isPresent(followerNextIndex)) { // FIXME : Sending one entry at a time entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); @@ -404,11 +462,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // then snapshot should be sent if(LOG.isDebugEnabled()) { - LOG.debug("InitiateInstallSnapshot to follower:{}," + - "follower-nextIndex:{}, leader-snapshot-index:{}, " + - "leader-last-index:{}", followerId, - followerNextIndex, leaderSnapShotIndex, leaderLastIndex - ); + LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," + + "follower-nextIndex: %s, leader-snapshot-index: %s, " + + "leader-last-index: %s", context.getId(), followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex)); } actor().tell(new InitiateInstallSnapshot(), actor()); @@ -421,22 +478,25 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { entries = Collections.emptyList(); } - sendAppendEntriesToFollower(followerActor, followerNextIndex, entries); - + sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId); } } } } private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex, - List entries) { - followerActor.tell( - new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(followerNextIndex), - prevLogTerm(followerNextIndex), entries, - context.getCommitIndex()).toSerializable(), - actor() - ); + List entries, String followerId) { + AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), + prevLogIndex(followerNextIndex), + prevLogTerm(followerNextIndex), entries, + context.getCommitIndex(), replicatedToAllIndex); + + if(!entries.isEmpty()) { + LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId, + appendEntries); + } + + followerActor.tell(appendEntries.toSerializable(), actor()); } /** @@ -456,25 +516,25 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * */ private void installSnapshotIfNeeded() { - for (String followerId : followers) { - ActorSelection followerActor = - context.getPeerActorSelection(followerId); + if(LOG.isDebugEnabled()) { + LOG.debug("{}: installSnapshotIfNeeded, followers {}", context.getId(), followerToLog.keySet()); + } - if(followerActor != null) { - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); + for (Entry e : followerToLog.entrySet()) { + final ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); - long nextIndex = followerLogInformation.getNextIndex().get(); + if (followerActor != null) { + long nextIndex = e.getValue().getNextIndex(); if (!context.getReplicatedLog().isPresent(nextIndex) && - context.getReplicatedLog().isInSnapshot(nextIndex)) { - LOG.info("{} follower needs a snapshot install", followerId); + context.getReplicatedLog().isInSnapshot(nextIndex)) { + LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey()); if (snapshot.isPresent()) { // if a snapshot is present in the memory, most likely another install is in progress // no need to capture snapshot - sendSnapshotChunk(followerActor, followerId); + sendSnapshotChunk(followerActor, e.getKey()); - } else { + } else if (!context.isSnapshotCaptureInitiated()) { initiateCaptureSnapshot(); //we just need 1 follower who would need snapshot to be installed. // when we have the snapshot captured, we would again check (in SendInstallSnapshot) @@ -490,7 +550,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // on every install snapshot, we try to capture the snapshot. // Once a capture is going on, another one issued will get ignored by RaftActor. private void initiateCaptureSnapshot() { - LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId()); + LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId()); ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied()); long lastAppliedIndex = -1; long lastAppliedTerm = -1; @@ -507,20 +567,20 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated), actor()); + context.setSnapshotCaptureInitiated(true); } private void sendInstallSnapshot() { - for (String followerId : followers) { - ActorSelection followerActor = context.getPeerActorSelection(followerId); + for (Entry e : followerToLog.entrySet()) { + ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); - if(followerActor != null) { - FollowerLogInformation followerLogInformation = followerToLog.get(followerId); - long nextIndex = followerLogInformation.getNextIndex().get(); + if (followerActor != null) { + long nextIndex = e.getValue().getNextIndex(); if (!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)) { - sendSnapshotChunk(followerActor, followerId); + sendSnapshotChunk(followerActor, e.getKey()); } } } @@ -533,22 +593,30 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { try { if (snapshot.isPresent()) { + ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get()); + + // Note: the previous call to getNextSnapshotChunk has the side-effect of adding + // followerId to the followerToSnapshot map. + FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + followerActor.tell( new InstallSnapshot(currentTerm(), context.getId(), context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm(), - getNextSnapshotChunk(followerId,snapshot.get()), - mapFollowerToSnapshot.get(followerId).incrementChunkIndex(), - mapFollowerToSnapshot.get(followerId).getTotalChunks() + nextSnapshotChunk, + followerToSnapshot.incrementChunkIndex(), + followerToSnapshot.getTotalChunks(), + Optional.of(followerToSnapshot.getLastChunkHashCode()) ).toSerializable(), actor() ); - LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}", - followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(), - mapFollowerToSnapshot.get(followerId).getTotalChunks()); + LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", + context.getId(), followerActor.path(), + followerToSnapshot.getChunkIndex(), + followerToSnapshot.getTotalChunks()); } } catch (IOException e) { - LOG.error(e, "InstallSnapshot failed for Leader."); + LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId()); } } @@ -564,13 +632,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } ByteString nextChunk = followerToSnapshot.getNextChunk(); if (LOG.isDebugEnabled()) { - LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size()); + LOG.debug("{}: Leader's snapshot nextChunk size:{}", context.getId(), nextChunk.size()); } return nextChunk; } private void sendHeartBeat() { - if (followers.size() > 0) { + if (!followerToLog.isEmpty()) { sendAppendEntries(); } } @@ -582,7 +650,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } private void scheduleHeartBeat(FiniteDuration interval) { - if(followers.size() == 0){ + if (followerToLog.isEmpty()) { // Optimization - do not bother scheduling a heartbeat as there are // no followers return; @@ -628,26 +696,28 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * snapshot chunks */ protected class FollowerToSnapshot { - private ByteString snapshotBytes; + private final ByteString snapshotBytes; private int offset = 0; // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset private int replyReceivedForOffset; // if replyStatus is false, the previous chunk is attempted private boolean replyStatus = false; private int chunkIndex; - private int totalChunks; + private final int totalChunks; + private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; + private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; public FollowerToSnapshot(ByteString snapshotBytes) { this.snapshotBytes = snapshotBytes; - replyReceivedForOffset = -1; - chunkIndex = 1; int size = snapshotBytes.size(); totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) + ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); if(LOG.isDebugEnabled()) { - LOG.debug("Snapshot {} bytes, total chunks to send:{}", - size, totalChunks); + LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}", + context.getId(), size, totalChunks); } + replyReceivedForOffset = -1; + chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; } public ByteString getSnapshotBytes() { @@ -692,6 +762,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // if the chunk sent was successful replyReceivedForOffset = offset; replyStatus = true; + lastChunkHashCode = nextChunkHashCode; } else { // if the chunk sent was failure replyReceivedForOffset = offset; @@ -712,27 +783,64 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } if(LOG.isDebugEnabled()) { - LOG.debug("length={}, offset={},size={}", + LOG.debug("{}: Next chunk: length={}, offset={},size={}", context.getId(), snapshotLength, start, size); } - return getSnapshotBytes().substring(start, start + size); + ByteString substring = getSnapshotBytes().substring(start, start + size); + nextChunkHashCode = substring.hashCode(); + return substring; + } + /** + * reset should be called when the Follower needs to be sent the snapshot from the beginning + */ + public void reset(){ + offset = 0; + replyStatus = false; + replyReceivedForOffset = offset; + chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; + lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; + } + + public int getLastChunkHashCode() { + return lastChunkHashCode; } } // called from example-actor for printing the follower-states public String printFollowerStates() { - StringBuilder sb = new StringBuilder(); - for(FollowerLogInformation followerLogInformation : followerToLog.values()) { - boolean isFollowerActive = followerLogInformation.isFollowerActive(); - sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},"); + final StringBuilder sb = new StringBuilder(); + sb.append('['); + for (FollowerLogInformation followerLogInformation : followerToLog.values()) { + sb.append('{'); + sb.append(followerLogInformation.getId()); + sb.append(" state:"); + sb.append(followerLogInformation.isFollowerActive()); + sb.append("},"); } - return "[" + sb.toString() + "]"; + sb.append(']'); + + return sb.toString(); + } + + @VisibleForTesting + public FollowerLogInformation getFollower(String followerId) { + return followerToLog.get(followerId); + } + + @VisibleForTesting + protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) { + mapFollowerToSnapshot.put(followerId, snapshot); + } + + @VisibleForTesting + public int followerSnapshotSize() { + return mapFollowerToSnapshot.size(); } @VisibleForTesting - void markFollowerActive(String followerId) { - followerToLog.get(followerId).markFollowerActive(); + public int followerLogSize() { + return followerToLog.size(); } }