import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.Creator;
+
import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
+
import org.opendaylight.controller.cluster.example.messages.KeyValue;
import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
import org.opendaylight.controller.cluster.example.messages.PrintRole;
@Override public String persistenceId() {
return getId();
}
+
+ @Override
+ protected void startLogRecoveryBatch(int maxBatchSize) {
+ }
+
+ @Override
+ protected void appendRecoveredLogEntry(Payload data) {
+ }
+
+ @Override
+ protected void applyCurrentLogRecoveryBatch() {
+ }
+
+ @Override
+ protected void onRecoveryComplete() {
+ }
+
+ @Override
+ protected void applyRecoverySnapshot(ByteString snapshot) {
+ }
}
*/
public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
- protected List<ReplicatedLogEntry> journal;
+ // We define this as ArrayList so we can use ensureCapacity.
+ protected ArrayList<ReplicatedLogEntry> journal;
protected ByteString snapshot;
protected long snapshotIndex = -1;
protected long snapshotTerm = -1;
// to be used for rollback during save snapshot failure
- protected List<ReplicatedLogEntry> snapshottedJournal;
+ protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
protected ByteString previousSnapshot;
protected long previousSnapshotIndex = -1;
protected long previousSnapshotTerm = -1;
journal.add(replicatedLogEntry);
}
+ @Override
+ public void increaseJournalLogCapacity(int amount) {
+ journal.ensureCapacity(journal.size() + amount);
+ }
+
@Override
public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
return getFrom(logEntryIndex, journal.size());
@Override
public void snapshotCommit() {
- snapshottedJournal.clear();
snapshottedJournal = null;
previousSnapshotIndex = -1;
previousSnapshotTerm = -1;
@Override
public void snapshotRollback() {
snapshottedJournal.addAll(journal);
- journal.clear();
journal = snapshottedJournal;
snapshottedJournal = null;
*
* @return long
*/
- public long getSnapshotBatchCount();
+ long getSnapshotBatchCount();
/**
* The interval at which a heart beat message will be sent to the remote
*
* @return FiniteDuration
*/
- public FiniteDuration getHeartBeatInterval();
+ FiniteDuration getHeartBeatInterval();
/**
* The interval in which a new election would get triggered if no leader is found
*
* @return FiniteDuration
*/
- public FiniteDuration getElectionTimeOutInterval();
+ FiniteDuration getElectionTimeOutInterval();
/**
* The maximum election time variance. The election is scheduled using both
*
* @return int
*/
- public int getElectionTimeVariance();
+ int getElectionTimeVariance();
/**
* The size (in bytes) of the snapshot chunk sent from Leader
*/
- public int getSnapshotChunkSize();
+ int getSnapshotChunkSize();
+
+ /**
+ * The number of journal log entries to batch on recovery before applying.
+ */
+ int getJournalRecoveryLogBatchSize();
}
private static final int SNAPSHOT_BATCH_COUNT = 20000;
+ private static final int JOURNAL_RECOVERY_LOG_BATCH_SIZE = 1000;
+
/**
* The maximum election time variance
*/
private static final int ELECTION_TIME_MAX_VARIANCE = 100;
- private final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB
+ private static final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB
/**
new FiniteDuration(100, TimeUnit.MILLISECONDS);
+ private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL;
+ private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
+ private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
+
+ public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
+ this.heartBeatInterval = heartBeatInterval;
+ }
+
+ public void setSnapshotBatchCount(long snapshotBatchCount) {
+ this.snapshotBatchCount = snapshotBatchCount;
+ }
+
+ public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) {
+ this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize;
+ }
+
@Override
public long getSnapshotBatchCount() {
- return SNAPSHOT_BATCH_COUNT;
+ return snapshotBatchCount;
}
@Override
public FiniteDuration getHeartBeatInterval() {
- return HEART_BEAT_INTERVAL;
+ return heartBeatInterval;
}
-
@Override
public FiniteDuration getElectionTimeOutInterval() {
// returns 2 times the heart beat interval
public int getSnapshotChunkSize() {
return SNAPSHOT_CHUNK_SIZE;
}
+
+ @Override
+ public int getJournalRecoveryLogBatchSize() {
+ return journalRecoveryLogBatchSize;
+ }
}
import akka.persistence.SnapshotSelectionCriteria;
import akka.persistence.UntypedPersistentActor;
import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
import java.io.Serializable;
import java.util.Map;
* This context should NOT be passed directly to any other actor it is
* only to be consumed by the RaftActorBehaviors
*/
- protected RaftActorContext context;
+ private final RaftActorContext context;
/**
* The in-memory journal
private volatile boolean hasSnapshotCaptureInitiated = false;
+ private Stopwatch recoveryTimer;
+
+ private int currentRecoveryBatchCount;
+
public RaftActor(String id, Map<String, String> peerAddresses) {
this(id, peerAddresses, Optional.<ConfigParams>absent());
}
LOG);
}
- @Override public void onReceiveRecover(Object message) {
+ private void initRecoveryTimer() {
+ if(recoveryTimer == null) {
+ recoveryTimer = new Stopwatch();
+ recoveryTimer.start();
+ }
+ }
+
+ @Override
+ public void preStart() throws Exception {
+ LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
+ context.getConfigParams().getJournalRecoveryLogBatchSize());
+ super.preStart();
+ }
+
+ @Override
+ public void onReceiveRecover(Object message) {
if (message instanceof SnapshotOffer) {
- LOG.info("SnapshotOffer called..");
- SnapshotOffer offer = (SnapshotOffer) message;
- Snapshot snapshot = (Snapshot) offer.snapshot();
+ onRecoveredSnapshot((SnapshotOffer)message);
+ } else if (message instanceof ReplicatedLogEntry) {
+ onRecoveredJournalLogEntry((ReplicatedLogEntry)message);
+ } else if (message instanceof ApplyLogEntries) {
+ onRecoveredApplyLogEntries((ApplyLogEntries)message);
+ } else if (message instanceof DeleteEntries) {
+ replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+ } else if (message instanceof UpdateElectionTerm) {
+ context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+ ((UpdateElectionTerm) message).getVotedFor());
+ } else if (message instanceof RecoveryCompleted) {
+ onRecoveryCompletedMessage();
+ }
+ }
- // Create a replicated log with the snapshot information
- // The replicated log can be used later on to retrieve this snapshot
- // when we need to install it on a peer
- replicatedLog = new ReplicatedLogImpl(snapshot);
+ private void onRecoveredSnapshot(SnapshotOffer offer) {
+ LOG.debug("SnapshotOffer called..");
- context.setReplicatedLog(replicatedLog);
- context.setLastApplied(snapshot.getLastAppliedIndex());
- context.setCommitIndex(snapshot.getLastAppliedIndex());
+ initRecoveryTimer();
- LOG.info("Applied snapshot to replicatedLog. " +
- "snapshotIndex={}, snapshotTerm={}, journal-size={}",
- replicatedLog.snapshotIndex, replicatedLog.snapshotTerm,
- replicatedLog.size()
- );
+ Snapshot snapshot = (Snapshot) offer.snapshot();
- // Apply the snapshot to the actors state
- applySnapshot(ByteString.copyFrom(snapshot.getState()));
+ // Create a replicated log with the snapshot information
+ // The replicated log can be used later on to retrieve this snapshot
+ // when we need to install it on a peer
+ replicatedLog = new ReplicatedLogImpl(snapshot);
- } else if (message instanceof ReplicatedLogEntry) {
- ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message;
- if(LOG.isDebugEnabled()) {
- LOG.debug("Received ReplicatedLogEntry for recovery:{}", logEntry.getIndex());
- }
- replicatedLog.append(logEntry);
+ context.setReplicatedLog(replicatedLog);
+ context.setLastApplied(snapshot.getLastAppliedIndex());
+ context.setCommitIndex(snapshot.getLastAppliedIndex());
- } else if (message instanceof ApplyLogEntries) {
- ApplyLogEntries ale = (ApplyLogEntries) message;
+ Stopwatch timer = new Stopwatch();
+ timer.start();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Received ApplyLogEntries for recovery, applying to state:{} to {}",
+ // Apply the snapshot to the actors state
+ applyRecoverySnapshot(ByteString.copyFrom(snapshot.getState()));
+
+ timer.stop();
+ LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
+ replicatedLog.size(), persistenceId(), timer.toString(),
+ replicatedLog.snapshotIndex, replicatedLog.snapshotTerm);
+ }
+
+ private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex());
+ }
+
+ replicatedLog.append(logEntry);
+ }
+
+ private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}",
context.getLastApplied() + 1, ale.getToIndex());
- }
+ }
- for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
- applyState(null, "recovery", replicatedLog.get(i).getData());
- }
- context.setLastApplied(ale.getToIndex());
- context.setCommitIndex(ale.getToIndex());
+ for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+ batchRecoveredLogEntry(replicatedLog.get(i));
+ }
- } else if (message instanceof DeleteEntries) {
- replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+ context.setLastApplied(ale.getToIndex());
+ context.setCommitIndex(ale.getToIndex());
+ }
- } else if (message instanceof UpdateElectionTerm) {
- context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
- ((UpdateElectionTerm) message).getVotedFor());
+ private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
+ initRecoveryTimer();
- } else if (message instanceof RecoveryCompleted) {
- LOG.info(
- "RecoveryCompleted - Switching actor to Follower - " +
- "Persistence Id = " + persistenceId() +
- " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
- "journal-size={}",
- replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
- replicatedLog.snapshotTerm, replicatedLog.size());
- currentBehavior = switchBehavior(RaftState.Follower);
- onStateChanged();
+ int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
+ if(currentRecoveryBatchCount == 0) {
+ startLogRecoveryBatch(batchSize);
+ }
+
+ appendRecoveredLogEntry(logEntry.getData());
+
+ if(++currentRecoveryBatchCount >= batchSize) {
+ endCurrentLogRecoveryBatch();
}
}
+ private void endCurrentLogRecoveryBatch() {
+ applyCurrentLogRecoveryBatch();
+ currentRecoveryBatchCount = 0;
+ }
+
+ private void onRecoveryCompletedMessage() {
+ if(currentRecoveryBatchCount > 0) {
+ endCurrentLogRecoveryBatch();
+ }
+
+ onRecoveryComplete();
+
+ String recoveryTime = "";
+ if(recoveryTimer != null) {
+ recoveryTimer.stop();
+ recoveryTime = " in " + recoveryTimer.toString();
+ recoveryTimer = null;
+ }
+
+ LOG.info(
+ "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
+ "Persistence Id = " + persistenceId() +
+ " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
+ "journal-size={}",
+ replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
+ replicatedLog.snapshotTerm, replicatedLog.size());
+
+ currentBehavior = switchBehavior(RaftState.Follower);
+ onStateChanged();
+ }
+
@Override public void onReceiveCommand(Object message) {
if (message instanceof ApplyState){
ApplyState applyState = (ApplyState) message;
return context.getLastApplied();
}
+ protected RaftActorContext getRaftActorContext() {
+ return context;
+ }
+
/**
* setPeerAddress sets the address of a known peer at a later time.
* <p>
protected abstract void applyState(ActorRef clientActor, String identifier,
Object data);
+ /**
+ * This method is called during recovery at the start of a batch of state entries. Derived
+ * classes should perform any initialization needed to start a batch.
+ */
+ protected abstract void startLogRecoveryBatch(int maxBatchSize);
+
+ /**
+ * This method is called during recovery to append state data to the current batch. This method
+ * is called 1 or more times after {@link #startRecoveryStateBatch}.
+ *
+ * @param data the state data
+ */
+ protected abstract void appendRecoveredLogEntry(Payload data);
+
+ /**
+ * This method is called during recovery to reconstruct the state of the actor.
+ *
+ * @param snapshot A snapshot of the state of the actor
+ */
+ protected abstract void applyRecoverySnapshot(ByteString snapshot);
+
+ /**
+ * This method is called during recovery at the end of a batch to apply the current batched
+ * log entries. This method is called after {@link #appendRecoveryLogEntry}.
+ */
+ protected abstract void applyCurrentLogRecoveryBatch();
+
+ /**
+ * This method is called when recovery is complete.
+ */
+ protected abstract void onRecoveryComplete();
+
/**
* This method will be called by the RaftActor when a snapshot needs to be
* created. The derived actor should respond with its current state.
protected abstract void createSnapshot();
/**
- * This method will be called by the RaftActor during recovery to
- * reconstruct the state of the actor.
- * <p/>
- * This method may also be called at any other point during normal
+ * This method can be called at any other point during normal
* operations when the derived actor is out of sync with it's peers
* and the only way to bring it in sync is by applying a snapshot
*
// of a single command.
persist(replicatedLogEntry,
new Procedure<ReplicatedLogEntry>() {
+ @Override
public void apply(ReplicatedLogEntry evt) throws Exception {
// when a snaphsot is being taken, captureSnapshot != null
if (hasSnapshotCaptureInitiated == false &&
private long currentTerm = 0;
private String votedFor = null;
+ @Override
public long getCurrentTerm() {
return currentTerm;
}
+ @Override
public String getVotedFor() {
return votedFor;
}
this.LOG = logger;
}
+ @Override
public ActorRef actorOf(Props props){
return context.actorOf(props);
}
+ @Override
public ActorSelection actorSelection(String path){
return context.actorSelection(path);
}
+ @Override
public String getId() {
return id;
}
+ @Override
public ActorRef getActor() {
return actor;
}
+ @Override
public ElectionTerm getTermInformation() {
return termInformation;
}
+ @Override
public long getCommitIndex() {
return commitIndex;
}
this.commitIndex = commitIndex;
}
+ @Override
public long getLastApplied() {
return lastApplied;
}
*/
void append(ReplicatedLogEntry replicatedLogEntry);
+ /**
+ * Optimization method to increase the capacity of the journal log prior to appending entries.
+ *
+ * @param amount the amount to increase by
+ */
+ void increaseJournalLogCapacity(int amount);
+
/**
*
* @param replicatedLogEntry
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import akka.actor.Terminated;
import akka.event.Logging;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
import org.junit.After;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
-
+import scala.concurrent.duration.FiniteDuration;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.TestCase.assertEquals;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertEquals;
public class RaftActorTest extends AbstractActorTest {
public static class MockRaftActor extends RaftActor {
- private boolean applySnapshotCalled = false;
- private List<Object> state;
+ public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
+ private final Map<String, String> peerAddresses;
+ private final String id;
+ private final Optional<ConfigParams> config;
- public MockRaftActor(String id,
- Map<String, String> peerAddresses) {
- super(id, peerAddresses);
- state = new ArrayList<>();
+ private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
+ Optional<ConfigParams> config) {
+ this.peerAddresses = peerAddresses;
+ this.id = id;
+ this.config = config;
+ }
+
+ @Override
+ public MockRaftActor create() throws Exception {
+ return new MockRaftActor(id, peerAddresses, config);
+ }
}
- public RaftActorContext getRaftActorContext() {
- return context;
+ private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+ private final List<Object> state;
+
+ public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config) {
+ super(id, peerAddresses, config);
+ state = new ArrayList<>();
}
- public boolean isApplySnapshotCalled() {
- return applySnapshotCalled;
+ public void waitForRecoveryComplete() {
+ try {
+ assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
public List<Object> getState() {
return state;
}
- public static Props props(final String id, final Map<String, String> peerAddresses){
- return Props.create(new Creator<MockRaftActor>(){
-
- @Override public MockRaftActor create() throws Exception {
- return new MockRaftActor(id, peerAddresses);
- }
- });
+ public static Props props(final String id, final Map<String, String> peerAddresses,
+ Optional<ConfigParams> config){
+ return Props.create(new MockRaftActorCreator(peerAddresses, id, config));
}
@Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
+ }
+
+ @Override
+ protected void startLogRecoveryBatch(int maxBatchSize) {
+ }
+
+ @Override
+ protected void appendRecoveredLogEntry(Payload data) {
state.add(data);
}
- @Override protected void createSnapshot() {
- throw new UnsupportedOperationException("createSnapshot");
+ @Override
+ protected void applyCurrentLogRecoveryBatch() {
}
- @Override protected void applySnapshot(ByteString snapshot) {
- applySnapshotCalled = true;
+ @Override
+ protected void onRecoveryComplete() {
+ recoveryComplete.countDown();
+ }
+
+ @Override
+ protected void applyRecoverySnapshot(ByteString snapshot) {
try {
Object data = toObject(snapshot);
+ System.out.println("!!!!!applyRecoverySnapshot: "+data);
if (data instanceof List) {
state.addAll((List) data);
}
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- } catch (IOException e) {
+ } catch (Exception e) {
e.printStackTrace();
}
}
+ @Override protected void createSnapshot() {
+ throw new UnsupportedOperationException("createSnapshot");
+ }
+
+ @Override protected void applySnapshot(ByteString snapshot) {
+ }
+
@Override protected void onStateChanged() {
}
public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
super(actorSystem);
- raftActor = this.getSystem()
- .actorOf(MockRaftActor.props(actorName,
- Collections.EMPTY_MAP), actorName);
+ raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
+ Collections.EMPTY_MAP, Optional.<ConfigParams>absent()), actorName);
}
return
new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
) {
+ @Override
protected Boolean run() {
return true;
}
}
public void findLeader(final String expectedLeader){
+ raftActor.tell(new FindLeader(), getRef());
-
- new Within(duration("1 seconds")) {
- protected void run() {
-
- raftActor.tell(new FindLeader(), getRef());
-
- String s = new ExpectMsg<String>(duration("1 seconds"),
- "findLeader") {
- // do not put code outside this method, will run afterwards
- protected String match(Object in) {
- if (in instanceof FindLeaderReply) {
- return ((FindLeaderReply) in).getLeaderActor();
- } else {
- throw noMatch();
- }
- }
- }.get();// this extracts the received message
-
- assertEquals(expectedLeader, s);
-
- }
-
-
- };
+ FindLeaderReply reply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+ assertEquals("getLeaderActor", expectedLeader, reply.getLeaderActor());
}
public ActorRef getRaftActor() {
return raftActor;
}
-
}
}
@Test
- public void testRaftActorRecovery() {
+ public void testRaftActorRecovery() throws Exception {
new JavaTestKit(getSystem()) {{
- new Within(duration("1 seconds")) {
- protected void run() {
-
- String persistenceId = "follower10";
-
- ActorRef followerActor = getSystem().actorOf(
- MockRaftActor.props(persistenceId, Collections.EMPTY_MAP), persistenceId);
-
- List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
- ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E"));
- snapshotUnappliedEntries.add(entry1);
-
- int lastAppliedDuringSnapshotCapture = 3;
- int lastIndexDuringSnapshotCapture = 4;
-
- ByteString snapshotBytes = null;
- try {
- // 4 messages as part of snapshot, which are applied to state
- snapshotBytes = fromObject(Arrays.asList(new MockRaftActorContext.MockPayload("A"),
- new MockRaftActorContext.MockPayload("B"),
- new MockRaftActorContext.MockPayload("C"),
- new MockRaftActorContext.MockPayload("D")));
- } catch (Exception e) {
- e.printStackTrace();
- }
- Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
- snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
- lastAppliedDuringSnapshotCapture, 1);
- MockSnapshotStore.setMockSnapshot(snapshot);
- MockSnapshotStore.setPersistenceId(persistenceId);
-
- // add more entries after snapshot is taken
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F"));
- ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("G"));
- ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7, new MockRaftActorContext.MockPayload("H"));
- entries.add(entry2);
- entries.add(entry3);
- entries.add(entry4);
-
- int lastAppliedToState = 5;
- int lastIndex = 7;
-
- MockAkkaJournal.addToJournal(5, entry2);
- // 2 entries are applied to state besides the 4 entries in snapshot
- MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
- MockAkkaJournal.addToJournal(7, entry3);
- MockAkkaJournal.addToJournal(8, entry4);
-
- // kill the actor
- followerActor.tell(PoisonPill.getInstance(), null);
-
- try {
- // give some time for actor to die
- Thread.sleep(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- //reinstate the actor
- TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
- MockRaftActor.props(persistenceId, Collections.EMPTY_MAP));
-
- try {
- //give some time for snapshot offer to get called.
- Thread.sleep(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- RaftActorContext context = ref.underlyingActor().getRaftActorContext();
- assertEquals(snapshotUnappliedEntries.size() + entries.size(), context.getReplicatedLog().size());
- assertEquals(lastIndex, context.getReplicatedLog().lastIndex());
- assertEquals(lastAppliedToState, context.getLastApplied());
- assertEquals(lastAppliedToState, context.getCommitIndex());
- assertTrue(ref.underlyingActor().isApplySnapshotCalled());
- assertEquals(6, ref.underlyingActor().getState().size());
- }
- };
+ String persistenceId = "follower10";
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ // Set the heartbeat interval high to essentially disable election otherwise the test
+ // may fail if the actor is switched to Leader and the commitIndex is set to the last
+ // log entry.
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
+ Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
+
+ watch(followerActor);
+
+ List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
+ ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
+ new MockRaftActorContext.MockPayload("E"));
+ snapshotUnappliedEntries.add(entry1);
+
+ int lastAppliedDuringSnapshotCapture = 3;
+ int lastIndexDuringSnapshotCapture = 4;
+
+ // 4 messages as part of snapshot, which are applied to state
+ ByteString snapshotBytes = fromObject(Arrays.asList(
+ new MockRaftActorContext.MockPayload("A"),
+ new MockRaftActorContext.MockPayload("B"),
+ new MockRaftActorContext.MockPayload("C"),
+ new MockRaftActorContext.MockPayload("D")));
+
+ Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
+ snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
+ lastAppliedDuringSnapshotCapture, 1);
+ MockSnapshotStore.setMockSnapshot(snapshot);
+ MockSnapshotStore.setPersistenceId(persistenceId);
+
+ // add more entries after snapshot is taken
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
+ new MockRaftActorContext.MockPayload("F"));
+ ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+ new MockRaftActorContext.MockPayload("G"));
+ ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
+ new MockRaftActorContext.MockPayload("H"));
+ entries.add(entry2);
+ entries.add(entry3);
+ entries.add(entry4);
+
+ int lastAppliedToState = 5;
+ int lastIndex = 7;
+
+ MockAkkaJournal.addToJournal(5, entry2);
+ // 2 entries are applied to state besides the 4 entries in snapshot
+ MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
+ MockAkkaJournal.addToJournal(7, entry3);
+ MockAkkaJournal.addToJournal(8, entry4);
+
+ // kill the actor
+ followerActor.tell(PoisonPill.getInstance(), null);
+ expectMsgClass(duration("5 seconds"), Terminated.class);
+
+ unwatch(followerActor);
+
+ //reinstate the actor
+ TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
+ MockRaftActor.props(persistenceId, Collections.EMPTY_MAP,
+ Optional.<ConfigParams>of(config)));
+
+ ref.underlyingActor().waitForRecoveryComplete();
+
+ RaftActorContext context = ref.underlyingActor().getRaftActorContext();
+ assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
+ context.getReplicatedLog().size());
+ assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
+ assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
+ assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
+ assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
}};
-
}
private ByteString fromObject(Object snapshot) throws Exception {
package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
import com.google.common.base.Preconditions;
+
import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
import org.opendaylight.yangtools.yang.data.api.Node;
import java.net.URI;
import java.util.ArrayList;
import java.util.Date;
+import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
private static class DeSerializer implements NormalizedNodeDeSerializationContext {
private static Map<NormalizedNodeType, DeSerializationFunction>
- deSerializationFunctions = new HashMap<>();
+ deSerializationFunctions = new EnumMap<>(NormalizedNodeType.class);
static {
deSerializationFunctions.put(CONTAINER_NODE_TYPE,
private NormalizedNode deSerialize(NormalizedNodeMessages.Node node){
Preconditions.checkNotNull(node, "node should not be null");
- DeSerializationFunction deSerializationFunction =
- Preconditions.checkNotNull(deSerializationFunctions.get(NormalizedNodeType.values()[node.getIntType()]), "Unknown type " + node);
+
+ DeSerializationFunction deSerializationFunction = deSerializationFunctions.get(
+ NormalizedNodeType.values()[node.getIntType()]);
return deSerializationFunction.apply(this, node);
}
NormalizedNode apply(DeSerializer deserializer, NormalizedNodeMessages.Node node);
}
}
-
-
-
-
}
package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
import com.google.common.base.Preconditions;
+
import org.opendaylight.controller.cluster.datastore.node.utils.NodeIdentifierFactory;
import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory;
import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
import static org.opendaylight.controller.cluster.datastore.node.utils.serialization.PathArgumentType.getSerializablePathArgumentType;
public class PathArgumentSerializer {
+ private static final String REVISION_ARG = "?revision=";
private static final Map<Class, PathArgumentAttributesGetter> pathArgumentAttributesGetters = new HashMap<>();
public static NormalizedNodeMessages.PathArgument serialize(NormalizedNodeSerializationContext context, YangInstanceIdentifier.PathArgument pathArgument){
// If this serializer is used qName cannot be null (see encodeQName)
// adding null check only in case someone tried to deSerialize a protocol buffer node
// that was not serialized using the PathArgumentSerializer
- Preconditions.checkNotNull(qName, "qName should not be null");
- Preconditions.checkArgument(!"".equals(qName.getLocalName()),
- "qName.localName cannot be empty qName = " + qName.toString());
- Preconditions.checkArgument(qName.getNamespace() != -1, "qName.namespace should be valid");
+// Preconditions.checkNotNull(qName, "qName should not be null");
+// Preconditions.checkArgument(qName.getNamespace() != -1, "qName.namespace should be valid");
- StringBuilder sb = new StringBuilder();
String namespace = context.getNamespace(qName.getNamespace());
- String revision = "";
String localName = context.getLocalName(qName.getLocalName());
+ StringBuilder sb;
if(qName.getRevision() != -1){
- revision = context.getRevision(qName.getRevision());
- sb.append("(").append(namespace).append("?revision=").append(
- revision).append(")").append(
- localName);
+ String revision = context.getRevision(qName.getRevision());
+ sb = new StringBuilder(namespace.length() + REVISION_ARG.length() + revision.length() +
+ localName.length() + 2);
+ sb.append('(').append(namespace).append(REVISION_ARG).append(
+ revision).append(')').append(localName);
} else {
- sb.append("(").append(namespace).append(")").append(
- localName);
+ sb = new StringBuilder(namespace.length() + localName.length() + 2);
+ sb.append('(').append(namespace).append(')').append(localName);
}
return sb.toString();
-
}
/**
NormalizedNodeDeSerializationContext context,
NormalizedNodeMessages.PathArgument pathArgument) {
- Preconditions.checkArgument(pathArgument.getIntType() >= 0
- && pathArgument.getIntType() < PathArgumentType.values().length,
- "Illegal PathArgumentType " + pathArgument.getIntType());
-
switch(PathArgumentType.values()[pathArgument.getIntType()]){
case NODE_IDENTIFIER_WITH_VALUE : {
NormalizedNodeDeSerializationContext context,
List<NormalizedNodeMessages.PathArgumentAttribute> attributesList) {
- Map<QName, Object> map = new HashMap<>();
-
- for(NormalizedNodeMessages.PathArgumentAttribute attribute : attributesList){
+ Map<QName, Object> map;
+ if(attributesList.size() == 1) {
+ NormalizedNodeMessages.PathArgumentAttribute attribute = attributesList.get(0);
NormalizedNodeMessages.QName name = attribute.getName();
Object value = parseAttribute(context, attribute);
+ map = Collections.singletonMap(QNameFactory.create(qNameToString(context, name)), value);
+ } else {
+ map = new HashMap<>();
+
+ for(NormalizedNodeMessages.PathArgumentAttribute attribute : attributesList){
+ NormalizedNodeMessages.QName name = attribute.getName();
+ Object value = parseAttribute(context, attribute);
- map.put(QNameFactory.create(qNameToString(context, name)), value);
+ map.put(QNameFactory.create(qNameToString(context, name)), value);
+ }
}
return map;
package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
-import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory;
import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
private static Object deSerializeBasicTypes(int valueType, String value) {
- Preconditions.checkArgument(valueType >= 0 && valueType < ValueType.values().length,
- "Illegal value type " + valueType );
-
switch(ValueType.values()[valueType]){
case SHORT_TYPE: {
return Short.valueOf(value);
public static final ValueType getSerializableType(Object node){
Preconditions.checkNotNull(node, "node should not be null");
- if(types.containsKey(node.getClass())) {
- return types.get(node.getClass());
+ ValueType type = types.get(node.getClass());
+ if(type != null) {
+ return type;
} else if(node instanceof Set){
return BITS_TYPE;
}
package org.opendaylight.controller.cluster.datastore;
-import com.google.common.base.Preconditions;
-
+import org.opendaylight.controller.cluster.raft.ConfigParams;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;
private final Duration shardTransactionIdleTimeout;
private final int operationTimeoutInSeconds;
private final String dataStoreMXBeanType;
+ private final ConfigParams shardRaftConfig;
public DatastoreContext() {
- this.dataStoreProperties = null;
- this.dataStoreMXBeanType = "DistributedDatastore";
- this.shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
- this.operationTimeoutInSeconds = 5;
+ this("DistributedDatastore", null, Duration.create(10, TimeUnit.MINUTES), 5, 1000, 20000, 500);
}
public DatastoreContext(String dataStoreMXBeanType,
InMemoryDOMDataStoreConfigProperties dataStoreProperties,
Duration shardTransactionIdleTimeout,
- int operationTimeoutInSeconds) {
+ int operationTimeoutInSeconds,
+ int shardJournalRecoveryLogBatchSize,
+ int shardSnapshotBatchCount,
+ int shardHeartbeatIntervalInMillis) {
this.dataStoreMXBeanType = dataStoreMXBeanType;
- this.dataStoreProperties = Preconditions.checkNotNull(dataStoreProperties);
+ this.dataStoreProperties = dataStoreProperties;
this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
this.operationTimeoutInSeconds = operationTimeoutInSeconds;
+
+ DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
+ raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
+ TimeUnit.MILLISECONDS));
+ raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
+ raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
+ shardRaftConfig = raftConfig;
}
public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
public int getOperationTimeoutInSeconds() {
return operationTimeoutInSeconds;
}
+
+ public ConfigParams getShardRaftConfig() {
+ return shardRaftConfig;
+ }
}
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.cluster.raft.ConfigParams;
-import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
/**
* A Shard represents a portion of the logical data tree <br/>
*/
public class Shard extends RaftActor {
- private static final ConfigParams configParams = new ShardConfigParams();
-
public static final String DEFAULT_NAME = "default";
// The state of this Shard
private ActorRef createSnapshotTransaction;
+ /**
+ * Coordinates persistence recovery on startup.
+ */
+ private ShardRecoveryCoordinator recoveryCoordinator;
+ private List<Object> currentLogRecoveryBatch;
+
private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
- private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
+ protected Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
DatastoreContext datastoreContext, SchemaContext schemaContext) {
- super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
+ super(name.toString(), mapPeerAddresses(peerAddresses),
+ Optional.of(datastoreContext.getShardRaftConfig()));
this.name = name;
this.datastoreContext = datastoreContext;
DOMStoreThreePhaseCommitCohort cohort =
modificationToCohort.remove(serialized);
if (cohort == null) {
-
- if(LOG.isDebugEnabled()) {
- LOG.debug(
- "Could not find cohort for modification : {}. Writing modification using a new transaction",
- modification);
- }
-
- DOMStoreWriteTransaction transaction =
- store.newWriteOnlyTransaction();
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("Created new transaction {}", transaction.getIdentifier().toString());
- }
-
- modification.apply(transaction);
- try {
- syncCommitTransaction(transaction);
- } catch (InterruptedException | ExecutionException e) {
- shardMBean.incrementFailedTransactionsCount();
- LOG.error("Failed to commit", e);
- return;
- }
- //we want to just apply the recovery commit and return
- shardMBean.incrementCommittedTransactionCount();
+ // If there's no cached cohort then we must be applying replicated state.
+ commitWithNewTransaction(serialized);
return;
}
-
- if(sender == null){
+ if(sender == null) {
LOG.error("Commit failed. Sender cannot be null");
return;
}
}
+ private void commitWithNewTransaction(Object modification) {
+ DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
+ MutableCompositeModification.fromSerializable(modification, schemaContext).apply(tx);
+ try {
+ syncCommitTransaction(tx);
+ shardMBean.incrementCommittedTransactionCount();
+ } catch (InterruptedException | ExecutionException e) {
+ shardMBean.incrementFailedTransactionsCount();
+ LOG.error(e, "Failed to commit");
+ }
+ }
+
private void handleForwardedCommit(ForwardedCommitTransaction message) {
Object serializedModification =
message.getModification().toSerializable();
return config.isMetricCaptureEnabled();
}
- @Override protected void applyState(ActorRef clientActor, String identifier,
- Object data) {
+ @Override
+ protected
+ void startLogRecoveryBatch(int maxBatchSize) {
+ currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
+ }
+ }
+
+ @Override
+ protected void appendRecoveredLogEntry(Payload data) {
+ if (data instanceof CompositeModificationPayload) {
+ currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
+ } else {
+ LOG.error("Unknown state received {} during recovery", data);
+ }
+ }
+
+ @Override
+ protected void applyRecoverySnapshot(ByteString snapshot) {
+ if(recoveryCoordinator == null) {
+ recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+ }
+
+ recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{} : submitted recovery sbapshot", persistenceId());
+ }
+ }
+
+ @Override
+ protected void applyCurrentLogRecoveryBatch() {
+ if(recoveryCoordinator == null) {
+ recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+ }
+
+ recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(),
+ currentLogRecoveryBatch.size());
+ }
+ }
+
+ @Override
+ protected void onRecoveryComplete() {
+ if(recoveryCoordinator != null) {
+ Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size());
+ }
+
+ for(DOMStoreWriteTransaction tx: txList) {
+ try {
+ syncCommitTransaction(tx);
+ shardMBean.incrementCommittedTransactionCount();
+ } catch (InterruptedException | ExecutionException e) {
+ shardMBean.incrementFailedTransactionsCount();
+ LOG.error(e, "Failed to commit");
+ }
+ }
+ }
+
+ recoveryCoordinator = null;
+ currentLogRecoveryBatch = null;
+ updateJournalStats();
+ }
+
+ @Override
+ protected void applyState(ActorRef clientActor, String identifier, Object data) {
if (data instanceof CompositeModificationPayload) {
- Object modification =
- ((CompositeModificationPayload) data).getModification();
+ Object modification = ((CompositeModificationPayload) data).getModification();
if (modification != null) {
commit(clientActor, modification);
} else {
LOG.error(
"modification is null - this is very unexpected, clientActor = {}, identifier = {}",
- identifier, clientActor.path().toString());
+ identifier, clientActor != null ? clientActor.path().toString() : null);
}
} else {
- LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}", data, data.getClass().getClassLoader(), CompositeModificationPayload.class.getClassLoader());
+ LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
+ data, data.getClass().getClassLoader(),
+ CompositeModificationPayload.class.getClassLoader());
}
- // Update stats
+ updateJournalStats();
+
+ }
+
+ private void updateJournalStats() {
ReplicatedLogEntry lastLogEntry = getLastLogEntry();
if (lastLogEntry != null) {
shardMBean.setCommitIndex(getCommitIndex());
shardMBean.setLastApplied(getLastApplied());
-
}
- @Override protected void createSnapshot() {
+ @Override
+ protected void createSnapshot() {
if (createSnapshotTransaction == null) {
// Create a transaction. We are really going to treat the transaction as a worker
}
}
- @VisibleForTesting @Override protected void applySnapshot(ByteString snapshot) {
+ @VisibleForTesting
+ @Override
+ protected void applySnapshot(ByteString snapshot) {
// Since this will be done only on Recovery or when this actor is a Follower
// we can safely commit everything in here. We not need to worry about event notifications
// as they would have already been disabled on the follower
return this.name.toString();
}
-
- private static class ShardConfigParams extends DefaultConfigParamsImpl {
- public static final FiniteDuration HEART_BEAT_INTERVAL =
- new FiniteDuration(500, TimeUnit.MILLISECONDS);
-
- @Override public FiniteDuration getHeartBeatInterval() {
- return HEART_BEAT_INTERVAL;
- }
- }
-
private static class ShardCreator implements Creator<Shard> {
private static final long serialVersionUID = 1L;
}
}
- @VisibleForTesting NormalizedNode readStore() throws ExecutionException, InterruptedException {
+ @VisibleForTesting
+ NormalizedNode<?,?> readStore(YangInstanceIdentifier id)
+ throws ExecutionException, InterruptedException {
DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
- transaction.read(YangInstanceIdentifier.builder().build());
+ transaction.read(id);
- NormalizedNode<?, ?> node = future.get().get();
+ Optional<NormalizedNode<?, ?>> optional = future.get();
+ NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
transaction.close();
return node;
}
- @VisibleForTesting void writeToStore(YangInstanceIdentifier id, NormalizedNode node)
+ @VisibleForTesting
+ void writeToStore(YangInstanceIdentifier id, NormalizedNode<?,?> node)
throws ExecutionException, InterruptedException {
DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
syncCommitTransaction(transaction);
}
+ @VisibleForTesting
+ ShardStats getShardMBean() {
+ return shardMBean;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
+ * and journal log entry batch are de-serialized and applied to their own write transaction
+ * instance in parallel on a thread pool for faster recovery time. However the transactions are
+ * committed to the data store in the order the corresponding snapshot or log batch are received
+ * to preserve data store integrity.
+ *
+ * @author Thomas Panetelis
+ */
+class ShardRecoveryCoordinator {
+
+ private static final int TIME_OUT = 10;
+
+ private static final Logger LOG = LoggerFactory.getLogger(ShardRecoveryCoordinator.class);
+
+ private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
+ private final SchemaContext schemaContext;
+ private final String shardName;
+ private final ExecutorService executor;
+
+ ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext) {
+ this.schemaContext = schemaContext;
+ this.shardName = shardName;
+
+ executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("ShardRecovery-" + shardName + "-%d").build());
+ }
+
+ /**
+ * Submits a batch of journal log entries.
+ *
+ * @param logEntries the serialized journal log entries
+ * @param resultingTx the write Tx to which to apply the entries
+ */
+ void submit(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
+ LogRecoveryTask task = new LogRecoveryTask(logEntries, resultingTx);
+ resultingTxList.add(resultingTx);
+ executor.execute(task);
+ }
+
+ /**
+ * Submits a snapshot.
+ *
+ * @param snapshot the serialized snapshot
+ * @param resultingTx the write Tx to which to apply the entries
+ */
+ void submit(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
+ SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshot, resultingTx);
+ resultingTxList.add(resultingTx);
+ executor.execute(task);
+ }
+
+ Collection<DOMStoreWriteTransaction> getTransactions() {
+ // Shutdown the executor and wait for task completion.
+ executor.shutdown();
+
+ try {
+ if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES)) {
+ return resultingTxList;
+ } else {
+ LOG.error("Recovery for shard {} timed out after {} minutes", shardName, TIME_OUT);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ return Collections.emptyList();
+ }
+
+ private static abstract class ShardRecoveryTask implements Runnable {
+
+ final DOMStoreWriteTransaction resultingTx;
+
+ ShardRecoveryTask(DOMStoreWriteTransaction resultingTx) {
+ this.resultingTx = resultingTx;
+ }
+ }
+
+ private class LogRecoveryTask extends ShardRecoveryTask {
+
+ private final List<Object> logEntries;
+
+ LogRecoveryTask(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
+ super(resultingTx);
+ this.logEntries = logEntries;
+ }
+
+ @Override
+ public void run() {
+ for(int i = 0; i < logEntries.size(); i++) {
+ MutableCompositeModification.fromSerializable(
+ logEntries.get(i), schemaContext).apply(resultingTx);
+ // Null out to GC quicker.
+ logEntries.set(i, null);
+ }
+ }
+ }
+
+ private class SnapshotRecoveryTask extends ShardRecoveryTask {
+
+ private final ByteString snapshot;
+
+ SnapshotRecoveryTask(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
+ super(resultingTx);
+ this.snapshot = snapshot;
+ }
+
+ @Override
+ public void run() {
+ try {
+ NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
+ NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext).decode(
+ YangInstanceIdentifier.builder().build(), serializedNode);
+
+ // delete everything first
+ resultingTx.delete(YangInstanceIdentifier.builder().build());
+
+ // Add everything from the remote node back
+ resultingTx.write(YangInstanceIdentifier.builder().build(), node);
+ } catch (InvalidProtocolBufferException e) {
+ LOG.error("Error deserializing snapshot", e);
+ }
+ }
+ }
+}
DatastoreContext datastoreContext = new DatastoreContext("DistributedConfigDatastore",
InMemoryDOMDataStoreConfigProperties.create(
- props.getMaxShardDataChangeExecutorPoolSize().getValue(),
- props.getMaxShardDataChangeExecutorQueueSize().getValue(),
- props.getMaxShardDataChangeListenerQueueSize().getValue(),
- props.getMaxShardDataStoreExecutorQueueSize().getValue()),
+ props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
+ props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
+ props.getMaxShardDataChangeListenerQueueSize().getValue().intValue(),
+ props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue()),
Duration.create(props.getShardTransactionIdleTimeoutInMinutes().getValue(),
TimeUnit.MINUTES),
- props.getOperationTimeoutInSeconds().getValue());
+ props.getOperationTimeoutInSeconds().getValue(),
+ props.getShardJournalRecoveryLogBatchSize().getValue().intValue(),
+ props.getShardSnapshotBatchCount().getValue().intValue(),
+ props.getShardHearbeatIntervalInMillis().getValue());
return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
datastoreContext, bundleContext);
DatastoreContext datastoreContext = new DatastoreContext("DistributedOperationalDatastore",
InMemoryDOMDataStoreConfigProperties.create(
- props.getMaxShardDataChangeExecutorPoolSize().getValue(),
- props.getMaxShardDataChangeExecutorQueueSize().getValue(),
- props.getMaxShardDataChangeListenerQueueSize().getValue(),
- props.getMaxShardDataStoreExecutorQueueSize().getValue()),
+ props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
+ props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
+ props.getMaxShardDataChangeListenerQueueSize().getValue().intValue(),
+ props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue()),
Duration.create(props.getShardTransactionIdleTimeoutInMinutes().getValue(),
TimeUnit.MINUTES),
- props.getOperationTimeoutInSeconds().getValue());
+ props.getOperationTimeoutInSeconds().getValue(),
+ props.getShardJournalRecoveryLogBatchSize().getValue().intValue(),
+ props.getShardSnapshotBatchCount().getValue().intValue(),
+ props.getShardHearbeatIntervalInMillis().getValue());
return DistributedDataStoreFactory.createInstance("operational",
getOperationalSchemaServiceDependency(), datastoreContext, bundleContext);
config:java-name-prefix DistributedOperationalDataStoreProvider;
}
- typedef non-zero-uint16-type {
- type uint16 {
+ typedef non-zero-uint32-type {
+ type uint32 {
range "1..max";
}
}
}
}
+ typedef heartbeat-interval-type {
+ type uint16 {
+ range "100..max";
+ }
+ }
+
grouping data-store-properties {
leaf max-shard-data-change-executor-queue-size {
default 1000;
- type non-zero-uint16-type;
+ type non-zero-uint32-type;
description "The maximum queue size for each shard's data store data change notification executor.";
}
leaf max-shard-data-change-executor-pool-size {
default 20;
- type non-zero-uint16-type;
+ type non-zero-uint32-type;
description "The maximum thread pool size for each shard's data store data change notification executor.";
}
leaf max-shard-data-change-listener-queue-size {
default 1000;
- type non-zero-uint16-type;
+ type non-zero-uint32-type;
description "The maximum queue size for each shard's data store data change listeners.";
}
leaf max-shard-data-store-executor-queue-size {
default 5000;
- type non-zero-uint16-type;
+ type non-zero-uint32-type;
description "The maximum queue size for each shard's data store executor.";
}
leaf shard-transaction-idle-timeout-in-minutes {
default 10;
- type non-zero-uint16-type;
+ type non-zero-uint32-type;
description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.";
}
+ leaf shard-snapshot-batch-count {
+ default 20000;
+ type non-zero-uint32-type;
+ description "The minimum number of entries to be present in the in-memory journal log before a snapshot to be taken.";
+ }
+
+ leaf shard-hearbeat-interval-in-millis {
+ default 500;
+ type heartbeat-interval-type;
+ description "The interval at which a shard will send a heart beat message to its remote shard.";
+ }
+
leaf operation-timeout-in-seconds {
default 5;
type operation-timeout-type;
description "The maximum amount of time for akka operations (remote or local) to complete before failing.";
}
+ leaf shard-journal-recovery-log-batch-size {
+ default 5000;
+ type non-zero-uint32-type;
+ description "The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.";
+ }
+
leaf enable-metric-capture {
default false;
type boolean;
leaf bounded-mailbox-capacity {
default 1000;
- type non-zero-uint16-type;
+ type non-zero-uint32-type;
description "Max queue size that an actor's mailbox can reach";
}
}
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
-import org.apache.commons.io.FileUtils;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import java.io.File;
import java.io.IOException;
public abstract class AbstractActorTest {
System.setProperty("shard.persistent", "false");
system = ActorSystem.create("test");
-
- deletePersistenceFiles();
}
@AfterClass
public static void tearDownClass() throws IOException {
JavaTestKit.shutdownActorSystem(system);
system = null;
-
- deletePersistenceFiles();
- }
-
- protected static void deletePersistenceFiles() throws IOException {
- File journal = new File("journal");
-
- if(journal.exists()) {
- FileUtils.deleteDirectory(journal);
- }
-
- File snapshots = new File("snapshots");
-
- if(snapshots.exists()){
- FileUtils.deleteDirectory(snapshots);
- }
-
}
protected ActorSystem getSystem() {
return system;
}
-
}
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.event.Logging;
+import akka.japi.Creator;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.Duration;
import java.io.IOException;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
-
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
public class ShardTest extends AbstractActorTest {
- private static final DatastoreContext DATA_STORE_CONTEXT = new DatastoreContext();
+ private static final DatastoreContext DATA_STORE_CONTEXT =
+ new DatastoreContext("", null, Duration.create(10, TimeUnit.MINUTES), 5, 3, 5000, 500);
- @Test
- public void testOnReceiveRegisterListener() throws Exception {
- new JavaTestKit(getSystem()) {{
- final ShardIdentifier identifier =
- ShardIdentifier.builder().memberName("member-1")
- .shardName("inventory").type("config").build();
+ private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
- final ActorRef subject =
- getSystem().actorOf(props, "testRegisterChangeListener");
+ private static final ShardIdentifier IDENTIFIER = ShardIdentifier.builder().memberName("member-1")
+ .shardName("inventory").type("config").build();
- new Within(duration("3 seconds")) {
- @Override
- protected void run() {
+ @Before
+ public void setUp() {
+ System.setProperty("shard.persistent", "false");
- subject.tell(
- new UpdateSchemaContext(SchemaContextHelper.full()),
- getRef());
+ InMemorySnapshotStore.clear();
+ InMemoryJournal.clear();
+ }
- subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
- getRef().path(), AsyncDataBroker.DataChangeScope.BASE),
- getRef());
+ @After
+ public void tearDown() {
+ InMemorySnapshotStore.clear();
+ InMemoryJournal.clear();
+ }
- final Boolean notificationEnabled = new ExpectMsg<Boolean>(
- duration("3 seconds"), "enable notification") {
- // do not put code outside this method, will run afterwards
- @Override
- protected Boolean match(Object in) {
- if(in instanceof EnableNotification){
- return ((EnableNotification) in).isEnabled();
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertFalse(notificationEnabled);
-
- final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in.getClass().equals(RegisterChangeListenerReply.class)) {
- RegisterChangeListenerReply reply =
- (RegisterChangeListenerReply) in;
- return reply.getListenerRegistrationPath()
- .toString();
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
+ private Props newShardProps() {
+ return Shard.props(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
+ DATA_STORE_CONTEXT, SCHEMA_CONTEXT);
+ }
- assertTrue(out.matches(
- "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
- }
+ @Test
+ public void testOnReceiveRegisterListener() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ ActorRef subject = getSystem().actorOf(newShardProps(), "testRegisterChangeListener");
+ subject.tell(new UpdateSchemaContext(SchemaContextHelper.full()), getRef());
- };
+ subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
+ getRef().path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
+
+ EnableNotification enable = expectMsgClass(duration("3 seconds"), EnableNotification.class);
+ assertEquals("isEnabled", false, enable.isEnabled());
+
+ RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
+ RegisterChangeListenerReply.class);
+ assertTrue(reply.getListenerRegistrationPath().toString().matches(
+ "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
}};
}
@Test
public void testCreateTransaction(){
- new JavaTestKit(getSystem()) {{
- final ShardIdentifier identifier =
- ShardIdentifier.builder().memberName("member-1")
- .shardName("inventory").type("config").build();
-
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
- final ActorRef subject =
- getSystem().actorOf(props, "testCreateTransaction");
-
- // Wait for a specific log message to show up
- final boolean result =
- new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
- ) {
- @Override
- protected Boolean run() {
- return true;
- }
- }.from(subject.path().toString())
- .message("Switching from state Candidate to Leader")
- .occurrences(1).exec();
-
- Assert.assertEquals(true, result);
+ new ShardTestKit(getSystem()) {{
+ ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransaction");
- new Within(duration("3 seconds")) {
- @Override
- protected void run() {
+ waitUntilLeader(subject);
- subject.tell(
- new UpdateSchemaContext(TestModel.createTestContext()),
- getRef());
+ subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(),
- getRef());
+ subject.tell(new CreateTransaction("txn-1",
+ TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
- final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in instanceof CreateTransactionReply) {
- CreateTransactionReply reply =
- (CreateTransactionReply) in;
- return reply.getTransactionActorPath()
- .toString();
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
+ CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
+ CreateTransactionReply.class);
- assertTrue("Unexpected transaction path " + out,
- out.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
- expectNoMsg();
- }
- };
+ String path = reply.getTransactionActorPath().toString();
+ assertTrue("Unexpected transaction path " + path,
+ path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
+ expectNoMsg();
}};
}
@Test
public void testCreateTransactionOnChain(){
- new JavaTestKit(getSystem()) {{
- final ShardIdentifier identifier =
- ShardIdentifier.builder().memberName("member-1")
- .shardName("inventory").type("config").build();
-
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
- final ActorRef subject =
- getSystem().actorOf(props, "testCreateTransactionOnChain");
-
- // Wait for a specific log message to show up
- final boolean result =
- new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
- ) {
- @Override
- protected Boolean run() {
- return true;
- }
- }.from(subject.path().toString())
- .message("Switching from state Candidate to Leader")
- .occurrences(1).exec();
-
- Assert.assertEquals(true, result);
-
- new Within(duration("3 seconds")) {
- @Override
- protected void run() {
+ new ShardTestKit(getSystem()) {{
+ final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
- subject.tell(
- new UpdateSchemaContext(TestModel.createTestContext()),
- getRef());
+ waitUntilLeader(subject);
- subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
- getRef());
+ subject.tell(new CreateTransaction("txn-1",
+ TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
+ getRef());
- final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in instanceof CreateTransactionReply) {
- CreateTransactionReply reply =
- (CreateTransactionReply) in;
- return reply.getTransactionActorPath()
- .toString();
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
+ CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
+ CreateTransactionReply.class);
- assertTrue("Unexpected transaction path " + out,
- out.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
- expectNoMsg();
- }
- };
+ String path = reply.getTransactionActorPath().toString();
+ assertTrue("Unexpected transaction path " + path,
+ path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
+ expectNoMsg();
}};
}
@Test
public void testPeerAddressResolved(){
new JavaTestKit(getSystem()) {{
- Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
-
final ShardIdentifier identifier =
ShardIdentifier.builder().memberName("member-1")
.shardName("inventory").type("config").build();
- peerAddresses.put(identifier, null);
- final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext());
- final ActorRef subject =
- getSystem().actorOf(props, "testPeerAddressResolved");
+ Props props = Shard.props(identifier,
+ Collections.<ShardIdentifier, String>singletonMap(identifier, null),
+ DATA_STORE_CONTEXT, SCHEMA_CONTEXT);
+ final ActorRef subject = getSystem().actorOf(props, "testPeerAddressResolved");
new Within(duration("3 seconds")) {
@Override
@Test
public void testApplySnapshot() throws ExecutionException, InterruptedException {
- Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
+ TestActorRef<Shard> ref = TestActorRef.create(getSystem(), newShardProps());
- final ShardIdentifier identifier =
- ShardIdentifier.builder().memberName("member-1")
- .shardName("inventory").type("config").build();
+ NormalizedNodeToNodeCodec codec =
+ new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
- peerAddresses.put(identifier, null);
- final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext());
+ ref.underlyingActor().writeToStore(TestModel.TEST_PATH, ImmutableNodes.containerNode(
+ TestModel.TEST_QNAME));
- TestActorRef<Shard> ref = TestActorRef.create(getSystem(), props);
+ YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
+ NormalizedNode<?,?> expected = ref.underlyingActor().readStore(root);
- ref.underlyingActor().updateSchemaContext(TestModel.createTestContext());
+ NormalizedNodeMessages.Container encode = codec.encode(root, expected);
- NormalizedNodeToNodeCodec codec =
- new NormalizedNodeToNodeCodec(TestModel.createTestContext());
+ ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
+ encode.getNormalizedNode().toByteString().toByteArray(),
+ Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
- ref.underlyingActor().writeToStore(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ ref.underlyingActor().onReceiveCommand(applySnapshot);
- NormalizedNode expected = ref.underlyingActor().readStore();
+ NormalizedNode<?,?> actual = ref.underlyingActor().readStore(root);
- NormalizedNodeMessages.Container encode = codec
- .encode(YangInstanceIdentifier.builder().build(), expected);
+ assertEquals(expected, actual);
+ }
+ @Test
+ public void testApplyState() throws Exception {
- ref.underlyingActor().applySnapshot(encode.getNormalizedNode().toByteString());
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps());
- NormalizedNode actual = ref.underlyingActor().readStore();
+ NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- assertEquals(expected, actual);
- }
+ MutableCompositeModification compMod = new MutableCompositeModification();
+ compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
+ Payload payload = new CompositeModificationPayload(compMod.toSerializable());
+ ApplyState applyState = new ApplyState(null, "test",
+ new ReplicatedLogImplEntry(1, 2, payload));
- private static class ShardTestKit extends JavaTestKit {
+ shard.underlyingActor().onReceiveCommand(applyState);
- private ShardTestKit(ActorSystem actorSystem) {
- super(actorSystem);
+ NormalizedNode<?,?> actual = shard.underlyingActor().readStore(TestModel.TEST_PATH);
+ assertEquals("Applied state", node, actual);
+ }
+
+ @SuppressWarnings("serial")
+ @Test
+ public void testRecovery() throws Exception {
+
+ // Set up the InMemorySnapshotStore.
+
+ InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
+ testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
+
+ DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
+ writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
+ commitCohort.preCommit().get();
+ commitCohort.commit().get();
+
+ DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
+ NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
+
+ InMemorySnapshotStore.addSnapshot(IDENTIFIER.toString(), Snapshot.create(
+ new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
+ YangInstanceIdentifier.builder().build(), root).
+ getNormalizedNode().toByteString().toByteArray(),
+ Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+
+ // Set up the InMemoryJournal.
+
+ InMemoryJournal.addEntry(IDENTIFIER.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
+ new WriteModification(TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+ SCHEMA_CONTEXT))));
+
+ int nListEntries = 11;
+ Set<Integer> listEntryKeys = new HashSet<>();
+ for(int i = 1; i <= nListEntries; i++) {
+ listEntryKeys.add(Integer.valueOf(i));
+ YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+ Modification mod = new MergeModification(path,
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
+ SCHEMA_CONTEXT);
+ InMemoryJournal.addEntry(IDENTIFIER.toString(), i, new ReplicatedLogImplEntry(i, 1,
+ newPayload(mod)));
}
- protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
- // Wait for a specific log message to show up
- final boolean result =
- new JavaTestKit.EventFilter<Boolean>(logLevel
- ) {
+ InMemoryJournal.addEntry(IDENTIFIER.toString(), nListEntries + 1,
+ new ApplyLogEntries(nListEntries));
+
+ // Create the actor and wait for recovery complete.
+
+ final CountDownLatch recoveryComplete = new CountDownLatch(1);
+
+ Creator<Shard> creator = new Creator<Shard>() {
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
+ DATA_STORE_CONTEXT, SCHEMA_CONTEXT) {
@Override
- protected Boolean run() {
- return true;
+ protected void onRecoveryComplete() {
+ try {
+ super.onRecoveryComplete();
+ } finally {
+ recoveryComplete.countDown();
+ }
}
- }.from(subject.path().toString())
- .message(logMessage)
- .occurrences(1).exec();
+ };
+ }
+ };
- Assert.assertEquals(true, result);
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(creator)), "testRecovery");
+
+ assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
+
+ // Verify data in the data store.
+
+ NormalizedNode<?, ?> outerList = shard.underlyingActor().readStore(TestModel.OUTER_LIST_PATH);
+ assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
+ assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
+ outerList.getValue() instanceof Iterable);
+ for(Object entry: (Iterable<?>) outerList.getValue()) {
+ assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
+ entry instanceof MapEntryNode);
+ MapEntryNode mapEntry = (MapEntryNode)entry;
+ Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
+ mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
+ assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
+ Object value = idLeaf.get().getValue();
+ assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
+ listEntryKeys.remove(value));
+ }
+ if(!listEntryKeys.isEmpty()) {
+ fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
+ listEntryKeys);
}
+ assertEquals("Last log index", nListEntries,
+ shard.underlyingActor().getShardMBean().getLastLogIndex());
+ assertEquals("Commit index", nListEntries,
+ shard.underlyingActor().getShardMBean().getCommitIndex());
+ assertEquals("Last applied", nListEntries,
+ shard.underlyingActor().getShardMBean().getLastApplied());
}
+ private CompositeModificationPayload newPayload(Modification... mods) {
+ MutableCompositeModification compMod = new MutableCompositeModification();
+ for(Modification mod: mods) {
+ compMod.addModification(mod);
+ }
+
+ return new CompositeModificationPayload(compMod.toSerializable());
+ }
+
+ @SuppressWarnings("unchecked")
@Test
- public void testCreateSnapshot() throws IOException, InterruptedException {
+ public void testForwardedCommitTransactionWithPersistence() throws IOException {
+ System.setProperty("shard.persistent", "true");
+
new ShardTestKit(getSystem()) {{
- final ShardIdentifier identifier =
- ShardIdentifier.builder().memberName("member-1")
- .shardName("inventory").type("config").build();
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps());
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
- final ActorRef subject =
- getSystem().actorOf(props, "testCreateSnapshot");
+ waitUntilLeader(shard);
- // Wait for a specific log message to show up
- this.waitForLogMessage(Logging.Info.class, subject, "Switching from state Candidate to Leader");
+ NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class);
+ doReturn(Futures.immediateFuture(null)).when(cohort).commit();
- new Within(duration("3 seconds")) {
- @Override
- protected void run() {
+ MutableCompositeModification modification = new MutableCompositeModification();
+ modification.addModification(new WriteModification(TestModel.TEST_PATH, node,
+ SCHEMA_CONTEXT));
- subject.tell(
- new UpdateSchemaContext(TestModel.createTestContext()),
- getRef());
+ shard.tell(new ForwardedCommitTransaction(cohort, modification), getRef());
- subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
- getRef());
+ expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
- waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+ verify(cohort).commit();
- subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
- getRef());
+ assertEquals("Last log index", 0, shard.underlyingActor().getShardMBean().getLastLogIndex());
+ }};
+ }
- waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+ @Test
+ public void testCreateSnapshot() throws IOException, InterruptedException {
+ new ShardTestKit(getSystem()) {{
+ final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateSnapshot");
- }
- };
+ waitUntilLeader(subject);
+
+ subject.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
+
+ waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+
+ subject.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
- deletePersistenceFiles();
+ waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
}};
}
InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
- store.onGlobalContextUpdated(TestModel.createTestContext());
+ store.onGlobalContextUpdated(SCHEMA_CONTEXT);
DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
putTransaction.write(TestModel.TEST_PATH,
}
};
}
+
+ private static final class DelegatingShardCreator implements Creator<Shard> {
+ private final Creator<Shard> delegate;
+
+ DelegatingShardCreator(Creator<Shard> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public Shard create() throws Exception {
+ return delegate.create();
+ }
+ }
+
+ private static class ShardTestKit extends JavaTestKit {
+
+ private ShardTestKit(ActorSystem actorSystem) {
+ super(actorSystem);
+ }
+
+ protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
+ // Wait for a specific log message to show up
+ final boolean result =
+ new JavaTestKit.EventFilter<Boolean>(logLevel
+ ) {
+ @Override
+ protected Boolean run() {
+ return true;
+ }
+ }.from(subject.path().toString())
+ .message(logMessage)
+ .occurrences(1).exec();
+
+ Assert.assertEquals(true, result);
+
+ }
+
+ protected void waitUntilLeader(ActorRef subject) {
+ waitForLogMessage(Logging.Info.class, subject,
+ "Switching from state Candidate to Leader");
+ }
+ }
}
datastoreContext = new DatastoreContext("Test",
InMemoryDOMDataStoreConfigProperties.getDefault(),
- Duration.create(500, TimeUnit.MILLISECONDS), 5);
+ Duration.create(500, TimeUnit.MILLISECONDS), 5, 1000, 1000, 500);
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.collect.Maps;
+import scala.concurrent.Future;
+import akka.dispatch.Futures;
+import akka.japi.Procedure;
+import akka.persistence.PersistentConfirmation;
+import akka.persistence.PersistentId;
+import akka.persistence.PersistentImpl;
+import akka.persistence.PersistentRepr;
+import akka.persistence.journal.japi.AsyncWriteJournal;
+
+public class InMemoryJournal extends AsyncWriteJournal {
+
+ private static Map<String, Map<Long, Object>> journals = new ConcurrentHashMap<>();
+
+ public static void addEntry(String persistenceId, long sequenceNr, Object data) {
+ Map<Long, Object> journal = journals.get(persistenceId);
+ if(journal == null) {
+ journal = Maps.newLinkedHashMap();
+ journals.put(persistenceId, journal);
+ }
+
+ journal.put(sequenceNr, data);
+ }
+
+ public static void clear() {
+ journals.clear();
+ }
+
+ @Override
+ public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr,
+ long toSequenceNr, long max, final Procedure<PersistentRepr> replayCallback) {
+ return Futures.future(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ Map<Long, Object> journal = journals.get(persistenceId);
+ if(journal == null) {
+ return null;
+ }
+
+ for (Map.Entry<Long,Object> entry : journal.entrySet()) {
+ PersistentRepr persistentMessage =
+ new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, false, null, null);
+ replayCallback.apply(persistentMessage);
+ }
+
+ return null;
+ }
+ }, context().dispatcher());
+ }
+
+ @Override
+ public Future<Long> doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) {
+ return Futures.successful(new Long(0));
+ }
+
+ @Override
+ public Future<Void> doAsyncWriteMessages(Iterable<PersistentRepr> messages) {
+ return Futures.successful(null);
+ }
+
+ @Override
+ public Future<Void> doAsyncWriteConfirmations(Iterable<PersistentConfirmation> confirmations) {
+ return Futures.successful(null);
+ }
+
+ @Override
+ public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> messageIds, boolean permanent) {
+ return Futures.successful(null);
+ }
+
+ @Override
+ public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) {
+ return Futures.successful(null);
+ }
+}
import akka.persistence.snapshot.japi.SnapshotStore;
import com.google.common.collect.Iterables;
import scala.concurrent.Future;
-
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.controller.cluster.raft.Snapshot;
public class InMemorySnapshotStore extends SnapshotStore {
- Map<String, List<Snapshot>> snapshots = new HashMap<>();
+ private static Map<String, List<StoredSnapshot>> snapshots = new ConcurrentHashMap<>();
+
+ public static void addSnapshot(String persistentId, Snapshot snapshot) {
+ List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
+
+ if(snapshotList == null) {
+ snapshotList = new ArrayList<>();
+ snapshots.put(persistentId, snapshotList);
+ }
+
+ snapshotList.add(new StoredSnapshot(new SnapshotMetadata(persistentId, snapshotList.size(),
+ System.currentTimeMillis()), snapshot));
+ }
+
+ public static void clear() {
+ snapshots.clear();
+ }
- @Override public Future<Option<SelectedSnapshot>> doLoadAsync(String s,
+ @Override
+ public Future<Option<SelectedSnapshot>> doLoadAsync(String s,
SnapshotSelectionCriteria snapshotSelectionCriteria) {
- List<Snapshot> snapshotList = snapshots.get(s);
+ List<StoredSnapshot> snapshotList = snapshots.get(s);
if(snapshotList == null){
return Futures.successful(Option.<SelectedSnapshot>none());
}
- Snapshot snapshot = Iterables.getLast(snapshotList);
+ StoredSnapshot snapshot = Iterables.getLast(snapshotList);
SelectedSnapshot selectedSnapshot =
new SelectedSnapshot(snapshot.getMetadata(), snapshot.getData());
return Futures.successful(Option.some(selectedSnapshot));
}
- @Override public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
- List<Snapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
+ @Override
+ public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
+ List<StoredSnapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
if(snapshotList == null){
snapshotList = new ArrayList<>();
snapshots.put(snapshotMetadata.persistenceId(), snapshotList);
}
- snapshotList.add(new Snapshot(snapshotMetadata, o));
+ snapshotList.add(new StoredSnapshot(snapshotMetadata, o));
return Futures.successful(null);
}
- @Override public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
+ @Override
+ public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
}
- @Override public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
- List<Snapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
+ @Override
+ public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
+ List<StoredSnapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
if(snapshotList == null){
return;
int deleteIndex = -1;
for(int i=0;i<snapshotList.size(); i++){
- Snapshot snapshot = snapshotList.get(i);
+ StoredSnapshot snapshot = snapshotList.get(i);
if(snapshotMetadata.equals(snapshot.getMetadata())){
deleteIndex = i;
break;
}
- @Override public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria)
+ @Override
+ public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria)
throws Exception {
- List<Snapshot> snapshotList = snapshots.get(s);
+ List<StoredSnapshot> snapshotList = snapshots.get(s);
if(snapshotList == null){
return;
snapshots.remove(s);
}
- private static class Snapshot {
+ private static class StoredSnapshot {
private final SnapshotMetadata metadata;
private final Object data;
- private Snapshot(SnapshotMetadata metadata, Object data) {
+ private StoredSnapshot(SnapshotMetadata metadata, Object data) {
this.metadata = metadata;
this.data = data;
}
akka {
persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+ persistence.journal.plugin = "in-memory-journal"
loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
}
}
+in-memory-journal {
+ class = "org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal"
+}
+
in-memory-snapshot-store {
# Class name of the plugin.
class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore"