import org.opendaylight.controller.cluster.example.messages.PrintState;
import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
/**
} else if (message instanceof PrintRole) {
if(LOG.isDebugEnabled()) {
- LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
+ String followers = "";
+ if (getRaftState() == RaftState.Leader) {
+ followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
+ LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getPeers(), followers);
+ } else {
+ LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
+ }
+
+
}
} else {
public class ExampleConfigParamsImpl extends DefaultConfigParamsImpl {
@Override
public long getSnapshotBatchCount() {
- return 50;
+ return 25;
}
@Override
*/
package org.opendaylight.controller.cluster.raft;
-import com.google.protobuf.ByteString;
-
import java.util.ArrayList;
import java.util.List;
// We define this as ArrayList so we can use ensureCapacity.
protected ArrayList<ReplicatedLogEntry> journal;
- protected ByteString snapshot;
+
protected long snapshotIndex = -1;
protected long snapshotTerm = -1;
// to be used for rollback during save snapshot failure
protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
- protected ByteString previousSnapshot;
protected long previousSnapshotIndex = -1;
protected long previousSnapshotTerm = -1;
- public AbstractReplicatedLogImpl(ByteString state, long snapshotIndex,
+ public AbstractReplicatedLogImpl(long snapshotIndex,
long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
- this.snapshot = state;
this.snapshotIndex = snapshotIndex;
this.snapshotTerm = snapshotTerm;
this.journal = new ArrayList<>(unAppliedEntries);
}
-
public AbstractReplicatedLogImpl() {
- this.snapshot = null;
this.journal = new ArrayList<>();
}
return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
}
- @Override
- public ByteString getSnapshot() {
- return snapshot;
- }
-
@Override
public long getSnapshotIndex() {
return snapshotIndex;
this.snapshotTerm = snapshotTerm;
}
- @Override
- public void setSnapshot(ByteString snapshot) {
- this.snapshot = snapshot;
- }
-
@Override
public void clear(int startIndex, int endIndex) {
journal.subList(startIndex, endIndex).clear();
}
@Override
- public void snapshotPreCommit(ByteString snapshot, long snapshotCapturedIndex, long snapshotCapturedTerm) {
+ public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) {
snapshottedJournal = new ArrayList<>(journal.size());
snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex)));
previousSnapshotTerm = snapshotTerm;
setSnapshotTerm(snapshotCapturedTerm);
-
- previousSnapshot = getSnapshot();
- setSnapshot(snapshot);
}
@Override
snapshottedJournal = null;
previousSnapshotIndex = -1;
previousSnapshotTerm = -1;
- previousSnapshot = null;
}
@Override
snapshotTerm = previousSnapshotTerm;
previousSnapshotTerm = -1;
-
- snapshot = previousSnapshot;
- previousSnapshot = null;
-
}
}
*/
public AtomicLong getMatchIndex();
+ /**
+ * Checks if the follower is active by comparing the last updated with the duration
+ * @return boolean
+ */
+ public boolean isFollowerActive();
+
+ /**
+ * restarts the timeout clock of the follower
+ */
+ public void markFollowerActive();
+
}
package org.opendaylight.controller.cluster.raft;
+import com.google.common.base.Stopwatch;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class FollowerLogInformationImpl implements FollowerLogInformation{
private final AtomicLong matchIndex;
+ private final Stopwatch stopwatch;
+
+ private final long followerTimeoutMillis;
+
public FollowerLogInformationImpl(String id, AtomicLong nextIndex,
- AtomicLong matchIndex) {
+ AtomicLong matchIndex, FiniteDuration followerTimeoutDuration) {
this.id = id;
this.nextIndex = nextIndex;
this.matchIndex = matchIndex;
+ this.stopwatch = new Stopwatch();
+ this.followerTimeoutMillis = followerTimeoutDuration.toMillis();
}
public long incrNextIndex(){
return matchIndex;
}
+ @Override
+ public boolean isFollowerActive() {
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ return (stopwatch.isRunning()) && (elapsed <= followerTimeoutMillis);
+ }
+
+ @Override
+ public void markFollowerActive() {
+ if (stopwatch.isRunning()) {
+ stopwatch.reset();
+ }
+ stopwatch.start();
+ }
}
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
}
public java.util.Set<String> getPeers() {
+
return context.getPeerAddresses().keySet();
}
//be greedy and remove entries from in-mem journal which are in the snapshot
// and update snapshotIndex and snapshotTerm without waiting for the success,
- context.getReplicatedLog().snapshotPreCommit(stateInBytes,
+ context.getReplicatedLog().snapshotPreCommit(
captureSnapshot.getLastAppliedIndex(),
captureSnapshot.getLastAppliedTerm());
"and term:{}", captureSnapshot.getLastAppliedIndex(),
captureSnapshot.getLastAppliedTerm());
+ if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
+ // this would be call straight to the leader and won't initiate in serialization
+ currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(stateInBytes));
+ }
+
captureSnapshot = null;
hasSnapshotCaptureInitiated = false;
}
-
private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
public ReplicatedLogImpl(Snapshot snapshot) {
- super(ByteString.copyFrom(snapshot.getState()),
- snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
+ super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
snapshot.getUnAppliedEntries());
}
}
}
+ @VisibleForTesting
+ void setCurrentBehavior(AbstractRaftActorBehavior behavior) {
+ currentBehavior = behavior;
+ }
+
+ protected RaftActorBehavior getCurrentBehavior() {
+ return currentBehavior;
+ }
}
package org.opendaylight.controller.cluster.raft;
-import com.google.protobuf.ByteString;
-
import java.util.List;
/**
*/
boolean isInSnapshot(long index);
- /**
- * Get the snapshot
- *
- * @return an object representing the snapshot if it exists. null otherwise
- */
- ByteString getSnapshot();
-
/**
* Get the index of the snapshot
*
*/
public void setSnapshotTerm(long snapshotTerm);
- /**
- * sets the snapshot in bytes
- * @param snapshot
- */
- public void setSnapshot(ByteString snapshot);
-
/**
* Clears the journal entries with startIndex(inclusive) and endIndex (exclusive)
* @param startIndex
/**
* Handles all the bookkeeping in order to perform a rollback in the
* event of SaveSnapshotFailure
- * @param snapshot
* @param snapshotCapturedIndex
* @param snapshotCapturedTerm
*/
- public void snapshotPreCommit(ByteString snapshot,
- long snapshotCapturedIndex, long snapshotCapturedTerm);
+ public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm);
/**
* Sets the Replicated log to state after snapshot success.
private long lastAppliedTerm;
private long lastIndex;
private long lastTerm;
+ private boolean installSnapshotInitiated;
public CaptureSnapshot(long lastIndex, long lastTerm,
long lastAppliedIndex, long lastAppliedTerm) {
+ this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, false);
+ }
+
+ public CaptureSnapshot(long lastIndex, long lastTerm,long lastAppliedIndex,
+ long lastAppliedTerm, boolean installSnapshotInitiated) {
this.lastIndex = lastIndex;
this.lastTerm = lastTerm;
this.lastAppliedIndex = lastAppliedIndex;
this.lastAppliedTerm = lastAppliedTerm;
+ this.installSnapshotInitiated = installSnapshotInitiated;
}
public long getLastAppliedIndex() {
public long getLastTerm() {
return lastTerm;
}
+
+ public boolean isInstallSnapshotInitiated() {
+ return installSnapshotInitiated;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+/**
+ * Internal message by Leader to initiate an install snapshot
+ */
+public class InitiateInstallSnapshot {
+}
+
package org.opendaylight.controller.cluster.raft.base.messages;
-import java.io.Serializable;
+import com.google.protobuf.ByteString;
-public class SendInstallSnapshot implements Serializable {
+public class SendInstallSnapshot {
+ private ByteString snapshot;
+
+ public SendInstallSnapshot(ByteString snapshot) {
+ this.snapshot = snapshot;
+ }
+
+ public ByteString getSnapshot() {
+ return snapshot;
+ }
}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
public class Leader extends AbstractRaftActorBehavior {
- protected final Map<String, FollowerLogInformation> followerToLog =
- new HashMap();
+ protected final Map<String, FollowerLogInformation> followerToLog = new HashMap();
protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
private final Set<String> followers;
private final int minReplicationCount;
+ private Optional<ByteString> snapshot;
+
public Leader(RaftActorContext context) {
super(context);
FollowerLogInformation followerLogInformation =
new FollowerLogInformationImpl(followerId,
new AtomicLong(context.getCommitIndex()),
- new AtomicLong(-1));
+ new AtomicLong(-1),
+ context.getConfigParams().getElectionTimeOutInterval());
followerToLog.put(followerId, followerLogInformation);
}
minReplicationCount = 0;
}
+ snapshot = Optional.absent();
// Immediately schedule a heartbeat
// Upon election: send initial empty AppendEntries RPCs
}
+ private Optional<ByteString> getSnapshot() {
+ return snapshot;
+ }
+
+ @VisibleForTesting
+ void setSnapshot(Optional<ByteString> snapshot) {
+ this.snapshot = snapshot;
+ }
+
@Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
return this;
}
+ followerLogInformation.markFollowerActive();
+
if (appendEntriesReply.isSuccess()) {
followerLogInformation
.setMatchIndex(appendEntriesReply.getLogLastIndex());
if (message instanceof SendHeartBeat) {
sendHeartBeat();
return this;
- } else if(message instanceof SendInstallSnapshot) {
+
+ } else if(message instanceof InitiateInstallSnapshot) {
installSnapshotIfNeeded();
+
+ } else if(message instanceof SendInstallSnapshot) {
+ // received from RaftActor
+ setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
+ sendInstallSnapshot();
+
} else if (message instanceof Replicate) {
replicate((Replicate) message);
+
} else if (message instanceof InstallSnapshotReply){
handleInstallSnapshotReply(
(InstallSnapshotReply) message);
private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
String followerId = reply.getFollowerId();
- FollowerToSnapshot followerToSnapshot =
- mapFollowerToSnapshot.get(followerId);
+ FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+ FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+ followerLogInformation.markFollowerActive();
if (followerToSnapshot != null &&
followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
);
}
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
followerLogInformation.setMatchIndex(
context.getReplicatedLog().getSnapshotIndex());
followerLogInformation.setNextIndex(
followerToLog.get(followerId).getNextIndex().get());
}
+ if (mapFollowerToSnapshot.isEmpty()) {
+ // once there are no pending followers receiving snapshots
+ // we can remove snapshot from the memory
+ setSnapshot(Optional.<ByteString>absent());
+ }
+
} else {
followerToSnapshot.markSendStatus(true);
}
if (followerActor != null) {
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
long followerNextIndex = followerLogInformation.getNextIndex().get();
- List<ReplicatedLogEntry> entries = Collections.emptyList();
+ boolean isFollowerActive = followerLogInformation.isFollowerActive();
+ List<ReplicatedLogEntry> entries = null;
if (mapFollowerToSnapshot.get(followerId) != null) {
- if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+ // if install snapshot is in process , then sent next chunk if possible
+ if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
sendSnapshotChunk(followerActor, followerId);
+ } else {
+ // we send a heartbeat even if we have not received a reply for the last chunk
+ sendAppendEntriesToFollower(followerActor, followerNextIndex,
+ Collections.<ReplicatedLogEntry>emptyList());
}
} else {
+ long leaderLastIndex = context.getReplicatedLog().lastIndex();
+ long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
- if (context.getReplicatedLog().isPresent(followerNextIndex)) {
+ if (isFollowerActive &&
+ context.getReplicatedLog().isPresent(followerNextIndex)) {
// FIXME : Sending one entry at a time
entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
- followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(followerNextIndex),
- prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex()).toSerializable(),
- actor()
- );
-
- } else {
- // if the followers next index is not present in the leaders log, then snapshot should be sent
- long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
- long leaderLastIndex = context.getReplicatedLog().lastIndex();
- if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
- // if the follower is just not starting and leader's index
- // is more than followers index
- if(LOG.isDebugEnabled()) {
- LOG.debug("SendInstallSnapshot to follower:{}," +
- "follower-nextIndex:{}, leader-snapshot-index:{}, " +
- "leader-last-index:{}", followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex
- );
- }
-
- actor().tell(new SendInstallSnapshot(), actor());
- } else {
- followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(followerNextIndex),
- prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex()).toSerializable(),
- actor()
+ } else if (isFollowerActive && followerNextIndex >= 0 &&
+ leaderLastIndex >= followerNextIndex ) {
+ // if the followers next index is not present in the leaders log, and
+ // if the follower is just not starting and if leader's index is more than followers index
+ // then snapshot should be sent
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("InitiateInstallSnapshot to follower:{}," +
+ "follower-nextIndex:{}, leader-snapshot-index:{}, " +
+ "leader-last-index:{}", followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex
);
}
+ actor().tell(new InitiateInstallSnapshot(), actor());
+
+ // we would want to sent AE as the capture snapshot might take time
+ entries = Collections.<ReplicatedLogEntry>emptyList();
+
+ } else {
+ //we send an AppendEntries, even if the follower is inactive
+ // in-order to update the followers timestamp, in case it becomes active again
+ entries = Collections.<ReplicatedLogEntry>emptyList();
}
+
+ sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
+
}
}
}
}
+ private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
+ List<ReplicatedLogEntry> entries) {
+ followerActor.tell(
+ new AppendEntries(currentTerm(), context.getId(),
+ prevLogIndex(followerNextIndex),
+ prevLogTerm(followerNextIndex), entries,
+ context.getCommitIndex()).toSerializable(),
+ actor()
+ );
+ }
+
/**
* An installSnapshot is scheduled at a interval that is a multiple of
* a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
* snapshots at every heartbeat.
+ *
+ * Install Snapshot works as follows
+ * 1. Leader sends a InitiateInstallSnapshot message to self
+ * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor
+ * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
+ * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
+ * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
+ * 5. On complete, Follower sends back a InstallSnapshotReply.
+ * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
+ * and replenishes the memory by deleting the snapshot in Replicated log.
+ *
*/
- private void installSnapshotIfNeeded(){
+ private void installSnapshotIfNeeded() {
for (String followerId : followers) {
ActorSelection followerActor =
context.getPeerActorSelection(followerId);
long nextIndex = followerLogInformation.getNextIndex().get();
+ if (!context.getReplicatedLog().isPresent(nextIndex) &&
+ context.getReplicatedLog().isInSnapshot(nextIndex)) {
+ LOG.info("{} follower needs a snapshot install", followerId);
+ if (snapshot.isPresent()) {
+ // if a snapshot is present in the memory, most likely another install is in progress
+ // no need to capture snapshot
+ sendSnapshotChunk(followerActor, followerId);
+
+ } else {
+ initiateCaptureSnapshot();
+ //we just need 1 follower who would need snapshot to be installed.
+ // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
+ // who needs an install and send to all who need
+ break;
+ }
+
+ }
+ }
+ }
+ }
+
+ // on every install snapshot, we try to capture the snapshot.
+ // Once a capture is going on, another one issued will get ignored by RaftActor.
+ private void initiateCaptureSnapshot() {
+ LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
+ ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
+ long lastAppliedIndex = -1;
+ long lastAppliedTerm = -1;
+
+ if (lastAppliedEntry != null) {
+ lastAppliedIndex = lastAppliedEntry.getIndex();
+ lastAppliedTerm = lastAppliedEntry.getTerm();
+ } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
+ lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
+ lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
+ }
+
+ boolean isInstallSnapshotInitiated = true;
+ actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
+ lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
+ actor());
+ }
+
+
+ private void sendInstallSnapshot() {
+ for (String followerId : followers) {
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
+
+ if(followerActor != null) {
+ FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+ long nextIndex = followerLogInformation.getNextIndex().get();
+
if (!context.getReplicatedLog().isPresent(nextIndex) &&
context.getReplicatedLog().isInSnapshot(nextIndex)) {
sendSnapshotChunk(followerActor, followerId);
*/
private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
try {
- followerActor.tell(
- new InstallSnapshot(currentTerm(), context.getId(),
- context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().getSnapshotTerm(),
- getNextSnapshotChunk(followerId,
- context.getReplicatedLog().getSnapshot()),
- mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks()
- ).toSerializable(),
- actor()
- );
- LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
- followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks());
+ if (snapshot.isPresent()) {
+ followerActor.tell(
+ new InstallSnapshot(currentTerm(), context.getId(),
+ context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm(),
+ getNextSnapshotChunk(followerId,snapshot.get()),
+ mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
+ mapFollowerToSnapshot.get(followerId).getTotalChunks()
+ ).toSerializable(),
+ actor()
+ );
+ LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
+ followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
+ mapFollowerToSnapshot.get(followerId).getTotalChunks());
+ }
} catch (IOException e) {
LOG.error(e, "InstallSnapshot failed for Leader.");
}
mapFollowerToSnapshot.put(followerId, followerToSnapshot);
}
ByteString nextChunk = followerToSnapshot.getNextChunk();
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
}
-
return nextChunk;
}
// Scheduling the heartbeat only once here because heartbeats do not
// need to be sent if there are other messages being sent to the remote
// actor.
- heartbeatSchedule =
- context.getActorSystem().scheduler().scheduleOnce(
- interval,
- context.getActor(), new SendHeartBeat(),
- context.getActorSystem().dispatcher(), context.getActor());
+ heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
+ interval, context.getActor(), new SendHeartBeat(),
+ context.getActorSystem().dispatcher(), context.getActor());
}
-
private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
if(followers.size() == 0){
// Optimization - do not bother scheduling a heartbeat as there are
installSnapshotSchedule =
context.getActorSystem().scheduler().scheduleOnce(
interval,
- context.getActor(), new SendInstallSnapshot(),
+ context.getActor(), new InitiateInstallSnapshot(),
context.getActorSystem().dispatcher(), context.getActor());
}
}
}
+ // called from example-actor for printing the follower-states
+ public String printFollowerStates() {
+ StringBuilder sb = new StringBuilder();
+ for(FollowerLogInformation followerLogInformation : followerToLog.values()) {
+ boolean isFollowerActive = followerLogInformation.isFollowerActive();
+ sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},");
+
+ }
+ return "[" + sb.toString() + "]";
+ }
+
+ @VisibleForTesting
+ void markFollowerActive(String followerId) {
+ followerToLog.get(followerId).markFollowerActive();
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FollowerLogInformationImplTest {
+
+ @Test
+ public void testIsFollowerActive() {
+
+ FiniteDuration timeoutDuration =
+ new FiniteDuration(500, TimeUnit.MILLISECONDS);
+
+ FollowerLogInformation followerLogInformation =
+ new FollowerLogInformationImpl(
+ "follower1", new AtomicLong(10), new AtomicLong(9), timeoutDuration);
+
+
+
+ assertFalse("Follower should be termed inactive before stopwatch starts",
+ followerLogInformation.isFollowerActive());
+
+ followerLogInformation.markFollowerActive();
+ if (sleepWithElaspsedTimeReturned(200) > 200) {
+ return;
+ }
+ assertTrue("Follower should be active", followerLogInformation.isFollowerActive());
+
+ if (sleepWithElaspsedTimeReturned(400) > 400) {
+ return;
+ }
+ assertFalse("Follower should be inactive after time lapsed",
+ followerLogInformation.isFollowerActive());
+
+ followerLogInformation.markFollowerActive();
+ assertTrue("Follower should be active from inactive",
+ followerLogInformation.isFollowerActive());
+ }
+
+ // we cannot rely comfortably that the sleep will indeed sleep for the desired time
+ // hence getting the actual elapsed time and do a match.
+ // if the sleep has spilled over, then return the test gracefully
+ private long sleepWithElaspsedTimeReturned(long millis) {
+ Stopwatch stopwatch = new Stopwatch();
+ stopwatch.start();
+ Uninterruptibles.sleepUninterruptibly(millis, TimeUnit.MILLISECONDS);
+ stopwatch.stop();
+ return stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ }
+}
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.behaviors.Follower;
+import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
-
mockActorRef.tell(PoisonPill.getInstance(), getRef());
}
DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
- TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
- Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+ TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+ MockRaftActor.props(persistenceId,Collections.EMPTY_MAP,
+ Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
+ RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
+
+ mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
+
mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
verify(dataPersistenceProvider).saveSnapshot(anyObject());
new MockRaftActorContext.MockPayload("C"),
new MockRaftActorContext.MockPayload("D")));
+ RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
+ mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
+
mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1));
verify(mockRaftActor.delegate).createSnapshot();
verify(dataPersistenceProvider).deleteMessages(100);
- assertNotNull("Snapshot should not be null", mockRaftActor.getReplicatedLog().getSnapshot());
-
assertEquals(2, mockRaftActor.getReplicatedLog().size());
assertNotNull(mockRaftActor.getReplicatedLog().get(3));
}
};
-
-
}
@Test
oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,0,mock(Payload.class)));
oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,1,mock(Payload.class)));
- oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,2,mock(Payload.class)));
+ oldReplicatedLog.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
+ mock(Payload.class)));
ByteString snapshotBytes = fromObject(Arrays.asList(
- new MockRaftActorContext.MockPayload("A"),
- new MockRaftActorContext.MockPayload("B"),
- new MockRaftActorContext.MockPayload("C"),
- new MockRaftActorContext.MockPayload("D")));
+ new MockRaftActorContext.MockPayload("A"),
+ new MockRaftActorContext.MockPayload("B"),
+ new MockRaftActorContext.MockPayload("C"),
+ new MockRaftActorContext.MockPayload("D")));
Snapshot snapshot = mock(Snapshot.class);
verify(mockRaftActor.delegate).applySnapshot(eq(snapshotBytes));
- assertTrue("The replicatedLog should have changed", oldReplicatedLog != mockRaftActor.getReplicatedLog());
+ assertTrue("The replicatedLog should have changed",
+ oldReplicatedLog != mockRaftActor.getReplicatedLog());
- assertEquals("lastApplied should be same as in the snapshot", (Long) 3L, mockRaftActor.getLastApplied());
+ assertEquals("lastApplied should be same as in the snapshot",
+ (Long) 3L, mockRaftActor.getLastApplied());
assertEquals(0, mockRaftActor.getReplicatedLog().size());
new MockRaftActorContext.MockPayload("C"),
new MockRaftActorContext.MockPayload("D")));
+ RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
+
+ mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
+
mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
mockRaftActor.getReplicatedLog().getSnapshotIndex());
- assertNull("Snapshot should be null", mockRaftActor.getReplicatedLog().getSnapshot());
-
mockActorRef.tell(PoisonPill.getInstance(), getRef());
}
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
+import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
+import scala.concurrent.duration.FiniteDuration;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
ActorRef followerActor = getTestActor();
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext();
+ MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
Map<String, String> peerAddresses = new HashMap();
}.get(); // this extracts the received message
assertEquals("match", out);
-
}
-
-
};
}};
}
}
@Test
- public void testSendInstallSnapshot() {
- new LeaderTestKit(getSystem()) {{
+ public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
- new Within(duration("1 seconds")) {
- protected void run() {
- ActorRef followerActor = getTestActor();
+ Map<String, String> peerAddresses = new HashMap();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
- Map<String, String> peerAddresses = new HashMap();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext(leaderActor);
+ actorContext.setPeerAddresses(peerAddresses);
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext(getRef());
- actorContext.setPeerAddresses(peerAddresses);
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
- Map<String, String> leadersSnapshot = new HashMap<>();
- leadersSnapshot.put("1", "A");
- leadersSnapshot.put("2", "B");
- leadersSnapshot.put("3", "C");
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.setCommitIndex(followersLastIndex);
+ //set follower timeout to 2 mins, helps during debugging
+ actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
- //clears leaders log
- actorContext.getReplicatedLog().removeFrom(0);
+ MockLeader leader = new MockLeader(actorContext);
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
- final int newEntryIndex = 4;
- final int snapshotTerm = 1;
- final int currentTerm = 2;
+ // new entry
+ ReplicatedLogImplEntry entry =
+ new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
- // set the snapshot variables in replicatedlog
- actorContext.getReplicatedLog().setSnapshot(
- toByteString(leadersSnapshot));
- actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
- actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ //update follower timestamp
+ leader.markFollowerActive(followerActor.path().toString());
- MockLeader leader = new MockLeader(actorContext);
- // set the follower info in leader
- leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+ leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
- // new entry
- ReplicatedLogImplEntry entry =
- new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
- new MockRaftActorContext.MockPayload("D"));
+ //send first chunk and no InstallSnapshotReply received yet
+ leader.getFollowerToSnapshot().getNextChunk();
+ leader.getFollowerToSnapshot().incrementChunkIndex();
- // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
- RaftActorBehavior raftBehavior = leader.handleMessage(
- senderActor, new Replicate(null, "state-id", entry));
+ leader.handleMessage(leaderActor, new SendHeartBeat());
- assertTrue(raftBehavior instanceof Leader);
+ AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
+ followerActor, AppendEntries.SERIALIZABLE_CLASS);
- // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
- Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
- @Override
- protected Boolean match(Object o) throws Exception {
- if (o instanceof SendInstallSnapshot) {
- return true;
- }
- return false;
- }
- }.get();
+ assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
+ "received", aeproto);
- boolean sendInstallSnapshotReceived = false;
- for (Boolean b: matches) {
- sendInstallSnapshotReceived = b | sendInstallSnapshotReceived;
- }
+ AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
+
+ assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
+
+ //InstallSnapshotReply received
+ leader.getFollowerToSnapshot().markSendStatus(true);
+
+ leader.handleMessage(senderActor, new SendHeartBeat());
+
+ InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
+ MessageCollectorActor.getFirstMatching(followerActor,
+ InstallSnapshot.SERIALIZABLE_CLASS);
+
+ assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
+ isproto);
+
+ InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
- assertTrue(sendInstallSnapshotReceived);
+ assertEquals(snapshotIndex, is.getLastIncludedIndex());
+ }};
+ }
+
+ @Test
+ public void testSendAppendEntriesSnapshotScenario() {
+ new JavaTestKit(getSystem()) {{
+
+ ActorRef followerActor = getTestActor();
+
+ Map<String, String> peerAddresses = new HashMap();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext(getRef());
+ actorContext.setPeerAddresses(peerAddresses);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ Leader leader = new Leader(actorContext);
+
+ // new entry
+ ReplicatedLogImplEntry entry =
+ new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
+
+ //update follower timestamp
+ leader.markFollowerActive(followerActor.path().toString());
+
+ // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+ RaftActorBehavior raftBehavior = leader.handleMessage(
+ senderActor, new Replicate(null, "state-id", entry));
+
+ assertTrue(raftBehavior instanceof Leader);
+
+ // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
+ Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
+ @Override
+ protected Boolean match(Object o) throws Exception {
+ if (o instanceof InitiateInstallSnapshot) {
+ return true;
+ }
+ return false;
}
- };
+ }.get();
+
+ boolean initiateInitiateInstallSnapshot = false;
+ for (Boolean b: matches) {
+ initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
+ }
+
+ assertTrue(initiateInitiateInstallSnapshot);
}};
}
@Test
- public void testInstallSnapshot() {
- new LeaderTestKit(getSystem()) {{
+ public void testInitiateInstallSnapshot() throws Exception {
+ new JavaTestKit(getSystem()) {{
- new Within(duration("1 seconds")) {
- protected void run() {
- ActorRef followerActor = getTestActor();
+ ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
- Map<String, String> peerAddresses = new HashMap();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ ActorRef followerActor = getTestActor();
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext();
- actorContext.setPeerAddresses(peerAddresses);
+ Map<String, String> peerAddresses = new HashMap();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
- Map<String, String> leadersSnapshot = new HashMap<>();
- leadersSnapshot.put("1", "A");
- leadersSnapshot.put("2", "B");
- leadersSnapshot.put("3", "C");
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext(leaderActor);
+ actorContext.setPeerAddresses(peerAddresses);
- //clears leaders log
- actorContext.getReplicatedLog().removeFrom(0);
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
- final int newEntryIndex = 4;
- final int snapshotTerm = 1;
- final int currentTerm = 2;
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
- // set the snapshot variables in replicatedlog
- actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot));
- actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
- actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
- actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.setLastApplied(3);
+ actorContext.setCommitIndex(followersLastIndex);
- MockLeader leader = new MockLeader(actorContext);
- // set the follower info in leader
- leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+ Leader leader = new Leader(actorContext);
+ // set the snapshot as absent and check if capture-snapshot is invoked.
+ leader.setSnapshot(Optional.<ByteString>absent());
- // new entry
- ReplicatedLogImplEntry entry =
- new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
- new MockRaftActorContext.MockPayload("D"));
+ // new entry
+ ReplicatedLogImplEntry entry =
+ new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
- RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, new SendInstallSnapshot());
+ actorContext.getReplicatedLog().append(entry);
- assertTrue(raftBehavior instanceof Leader);
+ // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+ RaftActorBehavior raftBehavior = leader.handleMessage(
+ leaderActor, new InitiateInstallSnapshot());
- // check if installsnapshot gets called with the correct values.
- final String out =
- new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- protected String match(Object in) {
- if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
- InstallSnapshot is = (InstallSnapshot)
- SerializationUtils.fromSerializable(in);
- if (is.getData() == null) {
- return "InstallSnapshot data is null";
- }
- if (is.getLastIncludedIndex() != snapshotIndex) {
- return is.getLastIncludedIndex() + "!=" + snapshotIndex;
- }
- if (is.getLastIncludedTerm() != snapshotTerm) {
- return is.getLastIncludedTerm() + "!=" + snapshotTerm;
- }
- if (is.getTerm() == currentTerm) {
- return is.getTerm() + "!=" + currentTerm;
- }
+ CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
+ getFirstMatching(leaderActor, CaptureSnapshot.class);
- return "match";
+ assertNotNull(cs);
- } else {
- return "message mismatch:" + in.getClass();
- }
+ assertTrue(cs.isInstallSnapshotInitiated());
+ assertEquals(3, cs.getLastAppliedIndex());
+ assertEquals(1, cs.getLastAppliedTerm());
+ assertEquals(4, cs.getLastIndex());
+ assertEquals(2, cs.getLastTerm());
+ }};
+ }
+
+ @Test
+ public void testInstallSnapshot() {
+ new JavaTestKit(getSystem()) {{
+
+ ActorRef followerActor = getTestActor();
+
+ Map<String, String> peerAddresses = new HashMap();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+ actorContext.setPeerAddresses(peerAddresses);
+
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+ actorContext.setCommitIndex(followersLastIndex);
+
+ Leader leader = new Leader(actorContext);
+
+ // new entry
+ ReplicatedLogImplEntry entry =
+ new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
+
+ RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
+ new SendInstallSnapshot(toByteString(leadersSnapshot)));
+
+ assertTrue(raftBehavior instanceof Leader);
+
+ // check if installsnapshot gets called with the correct values.
+ final String out =
+ new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
+ InstallSnapshot is = (InstallSnapshot)
+ SerializationUtils.fromSerializable(in);
+ if (is.getData() == null) {
+ return "InstallSnapshot data is null";
+ }
+ if (is.getLastIncludedIndex() != snapshotIndex) {
+ return is.getLastIncludedIndex() + "!=" + snapshotIndex;
+ }
+ if (is.getLastIncludedTerm() != snapshotTerm) {
+ return is.getLastIncludedTerm() + "!=" + snapshotTerm;
+ }
+ if (is.getTerm() == currentTerm) {
+ return is.getTerm() + "!=" + currentTerm;
}
- }.get(); // this extracts the received message
- assertEquals("match", out);
- }
- };
+ return "match";
+
+ } else {
+ return "message mismatch:" + in.getClass();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
}};
}
@Test
public void testHandleInstallSnapshotReplyLastChunk() {
- new LeaderTestKit(getSystem()) {{
- new Within(duration("1 seconds")) {
- protected void run() {
- ActorRef followerActor = getTestActor();
+ new JavaTestKit(getSystem()) {{
- Map<String, String> peerAddresses = new HashMap();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ ActorRef followerActor = getTestActor();
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext();
- actorContext.setPeerAddresses(peerAddresses);
+ Map<String, String> peerAddresses = new HashMap();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
- final int newEntryIndex = 4;
- final int snapshotTerm = 1;
- final int currentTerm = 2;
-
- MockLeader leader = new MockLeader(actorContext);
- // set the follower info in leader
- leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
-
- Map<String, String> leadersSnapshot = new HashMap<>();
- leadersSnapshot.put("1", "A");
- leadersSnapshot.put("2", "B");
- leadersSnapshot.put("3", "C");
-
- // set the snapshot variables in replicatedlog
- actorContext.getReplicatedLog().setSnapshot(
- toByteString(leadersSnapshot));
- actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
- actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
- actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
-
- ByteString bs = toByteString(leadersSnapshot);
- leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
- while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
- leader.getFollowerToSnapshot().getNextChunk();
- leader.getFollowerToSnapshot().incrementChunkIndex();
- }
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
- //clears leaders log
- actorContext.getReplicatedLog().removeFrom(0);
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+ actorContext.setPeerAddresses(peerAddresses);
+ actorContext.setCommitIndex(followersLastIndex);
- RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
- new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
- leader.getFollowerToSnapshot().getChunkIndex(), true));
+ MockLeader leader = new MockLeader(actorContext);
- assertTrue(raftBehavior instanceof Leader);
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
- assertEquals(leader.mapFollowerToSnapshot.size(), 0);
- assertEquals(leader.followerToLog.size(), 1);
- assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
- FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
- assertEquals(snapshotIndex, fli.getMatchIndex().get());
- assertEquals(snapshotIndex, fli.getMatchIndex().get());
- assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
- }
- };
+ // set the snapshot variables in replicatedlog
+
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+ leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
+ while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
+ leader.getFollowerToSnapshot().getNextChunk();
+ leader.getFollowerToSnapshot().incrementChunkIndex();
+ }
+
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
+
+ RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
+ new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
+ leader.getFollowerToSnapshot().getChunkIndex(), true));
+
+ assertTrue(raftBehavior instanceof Leader);
+
+ assertEquals(leader.mapFollowerToSnapshot.size(), 0);
+ assertEquals(leader.followerToLog.size(), 1);
+ assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
+ FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
+ assertEquals(snapshotIndex, fli.getMatchIndex().get());
+ assertEquals(snapshotIndex, fli.getMatchIndex().get());
+ assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
}};
}
followerActorContext.setCommitIndex(1);
Leader leader = new Leader(leaderActorContext);
+ leader.markFollowerActive(followerActor.path().toString());
leader.handleMessage(leaderActor, new SendHeartBeat());
followerActorContext.setCommitIndex(2);
Leader leader = new Leader(leaderActorContext);
+ leader.markFollowerActive(followerActor.path().toString());
leader.handleMessage(leaderActor, new SendHeartBeat());
}
- private static class LeaderTestKit extends JavaTestKit {
-
- private LeaderTestKit(ActorSystem actorSystem) {
- super(actorSystem);
- }
-
- protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
- // Wait for a specific log message to show up
- final boolean result =
- new JavaTestKit.EventFilter<Boolean>(logLevel
- ) {
- @Override
- protected Boolean run() {
- return true;
- }
- }.from(subject.path().toString())
- .message(logMessage)
- .occurrences(1).exec();
-
- Assert.assertEquals(true, result);
-
- }
- }
-
class MockLeader extends Leader {
FollowerToSnapshot fts;
super(context);
}
- public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) {
- FollowerLogInformation followerLogInformation =
- new FollowerLogInformationImpl(followerId,
- new AtomicLong(nextIndex),
- new AtomicLong(matchIndex));
- followerToLog.put(followerId, followerLogInformation);
- }
-
public FollowerToSnapshot getFollowerToSnapshot() {
return fts;
}
}
}
+
+ private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
+
+ private long electionTimeOutIntervalMillis;
+ private int snapshotChunkSize;
+
+ public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
+ super();
+ this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
+ this.snapshotChunkSize = snapshotChunkSize;
+ }
+
+ @Override
+ public FiniteDuration getElectionTimeOutInterval() {
+ return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public int getSnapshotChunkSize() {
+ return snapshotChunkSize;
+ }
+ }
}
actor {
# enable to test serialization only.
- serialize-messages = on
+ serialize-messages = off
serializers {
java = "akka.serialization.JavaSerializer"