Fixed adjusted index logic.
Added tests
Added Reinstate Node in TestDriver.
Changes related to recovery from snapshot
Review changes
Rebased with latest
Tasks to be done:
1. Snapshot capture criteria
2. Handling Snapshot failure
3. Configuration Interface
4. Install snapshot
Change-Id: Ie0d8341f501fef87f48e8ea6f5d6b6e878d1abe3
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
Reinstate from snapshot + review changes
Change-Id: Ie0d8341f501fef87f48e8ea6f5d6b6e878d1abe3
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
TestDriver changes
Change-Id: Ie0d8341f501fef87f48e8ea6f5d6b6e878d1abe3
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
Log interpolation change
Change-Id: Ie0d8341f501fef87f48e8ea6f5d6b6e878d1abe3
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
Rebased.
Change-Id: Ie0d8341f501fef87f48e8ea6f5d6b6e878d1abe3
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
}
} else if (message instanceof PrintState) {
- LOG.debug("State of the node:"+getId() + " has = "+state.size() + " entries");
+ LOG.debug("State of the node:{} has entries={}, {}",
+ getId(), state.size(), getReplicatedLogState());
} else if (message instanceof PrintRole) {
- LOG.debug(getId() + " = " + getRaftState());
+ LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),getPeers());
+
} else {
super.onReceiveCommand(message);
}
@Override protected void applySnapshot(Object snapshot) {
state.clear();
state.putAll((HashMap) snapshot);
+ LOG.debug("Snapshot applied to state :" + ((HashMap) snapshot).size());
}
@Override public void onReceiveRecover(Object message) {
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
/**
* This is a test driver for testing akka-raft implementation
* Its uses ExampleActors and threads to push content(key-vals) to these actors
* Each ExampleActor can have one or more ClientActors. Each ClientActor spawns
- * a thread and starts push logs to the actor its assignged to.
+ * a thread and starts push logs to the actor its assigned to.
*/
public class TestDriver {
private static Map<String, String> allPeers = new HashMap<>();
private static Map<String, ActorRef> clientActorRefs = new HashMap<String, ActorRef>();
private static Map<String, ActorRef> actorRefs = new HashMap<String, ActorRef>();
- private static LogGenerator logGenerator = new LogGenerator();;
+ private static LogGenerator logGenerator = new LogGenerator();
+ private int nameCounter = 0;
/**
* Create nodes, add clients and start logging.
* createNodes:{num}
* addNodes:{num}
* stopNode:{nodeName}
+ * reinstateNode:{nodeName}
* addClients:{num}
* addClientsToNode:{nodeName, num}
* startLogging
String[] arr = command.split(":");
td.stopNode(arr[1]);
+ } else if (command.startsWith("reinstateNode")) {
+ String[] arr = command.split(":");
+ td.reinstateNode(arr[1]);
+
} else if (command.startsWith("startLogging")) {
td.startAllLogging();
public void createNodes(int num) {
for (int i=0; i < num; i++) {
- int rand = getUnusedRandom(num);
- allPeers.put("example-"+rand, "akka://default/user/example-"+rand);
+ nameCounter = nameCounter + 1;
+ allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
}
for (String s : allPeers.keySet()) {
public void addNodes(int num) {
Map<String, String> newPeers = new HashMap<>();
for (int i=0; i < num; i++) {
- int rand = getUnusedRandom(num);
- newPeers.put("example-"+rand, "akka://default/user/example-"+rand);
- allPeers.put("example-"+rand, "akka://default/user/example-"+rand);
+ nameCounter = nameCounter + 1;
+ newPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
+ allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
}
Map<String, ActorRef> newActorRefs = new HashMap<String, ActorRef>(num);
public void addClientsToNode(String actorName, int num) {
ActorRef actorRef = actorRefs.get(actorName);
for (int i=0; i < num; i++) {
- String clientName = "client-" + i + "-" + actorRef;
+ String clientName = "client-" + i + "-" + actorName;
clientActorRefs.put(clientName,
actorSystem.actorOf(ClientActor.props(actorRef), clientName));
System.out.println("Added client-node:" + clientName);
public void stopNode(String actorName) {
ActorRef actorRef = actorRefs.get(actorName);
- String clientName = "client-"+actorName;
- if(clientActorRefs.containsKey(clientName)) {
- actorSystem.stop(clientActorRefs.get(clientName));
- clientActorRefs.remove(clientName);
+
+ for (Map.Entry<String,ActorRef> entry : clientActorRefs.entrySet()) {
+ if (entry.getKey().endsWith(actorName)) {
+ actorSystem.stop(entry.getValue());
+ }
}
+
actorSystem.stop(actorRef);
actorRefs.remove(actorName);
}
allPeers.remove(actorName);
+ }
+
+ public void reinstateNode(String actorName) {
+ String address = "akka://default/user/"+actorName;
+ allPeers.put(actorName, address);
+ ActorRef exampleActor = actorSystem.actorOf(ExampleActor.props(actorName, withoutPeer(actorName)), actorName);
+
+ for (ActorRef actor : actorRefs.values()) {
+ actor.tell(new AddRaftPeer(actorName, address), null);
+ }
+
+ actorRefs.put(actorName, exampleActor);
+
+ addClientsToNode(actorName, 1);
}
public void startAllLogging() {
return null;
}
- private int getUnusedRandom(int num) {
- int rand = -1;
- do {
- rand = (new Random()).nextInt(num * num);
- } while (allPeers.keySet().contains("example-"+rand));
-
- return rand;
- }
private static Map<String, String> withoutPeer(String peerId) {
Map<String, String> without = new ConcurrentHashMap<>(allPeers);
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Abstract class handling the mapping of
+ * logical LogEntry Index and the physical list index.
+ */
+public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
+
+ protected final List<ReplicatedLogEntry> journal;
+ protected final Object snapshot;
+ protected long snapshotIndex = -1;
+ protected long snapshotTerm = -1;
+
+ public AbstractReplicatedLogImpl(Object state, long snapshotIndex,
+ long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
+ this.snapshot = state;
+ this.snapshotIndex = snapshotIndex;
+ this.snapshotTerm = snapshotTerm;
+ this.journal = new ArrayList<>(unAppliedEntries);
+ }
+
+
+ public AbstractReplicatedLogImpl() {
+ this.snapshot = null;
+ this.journal = new ArrayList<>();
+ }
+
+ protected int adjustedIndex(long logEntryIndex) {
+ if(snapshotIndex < 0){
+ return (int) logEntryIndex;
+ }
+ return (int) (logEntryIndex - (snapshotIndex + 1));
+ }
+
+ @Override
+ public ReplicatedLogEntry get(long logEntryIndex) {
+ int adjustedIndex = adjustedIndex(logEntryIndex);
+
+ if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+ // physical index should be less than list size and >= 0
+ return null;
+ }
+
+ return journal.get(adjustedIndex);
+ }
+
+ @Override
+ public ReplicatedLogEntry last() {
+ if (journal.isEmpty()) {
+ return null;
+ }
+ // get the last entry directly from the physical index
+ return journal.get(journal.size() - 1);
+ }
+
+ @Override
+ public long lastIndex() {
+ if (journal.isEmpty()) {
+ // it can happen that after snapshot, all the entries of the
+ // journal are trimmed till lastApplied, so lastIndex = snapshotIndex
+ return snapshotIndex;
+ }
+ return last().getIndex();
+ }
+
+ @Override
+ public long lastTerm() {
+ if (journal.isEmpty()) {
+ // it can happen that after snapshot, all the entries of the
+ // journal are trimmed till lastApplied, so lastTerm = snapshotTerm
+ return snapshotTerm;
+ }
+ return last().getTerm();
+ }
+
+ @Override
+ public void removeFrom(long logEntryIndex) {
+ int adjustedIndex = adjustedIndex(logEntryIndex);
+ if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+ // physical index should be less than list size and >= 0
+ return;
+ }
+ journal.subList(adjustedIndex , journal.size()).clear();
+ }
+
+ @Override
+ public void append(ReplicatedLogEntry replicatedLogEntry) {
+ journal.add(replicatedLogEntry);
+ }
+
+ @Override
+ public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
+ int adjustedIndex = adjustedIndex(logEntryIndex);
+ int size = journal.size();
+ List<ReplicatedLogEntry> entries = new ArrayList<>(100);
+ if (adjustedIndex >= 0 && adjustedIndex < size) {
+ // physical index should be less than list size and >= 0
+ entries.addAll(journal.subList(adjustedIndex, size));
+ }
+ return entries;
+ }
+
+ @Override
+ public long size() {
+ return journal.size();
+ }
+
+ @Override
+ public boolean isPresent(long logEntryIndex) {
+ if (logEntryIndex > lastIndex()) {
+ // if the request logical index is less than the last present in the list
+ return false;
+ }
+ int adjustedIndex = adjustedIndex(logEntryIndex);
+ return (adjustedIndex >= 0);
+ }
+
+ @Override
+ public boolean isInSnapshot(long logEntryIndex) {
+ return logEntryIndex <= snapshotIndex;
+ }
+
+ @Override
+ public Object getSnapshot() {
+ return snapshot;
+ }
+
+ @Override
+ public long getSnapshotIndex() {
+ return snapshotIndex;
+ }
+
+ @Override
+ public long getSnapshotTerm() {
+ return snapshotTerm;
+ }
+
+ @Override
+ public abstract void appendAndPersist(ReplicatedLogEntry replicatedLogEntry);
+
+ @Override
+ public abstract void removeFromAndPersist(long index);
+}
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
protected final LoggingAdapter LOG =
Logging.getLogger(getContext().system(), this);
+ private static final int SNAPSHOT_ENTRY_COUNT = 100000;
+
/**
* The current state determines the current behavior of a RaftActor
* A Raft Actor always starts off in the Follower State
@Override public void onReceiveRecover(Object message) {
if (message instanceof SnapshotOffer) {
+ LOG.debug("SnapshotOffer called..");
SnapshotOffer offer = (SnapshotOffer) message;
Snapshot snapshot = (Snapshot) offer.snapshot();
// when we need to install it on a peer
replicatedLog = new ReplicatedLogImpl(snapshot);
+ context.setReplicatedLog(replicatedLog);
+
+ LOG.debug("Applied snapshot to replicatedLog. " +
+ "snapshotIndex={}, snapshotTerm={}, journal-size={}",
+ replicatedLog.snapshotIndex, replicatedLog.snapshotTerm,
+ replicatedLog.size());
+
// Apply the snapshot to the actors state
applySnapshot(snapshot.getState());
context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
} else if (message instanceof RecoveryCompleted) {
LOG.debug(
- "Last index in log : " + replicatedLog.lastIndex());
+ "RecoveryCompleted - Switching actor to Follower - " +
+ "Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
+ "journal-size={}",
+ replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
+ replicatedLog.snapshotTerm, replicatedLog.size());
currentBehavior = switchBehavior(RaftState.Follower);
}
}
}
}
+ public java.util.Set<String> getPeers() {
+ return context.getPeerAddresses().keySet();
+ }
+
+ protected String getReplicatedLogState() {
+ return "snapshotIndex=" + context.getReplicatedLog().getSnapshotIndex()
+ + ", snapshotTerm=" + context.getReplicatedLog().getSnapshotTerm()
+ + ", im-mem journal size=" + context.getReplicatedLog().size();
+ }
/**
}
private void trimPersistentData(long sequenceNumber) {
- // Trim snapshots
+ // Trim akka snapshots
// FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
// For now guessing that it is ANDed.
deleteSnapshots(new SnapshotSelectionCriteria(
- sequenceNumber - 100000, 43200000));
+ sequenceNumber - SNAPSHOT_ENTRY_COUNT, 43200000));
- // Trim journal
+ // Trim akka journal
deleteMessages(sequenceNumber);
}
- private class ReplicatedLogImpl implements ReplicatedLog {
- private final List<ReplicatedLogEntry> journal;
- private final Object snapshot;
- private long snapshotIndex = -1;
- private long snapshotTerm = -1;
+ private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
public ReplicatedLogImpl(Snapshot snapshot) {
- this.snapshot = snapshot.getState();
- this.snapshotIndex = snapshot.getLastAppliedIndex();
- this.snapshotTerm = snapshot.getLastAppliedTerm();
-
- this.journal = new ArrayList<>(snapshot.getUnAppliedEntries());
+ super(snapshot.getState(),
+ snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
+ snapshot.getUnAppliedEntries());
}
public ReplicatedLogImpl() {
- this.snapshot = null;
- this.journal = new ArrayList<>();
- }
-
- @Override public ReplicatedLogEntry get(long index) {
- int adjustedIndex = adjustedIndex(index);
-
- if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
- return null;
- }
-
- return journal.get(adjustedIndex);
- }
-
- @Override public ReplicatedLogEntry last() {
- if (journal.size() == 0) {
- return null;
- }
- return get(journal.size() - 1);
- }
-
- @Override public long lastIndex() {
- if (journal.size() == 0) {
- return -1;
- }
-
- return last().getIndex();
- }
-
- @Override public long lastTerm() {
- if (journal.size() == 0) {
- return -1;
- }
-
- return last().getTerm();
- }
-
-
- @Override public void removeFrom(long index) {
- int adjustedIndex = adjustedIndex(index);
-
- if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
- return;
- }
-
- journal.subList(adjustedIndex , journal.size()).clear();
+ super();
}
+ @Override public void removeFromAndPersist(long logEntryIndex) {
+ int adjustedIndex = adjustedIndex(logEntryIndex);
- @Override public void removeFromAndPersist(long index) {
- int adjustedIndex = adjustedIndex(index);
-
- if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+ if (adjustedIndex < 0) {
return;
}
//FIXME : Doing nothing for now
}
});
-
-
- }
-
- @Override public void append(
- final ReplicatedLogEntry replicatedLogEntry) {
- journal.add(replicatedLogEntry);
- }
-
- @Override public List<ReplicatedLogEntry> getFrom(long index) {
- int adjustedIndex = adjustedIndex(index);
-
- List<ReplicatedLogEntry> entries = new ArrayList<>(100);
- if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
- return entries;
- }
-
-
- for (int i = adjustedIndex;
- i < journal.size(); i++) {
- entries.add(journal.get(i));
- }
- return entries;
}
@Override public void appendAndPersist(
new Procedure<ReplicatedLogEntry>() {
public void apply(ReplicatedLogEntry evt) throws Exception {
// FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
- if (size() > 100000) {
- ReplicatedLogEntry lastAppliedEntry =
- get(context.getLastApplied());
+ if (journal.size() > SNAPSHOT_ENTRY_COUNT) {
+ LOG.info("Initiating Snapshot Capture..");
long lastAppliedIndex = -1;
long lastAppliedTerm = -1;
+
+ ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
if (lastAppliedEntry != null) {
lastAppliedIndex = lastAppliedEntry.getIndex();
lastAppliedTerm = lastAppliedEntry.getTerm();
}
- saveSnapshot(Snapshot.create(createSnapshot(),
+ LOG.debug("Snapshot Capture logSize: {}", journal.size());
+ LOG.debug("Snapshot Capture lastApplied:{} ", context.getLastApplied());
+ LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
+ LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+
+ // create a snapshot object from the state provided and save it
+ // when snapshot is saved async, SaveSnapshotSuccess is raised.
+ Snapshot sn = Snapshot.create(createSnapshot(),
getFrom(context.getLastApplied() + 1),
lastIndex(), lastTerm(), lastAppliedIndex,
- lastAppliedTerm));
+ lastAppliedTerm);
+ saveSnapshot(sn);
+
+ LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+
+ //be greedy and remove entries from in-mem journal which are in the snapshot
+ // and update snapshotIndex and snapshotTerm without waiting for the success,
+ // TODO: damage-recovery to be done on failure
+ journal.subList(0, (int) (lastAppliedIndex - snapshotIndex)).clear();
+ snapshotIndex = lastAppliedIndex;
+ snapshotTerm = lastAppliedTerm;
+
+ LOG.info("Removed in-memory snapshotted entries, " +
+ "adjusted snaphsotIndex:{}" +
+ "and term:{}", snapshotIndex, lastAppliedTerm);
}
// Send message for replication
if (clientActor != null) {
);
}
- @Override public long size() {
- return journal.size() + snapshotIndex + 1;
- }
-
- @Override public boolean isPresent(long index) {
- int adjustedIndex = adjustedIndex(index);
-
- if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
- return false;
- }
- return true;
- }
-
- @Override public boolean isInSnapshot(long index) {
- return index <= snapshotIndex;
- }
-
- @Override public Object getSnapshot() {
- return snapshot;
- }
-
- @Override public long getSnapshotIndex() {
- return snapshotIndex;
- }
-
- @Override public long getSnapshotTerm() {
- return snapshotTerm;
- }
-
- private int adjustedIndex(long index) {
- if(snapshotIndex < 0){
- return (int) index;
- }
- return (int) (index - snapshotIndex);
- }
}
-
-
-
private static class DeleteEntries implements Serializable {
private final int fromIndex;
public long getLastAppliedTerm() {
return lastAppliedTerm;
}
+
+ public String getLogMessage() {
+ StringBuilder sb = new StringBuilder();
+ return sb.append("Snapshot={")
+ .append("lastTerm:" + this.getLastTerm() + ", ")
+ .append("LastAppliedIndex:" + this.getLastAppliedIndex() + ", ")
+ .append("LastAppliedTerm:" + this.getLastAppliedTerm() + ", ")
+ .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size() + "}")
+ .toString();
+
+ }
}
private class ElectionTermImpl implements ElectionTerm {
*/
void setLastApplied(long lastApplied);
+ /**
+ *
+ * @param replicatedLog
+ */
+ public void setReplicatedLog(ReplicatedLog replicatedLog);
+
/**
* @return A representation of the log
*/
private long lastApplied;
- private final ReplicatedLog replicatedLog;
+ private ReplicatedLog replicatedLog;
private final Map<String, String> peerAddresses;
this.lastApplied = lastApplied;
}
+ @Override public void setReplicatedLog(ReplicatedLog replicatedLog) {
+ this.replicatedLog = replicatedLog;
+ }
+
@Override public ReplicatedLog getReplicatedLog() {
return replicatedLog;
}
int addEntriesFrom = 0;
if (context.getReplicatedLog().size() > 0) {
- // Find the entry up until which the one that is not in the
- // follower's log
- for (int i = 0;
- i < appendEntries.getEntries()
- .size(); i++, addEntriesFrom++) {
- ReplicatedLogEntry matchEntry =
- appendEntries.getEntries().get(i);
- ReplicatedLogEntry newEntry = context.getReplicatedLog()
- .get(matchEntry.getIndex());
+ // Find the entry up until which the one that is not in the follower's log
+ for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
+ ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i);
+ ReplicatedLogEntry newEntry = context.getReplicatedLog().get(matchEntry.getIndex());
if (newEntry == null) {
//newEntry not found in the log
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import static org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
+/**
+*
+*/
+public class AbstractReplicatedLogImplTest {
+
+ private MockAbstractReplicatedLogImpl replicatedLogImpl;
+
+ @Before
+ public void setUp() {
+ replicatedLogImpl = new MockAbstractReplicatedLogImpl();
+ }
+
+ @After
+ public void tearDown() {
+ replicatedLogImpl.journal.clear();
+ replicatedLogImpl.setSnapshotIndex(-1);
+ replicatedLogImpl.setSnapshotTerm(-1);
+ replicatedLogImpl = null;
+ }
+
+ @Test
+ public void testIndexOperations() {
+ // create a set of initial entries in the in-memory log
+ replicatedLogImpl.append(new MockReplicatedLogEntry(1, 0, new MockPayload("A")));
+ replicatedLogImpl.append(new MockReplicatedLogEntry(1, 1, new MockPayload("B")));
+ replicatedLogImpl.append(new MockReplicatedLogEntry(1, 2, new MockPayload("C")));
+ replicatedLogImpl.append(new MockReplicatedLogEntry(2, 3, new MockPayload("D")));
+
+ // check if the values returned are correct, with snapshotIndex = -1
+ assertEquals("B", replicatedLogImpl.get(1).getData().toString());
+ assertEquals("D", replicatedLogImpl.last().getData().toString());
+ assertEquals(3, replicatedLogImpl.lastIndex());
+ assertEquals(2, replicatedLogImpl.lastTerm());
+ assertEquals(2, replicatedLogImpl.getFrom(2).size());
+ assertEquals(4, replicatedLogImpl.size());
+ assertTrue(replicatedLogImpl.isPresent(2));
+ assertFalse(replicatedLogImpl.isPresent(4));
+ assertFalse(replicatedLogImpl.isInSnapshot(2));
+
+ // now create a snapshot of 3 entries, with 1 unapplied entry left in the log
+ // It removes the entries which have made it to snapshot
+ // and updates the snapshot index and term
+ Map state = takeSnapshot(3);
+
+ // check the values after the snapshot.
+ // each index value passed in the test is the logical index (log entry index)
+ // which gets mapped to the list's physical index
+ assertEquals("D", replicatedLogImpl.get(3).getData().toString());
+ assertEquals("D", replicatedLogImpl.last().getData().toString());
+ assertNull(replicatedLogImpl.get(1));
+ assertEquals(3, replicatedLogImpl.lastIndex());
+ assertEquals(2, replicatedLogImpl.lastTerm());
+ assertEquals(0, replicatedLogImpl.getFrom(2).size());
+ assertEquals(1, replicatedLogImpl.size());
+ assertFalse(replicatedLogImpl.isPresent(2));
+ assertTrue(replicatedLogImpl.isPresent(3));
+ assertFalse(replicatedLogImpl.isPresent(4));
+ assertTrue(replicatedLogImpl.isInSnapshot(2));
+
+ // append few more entries
+ replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E")));
+ replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F")));
+ replicatedLogImpl.append(new MockReplicatedLogEntry(3, 6, new MockPayload("G")));
+ replicatedLogImpl.append(new MockReplicatedLogEntry(3, 7, new MockPayload("H")));
+
+ // check their values as well
+ assertEquals(5, replicatedLogImpl.size());
+ assertEquals("D", replicatedLogImpl.get(3).getData().toString());
+ assertEquals("E", replicatedLogImpl.get(4).getData().toString());
+ assertEquals("H", replicatedLogImpl.last().getData().toString());
+ assertEquals(3, replicatedLogImpl.lastTerm());
+ assertEquals(7, replicatedLogImpl.lastIndex());
+ assertTrue(replicatedLogImpl.isPresent(7));
+ assertFalse(replicatedLogImpl.isInSnapshot(7));
+ assertEquals(1, replicatedLogImpl.getFrom(7).size());
+ assertEquals(2, replicatedLogImpl.getFrom(6).size());
+
+ // take a second snapshot with 5 entries with 0 unapplied entries left in the log
+ state = takeSnapshot(5);
+
+ assertEquals(0, replicatedLogImpl.size());
+ assertNull(replicatedLogImpl.last());
+ assertNull(replicatedLogImpl.get(7));
+ assertNull(replicatedLogImpl.get(1));
+ assertFalse(replicatedLogImpl.isPresent(7));
+ assertTrue(replicatedLogImpl.isInSnapshot(7));
+ assertEquals(0, replicatedLogImpl.getFrom(7).size());
+ assertEquals(0, replicatedLogImpl.getFrom(6).size());
+
+ }
+
+ // create a snapshot for test
+ public Map takeSnapshot(int numEntries) {
+ Map map = new HashMap(numEntries);
+ List<ReplicatedLogEntry> entries = replicatedLogImpl.getEntriesTill(numEntries);
+ for (ReplicatedLogEntry entry : entries) {
+ map.put(entry.getIndex(), entry.getData().toString());
+ }
+
+ int term = (int) replicatedLogImpl.lastTerm();
+ int lastIndex = (int) entries.get(entries.size() - 1).getIndex();
+ entries.clear();
+ replicatedLogImpl.setSnapshotTerm(term);
+ replicatedLogImpl.setSnapshotIndex(lastIndex);
+
+ return map;
+
+ }
+ class MockAbstractReplicatedLogImpl extends AbstractReplicatedLogImpl {
+ @Override
+ public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry) {
+ }
+
+ @Override
+ public void removeFromAndPersist(long index) {
+ }
+
+ public void setSnapshotIndex(long snapshotIndex) {
+ this.snapshotIndex = snapshotIndex;
+ }
+
+ public void setSnapshotTerm(long snapshotTerm) {
+ this.snapshotTerm = snapshotTerm;
+ }
+
+ public List<ReplicatedLogEntry> getEntriesTill(int index) {
+ return journal.subList(0, index);
+ }
+ }
+}