In order to minimize the memory usage of the in-memory journal, we can remove the entries from the Leader's journal
once it has been successfully replicated to ALL its followers.
This does not intefere with snapshots, as we capture snapshots on demand.
The followers follow the leader in cleaning the in-memory journal, there by ensuring that all the journals have more or less same entries.
This is done by the leader passing its replicatedToAllIndex as part of the AppendEntries.
Change-Id: I579a1f90d3c4e5d6be4ce699072688788b07bd48
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
private CaptureSnapshot captureSnapshot = null;
- private volatile boolean hasSnapshotCaptureInitiated = false;
-
private Stopwatch recoveryTimer;
private int currentRecoveryBatchCount;
-
-
public RaftActor(String id, Map<String, String> peerAddresses) {
this(id, peerAddresses, Optional.<ConfigParams>absent());
}
self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
// Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
- if(!hasSnapshotCaptureInitiated){
+ if(!context.isSnapshotCaptureInitiated()){
raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
raftContext.getTermInformation().getCurrentTerm());
raftContext.getReplicatedLog().snapshotCommit();
}
captureSnapshot = null;
- hasSnapshotCaptureInitiated = false;
+ context.setSnapshotCaptureInitiated(false);
}
protected boolean hasFollowers(){
getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
// when a snaphsot is being taken, captureSnapshot != null
- if (hasSnapshotCaptureInitiated == false &&
+ if (!context.isSnapshotCaptureInitiated() &&
( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
dataSizeForCheck > dataThreshold)) {
getSelf().tell(new CaptureSnapshot(
lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
null);
- hasSnapshotCaptureInitiated = true;
+ context.setSnapshotCaptureInitiated(true);
}
if(callback != null){
callback.apply(replicatedLogEntry);
*
* @param replicatedLog
*/
- public void setReplicatedLog(ReplicatedLog replicatedLog);
+ void setReplicatedLog(ReplicatedLog replicatedLog);
/**
* @return A representation of the log
*
* @param name
*/
- public void removePeer(String name);
+ void removePeer(String name);
/**
* Given a peerId return the corresponding actor
/**
* @return ConfigParams
*/
- public ConfigParams getConfigParams();
+ ConfigParams getConfigParams();
+
+ void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated);
+
+ boolean isSnapshotCaptureInitiated();
+
}
import akka.actor.Props;
import akka.actor.UntypedActorContext;
import akka.event.LoggingAdapter;
-
import java.util.Map;
import static com.google.common.base.Preconditions.checkState;
private final ConfigParams configParams;
+ private boolean snapshotCaptureInitiated;
+
public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
String id,
ElectionTerm termInformation, long commitIndex,
return configParams;
}
+ @Override
+ public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
+ this.snapshotCaptureInitiated = snapshotCaptureInitiated;
+ }
+
+ @Override
+ public boolean isSnapshotCaptureInitiated() {
+ return snapshotCaptureInitiated;
+ }
+
@Override public void addToPeers(String name, String address) {
peerAddresses.put(name, address);
}
private Optional<ByteString> snapshot;
+ private long replicatedToAllIndex = -1;
+
public AbstractLeader(RaftActorContext context) {
super(context);
applyLogToStateMachine(context.getCommitIndex());
}
+ if (!context.isSnapshotCaptureInitiated()) {
+ purgeInMemoryLog();
+ }
+
return this;
}
+ private void purgeInMemoryLog() {
+ //find the lowest index across followers which has been replicated to all. -1 if there are no followers.
+ // we would delete the in-mem log from that index on, in-order to minimize mem usage
+ // we would also share this info thru AE with the followers so that they can delete their log entries as well.
+ long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE;
+ for (FollowerLogInformation info : followerToLog.values()) {
+ minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
+ }
+
+ replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex);
+ }
+
@Override
protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
final Iterator<ClientRequestTracker> it = trackerList.iterator();
new AppendEntries(currentTerm(), context.getId(),
prevLogIndex(followerNextIndex),
prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex()).toSerializable(),
+ context.getCommitIndex(),
+ replicatedToAllIndex).toSerializable(),
actor()
);
}
return numMajority;
}
+
+ protected long fakeSnapshot(final long minReplicatedToAllIndex, final long currentReplicatedIndex) {
+
+ // we would want to keep the lastApplied as its used while capturing snapshots
+ long tempMin = Math.min(minReplicatedToAllIndex,
+ (context.getLastApplied() > -1 ? context.getLastApplied() - 1 : -1));
+
+ if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
+ context.getReplicatedLog().snapshotPreCommit(tempMin, context.getTermInformation().getCurrentTerm());
+ context.getReplicatedLog().snapshotCommit();
+ return tempMin;
+ }
+ return currentReplicatedIndex;
+ }
}
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
lastIndex(), lastTerm()), actor());
+ if (!context.isSnapshotCaptureInitiated()) {
+ fakeSnapshot(appendEntries.getReplicatedToAllIndex(), appendEntries.getReplicatedToAllIndex());
+ }
+
return this;
}
// leader's commitIndex
private final long leaderCommit;
+ // index which has been replicated successfully to all followers, -1 if none
+ private final long replicatedToAllIndex;
+
public AppendEntries(long term, String leaderId, long prevLogIndex,
- long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit) {
+ long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex) {
super(term);
this.leaderId = leaderId;
this.prevLogIndex = prevLogIndex;
this.prevLogTerm = prevLogTerm;
this.entries = entries;
this.leaderCommit = leaderCommit;
+ this.replicatedToAllIndex = replicatedToAllIndex;
}
private void writeObject(ObjectOutputStream out) throws IOException {
return leaderCommit;
}
+ public long getReplicatedToAllIndex() {
+ return replicatedToAllIndex;
+ }
+
@Override
public String toString() {
final StringBuilder sb =
sb.append(", prevLogTerm=").append(prevLogTerm);
sb.append(", entries=").append(entries);
sb.append(", leaderCommit=").append(leaderCommit);
+ sb.append(", replicatedToAllIndex=").append(replicatedToAllIndex);
sb.append('}');
return sb.toString();
}
from.getPrevLogIndex(),
from.getPrevLogTerm(),
logEntryList,
- from.getLeaderCommit());
+ from.getLeaderCommit(), -1);
return to;
}
}
+ @Test
+ public void testSnapshotPreCommit() {
+ replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E")));
+ replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F")));
+ replicatedLogImpl.append(new MockReplicatedLogEntry(3, 6, new MockPayload("G")));
+ replicatedLogImpl.append(new MockReplicatedLogEntry(3, 7, new MockPayload("H")));
+
+ replicatedLogImpl.snapshotPreCommit(4, 3);
+ assertEquals(3, replicatedLogImpl.size());
+ assertEquals(4, replicatedLogImpl.getSnapshotIndex());
+
+ replicatedLogImpl.snapshotPreCommit(6, 3);
+ assertEquals(1, replicatedLogImpl.size());
+ assertEquals(6, replicatedLogImpl.getSnapshotIndex());
+
+ replicatedLogImpl.snapshotPreCommit(7, 3);
+ assertEquals(0, replicatedLogImpl.size());
+ assertEquals(7, replicatedLogImpl.getSnapshotIndex());
+
+ //running it again on an empty list should not throw exception
+ replicatedLogImpl.snapshotPreCommit(7, 3);
+ assertEquals(0, replicatedLogImpl.size());
+ assertEquals(7, replicatedLogImpl.getSnapshotIndex());
+
+
+ }
+
// create a snapshot for test
public Map<Long, String> takeSnapshot(final int numEntries) {
Map<Long, String> map = new HashMap<>(numEntries);
private ReplicatedLog replicatedLog;
private Map<String, String> peerAddresses = new HashMap<>();
private ConfigParams configParams;
+ private boolean snapshotCaptureInitiated;
public MockRaftActorContext(){
electionTerm = null;
return configParams;
}
+ @Override
+ public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
+ this.snapshotCaptureInitiated = snapshotCaptureInitiated;
+ }
+
+ @Override
+ public boolean isSnapshotCaptureInitiated() {
+ return snapshotCaptureInitiated;
+ }
+
public void setConfigParams(ConfigParams configParams) {
this.configParams = configParams;
}
package org.opendaylight.controller.cluster.raft;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
public class RaftActorTest extends AbstractActorTest {
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final List<Object> state;
private ActorRef roleChangeNotifier;
+ private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
private static final long serialVersionUID = 1L;
}
}
- public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
+ public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
+ DataPersistenceProvider dataPersistenceProvider) {
super(id, peerAddresses, config);
state = new ArrayList<>();
this.delegate = mock(RaftActor.class);
}
}
+ public void waitForInitializeBehaviorComplete() {
+ try {
+ assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
public List<Object> getState() {
return state;
}
recoveryComplete.countDown();
}
+ @Override
+ protected void initializeBehavior() {
+ super.initializeBehavior();
+ initializeBehaviorComplete.countDown();
+ }
+
@Override
protected void applyRecoverySnapshot(byte[] bytes) {
delegate.applyRecoverySnapshot(bytes);
// 4 messages as part of snapshot, which are applied to state
ByteString snapshotBytes = fromObject(Arrays.asList(
- new MockRaftActorContext.MockPayload("A"),
- new MockRaftActorContext.MockPayload("B"),
- new MockRaftActorContext.MockPayload("C"),
- new MockRaftActorContext.MockPayload("D")));
+ new MockRaftActorContext.MockPayload("A"),
+ new MockRaftActorContext.MockPayload("B"),
+ new MockRaftActorContext.MockPayload("C"),
+ new MockRaftActorContext.MockPayload("D")));
Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
}};
}
+ @Test
+ public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ String persistenceId = "leader1";
+
+ ActorRef followerActor1 =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-1", followerActor1.path().toString());
+
+ TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+ MockRaftActor.props(persistenceId, peerAddresses,
+ Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+ MockRaftActor leaderActor = mockActorRef.underlyingActor();
+ leaderActor.getRaftActorContext().setCommitIndex(4);
+ leaderActor.getRaftActorContext().setLastApplied(4);
+ leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+ leaderActor.waitForInitializeBehaviorComplete();
+
+ // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
+
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
+ leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
+
+ assertEquals(8, leaderActor.getReplicatedLog().size());
+
+ leaderActor.onReceiveCommand(new CaptureSnapshot(6,1,4,1));
+ leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
+ verify(leaderActor.delegate).createSnapshot();
+
+ assertEquals(8, leaderActor.getReplicatedLog().size());
+
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+ //fake snapshot on index 5
+ leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 5, 1));
+
+ assertEquals(8, leaderActor.getReplicatedLog().size());
+
+ //fake snapshot on index 6
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+ leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 6, 1));
+ assertEquals(8, leaderActor.getReplicatedLog().size());
+
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ assertEquals(8, leaderActor.getReplicatedLog().size());
+
+ ByteString snapshotBytes = fromObject(Arrays.asList(
+ new MockRaftActorContext.MockPayload("foo-0"),
+ new MockRaftActorContext.MockPayload("foo-1"),
+ new MockRaftActorContext.MockPayload("foo-2"),
+ new MockRaftActorContext.MockPayload("foo-3"),
+ new MockRaftActorContext.MockPayload("foo-4")));
+ leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+ assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+ // capture snapshot reply should remove the snapshotted entries only
+ assertEquals(3, leaderActor.getReplicatedLog().size());
+ assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
+
+ // add another non-replicated entry
+ leaderActor.getReplicatedLog().append(
+ new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
+
+ //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
+ leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 7, 1));
+ assertEquals(2, leaderActor.getReplicatedLog().size());
+ assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
+
+ mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+ }
+ };
+ }
+
+ @Test
+ public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ String persistenceId = "follower1";
+
+ ActorRef leaderActor1 =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("leader", leaderActor1.path().toString());
+
+ TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+ MockRaftActor.props(persistenceId, peerAddresses,
+ Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+ MockRaftActor followerActor = mockActorRef.underlyingActor();
+ followerActor.getRaftActorContext().setCommitIndex(4);
+ followerActor.getRaftActorContext().setLastApplied(4);
+ followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+ followerActor.waitForInitializeBehaviorComplete();
+
+ // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
+ Follower follower = new Follower(followerActor.getRaftActorContext());
+ followerActor.setCurrentBehavior(follower);
+ assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+
+ MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
+ followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
+
+ // log as indices 0-5
+ assertEquals(6, followerActor.getReplicatedLog().size());
+
+ //snapshot on 4
+ followerActor.onReceiveCommand(new CaptureSnapshot(5,1,4,1));
+ followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
+ verify(followerActor.delegate).createSnapshot();
+
+ assertEquals(6, followerActor.getReplicatedLog().size());
+
+ //fake snapshot on index 6
+ List<ReplicatedLogEntry> entries =
+ Arrays.asList(
+ (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+ new MockRaftActorContext.MockPayload("foo-6"))
+ );
+ followerActor.onReceiveCommand(new AppendEntries(1, "leader", 5, 1, entries , 5, 5));
+ assertEquals(7, followerActor.getReplicatedLog().size());
+
+ //fake snapshot on index 7
+ assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+
+ entries =
+ Arrays.asList(
+ (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
+ new MockRaftActorContext.MockPayload("foo-7"))
+ );
+ followerActor.onReceiveCommand(new AppendEntries(1, "leader", 6, 1, entries, 6, 6));
+ assertEquals(8, followerActor.getReplicatedLog().size());
+
+ assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+
+
+ ByteString snapshotBytes = fromObject(Arrays.asList(
+ new MockRaftActorContext.MockPayload("foo-0"),
+ new MockRaftActorContext.MockPayload("foo-1"),
+ new MockRaftActorContext.MockPayload("foo-2"),
+ new MockRaftActorContext.MockPayload("foo-3"),
+ new MockRaftActorContext.MockPayload("foo-4")));
+ followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+ assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+ // capture snapshot reply should remove the snapshotted entries only
+ assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
+ assertEquals(7, followerActor.getReplicatedLog().lastIndex());
+
+ entries =
+ Arrays.asList(
+ (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
+ new MockRaftActorContext.MockPayload("foo-7"))
+ );
+ // send an additional entry 8 with leaderCommit = 7
+ followerActor.onReceiveCommand(new AppendEntries(1, "leader", 7, 1, entries , 7, 7));
+
+ // 7 and 8, as lastapplied is 7
+ assertEquals(2, followerActor.getReplicatedLog().size());
+
+ mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+ }
+ };
+ }
+
private ByteString fromObject(Object snapshot) throws Exception {
ByteArrayOutputStream b = null;
ObjectOutputStream o = null;
context.getTermInformation().update(1000, "test");
AppendEntries appendEntries =
- new AppendEntries(100, "leader-1", 0, 0, null, 101);
+ new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
RaftActorBehavior behavior = createBehavior(context);
new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
AppendEntries appendEntries =
- new AppendEntries(2, "leader-1", -1, 1, entries, 0);
+ new AppendEntries(2, "leader-1", -1, 1, entries, 0, -1);
RaftActorBehavior behavior = createBehavior(context);
}};
}
+ @Test
+ public void testFakeSnapshots() {
+ MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), behaviorActor);
+ AbstractRaftActorBehavior behavior = new Leader(context);
+ context.getTermInformation().update(1, "leader");
+
+ //entry with 1 index=0 entry with replicatedToAllIndex = 0, does not do anything, returns the
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+ context.setLastApplied(0);
+ assertEquals(-1, behavior.fakeSnapshot(0, -1));
+ assertEquals(1, context.getReplicatedLog().size());
+
+ //2 entries, lastApplied still 0, no purging.
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build());
+ context.setLastApplied(0);
+ assertEquals(-1, behavior.fakeSnapshot(0, -1));
+ assertEquals(2, context.getReplicatedLog().size());
+
+ //2 entries, lastApplied still 0, no purging.
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build());
+ context.setLastApplied(1);
+ assertEquals(0, behavior.fakeSnapshot(0, -1));
+ assertEquals(1, context.getReplicatedLog().size());
+
+ //5 entries, lastApplied =2 and replicatedIndex = 3, but since we want to keep the lastapplied, indices 0 and 1 will only get purged
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,5,1).build());
+ context.setLastApplied(2);
+ assertEquals(1, behavior.fakeSnapshot(3, 1));
+ assertEquals(3, context.getReplicatedLog().size());
+
+
+ }
+
protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(
ActorRef actorRef, RaftRPC rpc) {
}
protected AppendEntries createAppendEntriesWithNewerTerm() {
- return new AppendEntries(100, "leader-1", 0, 0, null, 1);
+ return new AppendEntries(100, "leader-1", 0, 0, null, 1, -1);
}
protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() {
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+
import static org.junit.Assert.assertEquals;
public class CandidateTest extends AbstractRaftActorBehaviorTest {
Candidate candidate = new Candidate(createActorContext(getTestActor()));
- candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.<ReplicatedLogEntry>emptyList(), 0));
+ candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.<ReplicatedLogEntry>emptyList(), 0, -1));
final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "AppendEntriesResponse") {
// do not put code outside this method, will run afterwards
// The new commitIndex is 101
AppendEntries appendEntries =
- new AppendEntries(2, "leader-1", 100, 1, entries, 101);
+ new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
RaftActorBehavior raftBehavior =
createBehavior(context).handleMessage(getRef(), appendEntries);
// AppendEntries is now sent with a bigger term
// this will set the receivers term to be the same as the sender's term
AppendEntries appendEntries =
- new AppendEntries(100, "leader-1", 0, 0, null, 101);
+ new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
RaftActorBehavior behavior = createBehavior(context);
// This will not work for a Candidate because as soon as a Candidate
// is created it increments the term
AppendEntries appendEntries =
- new AppendEntries(1, "leader-1", 2, 1, entries, 4);
+ new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
RaftActorBehavior behavior = createBehavior(context);
// This will not work for a Candidate because as soon as a Candidate
// is created it increments the term
AppendEntries appendEntries =
- new AppendEntries(2, "leader-1", 1, 1, entries, 3);
+ new AppendEntries(2, "leader-1", 1, 1, entries, 3, -1);
RaftActorBehavior behavior = createBehavior(context);
new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
AppendEntries appendEntries =
- new AppendEntries(1, "leader-1", 3, 1, entries, 4);
+ new AppendEntries(1, "leader-1", 3, 1, entries, 4, -1);
RaftActorBehavior behavior = createBehavior(context);
new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
AppendEntries appendEntries =
- new AppendEntries(1, "leader-1", 3, 1, entries, 4);
+ new AppendEntries(1, "leader-1", 3, 1, entries, 4, 3);
RaftActorBehavior behavior = createBehavior(context);
*/
package org.opendaylight.controller.cluster.raft.messages;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
/**
* Unit tests for AppendEntries.
*
ReplicatedLogEntry entry2 = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload2"));
- AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L);
+ AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L, -1);
AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected);
@Test
public void testToAndFromSerializable() {
AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L,
- Collections.<ReplicatedLogEntry>emptyList(), 10L);
+ Collections.<ReplicatedLogEntry>emptyList(), 10L, -1);
assertSame("toSerializable", entries, entries.toSerializable());
assertSame("fromSerializable", entries,
@Test
public void testToAndFromLegacySerializable() {
ReplicatedLogEntry entry = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload"));
- AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L);
+ AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L, -1);
Object serializable = entries.toSerializable(RaftVersions.HELIUM_VERSION);
Assert.assertTrue(serializable instanceof AppendEntriesMessages.AppendEntries);
assertEquals("getLeaderCommit", expected.getLeaderCommit(), actual.getLeaderCommit());
assertEquals("getPrevLogIndex", expected.getPrevLogIndex(), actual.getPrevLogIndex());
assertEquals("getPrevLogTerm", expected.getPrevLogTerm(), actual.getPrevLogTerm());
+ assertEquals("getReplicatedToAllIndex", expected.getReplicatedToAllIndex(), actual.getReplicatedToAllIndex());
assertEquals("getEntries size", expected.getEntries().size(), actual.getEntries().size());
Iterator<ReplicatedLogEntry> iter = expected.getEntries().iterator();
@Override public void onReceive(Object message) throws Exception {
final String messageType = message.getClass().getSimpleName();
if(LOG.isDebugEnabled()) {
- LOG.debug("Received message {}", messageType);
+// LOG.debug("Received message {}", messageType);
}
handleReceive(message);
if(LOG.isDebugEnabled()) {
- LOG.debug("Done handling message {}", messageType);
+// LOG.debug("Done handling message {}", messageType);
}
}
package org.opendaylight.controller.cluster.datastore;
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.SerializationUtils;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+
@Deprecated
public class CompositeModificationByteStringPayloadTest {
entries.add(new ReplicatedLogImplEntry(0, 1, payload));
- assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10).toSerializable());
+ assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10, -1).toSerializable());
}
}
});
AppendEntries appendEntries =
- new AppendEntries(1, "member-1", 0, 100, entries, 1);
+ new AppendEntries(1, "member-1", 0, 100, entries, 1, -1);
AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries)
appendEntries.toSerializable(RaftVersions.HELIUM_VERSION);
}
});
- return new AppendEntries(1, "member-1", 0, 100, modification, 1);
+ return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1);
}
public static AppendEntries keyValueAppendEntries() {
}
});
- return new AppendEntries(1, "member-1", 0, 100, modification, 1);
+ return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1);
}
}