private CaptureSnapshot captureSnapshot = null;
- private volatile boolean hasSnapshotCaptureInitiated = false;
-
private Stopwatch recoveryTimer;
private int currentRecoveryBatchCount;
-
-
public RaftActor(String id, Map<String, String> peerAddresses) {
this(id, peerAddresses, Optional.<ConfigParams>absent());
}
self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
// Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
- if(!hasSnapshotCaptureInitiated){
+ if(!context.isSnapshotCaptureInitiated()){
raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
raftContext.getTermInformation().getCurrentTerm());
raftContext.getReplicatedLog().snapshotCommit();
}
captureSnapshot = null;
- hasSnapshotCaptureInitiated = false;
+ context.setSnapshotCaptureInitiated(false);
}
protected boolean hasFollowers(){
getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
// when a snaphsot is being taken, captureSnapshot != null
- if (hasSnapshotCaptureInitiated == false &&
+ if (!context.isSnapshotCaptureInitiated() &&
( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
dataSizeForCheck > dataThreshold)) {
getSelf().tell(new CaptureSnapshot(
lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
null);
- hasSnapshotCaptureInitiated = true;
+ context.setSnapshotCaptureInitiated(true);
}
if(callback != null){
callback.apply(replicatedLogEntry);
*
* @param replicatedLog
*/
- public void setReplicatedLog(ReplicatedLog replicatedLog);
+ void setReplicatedLog(ReplicatedLog replicatedLog);
/**
* @return A representation of the log
*
* @param name
*/
- public void removePeer(String name);
+ void removePeer(String name);
/**
* Given a peerId return the corresponding actor
/**
* @return ConfigParams
*/
- public ConfigParams getConfigParams();
+ ConfigParams getConfigParams();
+
+ void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated);
+
+ boolean isSnapshotCaptureInitiated();
+
}
import akka.actor.Props;
import akka.actor.UntypedActorContext;
import akka.event.LoggingAdapter;
-
import java.util.Map;
import static com.google.common.base.Preconditions.checkState;
private final ConfigParams configParams;
+ private boolean snapshotCaptureInitiated;
+
public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
String id,
ElectionTerm termInformation, long commitIndex,
return configParams;
}
+ @Override
+ public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
+ this.snapshotCaptureInitiated = snapshotCaptureInitiated;
+ }
+
+ @Override
+ public boolean isSnapshotCaptureInitiated() {
+ return snapshotCaptureInitiated;
+ }
+
@Override public void addToPeers(String name, String address) {
peerAddresses.put(name, address);
}
private Optional<ByteString> snapshot;
+ private long replicatedToAllIndex = -1;
+
public AbstractLeader(RaftActorContext context) {
super(context);
applyLogToStateMachine(context.getCommitIndex());
}
+ if (!context.isSnapshotCaptureInitiated()) {
+ purgeInMemoryLog();
+ }
+
return this;
}
+ private void purgeInMemoryLog() {
+ //find the lowest index across followers which has been replicated to all. -1 if there are no followers.
+ // we would delete the in-mem log from that index on, in-order to minimize mem usage
+ // we would also share this info thru AE with the followers so that they can delete their log entries as well.
+ long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE;
+ for (FollowerLogInformation info : followerToLog.values()) {
+ minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
+ }
+
+ replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex);
+ }
+
@Override
protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
final Iterator<ClientRequestTracker> it = trackerList.iterator();
new AppendEntries(currentTerm(), context.getId(),
prevLogIndex(followerNextIndex),
prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex()).toSerializable(),
+ context.getCommitIndex(),
+ replicatedToAllIndex).toSerializable(),
actor()
);
}
return numMajority;
}
+
+ protected long fakeSnapshot(final long minReplicatedToAllIndex, final long currentReplicatedIndex) {
+
+ // we would want to keep the lastApplied as its used while capturing snapshots
+ long tempMin = Math.min(minReplicatedToAllIndex,
+ (context.getLastApplied() > -1 ? context.getLastApplied() - 1 : -1));
+
+ if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
+ context.getReplicatedLog().snapshotPreCommit(tempMin, context.getTermInformation().getCurrentTerm());
+ context.getReplicatedLog().snapshotCommit();
+ return tempMin;
+ }
+ return currentReplicatedIndex;
+ }
}
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
lastIndex(), lastTerm()), actor());
+ if (!context.isSnapshotCaptureInitiated()) {
+ fakeSnapshot(appendEntries.getReplicatedToAllIndex(), appendEntries.getReplicatedToAllIndex());
+ }
+
return this;
}
// leader's commitIndex
private final long leaderCommit;
+ // index which has been replicated successfully to all followers, -1 if none
+ private final long replicatedToAllIndex;
+
public AppendEntries(long term, String leaderId, long prevLogIndex,
- long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit) {
+ long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex) {
super(term);
this.leaderId = leaderId;
this.prevLogIndex = prevLogIndex;
this.prevLogTerm = prevLogTerm;
this.entries = entries;
this.leaderCommit = leaderCommit;
+ this.replicatedToAllIndex = replicatedToAllIndex;
}
private void writeObject(ObjectOutputStream out) throws IOException {
return leaderCommit;
}
+ public long getReplicatedToAllIndex() {
+ return replicatedToAllIndex;
+ }
+
@Override
public String toString() {
final StringBuilder sb =
sb.append(", prevLogTerm=").append(prevLogTerm);
sb.append(", entries=").append(entries);
sb.append(", leaderCommit=").append(leaderCommit);
+ sb.append(", replicatedToAllIndex=").append(replicatedToAllIndex);
sb.append('}');
return sb.toString();
}
from.getPrevLogIndex(),
from.getPrevLogTerm(),
logEntryList,
- from.getLeaderCommit());
+ from.getLeaderCommit(), -1);
return to;
}
}
+ @Test
+ public void testSnapshotPreCommit() {
+ replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E")));
+ replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F")));
+ replicatedLogImpl.append(new MockReplicatedLogEntry(3, 6, new MockPayload("G")));
+ replicatedLogImpl.append(new MockReplicatedLogEntry(3, 7, new MockPayload("H")));
+
+ replicatedLogImpl.snapshotPreCommit(4, 3);
+ assertEquals(3, replicatedLogImpl.size());
+ assertEquals(4, replicatedLogImpl.getSnapshotIndex());
+
+ replicatedLogImpl.snapshotPreCommit(6, 3);
+ assertEquals(1, replicatedLogImpl.size());
+ assertEquals(6, replicatedLogImpl.getSnapshotIndex());
+
+ replicatedLogImpl.snapshotPreCommit(7, 3);
+ assertEquals(0, replicatedLogImpl.size());
+ assertEquals(7, replicatedLogImpl.getSnapshotIndex());
+
+ //running it again on an empty list should not throw exception
+ replicatedLogImpl.snapshotPreCommit(7, 3);
+ assertEquals(0, replicatedLogImpl.size());
+ assertEquals(7, replicatedLogImpl.getSnapshotIndex());
+
+
+ }
+
// create a snapshot for test
public Map<Long, String> takeSnapshot(final int numEntries) {
Map<Long, String> map = new HashMap<>(numEntries);
private ReplicatedLog replicatedLog;
private Map<String, String> peerAddresses = new HashMap<>();
private ConfigParams configParams;
+ private boolean snapshotCaptureInitiated;
public MockRaftActorContext(){
electionTerm = null;
return configParams;
}
+ @Override
+ public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
+ this.snapshotCaptureInitiated = snapshotCaptureInitiated;
+ }
+
+ @Override
+ public boolean isSnapshotCaptureInitiated() {
+ return snapshotCaptureInitiated;
+ }
+
public void setConfigParams(ConfigParams configParams) {
this.configParams = configParams;
}
package org.opendaylight.controller.cluster.raft;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
public class RaftActorTest extends AbstractActorTest {
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final List<Object> state;
private ActorRef roleChangeNotifier;
+ private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
private static final long serialVersionUID = 1L;
}
}
- public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
+ public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
+ DataPersistenceProvider dataPersistenceProvider) {
super(id, peerAddresses, config);
state = new ArrayList<>();
this.delegate = mock(RaftActor.class);
}
}
+ public void waitForInitializeBehaviorComplete() {
+ try {
+ assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
public List<Object> getState() {
return state;
}
recoveryComplete.countDown();
}
+ @Override
+ protected void initializeBehavior() {
+ super.initializeBehavior();
+ initializeBehaviorComplete.countDown();
+ }
+
@Override
protected void applyRecoverySnapshot(byte[] bytes) {
delegate.applyRecoverySnapshot(bytes);
// 4 messages as part of snapshot, which are applied to state
ByteString snapshotBytes = fromObject(Arrays.asList(
- new MockRaftActorContext.MockPayload("A"),
- new MockRaftActorContext.MockPayload("B"),
- new MockRaftActorContext.MockPayload("C"),
- new MockRaftActorContext.MockPayload("D")));
+ new MockRaftActorContext.MockPayload("A"),
+ new MockRaftActorContext.MockPayload("B"),
+ new MockRaftActorContext.MockPayload("C"),
+ new MockRaftActorContext.MockPayload("D")));
Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
}};
}
+ @Test
+ public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ String persistenceId = "leader1";
+
+ ActorRef followerActor1 =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-1", followerActor1.path().toString());
+
+ TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+ MockRaftActor.props(persistenceId, peerAddresses,
+ Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+ MockRaftActor leaderActor = mockActorRef.underlyingActor();
+ leaderActor.getRaftActorContext().setCommitIndex(4);
+ leaderActor.getRaftActorContext().setLastApplied(4);
+ leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+ leaderActor.waitForInitializeBehaviorComplete();
+
+ // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
+
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
+ leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
+
+ assertEquals(8, leaderActor.getReplicatedLog().size());
+
+ leaderActor.onReceiveCommand(new CaptureSnapshot(6,1,4,1));
+ leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
+ verify(leaderActor.delegate).createSnapshot();
+
+ assertEquals(8, leaderActor.getReplicatedLog().size());
+
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+ //fake snapshot on index 5
+ leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 5, 1));
+
+ assertEquals(8, leaderActor.getReplicatedLog().size());
+
+ //fake snapshot on index 6
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+ leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 6, 1));
+ assertEquals(8, leaderActor.getReplicatedLog().size());
+
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ assertEquals(8, leaderActor.getReplicatedLog().size());
+
+ ByteString snapshotBytes = fromObject(Arrays.asList(
+ new MockRaftActorContext.MockPayload("foo-0"),
+ new MockRaftActorContext.MockPayload("foo-1"),
+ new MockRaftActorContext.MockPayload("foo-2"),
+ new MockRaftActorContext.MockPayload("foo-3"),
+ new MockRaftActorContext.MockPayload("foo-4")));
+ leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+ assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+ // capture snapshot reply should remove the snapshotted entries only
+ assertEquals(3, leaderActor.getReplicatedLog().size());
+ assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
+
+ // add another non-replicated entry
+ leaderActor.getReplicatedLog().append(
+ new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
+
+ //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
+ leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 7, 1));
+ assertEquals(2, leaderActor.getReplicatedLog().size());
+ assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
+
+ mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+ }
+ };
+ }
+
+ @Test
+ public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ String persistenceId = "follower1";
+
+ ActorRef leaderActor1 =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("leader", leaderActor1.path().toString());
+
+ TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+ MockRaftActor.props(persistenceId, peerAddresses,
+ Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+ MockRaftActor followerActor = mockActorRef.underlyingActor();
+ followerActor.getRaftActorContext().setCommitIndex(4);
+ followerActor.getRaftActorContext().setLastApplied(4);
+ followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+ followerActor.waitForInitializeBehaviorComplete();
+
+ // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
+ Follower follower = new Follower(followerActor.getRaftActorContext());
+ followerActor.setCurrentBehavior(follower);
+ assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+
+ MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
+ followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
+
+ // log as indices 0-5
+ assertEquals(6, followerActor.getReplicatedLog().size());
+
+ //snapshot on 4
+ followerActor.onReceiveCommand(new CaptureSnapshot(5,1,4,1));
+ followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
+ verify(followerActor.delegate).createSnapshot();
+
+ assertEquals(6, followerActor.getReplicatedLog().size());
+
+ //fake snapshot on index 6
+ List<ReplicatedLogEntry> entries =
+ Arrays.asList(
+ (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+ new MockRaftActorContext.MockPayload("foo-6"))
+ );
+ followerActor.onReceiveCommand(new AppendEntries(1, "leader", 5, 1, entries , 5, 5));
+ assertEquals(7, followerActor.getReplicatedLog().size());
+
+ //fake snapshot on index 7
+ assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+
+ entries =
+ Arrays.asList(
+ (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
+ new MockRaftActorContext.MockPayload("foo-7"))
+ );
+ followerActor.onReceiveCommand(new AppendEntries(1, "leader", 6, 1, entries, 6, 6));
+ assertEquals(8, followerActor.getReplicatedLog().size());
+
+ assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+
+
+ ByteString snapshotBytes = fromObject(Arrays.asList(
+ new MockRaftActorContext.MockPayload("foo-0"),
+ new MockRaftActorContext.MockPayload("foo-1"),
+ new MockRaftActorContext.MockPayload("foo-2"),
+ new MockRaftActorContext.MockPayload("foo-3"),
+ new MockRaftActorContext.MockPayload("foo-4")));
+ followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+ assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+ // capture snapshot reply should remove the snapshotted entries only
+ assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
+ assertEquals(7, followerActor.getReplicatedLog().lastIndex());
+
+ entries =
+ Arrays.asList(
+ (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
+ new MockRaftActorContext.MockPayload("foo-7"))
+ );
+ // send an additional entry 8 with leaderCommit = 7
+ followerActor.onReceiveCommand(new AppendEntries(1, "leader", 7, 1, entries , 7, 7));
+
+ // 7 and 8, as lastapplied is 7
+ assertEquals(2, followerActor.getReplicatedLog().size());
+
+ mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+ }
+ };
+ }
+
private ByteString fromObject(Object snapshot) throws Exception {
ByteArrayOutputStream b = null;
ObjectOutputStream o = null;
context.getTermInformation().update(1000, "test");
AppendEntries appendEntries =
- new AppendEntries(100, "leader-1", 0, 0, null, 101);
+ new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
RaftActorBehavior behavior = createBehavior(context);
new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
AppendEntries appendEntries =
- new AppendEntries(2, "leader-1", -1, 1, entries, 0);
+ new AppendEntries(2, "leader-1", -1, 1, entries, 0, -1);
RaftActorBehavior behavior = createBehavior(context);
}};
}
+ @Test
+ public void testFakeSnapshots() {
+ MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), behaviorActor);
+ AbstractRaftActorBehavior behavior = new Leader(context);
+ context.getTermInformation().update(1, "leader");
+
+ //entry with 1 index=0 entry with replicatedToAllIndex = 0, does not do anything, returns the
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+ context.setLastApplied(0);
+ assertEquals(-1, behavior.fakeSnapshot(0, -1));
+ assertEquals(1, context.getReplicatedLog().size());
+
+ //2 entries, lastApplied still 0, no purging.
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build());
+ context.setLastApplied(0);
+ assertEquals(-1, behavior.fakeSnapshot(0, -1));
+ assertEquals(2, context.getReplicatedLog().size());
+
+ //2 entries, lastApplied still 0, no purging.
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build());
+ context.setLastApplied(1);
+ assertEquals(0, behavior.fakeSnapshot(0, -1));
+ assertEquals(1, context.getReplicatedLog().size());
+
+ //5 entries, lastApplied =2 and replicatedIndex = 3, but since we want to keep the lastapplied, indices 0 and 1 will only get purged
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,5,1).build());
+ context.setLastApplied(2);
+ assertEquals(1, behavior.fakeSnapshot(3, 1));
+ assertEquals(3, context.getReplicatedLog().size());
+
+
+ }
+
protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(
ActorRef actorRef, RaftRPC rpc) {
}
protected AppendEntries createAppendEntriesWithNewerTerm() {
- return new AppendEntries(100, "leader-1", 0, 0, null, 1);
+ return new AppendEntries(100, "leader-1", 0, 0, null, 1, -1);
}
protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() {
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+
import static org.junit.Assert.assertEquals;
public class CandidateTest extends AbstractRaftActorBehaviorTest {
Candidate candidate = new Candidate(createActorContext(getTestActor()));
- candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.<ReplicatedLogEntry>emptyList(), 0));
+ candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.<ReplicatedLogEntry>emptyList(), 0, -1));
final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "AppendEntriesResponse") {
// do not put code outside this method, will run afterwards
// The new commitIndex is 101
AppendEntries appendEntries =
- new AppendEntries(2, "leader-1", 100, 1, entries, 101);
+ new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
RaftActorBehavior raftBehavior =
createBehavior(context).handleMessage(getRef(), appendEntries);
// AppendEntries is now sent with a bigger term
// this will set the receivers term to be the same as the sender's term
AppendEntries appendEntries =
- new AppendEntries(100, "leader-1", 0, 0, null, 101);
+ new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
RaftActorBehavior behavior = createBehavior(context);
// This will not work for a Candidate because as soon as a Candidate
// is created it increments the term
AppendEntries appendEntries =
- new AppendEntries(1, "leader-1", 2, 1, entries, 4);
+ new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
RaftActorBehavior behavior = createBehavior(context);
// This will not work for a Candidate because as soon as a Candidate
// is created it increments the term
AppendEntries appendEntries =
- new AppendEntries(2, "leader-1", 1, 1, entries, 3);
+ new AppendEntries(2, "leader-1", 1, 1, entries, 3, -1);
RaftActorBehavior behavior = createBehavior(context);
new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
AppendEntries appendEntries =
- new AppendEntries(1, "leader-1", 3, 1, entries, 4);
+ new AppendEntries(1, "leader-1", 3, 1, entries, 4, -1);
RaftActorBehavior behavior = createBehavior(context);
new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
AppendEntries appendEntries =
- new AppendEntries(1, "leader-1", 3, 1, entries, 4);
+ new AppendEntries(1, "leader-1", 3, 1, entries, 4, 3);
RaftActorBehavior behavior = createBehavior(context);
*/
package org.opendaylight.controller.cluster.raft.messages;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
/**
* Unit tests for AppendEntries.
*
ReplicatedLogEntry entry2 = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload2"));
- AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L);
+ AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L, -1);
AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected);
@Test
public void testToAndFromSerializable() {
AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L,
- Collections.<ReplicatedLogEntry>emptyList(), 10L);
+ Collections.<ReplicatedLogEntry>emptyList(), 10L, -1);
assertSame("toSerializable", entries, entries.toSerializable());
assertSame("fromSerializable", entries,
@Test
public void testToAndFromLegacySerializable() {
ReplicatedLogEntry entry = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload"));
- AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L);
+ AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L, -1);
Object serializable = entries.toSerializable(RaftVersions.HELIUM_VERSION);
Assert.assertTrue(serializable instanceof AppendEntriesMessages.AppendEntries);
assertEquals("getLeaderCommit", expected.getLeaderCommit(), actual.getLeaderCommit());
assertEquals("getPrevLogIndex", expected.getPrevLogIndex(), actual.getPrevLogIndex());
assertEquals("getPrevLogTerm", expected.getPrevLogTerm(), actual.getPrevLogTerm());
+ assertEquals("getReplicatedToAllIndex", expected.getReplicatedToAllIndex(), actual.getReplicatedToAllIndex());
assertEquals("getEntries size", expected.getEntries().size(), actual.getEntries().size());
Iterator<ReplicatedLogEntry> iter = expected.getEntries().iterator();
@Override public void onReceive(Object message) throws Exception {
final String messageType = message.getClass().getSimpleName();
if(LOG.isDebugEnabled()) {
- LOG.debug("Received message {}", messageType);
+// LOG.debug("Received message {}", messageType);
}
handleReceive(message);
if(LOG.isDebugEnabled()) {
- LOG.debug("Done handling message {}", messageType);
+// LOG.debug("Done handling message {}", messageType);
}
}
package org.opendaylight.controller.cluster.datastore;
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.SerializationUtils;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+
@Deprecated
public class CompositeModificationByteStringPayloadTest {
entries.add(new ReplicatedLogImplEntry(0, 1, payload));
- assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10).toSerializable());
+ assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10, -1).toSerializable());
}
}
});
AppendEntries appendEntries =
- new AppendEntries(1, "member-1", 0, 100, entries, 1);
+ new AppendEntries(1, "member-1", 0, 100, entries, 1, -1);
AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries)
appendEntries.toSerializable(RaftVersions.HELIUM_VERSION);
}
});
- return new AppendEntries(1, "member-1", 0, 100, modification, 1);
+ return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1);
}
public static AppendEntries keyValueAppendEntries() {
}
});
- return new AppendEntries(1, "member-1", 0, 100, modification, 1);
+ return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1);
}
}