From: Kamal Rameshan Date: Thu, 31 Jul 2014 06:00:36 +0000 (-0700) Subject: Snapshot changes X-Git-Tag: release/helium~367^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=b78e7cc266d3540634ccc5b996b67b68365edbd8 Snapshot changes Fixed adjusted index logic. Added tests Added Reinstate Node in TestDriver. Changes related to recovery from snapshot Review changes Rebased with latest Tasks to be done: 1. Snapshot capture criteria 2. Handling Snapshot failure 3. Configuration Interface 4. Install snapshot Change-Id: Ie0d8341f501fef87f48e8ea6f5d6b6e878d1abe3 Signed-off-by: Kamal Rameshan Reinstate from snapshot + review changes Change-Id: Ie0d8341f501fef87f48e8ea6f5d6b6e878d1abe3 Signed-off-by: Kamal Rameshan TestDriver changes Change-Id: Ie0d8341f501fef87f48e8ea6f5d6b6e878d1abe3 Signed-off-by: Kamal Rameshan Log interpolation change Change-Id: Ie0d8341f501fef87f48e8ea6f5d6b6e878d1abe3 Signed-off-by: Kamal Rameshan Rebased. Change-Id: Ie0d8341f501fef87f48e8ea6f5d6b6e878d1abe3 Signed-off-by: Kamal Rameshan --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 641ec0582c..90bf121fd4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -56,10 +56,12 @@ public class ExampleActor extends RaftActor { } } else if (message instanceof PrintState) { - LOG.debug("State of the node:"+getId() + " has = "+state.size() + " entries"); + LOG.debug("State of the node:{} has entries={}, {}", + getId(), state.size(), getReplicatedLogState()); } else if (message instanceof PrintRole) { - LOG.debug(getId() + " = " + getRaftState()); + LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),getPeers()); + } else { super.onReceiveCommand(message); } @@ -83,6 +85,7 @@ public class ExampleActor extends RaftActor { @Override protected void applySnapshot(Object snapshot) { state.clear(); state.putAll((HashMap) snapshot); + LOG.debug("Snapshot applied to state :" + ((HashMap) snapshot).size()); } @Override public void onReceiveRecover(Object message) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java index c2d0b3a6b7..18c14e7124 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java @@ -11,14 +11,13 @@ import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.HashMap; import java.util.Map; -import java.util.Random; import java.util.concurrent.ConcurrentHashMap; /** * This is a test driver for testing akka-raft implementation * Its uses ExampleActors and threads to push content(key-vals) to these actors * Each ExampleActor can have one or more ClientActors. Each ClientActor spawns - * a thread and starts push logs to the actor its assignged to. + * a thread and starts push logs to the actor its assigned to. */ public class TestDriver { @@ -26,7 +25,8 @@ public class TestDriver { private static Map allPeers = new HashMap<>(); private static Map clientActorRefs = new HashMap(); private static Map actorRefs = new HashMap(); - private static LogGenerator logGenerator = new LogGenerator();; + private static LogGenerator logGenerator = new LogGenerator(); + private int nameCounter = 0; /** * Create nodes, add clients and start logging. @@ -35,6 +35,7 @@ public class TestDriver { * createNodes:{num} * addNodes:{num} * stopNode:{nodeName} + * reinstateNode:{nodeName} * addClients:{num} * addClientsToNode:{nodeName, num} * startLogging @@ -83,6 +84,10 @@ public class TestDriver { String[] arr = command.split(":"); td.stopNode(arr[1]); + } else if (command.startsWith("reinstateNode")) { + String[] arr = command.split(":"); + td.reinstateNode(arr[1]); + } else if (command.startsWith("startLogging")) { td.startAllLogging(); @@ -108,8 +113,8 @@ public class TestDriver { public void createNodes(int num) { for (int i=0; i < num; i++) { - int rand = getUnusedRandom(num); - allPeers.put("example-"+rand, "akka://default/user/example-"+rand); + nameCounter = nameCounter + 1; + allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter); } for (String s : allPeers.keySet()) { @@ -125,9 +130,9 @@ public class TestDriver { public void addNodes(int num) { Map newPeers = new HashMap<>(); for (int i=0; i < num; i++) { - int rand = getUnusedRandom(num); - newPeers.put("example-"+rand, "akka://default/user/example-"+rand); - allPeers.put("example-"+rand, "akka://default/user/example-"+rand); + nameCounter = nameCounter + 1; + newPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter); + allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter); } Map newActorRefs = new HashMap(num); @@ -165,7 +170,7 @@ public class TestDriver { public void addClientsToNode(String actorName, int num) { ActorRef actorRef = actorRefs.get(actorName); for (int i=0; i < num; i++) { - String clientName = "client-" + i + "-" + actorRef; + String clientName = "client-" + i + "-" + actorName; clientActorRefs.put(clientName, actorSystem.actorOf(ClientActor.props(actorRef), clientName)); System.out.println("Added client-node:" + clientName); @@ -174,11 +179,13 @@ public class TestDriver { public void stopNode(String actorName) { ActorRef actorRef = actorRefs.get(actorName); - String clientName = "client-"+actorName; - if(clientActorRefs.containsKey(clientName)) { - actorSystem.stop(clientActorRefs.get(clientName)); - clientActorRefs.remove(clientName); + + for (Map.Entry entry : clientActorRefs.entrySet()) { + if (entry.getKey().endsWith(actorName)) { + actorSystem.stop(entry.getValue()); + } } + actorSystem.stop(actorRef); actorRefs.remove(actorName); @@ -187,7 +194,21 @@ public class TestDriver { } allPeers.remove(actorName); + } + + public void reinstateNode(String actorName) { + String address = "akka://default/user/"+actorName; + allPeers.put(actorName, address); + ActorRef exampleActor = actorSystem.actorOf(ExampleActor.props(actorName, withoutPeer(actorName)), actorName); + + for (ActorRef actor : actorRefs.values()) { + actor.tell(new AddRaftPeer(actorName, address), null); + } + + actorRefs.put(actorName, exampleActor); + + addClientsToNode(actorName, 1); } public void startAllLogging() { @@ -232,14 +253,6 @@ public class TestDriver { return null; } - private int getUnusedRandom(int num) { - int rand = -1; - do { - rand = (new Random()).nextInt(num * num); - } while (allPeers.keySet().contains("example-"+rand)); - - return rand; - } private static Map withoutPeer(String peerId) { Map without = new ConcurrentHashMap<>(allPeers); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java new file mode 100644 index 0000000000..24bfa3de21 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import java.util.ArrayList; +import java.util.List; + +/** + * Abstract class handling the mapping of + * logical LogEntry Index and the physical list index. + */ +public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { + + protected final List journal; + protected final Object snapshot; + protected long snapshotIndex = -1; + protected long snapshotTerm = -1; + + public AbstractReplicatedLogImpl(Object state, long snapshotIndex, + long snapshotTerm, List unAppliedEntries) { + this.snapshot = state; + this.snapshotIndex = snapshotIndex; + this.snapshotTerm = snapshotTerm; + this.journal = new ArrayList<>(unAppliedEntries); + } + + + public AbstractReplicatedLogImpl() { + this.snapshot = null; + this.journal = new ArrayList<>(); + } + + protected int adjustedIndex(long logEntryIndex) { + if(snapshotIndex < 0){ + return (int) logEntryIndex; + } + return (int) (logEntryIndex - (snapshotIndex + 1)); + } + + @Override + public ReplicatedLogEntry get(long logEntryIndex) { + int adjustedIndex = adjustedIndex(logEntryIndex); + + if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { + // physical index should be less than list size and >= 0 + return null; + } + + return journal.get(adjustedIndex); + } + + @Override + public ReplicatedLogEntry last() { + if (journal.isEmpty()) { + return null; + } + // get the last entry directly from the physical index + return journal.get(journal.size() - 1); + } + + @Override + public long lastIndex() { + if (journal.isEmpty()) { + // it can happen that after snapshot, all the entries of the + // journal are trimmed till lastApplied, so lastIndex = snapshotIndex + return snapshotIndex; + } + return last().getIndex(); + } + + @Override + public long lastTerm() { + if (journal.isEmpty()) { + // it can happen that after snapshot, all the entries of the + // journal are trimmed till lastApplied, so lastTerm = snapshotTerm + return snapshotTerm; + } + return last().getTerm(); + } + + @Override + public void removeFrom(long logEntryIndex) { + int adjustedIndex = adjustedIndex(logEntryIndex); + if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { + // physical index should be less than list size and >= 0 + return; + } + journal.subList(adjustedIndex , journal.size()).clear(); + } + + @Override + public void append(ReplicatedLogEntry replicatedLogEntry) { + journal.add(replicatedLogEntry); + } + + @Override + public List getFrom(long logEntryIndex) { + int adjustedIndex = adjustedIndex(logEntryIndex); + int size = journal.size(); + List entries = new ArrayList<>(100); + if (adjustedIndex >= 0 && adjustedIndex < size) { + // physical index should be less than list size and >= 0 + entries.addAll(journal.subList(adjustedIndex, size)); + } + return entries; + } + + @Override + public long size() { + return journal.size(); + } + + @Override + public boolean isPresent(long logEntryIndex) { + if (logEntryIndex > lastIndex()) { + // if the request logical index is less than the last present in the list + return false; + } + int adjustedIndex = adjustedIndex(logEntryIndex); + return (adjustedIndex >= 0); + } + + @Override + public boolean isInSnapshot(long logEntryIndex) { + return logEntryIndex <= snapshotIndex; + } + + @Override + public Object getSnapshot() { + return snapshot; + } + + @Override + public long getSnapshotIndex() { + return snapshotIndex; + } + + @Override + public long getSnapshotTerm() { + return snapshotTerm; + } + + @Override + public abstract void appendAndPersist(ReplicatedLogEntry replicatedLogEntry); + + @Override + public abstract void removeFromAndPersist(long index); +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index b8e9653bc5..36c86f542d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -33,7 +33,6 @@ import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import java.io.Serializable; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -81,6 +80,8 @@ public abstract class RaftActor extends UntypedPersistentActor { protected final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this); + private static final int SNAPSHOT_ENTRY_COUNT = 100000; + /** * The current state determines the current behavior of a RaftActor * A Raft Actor always starts off in the Follower State @@ -108,6 +109,7 @@ public abstract class RaftActor extends UntypedPersistentActor { @Override public void onReceiveRecover(Object message) { if (message instanceof SnapshotOffer) { + LOG.debug("SnapshotOffer called.."); SnapshotOffer offer = (SnapshotOffer) message; Snapshot snapshot = (Snapshot) offer.snapshot(); @@ -116,6 +118,13 @@ public abstract class RaftActor extends UntypedPersistentActor { // when we need to install it on a peer replicatedLog = new ReplicatedLogImpl(snapshot); + context.setReplicatedLog(replicatedLog); + + LOG.debug("Applied snapshot to replicatedLog. " + + "snapshotIndex={}, snapshotTerm={}, journal-size={}", + replicatedLog.snapshotIndex, replicatedLog.snapshotTerm, + replicatedLog.size()); + // Apply the snapshot to the actors state applySnapshot(snapshot.getState()); @@ -127,7 +136,11 @@ public abstract class RaftActor extends UntypedPersistentActor { context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor()); } else if (message instanceof RecoveryCompleted) { LOG.debug( - "Last index in log : " + replicatedLog.lastIndex()); + "RecoveryCompleted - Switching actor to Follower - " + + "Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " + + "journal-size={}", + replicatedLog.lastIndex(), replicatedLog.snapshotIndex, + replicatedLog.snapshotTerm, replicatedLog.size()); currentBehavior = switchBehavior(RaftState.Follower); } } @@ -191,6 +204,15 @@ public abstract class RaftActor extends UntypedPersistentActor { } } + public java.util.Set getPeers() { + return context.getPeerAddresses().keySet(); + } + + protected String getReplicatedLogState() { + return "snapshotIndex=" + context.getReplicatedLog().getSnapshotIndex() + + ", snapshotTerm=" + context.getReplicatedLog().getSnapshotTerm() + + ", im-mem journal size=" + context.getReplicatedLog().size(); + } /** @@ -343,85 +365,33 @@ public abstract class RaftActor extends UntypedPersistentActor { } private void trimPersistentData(long sequenceNumber) { - // Trim snapshots + // Trim akka snapshots // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied // For now guessing that it is ANDed. deleteSnapshots(new SnapshotSelectionCriteria( - sequenceNumber - 100000, 43200000)); + sequenceNumber - SNAPSHOT_ENTRY_COUNT, 43200000)); - // Trim journal + // Trim akka journal deleteMessages(sequenceNumber); } - private class ReplicatedLogImpl implements ReplicatedLog { - private final List journal; - private final Object snapshot; - private long snapshotIndex = -1; - private long snapshotTerm = -1; + private class ReplicatedLogImpl extends AbstractReplicatedLogImpl { public ReplicatedLogImpl(Snapshot snapshot) { - this.snapshot = snapshot.getState(); - this.snapshotIndex = snapshot.getLastAppliedIndex(); - this.snapshotTerm = snapshot.getLastAppliedTerm(); - - this.journal = new ArrayList<>(snapshot.getUnAppliedEntries()); + super(snapshot.getState(), + snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(), + snapshot.getUnAppliedEntries()); } public ReplicatedLogImpl() { - this.snapshot = null; - this.journal = new ArrayList<>(); - } - - @Override public ReplicatedLogEntry get(long index) { - int adjustedIndex = adjustedIndex(index); - - if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { - return null; - } - - return journal.get(adjustedIndex); - } - - @Override public ReplicatedLogEntry last() { - if (journal.size() == 0) { - return null; - } - return get(journal.size() - 1); - } - - @Override public long lastIndex() { - if (journal.size() == 0) { - return -1; - } - - return last().getIndex(); - } - - @Override public long lastTerm() { - if (journal.size() == 0) { - return -1; - } - - return last().getTerm(); - } - - - @Override public void removeFrom(long index) { - int adjustedIndex = adjustedIndex(index); - - if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { - return; - } - - journal.subList(adjustedIndex , journal.size()).clear(); + super(); } + @Override public void removeFromAndPersist(long logEntryIndex) { + int adjustedIndex = adjustedIndex(logEntryIndex); - @Override public void removeFromAndPersist(long index) { - int adjustedIndex = adjustedIndex(index); - - if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { + if (adjustedIndex < 0) { return; } @@ -435,29 +405,6 @@ public abstract class RaftActor extends UntypedPersistentActor { //FIXME : Doing nothing for now } }); - - - } - - @Override public void append( - final ReplicatedLogEntry replicatedLogEntry) { - journal.add(replicatedLogEntry); - } - - @Override public List getFrom(long index) { - int adjustedIndex = adjustedIndex(index); - - List entries = new ArrayList<>(100); - if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { - return entries; - } - - - for (int i = adjustedIndex; - i < journal.size(); i++) { - entries.add(journal.get(i)); - } - return entries; } @Override public void appendAndPersist( @@ -482,20 +429,42 @@ public abstract class RaftActor extends UntypedPersistentActor { new Procedure() { public void apply(ReplicatedLogEntry evt) throws Exception { // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned. - if (size() > 100000) { - ReplicatedLogEntry lastAppliedEntry = - get(context.getLastApplied()); + if (journal.size() > SNAPSHOT_ENTRY_COUNT) { + LOG.info("Initiating Snapshot Capture.."); long lastAppliedIndex = -1; long lastAppliedTerm = -1; + + ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied()); if (lastAppliedEntry != null) { lastAppliedIndex = lastAppliedEntry.getIndex(); lastAppliedTerm = lastAppliedEntry.getTerm(); } - saveSnapshot(Snapshot.create(createSnapshot(), + LOG.debug("Snapshot Capture logSize: {}", journal.size()); + LOG.debug("Snapshot Capture lastApplied:{} ", context.getLastApplied()); + LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex); + LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm); + + // create a snapshot object from the state provided and save it + // when snapshot is saved async, SaveSnapshotSuccess is raised. + Snapshot sn = Snapshot.create(createSnapshot(), getFrom(context.getLastApplied() + 1), lastIndex(), lastTerm(), lastAppliedIndex, - lastAppliedTerm)); + lastAppliedTerm); + saveSnapshot(sn); + + LOG.info("Persisting of snapshot done:{}", sn.getLogMessage()); + + //be greedy and remove entries from in-mem journal which are in the snapshot + // and update snapshotIndex and snapshotTerm without waiting for the success, + // TODO: damage-recovery to be done on failure + journal.subList(0, (int) (lastAppliedIndex - snapshotIndex)).clear(); + snapshotIndex = lastAppliedIndex; + snapshotTerm = lastAppliedTerm; + + LOG.info("Removed in-memory snapshotted entries, " + + "adjusted snaphsotIndex:{}" + + "and term:{}", snapshotIndex, lastAppliedTerm); } // Send message for replication if (clientActor != null) { @@ -509,46 +478,8 @@ public abstract class RaftActor extends UntypedPersistentActor { ); } - @Override public long size() { - return journal.size() + snapshotIndex + 1; - } - - @Override public boolean isPresent(long index) { - int adjustedIndex = adjustedIndex(index); - - if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { - return false; - } - return true; - } - - @Override public boolean isInSnapshot(long index) { - return index <= snapshotIndex; - } - - @Override public Object getSnapshot() { - return snapshot; - } - - @Override public long getSnapshotIndex() { - return snapshotIndex; - } - - @Override public long getSnapshotTerm() { - return snapshotTerm; - } - - private int adjustedIndex(long index) { - if(snapshotIndex < 0){ - return (int) index; - } - return (int) (index - snapshotIndex); - } } - - - private static class DeleteEntries implements Serializable { private final int fromIndex; @@ -609,6 +540,17 @@ public abstract class RaftActor extends UntypedPersistentActor { public long getLastAppliedTerm() { return lastAppliedTerm; } + + public String getLogMessage() { + StringBuilder sb = new StringBuilder(); + return sb.append("Snapshot={") + .append("lastTerm:" + this.getLastTerm() + ", ") + .append("LastAppliedIndex:" + this.getLastAppliedIndex() + ", ") + .append("LastAppliedTerm:" + this.getLastAppliedTerm() + ", ") + .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size() + "}") + .toString(); + + } } private class ElectionTermImpl implements ElectionTerm { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index ae9431a43a..ec80055459 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -85,6 +85,12 @@ public interface RaftActorContext { */ void setLastApplied(long lastApplied); + /** + * + * @param replicatedLog + */ + public void setReplicatedLog(ReplicatedLog replicatedLog); + /** * @return A representation of the log */ diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 833c8a9e8a..0fbc55d1a4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -33,7 +33,7 @@ public class RaftActorContextImpl implements RaftActorContext{ private long lastApplied; - private final ReplicatedLog replicatedLog; + private ReplicatedLog replicatedLog; private final Map peerAddresses; @@ -90,6 +90,10 @@ public class RaftActorContextImpl implements RaftActorContext{ this.lastApplied = lastApplied; } + @Override public void setReplicatedLog(ReplicatedLog replicatedLog) { + this.replicatedLog = replicatedLog; + } + @Override public ReplicatedLog getReplicatedLog() { return replicatedLog; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index db62dfc2ac..c8cd41dfa1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -126,15 +126,10 @@ public class Follower extends AbstractRaftActorBehavior { int addEntriesFrom = 0; if (context.getReplicatedLog().size() > 0) { - // Find the entry up until which the one that is not in the - // follower's log - for (int i = 0; - i < appendEntries.getEntries() - .size(); i++, addEntriesFrom++) { - ReplicatedLogEntry matchEntry = - appendEntries.getEntries().get(i); - ReplicatedLogEntry newEntry = context.getReplicatedLog() - .get(matchEntry.getIndex()); + // Find the entry up until which the one that is not in the follower's log + for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) { + ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i); + ReplicatedLogEntry newEntry = context.getReplicatedLog().get(matchEntry.getIndex()); if (newEntry == null) { //newEntry not found in the log diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java new file mode 100644 index 0000000000..ae8e525233 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; +import static org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry; +/** +* +*/ +public class AbstractReplicatedLogImplTest { + + private MockAbstractReplicatedLogImpl replicatedLogImpl; + + @Before + public void setUp() { + replicatedLogImpl = new MockAbstractReplicatedLogImpl(); + } + + @After + public void tearDown() { + replicatedLogImpl.journal.clear(); + replicatedLogImpl.setSnapshotIndex(-1); + replicatedLogImpl.setSnapshotTerm(-1); + replicatedLogImpl = null; + } + + @Test + public void testIndexOperations() { + // create a set of initial entries in the in-memory log + replicatedLogImpl.append(new MockReplicatedLogEntry(1, 0, new MockPayload("A"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(1, 1, new MockPayload("B"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(1, 2, new MockPayload("C"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(2, 3, new MockPayload("D"))); + + // check if the values returned are correct, with snapshotIndex = -1 + assertEquals("B", replicatedLogImpl.get(1).getData().toString()); + assertEquals("D", replicatedLogImpl.last().getData().toString()); + assertEquals(3, replicatedLogImpl.lastIndex()); + assertEquals(2, replicatedLogImpl.lastTerm()); + assertEquals(2, replicatedLogImpl.getFrom(2).size()); + assertEquals(4, replicatedLogImpl.size()); + assertTrue(replicatedLogImpl.isPresent(2)); + assertFalse(replicatedLogImpl.isPresent(4)); + assertFalse(replicatedLogImpl.isInSnapshot(2)); + + // now create a snapshot of 3 entries, with 1 unapplied entry left in the log + // It removes the entries which have made it to snapshot + // and updates the snapshot index and term + Map state = takeSnapshot(3); + + // check the values after the snapshot. + // each index value passed in the test is the logical index (log entry index) + // which gets mapped to the list's physical index + assertEquals("D", replicatedLogImpl.get(3).getData().toString()); + assertEquals("D", replicatedLogImpl.last().getData().toString()); + assertNull(replicatedLogImpl.get(1)); + assertEquals(3, replicatedLogImpl.lastIndex()); + assertEquals(2, replicatedLogImpl.lastTerm()); + assertEquals(0, replicatedLogImpl.getFrom(2).size()); + assertEquals(1, replicatedLogImpl.size()); + assertFalse(replicatedLogImpl.isPresent(2)); + assertTrue(replicatedLogImpl.isPresent(3)); + assertFalse(replicatedLogImpl.isPresent(4)); + assertTrue(replicatedLogImpl.isInSnapshot(2)); + + // append few more entries + replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(3, 6, new MockPayload("G"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(3, 7, new MockPayload("H"))); + + // check their values as well + assertEquals(5, replicatedLogImpl.size()); + assertEquals("D", replicatedLogImpl.get(3).getData().toString()); + assertEquals("E", replicatedLogImpl.get(4).getData().toString()); + assertEquals("H", replicatedLogImpl.last().getData().toString()); + assertEquals(3, replicatedLogImpl.lastTerm()); + assertEquals(7, replicatedLogImpl.lastIndex()); + assertTrue(replicatedLogImpl.isPresent(7)); + assertFalse(replicatedLogImpl.isInSnapshot(7)); + assertEquals(1, replicatedLogImpl.getFrom(7).size()); + assertEquals(2, replicatedLogImpl.getFrom(6).size()); + + // take a second snapshot with 5 entries with 0 unapplied entries left in the log + state = takeSnapshot(5); + + assertEquals(0, replicatedLogImpl.size()); + assertNull(replicatedLogImpl.last()); + assertNull(replicatedLogImpl.get(7)); + assertNull(replicatedLogImpl.get(1)); + assertFalse(replicatedLogImpl.isPresent(7)); + assertTrue(replicatedLogImpl.isInSnapshot(7)); + assertEquals(0, replicatedLogImpl.getFrom(7).size()); + assertEquals(0, replicatedLogImpl.getFrom(6).size()); + + } + + // create a snapshot for test + public Map takeSnapshot(int numEntries) { + Map map = new HashMap(numEntries); + List entries = replicatedLogImpl.getEntriesTill(numEntries); + for (ReplicatedLogEntry entry : entries) { + map.put(entry.getIndex(), entry.getData().toString()); + } + + int term = (int) replicatedLogImpl.lastTerm(); + int lastIndex = (int) entries.get(entries.size() - 1).getIndex(); + entries.clear(); + replicatedLogImpl.setSnapshotTerm(term); + replicatedLogImpl.setSnapshotIndex(lastIndex); + + return map; + + } + class MockAbstractReplicatedLogImpl extends AbstractReplicatedLogImpl { + @Override + public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry) { + } + + @Override + public void removeFromAndPersist(long index) { + } + + public void setSnapshotIndex(long snapshotIndex) { + this.snapshotIndex = snapshotIndex; + } + + public void setSnapshotTerm(long snapshotTerm) { + this.snapshotTerm = snapshotTerm; + } + + public List getEntriesTill(int index) { + return journal.subList(0, index); + } + } +}