import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
-import akka.event.LoggingAdapter;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
public class Leader extends AbstractRaftActorBehavior {
- protected final Map<String, FollowerLogInformation> followerToLog =
- new HashMap();
+ protected final Map<String, FollowerLogInformation> followerToLog = new HashMap();
protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
private final Set<String> followers;
private Cancellable heartbeatSchedule = null;
- private Cancellable appendEntriesSchedule = null;
private Cancellable installSnapshotSchedule = null;
private List<ClientRequestTracker> trackerList = new ArrayList<>();
private final int minReplicationCount;
- private final LoggingAdapter LOG;
+ private Optional<ByteString> snapshot;
public Leader(RaftActorContext context) {
super(context);
- LOG = context.getLogger();
-
- if (lastIndex() >= 0) {
- context.setCommitIndex(lastIndex());
- }
-
followers = context.getPeerAddresses().keySet();
for (String followerId : followers) {
FollowerLogInformation followerLogInformation =
new FollowerLogInformationImpl(followerId,
- new AtomicLong(lastIndex()),
- new AtomicLong(-1));
+ new AtomicLong(context.getCommitIndex()),
+ new AtomicLong(-1),
+ context.getConfigParams().getElectionTimeOutInterval());
followerToLog.put(followerId, followerLogInformation);
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Election:Leader has following peers:" + followers);
+ LOG.debug("Election:Leader has following peers: {}", followers);
}
if (followers.size() > 0) {
minReplicationCount = 0;
}
+ snapshot = Optional.absent();
// Immediately schedule a heartbeat
// Upon election: send initial empty AppendEntries RPCs
}
- @Override protected RaftState handleAppendEntries(ActorRef sender,
+ private Optional<ByteString> getSnapshot() {
+ return snapshot;
+ }
+
+ @VisibleForTesting
+ void setSnapshot(Optional<ByteString> snapshot) {
+ this.snapshot = snapshot;
+ }
+
+ @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
if(LOG.isDebugEnabled()) {
LOG.debug(appendEntries.toString());
}
- return state();
+ return this;
}
- @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+ @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
AppendEntriesReply appendEntriesReply) {
if(! appendEntriesReply.isSuccess()) {
if(followerLogInformation == null){
LOG.error("Unknown follower {}", followerId);
- return state();
+ return this;
}
+ followerLogInformation.markFollowerActive();
+
if (appendEntriesReply.isSuccess()) {
followerLogInformation
.setMatchIndex(appendEntriesReply.getLogLastIndex());
applyLogToStateMachine(context.getCommitIndex());
}
- return state();
+ return this;
}
protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
return null;
}
- @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+ @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply) {
- return state();
+ return this;
}
@Override public RaftState state() {
return RaftState.Leader;
}
- @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+ @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
Preconditions.checkNotNull(sender, "sender should not be null");
Object message = fromSerializableMessage(originalMessage);
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
- return RaftState.Follower;
+
+ return switchBehavior(new Follower(context));
}
}
try {
if (message instanceof SendHeartBeat) {
- return sendHeartBeat();
- } else if(message instanceof SendInstallSnapshot) {
+ sendHeartBeat();
+ return this;
+
+ } else if(message instanceof InitiateInstallSnapshot) {
installSnapshotIfNeeded();
+
+ } else if(message instanceof SendInstallSnapshot) {
+ // received from RaftActor
+ setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
+ sendInstallSnapshot();
+
} else if (message instanceof Replicate) {
replicate((Replicate) message);
+
} else if (message instanceof InstallSnapshotReply){
handleInstallSnapshotReply(
(InstallSnapshotReply) message);
private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
String followerId = reply.getFollowerId();
- FollowerToSnapshot followerToSnapshot =
- mapFollowerToSnapshot.get(followerId);
+ FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+ FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+ followerLogInformation.markFollowerActive();
if (followerToSnapshot != null &&
followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
);
}
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
followerLogInformation.setMatchIndex(
context.getReplicatedLog().getSnapshotIndex());
followerLogInformation.setNextIndex(
followerToLog.get(followerId).getNextIndex().get());
}
+ if (mapFollowerToSnapshot.isEmpty()) {
+ // once there are no pending followers receiving snapshots
+ // we can remove snapshot from the memory
+ setSnapshot(Optional.<ByteString>absent());
+ }
+
} else {
followerToSnapshot.markSendStatus(true);
}
long logIndex = replicate.getReplicatedLogEntry().getIndex();
if(LOG.isDebugEnabled()) {
- LOG.debug("Replicate message " + logIndex);
+ LOG.debug("Replicate message {}", logIndex);
}
// Create a tracker entry we will use this later to notify the
if (followerActor != null) {
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
long followerNextIndex = followerLogInformation.getNextIndex().get();
- List<ReplicatedLogEntry> entries = Collections.emptyList();
+ boolean isFollowerActive = followerLogInformation.isFollowerActive();
+ List<ReplicatedLogEntry> entries = null;
if (mapFollowerToSnapshot.get(followerId) != null) {
- if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+ // if install snapshot is in process , then sent next chunk if possible
+ if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
sendSnapshotChunk(followerActor, followerId);
+ } else {
+ // we send a heartbeat even if we have not received a reply for the last chunk
+ sendAppendEntriesToFollower(followerActor, followerNextIndex,
+ Collections.<ReplicatedLogEntry>emptyList());
}
} else {
+ long leaderLastIndex = context.getReplicatedLog().lastIndex();
+ long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
- if (context.getReplicatedLog().isPresent(followerNextIndex)) {
+ if (isFollowerActive &&
+ context.getReplicatedLog().isPresent(followerNextIndex)) {
// FIXME : Sending one entry at a time
entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
- followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(followerNextIndex),
- prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex()).toSerializable(),
- actor()
- );
-
- } else {
- // if the followers next index is not present in the leaders log, then snapshot should be sent
- long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
- long leaderLastIndex = context.getReplicatedLog().lastIndex();
- if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
- // if the follower is just not starting and leader's index
- // is more than followers index
- if(LOG.isDebugEnabled()) {
- LOG.debug("SendInstallSnapshot to follower:{}," +
- "follower-nextIndex:{}, leader-snapshot-index:{}, " +
- "leader-last-index:{}", followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex
- );
- }
-
- actor().tell(new SendInstallSnapshot(), actor());
- } else {
- followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(followerNextIndex),
- prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex()).toSerializable(),
- actor()
+ } else if (isFollowerActive && followerNextIndex >= 0 &&
+ leaderLastIndex >= followerNextIndex ) {
+ // if the followers next index is not present in the leaders log, and
+ // if the follower is just not starting and if leader's index is more than followers index
+ // then snapshot should be sent
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("InitiateInstallSnapshot to follower:{}," +
+ "follower-nextIndex:{}, leader-snapshot-index:{}, " +
+ "leader-last-index:{}", followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex
);
}
+ actor().tell(new InitiateInstallSnapshot(), actor());
+
+ // we would want to sent AE as the capture snapshot might take time
+ entries = Collections.<ReplicatedLogEntry>emptyList();
+
+ } else {
+ //we send an AppendEntries, even if the follower is inactive
+ // in-order to update the followers timestamp, in case it becomes active again
+ entries = Collections.<ReplicatedLogEntry>emptyList();
}
+
+ sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
+
}
}
}
}
+ private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
+ List<ReplicatedLogEntry> entries) {
+ followerActor.tell(
+ new AppendEntries(currentTerm(), context.getId(),
+ prevLogIndex(followerNextIndex),
+ prevLogTerm(followerNextIndex), entries,
+ context.getCommitIndex()).toSerializable(),
+ actor()
+ );
+ }
+
/**
* An installSnapshot is scheduled at a interval that is a multiple of
* a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
* snapshots at every heartbeat.
+ *
+ * Install Snapshot works as follows
+ * 1. Leader sends a InitiateInstallSnapshot message to self
+ * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor
+ * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
+ * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
+ * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
+ * 5. On complete, Follower sends back a InstallSnapshotReply.
+ * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
+ * and replenishes the memory by deleting the snapshot in Replicated log.
+ *
*/
- private void installSnapshotIfNeeded(){
+ private void installSnapshotIfNeeded() {
for (String followerId : followers) {
ActorSelection followerActor =
context.getPeerActorSelection(followerId);
long nextIndex = followerLogInformation.getNextIndex().get();
+ if (!context.getReplicatedLog().isPresent(nextIndex) &&
+ context.getReplicatedLog().isInSnapshot(nextIndex)) {
+ LOG.info("{} follower needs a snapshot install", followerId);
+ if (snapshot.isPresent()) {
+ // if a snapshot is present in the memory, most likely another install is in progress
+ // no need to capture snapshot
+ sendSnapshotChunk(followerActor, followerId);
+
+ } else {
+ initiateCaptureSnapshot();
+ //we just need 1 follower who would need snapshot to be installed.
+ // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
+ // who needs an install and send to all who need
+ break;
+ }
+
+ }
+ }
+ }
+ }
+
+ // on every install snapshot, we try to capture the snapshot.
+ // Once a capture is going on, another one issued will get ignored by RaftActor.
+ private void initiateCaptureSnapshot() {
+ LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
+ ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
+ long lastAppliedIndex = -1;
+ long lastAppliedTerm = -1;
+
+ if (lastAppliedEntry != null) {
+ lastAppliedIndex = lastAppliedEntry.getIndex();
+ lastAppliedTerm = lastAppliedEntry.getTerm();
+ } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
+ lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
+ lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
+ }
+
+ boolean isInstallSnapshotInitiated = true;
+ actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
+ lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
+ actor());
+ }
+
+
+ private void sendInstallSnapshot() {
+ for (String followerId : followers) {
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
+
+ if(followerActor != null) {
+ FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+ long nextIndex = followerLogInformation.getNextIndex().get();
+
if (!context.getReplicatedLog().isPresent(nextIndex) &&
context.getReplicatedLog().isInSnapshot(nextIndex)) {
sendSnapshotChunk(followerActor, followerId);
*/
private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
try {
- followerActor.tell(
- new InstallSnapshot(currentTerm(), context.getId(),
- context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().getSnapshotTerm(),
- getNextSnapshotChunk(followerId,
- context.getReplicatedLog().getSnapshot()),
- mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks()
- ).toSerializable(),
- actor()
- );
- LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
- followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks());
+ if (snapshot.isPresent()) {
+ followerActor.tell(
+ new InstallSnapshot(currentTerm(), context.getId(),
+ context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm(),
+ getNextSnapshotChunk(followerId,snapshot.get()),
+ mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
+ mapFollowerToSnapshot.get(followerId).getTotalChunks()
+ ).toSerializable(),
+ actor()
+ );
+ LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
+ followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
+ mapFollowerToSnapshot.get(followerId).getTotalChunks());
+ }
} catch (IOException e) {
- LOG.error("InstallSnapshot failed for Leader.", e);
+ LOG.error(e, "InstallSnapshot failed for Leader.");
}
}
mapFollowerToSnapshot.put(followerId, followerToSnapshot);
}
ByteString nextChunk = followerToSnapshot.getNextChunk();
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
}
-
return nextChunk;
}
- private RaftState sendHeartBeat() {
+ private void sendHeartBeat() {
if (followers.size() > 0) {
sendAppendEntries();
}
- return state();
}
private void stopHeartBeat() {
// Scheduling the heartbeat only once here because heartbeats do not
// need to be sent if there are other messages being sent to the remote
// actor.
- heartbeatSchedule =
- context.getActorSystem().scheduler().scheduleOnce(
- interval,
- context.getActor(), new SendHeartBeat(),
- context.getActorSystem().dispatcher(), context.getActor());
+ heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
+ interval, context.getActor(), new SendHeartBeat(),
+ context.getActorSystem().dispatcher(), context.getActor());
}
-
private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
if(followers.size() == 0){
// Optimization - do not bother scheduling a heartbeat as there are
installSnapshotSchedule =
context.getActorSystem().scheduler().scheduleOnce(
interval,
- context.getActor(), new SendInstallSnapshot(),
+ context.getActor(), new InitiateInstallSnapshot(),
context.getActorSystem().dispatcher(), context.getActor());
}
}
}
+ // called from example-actor for printing the follower-states
+ public String printFollowerStates() {
+ StringBuilder sb = new StringBuilder();
+ for(FollowerLogInformation followerLogInformation : followerToLog.values()) {
+ boolean isFollowerActive = followerLogInformation.isFollowerActive();
+ sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},");
+
+ }
+ return "[" + sb.toString() + "]";
+ }
+
+ @VisibleForTesting
+ void markFollowerActive(String followerId) {
+ followerToLog.get(followerId).markFollowerActive();
+ }
}