From: tpantelis Date: Sun, 31 Aug 2014 15:15:18 +0000 (-0400) Subject: Bug 1831 Batch messages on journal recovery X-Git-Tag: release/helium~32 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=9e59cc0d824e6752a7a3f3ba092abaaf3c1d4193 Bug 1831 Batch messages on journal recovery Added journal log recovery batching support in RaftActor along with additonal abstract methods for granular recovery control. It turns out that batching helps a little but the real performance hog was deserialization. There was a couple inefficient PreConditions in the NormalizedNodeSerializer (unwanted String concats in the fast path) that accounted for significant performance degradation. I also made a few other minor performance enhancements. Although deserialization is much better with the fixes, I also implemented some parallelization during shard recovery. I added a ShardRecoveryCoordinator class that parallelizes deserialization of journal log enry batches and snapshots for faster recovery time. The resulting transactions are still committed to the data store in the order the corresponding snapshot or log batch are received to preserve data store integrity. The journal recovery log batch size is configurable vai the config XML. I also made the shard heartbeat interval and shard snapshot batch count configurable. ` Change-Id: I52ef1690bfb6cc486c329ee60f77c52720c24469 Signed-off-by: tpantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 3bfdf732cf..04df7785ad 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -11,8 +11,10 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.Props; import akka.japi.Creator; + import com.google.common.base.Optional; import com.google.protobuf.ByteString; + import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; @@ -165,4 +167,24 @@ public class ExampleActor extends RaftActor { @Override public String persistenceId() { return getId(); } + + @Override + protected void startLogRecoveryBatch(int maxBatchSize) { + } + + @Override + protected void appendRecoveredLogEntry(Payload data) { + } + + @Override + protected void applyCurrentLogRecoveryBatch() { + } + + @Override + protected void onRecoveryComplete() { + } + + @Override + protected void applyRecoverySnapshot(ByteString snapshot) { + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java index b436bce500..2be4a0c36f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -18,13 +18,14 @@ import java.util.List; */ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { - protected List journal; + // We define this as ArrayList so we can use ensureCapacity. + protected ArrayList journal; protected ByteString snapshot; protected long snapshotIndex = -1; protected long snapshotTerm = -1; // to be used for rollback during save snapshot failure - protected List snapshottedJournal; + protected ArrayList snapshottedJournal; protected ByteString previousSnapshot; protected long previousSnapshotIndex = -1; protected long previousSnapshotTerm = -1; @@ -106,6 +107,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { journal.add(replicatedLogEntry); } + @Override + public void increaseJournalLogCapacity(int amount) { + journal.ensureCapacity(journal.size() + amount); + } + @Override public List getFrom(long logEntryIndex) { return getFrom(logEntryIndex, journal.size()); @@ -208,7 +214,6 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { @Override public void snapshotCommit() { - snapshottedJournal.clear(); snapshottedJournal = null; previousSnapshotIndex = -1; previousSnapshotTerm = -1; @@ -218,7 +223,6 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { @Override public void snapshotRollback() { snapshottedJournal.addAll(journal); - journal.clear(); journal = snapshottedJournal; snapshottedJournal = null; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java index ed6439d8c3..bff2a27797 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java @@ -26,7 +26,7 @@ public interface ConfigParams { * * @return long */ - public long getSnapshotBatchCount(); + long getSnapshotBatchCount(); /** * The interval at which a heart beat message will be sent to the remote @@ -34,7 +34,7 @@ public interface ConfigParams { * * @return FiniteDuration */ - public FiniteDuration getHeartBeatInterval(); + FiniteDuration getHeartBeatInterval(); /** * The interval in which a new election would get triggered if no leader is found @@ -43,7 +43,7 @@ public interface ConfigParams { * * @return FiniteDuration */ - public FiniteDuration getElectionTimeOutInterval(); + FiniteDuration getElectionTimeOutInterval(); /** * The maximum election time variance. The election is scheduled using both @@ -51,10 +51,15 @@ public interface ConfigParams { * * @return int */ - public int getElectionTimeVariance(); + int getElectionTimeVariance(); /** * The size (in bytes) of the snapshot chunk sent from Leader */ - public int getSnapshotChunkSize(); + int getSnapshotChunkSize(); + + /** + * The number of journal log entries to batch on recovery before applying. + */ + int getJournalRecoveryLogBatchSize(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java index 9d06f63604..dc4145358a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java @@ -20,12 +20,14 @@ public class DefaultConfigParamsImpl implements ConfigParams { private static final int SNAPSHOT_BATCH_COUNT = 20000; + private static final int JOURNAL_RECOVERY_LOG_BATCH_SIZE = 1000; + /** * The maximum election time variance */ private static final int ELECTION_TIME_MAX_VARIANCE = 100; - private final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB + private static final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB /** @@ -39,17 +41,32 @@ public class DefaultConfigParamsImpl implements ConfigParams { new FiniteDuration(100, TimeUnit.MILLISECONDS); + private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL; + private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT; + private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE; + + public void setHeartBeatInterval(FiniteDuration heartBeatInterval) { + this.heartBeatInterval = heartBeatInterval; + } + + public void setSnapshotBatchCount(long snapshotBatchCount) { + this.snapshotBatchCount = snapshotBatchCount; + } + + public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) { + this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize; + } + @Override public long getSnapshotBatchCount() { - return SNAPSHOT_BATCH_COUNT; + return snapshotBatchCount; } @Override public FiniteDuration getHeartBeatInterval() { - return HEART_BEAT_INTERVAL; + return heartBeatInterval; } - @Override public FiniteDuration getElectionTimeOutInterval() { // returns 2 times the heart beat interval @@ -65,4 +82,9 @@ public class DefaultConfigParamsImpl implements ConfigParams { public int getSnapshotChunkSize() { return SNAPSHOT_CHUNK_SIZE; } + + @Override + public int getJournalRecoveryLogBatchSize() { + return journalRecoveryLogBatchSize; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 0a4e2170e5..64fa749604 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -20,6 +20,7 @@ import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.UntypedPersistentActor; import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; @@ -39,7 +40,6 @@ import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; - import java.io.Serializable; import java.util.Map; @@ -97,7 +97,7 @@ public abstract class RaftActor extends UntypedPersistentActor { * This context should NOT be passed directly to any other actor it is * only to be consumed by the RaftActorBehaviors */ - protected RaftActorContext context; + private final RaftActorContext context; /** * The in-memory journal @@ -108,6 +108,10 @@ public abstract class RaftActor extends UntypedPersistentActor { private volatile boolean hasSnapshotCaptureInitiated = false; + private Stopwatch recoveryTimer; + + private int currentRecoveryBatchCount; + public RaftActor(String id, Map peerAddresses) { this(id, peerAddresses, Optional.absent()); } @@ -122,71 +126,134 @@ public abstract class RaftActor extends UntypedPersistentActor { LOG); } - @Override public void onReceiveRecover(Object message) { + private void initRecoveryTimer() { + if(recoveryTimer == null) { + recoveryTimer = new Stopwatch(); + recoveryTimer.start(); + } + } + + @Override + public void preStart() throws Exception { + LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(), + context.getConfigParams().getJournalRecoveryLogBatchSize()); + super.preStart(); + } + + @Override + public void onReceiveRecover(Object message) { if (message instanceof SnapshotOffer) { - LOG.info("SnapshotOffer called.."); - SnapshotOffer offer = (SnapshotOffer) message; - Snapshot snapshot = (Snapshot) offer.snapshot(); + onRecoveredSnapshot((SnapshotOffer)message); + } else if (message instanceof ReplicatedLogEntry) { + onRecoveredJournalLogEntry((ReplicatedLogEntry)message); + } else if (message instanceof ApplyLogEntries) { + onRecoveredApplyLogEntries((ApplyLogEntries)message); + } else if (message instanceof DeleteEntries) { + replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex()); + } else if (message instanceof UpdateElectionTerm) { + context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), + ((UpdateElectionTerm) message).getVotedFor()); + } else if (message instanceof RecoveryCompleted) { + onRecoveryCompletedMessage(); + } + } - // Create a replicated log with the snapshot information - // The replicated log can be used later on to retrieve this snapshot - // when we need to install it on a peer - replicatedLog = new ReplicatedLogImpl(snapshot); + private void onRecoveredSnapshot(SnapshotOffer offer) { + LOG.debug("SnapshotOffer called.."); - context.setReplicatedLog(replicatedLog); - context.setLastApplied(snapshot.getLastAppliedIndex()); - context.setCommitIndex(snapshot.getLastAppliedIndex()); + initRecoveryTimer(); - LOG.info("Applied snapshot to replicatedLog. " + - "snapshotIndex={}, snapshotTerm={}, journal-size={}", - replicatedLog.snapshotIndex, replicatedLog.snapshotTerm, - replicatedLog.size() - ); + Snapshot snapshot = (Snapshot) offer.snapshot(); - // Apply the snapshot to the actors state - applySnapshot(ByteString.copyFrom(snapshot.getState())); + // Create a replicated log with the snapshot information + // The replicated log can be used later on to retrieve this snapshot + // when we need to install it on a peer + replicatedLog = new ReplicatedLogImpl(snapshot); - } else if (message instanceof ReplicatedLogEntry) { - ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message; - if(LOG.isDebugEnabled()) { - LOG.debug("Received ReplicatedLogEntry for recovery:{}", logEntry.getIndex()); - } - replicatedLog.append(logEntry); + context.setReplicatedLog(replicatedLog); + context.setLastApplied(snapshot.getLastAppliedIndex()); + context.setCommitIndex(snapshot.getLastAppliedIndex()); - } else if (message instanceof ApplyLogEntries) { - ApplyLogEntries ale = (ApplyLogEntries) message; + Stopwatch timer = new Stopwatch(); + timer.start(); - if(LOG.isDebugEnabled()) { - LOG.debug("Received ApplyLogEntries for recovery, applying to state:{} to {}", + // Apply the snapshot to the actors state + applyRecoverySnapshot(ByteString.copyFrom(snapshot.getState())); + + timer.stop(); + LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" + + replicatedLog.size(), persistenceId(), timer.toString(), + replicatedLog.snapshotIndex, replicatedLog.snapshotTerm); + } + + private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) { + if(LOG.isDebugEnabled()) { + LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex()); + } + + replicatedLog.append(logEntry); + } + + private void onRecoveredApplyLogEntries(ApplyLogEntries ale) { + if(LOG.isDebugEnabled()) { + LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}", context.getLastApplied() + 1, ale.getToIndex()); - } + } - for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) { - applyState(null, "recovery", replicatedLog.get(i).getData()); - } - context.setLastApplied(ale.getToIndex()); - context.setCommitIndex(ale.getToIndex()); + for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) { + batchRecoveredLogEntry(replicatedLog.get(i)); + } - } else if (message instanceof DeleteEntries) { - replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex()); + context.setLastApplied(ale.getToIndex()); + context.setCommitIndex(ale.getToIndex()); + } - } else if (message instanceof UpdateElectionTerm) { - context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), - ((UpdateElectionTerm) message).getVotedFor()); + private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) { + initRecoveryTimer(); - } else if (message instanceof RecoveryCompleted) { - LOG.info( - "RecoveryCompleted - Switching actor to Follower - " + - "Persistence Id = " + persistenceId() + - " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " + - "journal-size={}", - replicatedLog.lastIndex(), replicatedLog.snapshotIndex, - replicatedLog.snapshotTerm, replicatedLog.size()); - currentBehavior = switchBehavior(RaftState.Follower); - onStateChanged(); + int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize(); + if(currentRecoveryBatchCount == 0) { + startLogRecoveryBatch(batchSize); + } + + appendRecoveredLogEntry(logEntry.getData()); + + if(++currentRecoveryBatchCount >= batchSize) { + endCurrentLogRecoveryBatch(); } } + private void endCurrentLogRecoveryBatch() { + applyCurrentLogRecoveryBatch(); + currentRecoveryBatchCount = 0; + } + + private void onRecoveryCompletedMessage() { + if(currentRecoveryBatchCount > 0) { + endCurrentLogRecoveryBatch(); + } + + onRecoveryComplete(); + + String recoveryTime = ""; + if(recoveryTimer != null) { + recoveryTimer.stop(); + recoveryTime = " in " + recoveryTimer.toString(); + recoveryTimer = null; + } + + LOG.info( + "Recovery completed" + recoveryTime + " - Switching actor to Follower - " + + "Persistence Id = " + persistenceId() + + " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " + + "journal-size={}", + replicatedLog.lastIndex(), replicatedLog.snapshotIndex, + replicatedLog.snapshotTerm, replicatedLog.size()); + + currentBehavior = switchBehavior(RaftState.Follower); + onStateChanged(); + } + @Override public void onReceiveCommand(Object message) { if (message instanceof ApplyState){ ApplyState applyState = (ApplyState) message; @@ -397,6 +464,10 @@ public abstract class RaftActor extends UntypedPersistentActor { return context.getLastApplied(); } + protected RaftActorContext getRaftActorContext() { + return context; + } + /** * setPeerAddress sets the address of a known peer at a later time. *

@@ -437,6 +508,38 @@ public abstract class RaftActor extends UntypedPersistentActor { protected abstract void applyState(ActorRef clientActor, String identifier, Object data); + /** + * This method is called during recovery at the start of a batch of state entries. Derived + * classes should perform any initialization needed to start a batch. + */ + protected abstract void startLogRecoveryBatch(int maxBatchSize); + + /** + * This method is called during recovery to append state data to the current batch. This method + * is called 1 or more times after {@link #startRecoveryStateBatch}. + * + * @param data the state data + */ + protected abstract void appendRecoveredLogEntry(Payload data); + + /** + * This method is called during recovery to reconstruct the state of the actor. + * + * @param snapshot A snapshot of the state of the actor + */ + protected abstract void applyRecoverySnapshot(ByteString snapshot); + + /** + * This method is called during recovery at the end of a batch to apply the current batched + * log entries. This method is called after {@link #appendRecoveryLogEntry}. + */ + protected abstract void applyCurrentLogRecoveryBatch(); + + /** + * This method is called when recovery is complete. + */ + protected abstract void onRecoveryComplete(); + /** * This method will be called by the RaftActor when a snapshot needs to be * created. The derived actor should respond with its current state. @@ -449,10 +552,7 @@ public abstract class RaftActor extends UntypedPersistentActor { protected abstract void createSnapshot(); /** - * This method will be called by the RaftActor during recovery to - * reconstruct the state of the actor. - *

- * This method may also be called at any other point during normal + * This method can be called at any other point during normal * operations when the derived actor is out of sync with it's peers * and the only way to bring it in sync is by applying a snapshot * @@ -609,6 +709,7 @@ public abstract class RaftActor extends UntypedPersistentActor { // of a single command. persist(replicatedLogEntry, new Procedure() { + @Override public void apply(ReplicatedLogEntry evt) throws Exception { // when a snaphsot is being taken, captureSnapshot != null if (hasSnapshotCaptureInitiated == false && @@ -673,10 +774,12 @@ public abstract class RaftActor extends UntypedPersistentActor { private long currentTerm = 0; private String votedFor = null; + @Override public long getCurrentTerm() { return currentTerm; } + @Override public String getVotedFor() { return votedFor; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 25da37105c..e4aef0a844 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -59,26 +59,32 @@ public class RaftActorContextImpl implements RaftActorContext { this.LOG = logger; } + @Override public ActorRef actorOf(Props props){ return context.actorOf(props); } + @Override public ActorSelection actorSelection(String path){ return context.actorSelection(path); } + @Override public String getId() { return id; } + @Override public ActorRef getActor() { return actor; } + @Override public ElectionTerm getTermInformation() { return termInformation; } + @Override public long getCommitIndex() { return commitIndex; } @@ -87,6 +93,7 @@ public class RaftActorContextImpl implements RaftActorContext { this.commitIndex = commitIndex; } + @Override public long getLastApplied() { return lastApplied; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index c17f5448c6..85893333c2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -74,6 +74,13 @@ public interface ReplicatedLog { */ void append(ReplicatedLogEntry replicatedLogEntry); + /** + * Optimization method to increase the capacity of the journal log prior to appending entries. + * + * @param amount the amount to increase by + */ + void increaseJournalLogCapacity(int amount); + /** * * @param replicatedLogEntry diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 998c198756..22f374319c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -4,19 +4,22 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; +import akka.actor.Terminated; import akka.event.Logging; import akka.japi.Creator; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; +import com.google.common.base.Optional; import com.google.protobuf.ByteString; import org.junit.After; import org.junit.Test; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal; import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore; - +import scala.concurrent.duration.FiniteDuration; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -27,9 +30,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; - -import static junit.framework.Assert.assertTrue; -import static junit.framework.TestCase.assertEquals; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertEquals; public class RaftActorTest extends AbstractActorTest { @@ -42,58 +45,90 @@ public class RaftActorTest extends AbstractActorTest { public static class MockRaftActor extends RaftActor { - private boolean applySnapshotCalled = false; - private List state; + public static final class MockRaftActorCreator implements Creator { + private final Map peerAddresses; + private final String id; + private final Optional config; - public MockRaftActor(String id, - Map peerAddresses) { - super(id, peerAddresses); - state = new ArrayList<>(); + private MockRaftActorCreator(Map peerAddresses, String id, + Optional config) { + this.peerAddresses = peerAddresses; + this.id = id; + this.config = config; + } + + @Override + public MockRaftActor create() throws Exception { + return new MockRaftActor(id, peerAddresses, config); + } } - public RaftActorContext getRaftActorContext() { - return context; + private final CountDownLatch recoveryComplete = new CountDownLatch(1); + private final List state; + + public MockRaftActor(String id, Map peerAddresses, Optional config) { + super(id, peerAddresses, config); + state = new ArrayList<>(); } - public boolean isApplySnapshotCalled() { - return applySnapshotCalled; + public void waitForRecoveryComplete() { + try { + assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + e.printStackTrace(); + } } public List getState() { return state; } - public static Props props(final String id, final Map peerAddresses){ - return Props.create(new Creator(){ - - @Override public MockRaftActor create() throws Exception { - return new MockRaftActor(id, peerAddresses); - } - }); + public static Props props(final String id, final Map peerAddresses, + Optional config){ + return Props.create(new MockRaftActorCreator(peerAddresses, id, config)); } @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { + } + + @Override + protected void startLogRecoveryBatch(int maxBatchSize) { + } + + @Override + protected void appendRecoveredLogEntry(Payload data) { state.add(data); } - @Override protected void createSnapshot() { - throw new UnsupportedOperationException("createSnapshot"); + @Override + protected void applyCurrentLogRecoveryBatch() { } - @Override protected void applySnapshot(ByteString snapshot) { - applySnapshotCalled = true; + @Override + protected void onRecoveryComplete() { + recoveryComplete.countDown(); + } + + @Override + protected void applyRecoverySnapshot(ByteString snapshot) { try { Object data = toObject(snapshot); + System.out.println("!!!!!applyRecoverySnapshot: "+data); if (data instanceof List) { state.addAll((List) data); } - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } catch (IOException e) { + } catch (Exception e) { e.printStackTrace(); } } + @Override protected void createSnapshot() { + throw new UnsupportedOperationException("createSnapshot"); + } + + @Override protected void applySnapshot(ByteString snapshot) { + } + @Override protected void onStateChanged() { } @@ -130,9 +165,8 @@ public class RaftActorTest extends AbstractActorTest { public RaftActorTestKit(ActorSystem actorSystem, String actorName) { super(actorSystem); - raftActor = this.getSystem() - .actorOf(MockRaftActor.props(actorName, - Collections.EMPTY_MAP), actorName); + raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName, + Collections.EMPTY_MAP, Optional.absent()), actorName); } @@ -142,6 +176,7 @@ public class RaftActorTest extends AbstractActorTest { return new JavaTestKit.EventFilter(Logging.Info.class ) { + @Override protected Boolean run() { return true; } @@ -153,37 +188,15 @@ public class RaftActorTest extends AbstractActorTest { } public void findLeader(final String expectedLeader){ + raftActor.tell(new FindLeader(), getRef()); - - new Within(duration("1 seconds")) { - protected void run() { - - raftActor.tell(new FindLeader(), getRef()); - - String s = new ExpectMsg(duration("1 seconds"), - "findLeader") { - // do not put code outside this method, will run afterwards - protected String match(Object in) { - if (in instanceof FindLeaderReply) { - return ((FindLeaderReply) in).getLeaderActor(); - } else { - throw noMatch(); - } - } - }.get();// this extracts the received message - - assertEquals(expectedLeader, s); - - } - - - }; + FindLeaderReply reply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class); + assertEquals("getLeaderActor", expectedLeader, reply.getLeaderActor()); } public ActorRef getRaftActor() { return raftActor; } - } @@ -201,89 +214,84 @@ public class RaftActorTest extends AbstractActorTest { } @Test - public void testRaftActorRecovery() { + public void testRaftActorRecovery() throws Exception { new JavaTestKit(getSystem()) {{ - new Within(duration("1 seconds")) { - protected void run() { - - String persistenceId = "follower10"; - - ActorRef followerActor = getSystem().actorOf( - MockRaftActor.props(persistenceId, Collections.EMPTY_MAP), persistenceId); - - List snapshotUnappliedEntries = new ArrayList<>(); - ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E")); - snapshotUnappliedEntries.add(entry1); - - int lastAppliedDuringSnapshotCapture = 3; - int lastIndexDuringSnapshotCapture = 4; - - ByteString snapshotBytes = null; - try { - // 4 messages as part of snapshot, which are applied to state - snapshotBytes = fromObject(Arrays.asList(new MockRaftActorContext.MockPayload("A"), - new MockRaftActorContext.MockPayload("B"), - new MockRaftActorContext.MockPayload("C"), - new MockRaftActorContext.MockPayload("D"))); - } catch (Exception e) { - e.printStackTrace(); - } - Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(), - snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 , - lastAppliedDuringSnapshotCapture, 1); - MockSnapshotStore.setMockSnapshot(snapshot); - MockSnapshotStore.setPersistenceId(persistenceId); - - // add more entries after snapshot is taken - List entries = new ArrayList<>(); - ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F")); - ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("G")); - ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7, new MockRaftActorContext.MockPayload("H")); - entries.add(entry2); - entries.add(entry3); - entries.add(entry4); - - int lastAppliedToState = 5; - int lastIndex = 7; - - MockAkkaJournal.addToJournal(5, entry2); - // 2 entries are applied to state besides the 4 entries in snapshot - MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState)); - MockAkkaJournal.addToJournal(7, entry3); - MockAkkaJournal.addToJournal(8, entry4); - - // kill the actor - followerActor.tell(PoisonPill.getInstance(), null); - - try { - // give some time for actor to die - Thread.sleep(200); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - //reinstate the actor - TestActorRef ref = TestActorRef.create(getSystem(), - MockRaftActor.props(persistenceId, Collections.EMPTY_MAP)); - - try { - //give some time for snapshot offer to get called. - Thread.sleep(200); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - RaftActorContext context = ref.underlyingActor().getRaftActorContext(); - assertEquals(snapshotUnappliedEntries.size() + entries.size(), context.getReplicatedLog().size()); - assertEquals(lastIndex, context.getReplicatedLog().lastIndex()); - assertEquals(lastAppliedToState, context.getLastApplied()); - assertEquals(lastAppliedToState, context.getCommitIndex()); - assertTrue(ref.underlyingActor().isApplySnapshotCalled()); - assertEquals(6, ref.underlyingActor().getState().size()); - } - }; + String persistenceId = "follower10"; + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + // Set the heartbeat interval high to essentially disable election otherwise the test + // may fail if the actor is switched to Leader and the commitIndex is set to the last + // log entry. + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId, + Collections.EMPTY_MAP, Optional.of(config)), persistenceId); + + watch(followerActor); + + List snapshotUnappliedEntries = new ArrayList<>(); + ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, + new MockRaftActorContext.MockPayload("E")); + snapshotUnappliedEntries.add(entry1); + + int lastAppliedDuringSnapshotCapture = 3; + int lastIndexDuringSnapshotCapture = 4; + + // 4 messages as part of snapshot, which are applied to state + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("A"), + new MockRaftActorContext.MockPayload("B"), + new MockRaftActorContext.MockPayload("C"), + new MockRaftActorContext.MockPayload("D"))); + + Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(), + snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 , + lastAppliedDuringSnapshotCapture, 1); + MockSnapshotStore.setMockSnapshot(snapshot); + MockSnapshotStore.setPersistenceId(persistenceId); + + // add more entries after snapshot is taken + List entries = new ArrayList<>(); + ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, + new MockRaftActorContext.MockPayload("F")); + ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6, + new MockRaftActorContext.MockPayload("G")); + ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7, + new MockRaftActorContext.MockPayload("H")); + entries.add(entry2); + entries.add(entry3); + entries.add(entry4); + + int lastAppliedToState = 5; + int lastIndex = 7; + + MockAkkaJournal.addToJournal(5, entry2); + // 2 entries are applied to state besides the 4 entries in snapshot + MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState)); + MockAkkaJournal.addToJournal(7, entry3); + MockAkkaJournal.addToJournal(8, entry4); + + // kill the actor + followerActor.tell(PoisonPill.getInstance(), null); + expectMsgClass(duration("5 seconds"), Terminated.class); + + unwatch(followerActor); + + //reinstate the actor + TestActorRef ref = TestActorRef.create(getSystem(), + MockRaftActor.props(persistenceId, Collections.EMPTY_MAP, + Optional.of(config))); + + ref.underlyingActor().waitForRecoveryComplete(); + + RaftActorContext context = ref.underlyingActor().getRaftActorContext(); + assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(), + context.getReplicatedLog().size()); + assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex()); + assertEquals("Last applied", lastAppliedToState, context.getLastApplied()); + assertEquals("Commit index", lastAppliedToState, context.getCommitIndex()); + assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size()); }}; - } private ByteString fromObject(Object snapshot) throws Exception { diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/NormalizedNodeSerializer.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/NormalizedNodeSerializer.java index 3e1bd35632..44da4a5668 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/NormalizedNodeSerializer.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/NormalizedNodeSerializer.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore.node.utils.serialization; import com.google.common.base.Preconditions; + import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil; import org.opendaylight.yangtools.yang.data.api.Node; @@ -36,6 +37,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNo import java.net.URI; import java.util.ArrayList; import java.util.Date; +import java.util.EnumMap; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -232,7 +234,7 @@ public class NormalizedNodeSerializer { private static class DeSerializer implements NormalizedNodeDeSerializationContext { private static Map - deSerializationFunctions = new HashMap<>(); + deSerializationFunctions = new EnumMap<>(NormalizedNodeType.class); static { deSerializationFunctions.put(CONTAINER_NODE_TYPE, @@ -447,8 +449,9 @@ public class NormalizedNodeSerializer { private NormalizedNode deSerialize(NormalizedNodeMessages.Node node){ Preconditions.checkNotNull(node, "node should not be null"); - DeSerializationFunction deSerializationFunction = - Preconditions.checkNotNull(deSerializationFunctions.get(NormalizedNodeType.values()[node.getIntType()]), "Unknown type " + node); + + DeSerializationFunction deSerializationFunction = deSerializationFunctions.get( + NormalizedNodeType.values()[node.getIntType()]); return deSerializationFunction.apply(this, node); } @@ -544,8 +547,4 @@ public class NormalizedNodeSerializer { NormalizedNode apply(DeSerializer deserializer, NormalizedNodeMessages.Node node); } } - - - - } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/PathArgumentSerializer.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/PathArgumentSerializer.java index d7627c008e..4fb676e518 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/PathArgumentSerializer.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/PathArgumentSerializer.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore.node.utils.serialization; import com.google.common.base.Preconditions; + import org.opendaylight.controller.cluster.datastore.node.utils.NodeIdentifierFactory; import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory; import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; @@ -26,6 +27,7 @@ import java.util.Set; import static org.opendaylight.controller.cluster.datastore.node.utils.serialization.PathArgumentType.getSerializablePathArgumentType; public class PathArgumentSerializer { + private static final String REVISION_ARG = "?revision="; private static final Map pathArgumentAttributesGetters = new HashMap<>(); public static NormalizedNodeMessages.PathArgument serialize(NormalizedNodeSerializationContext context, YangInstanceIdentifier.PathArgument pathArgument){ @@ -190,27 +192,24 @@ public class PathArgumentSerializer { // If this serializer is used qName cannot be null (see encodeQName) // adding null check only in case someone tried to deSerialize a protocol buffer node // that was not serialized using the PathArgumentSerializer - Preconditions.checkNotNull(qName, "qName should not be null"); - Preconditions.checkArgument(!"".equals(qName.getLocalName()), - "qName.localName cannot be empty qName = " + qName.toString()); - Preconditions.checkArgument(qName.getNamespace() != -1, "qName.namespace should be valid"); +// Preconditions.checkNotNull(qName, "qName should not be null"); +// Preconditions.checkArgument(qName.getNamespace() != -1, "qName.namespace should be valid"); - StringBuilder sb = new StringBuilder(); String namespace = context.getNamespace(qName.getNamespace()); - String revision = ""; String localName = context.getLocalName(qName.getLocalName()); + StringBuilder sb; if(qName.getRevision() != -1){ - revision = context.getRevision(qName.getRevision()); - sb.append("(").append(namespace).append("?revision=").append( - revision).append(")").append( - localName); + String revision = context.getRevision(qName.getRevision()); + sb = new StringBuilder(namespace.length() + REVISION_ARG.length() + revision.length() + + localName.length() + 2); + sb.append('(').append(namespace).append(REVISION_ARG).append( + revision).append(')').append(localName); } else { - sb.append("(").append(namespace).append(")").append( - localName); + sb = new StringBuilder(namespace.length() + localName.length() + 2); + sb.append('(').append(namespace).append(')').append(localName); } return sb.toString(); - } /** @@ -223,10 +222,6 @@ public class PathArgumentSerializer { NormalizedNodeDeSerializationContext context, NormalizedNodeMessages.PathArgument pathArgument) { - Preconditions.checkArgument(pathArgument.getIntType() >= 0 - && pathArgument.getIntType() < PathArgumentType.values().length, - "Illegal PathArgumentType " + pathArgument.getIntType()); - switch(PathArgumentType.values()[pathArgument.getIntType()]){ case NODE_IDENTIFIER_WITH_VALUE : { @@ -272,13 +267,21 @@ public class PathArgumentSerializer { NormalizedNodeDeSerializationContext context, List attributesList) { - Map map = new HashMap<>(); - - for(NormalizedNodeMessages.PathArgumentAttribute attribute : attributesList){ + Map map; + if(attributesList.size() == 1) { + NormalizedNodeMessages.PathArgumentAttribute attribute = attributesList.get(0); NormalizedNodeMessages.QName name = attribute.getName(); Object value = parseAttribute(context, attribute); + map = Collections.singletonMap(QNameFactory.create(qNameToString(context, name)), value); + } else { + map = new HashMap<>(); + + for(NormalizedNodeMessages.PathArgumentAttribute attribute : attributesList){ + NormalizedNodeMessages.QName name = attribute.getName(); + Object value = parseAttribute(context, attribute); - map.put(QNameFactory.create(qNameToString(context, name)), value); + map.put(QNameFactory.create(qNameToString(context, name)), value); + } } return map; diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueSerializer.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueSerializer.java index 04c95d61ce..8def754f11 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueSerializer.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueSerializer.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.datastore.node.utils.serialization; -import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory; import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils; import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; @@ -70,9 +69,6 @@ public class ValueSerializer { private static Object deSerializeBasicTypes(int valueType, String value) { - Preconditions.checkArgument(valueType >= 0 && valueType < ValueType.values().length, - "Illegal value type " + valueType ); - switch(ValueType.values()[valueType]){ case SHORT_TYPE: { return Short.valueOf(value); diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueType.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueType.java index 8724dfe43a..49db8967a6 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueType.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueType.java @@ -50,8 +50,9 @@ public enum ValueType { public static final ValueType getSerializableType(Object node){ Preconditions.checkNotNull(node, "node should not be null"); - if(types.containsKey(node.getClass())) { - return types.get(node.getClass()); + ValueType type = types.get(node.getClass()); + if(type != null) { + return type; } else if(node instanceof Set){ return BITS_TYPE; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index 1021ddeee7..83164b07d9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -8,11 +8,12 @@ package org.opendaylight.controller.cluster.datastore; -import com.google.common.base.Preconditions; - +import org.opendaylight.controller.cluster.raft.ConfigParams; +import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; import java.util.concurrent.TimeUnit; @@ -27,22 +28,30 @@ public class DatastoreContext { private final Duration shardTransactionIdleTimeout; private final int operationTimeoutInSeconds; private final String dataStoreMXBeanType; + private final ConfigParams shardRaftConfig; public DatastoreContext() { - this.dataStoreProperties = null; - this.dataStoreMXBeanType = "DistributedDatastore"; - this.shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES); - this.operationTimeoutInSeconds = 5; + this("DistributedDatastore", null, Duration.create(10, TimeUnit.MINUTES), 5, 1000, 20000, 500); } public DatastoreContext(String dataStoreMXBeanType, InMemoryDOMDataStoreConfigProperties dataStoreProperties, Duration shardTransactionIdleTimeout, - int operationTimeoutInSeconds) { + int operationTimeoutInSeconds, + int shardJournalRecoveryLogBatchSize, + int shardSnapshotBatchCount, + int shardHeartbeatIntervalInMillis) { this.dataStoreMXBeanType = dataStoreMXBeanType; - this.dataStoreProperties = Preconditions.checkNotNull(dataStoreProperties); + this.dataStoreProperties = dataStoreProperties; this.shardTransactionIdleTimeout = shardTransactionIdleTimeout; this.operationTimeoutInSeconds = operationTimeoutInSeconds; + + DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); + raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis, + TimeUnit.MILLISECONDS)); + raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize); + raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount); + shardRaftConfig = raftConfig; } public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() { @@ -60,4 +69,8 @@ public class DatastoreContext { public int getOperationTimeoutInSeconds() { return operationTimeoutInSeconds; } + + public ConfigParams getShardRaftConfig() { + return shardRaftConfig; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 0fa27706e1..ddb5989f09 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -20,6 +20,7 @@ import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -47,12 +48,11 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; -import org.opendaylight.controller.cluster.raft.ConfigParams; -import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; @@ -67,14 +67,12 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import scala.concurrent.duration.FiniteDuration; - import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; /** * A Shard represents a portion of the logical data tree
@@ -84,8 +82,6 @@ import java.util.concurrent.TimeUnit; */ public class Shard extends RaftActor { - private static final ConfigParams configParams = new ShardConfigParams(); - public static final String DEFAULT_NAME = "default"; // The state of this Shard @@ -114,11 +110,18 @@ public class Shard extends RaftActor { private ActorRef createSnapshotTransaction; + /** + * Coordinates persistence recovery on startup. + */ + private ShardRecoveryCoordinator recoveryCoordinator; + private List currentLogRecoveryBatch; + private final Map transactionChains = new HashMap<>(); - private Shard(ShardIdentifier name, Map peerAddresses, + protected Shard(ShardIdentifier name, Map peerAddresses, DatastoreContext datastoreContext, SchemaContext schemaContext) { - super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams)); + super(name.toString(), mapPeerAddresses(peerAddresses), + Optional.of(datastoreContext.getShardRaftConfig())); this.name = name; this.datastoreContext = datastoreContext; @@ -333,35 +336,12 @@ public class Shard extends RaftActor { DOMStoreThreePhaseCommitCohort cohort = modificationToCohort.remove(serialized); if (cohort == null) { - - if(LOG.isDebugEnabled()) { - LOG.debug( - "Could not find cohort for modification : {}. Writing modification using a new transaction", - modification); - } - - DOMStoreWriteTransaction transaction = - store.newWriteOnlyTransaction(); - - if(LOG.isDebugEnabled()) { - LOG.debug("Created new transaction {}", transaction.getIdentifier().toString()); - } - - modification.apply(transaction); - try { - syncCommitTransaction(transaction); - } catch (InterruptedException | ExecutionException e) { - shardMBean.incrementFailedTransactionsCount(); - LOG.error("Failed to commit", e); - return; - } - //we want to just apply the recovery commit and return - shardMBean.incrementCommittedTransactionCount(); + // If there's no cached cohort then we must be applying replicated state. + commitWithNewTransaction(serialized); return; } - - if(sender == null){ + if(sender == null) { LOG.error("Commit failed. Sender cannot be null"); return; } @@ -386,6 +366,18 @@ public class Shard extends RaftActor { } + private void commitWithNewTransaction(Object modification) { + DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction(); + MutableCompositeModification.fromSerializable(modification, schemaContext).apply(tx); + try { + syncCommitTransaction(tx); + shardMBean.incrementCommittedTransactionCount(); + } catch (InterruptedException | ExecutionException e) { + shardMBean.incrementFailedTransactionsCount(); + LOG.error(e, "Failed to commit"); + } + } + private void handleForwardedCommit(ForwardedCommitTransaction message) { Object serializedModification = message.getModification().toSerializable(); @@ -461,26 +453,102 @@ public class Shard extends RaftActor { return config.isMetricCaptureEnabled(); } - @Override protected void applyState(ActorRef clientActor, String identifier, - Object data) { + @Override + protected + void startLogRecoveryBatch(int maxBatchSize) { + currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize); + + if(LOG.isDebugEnabled()) { + LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize); + } + } + + @Override + protected void appendRecoveredLogEntry(Payload data) { + if (data instanceof CompositeModificationPayload) { + currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification()); + } else { + LOG.error("Unknown state received {} during recovery", data); + } + } + + @Override + protected void applyRecoverySnapshot(ByteString snapshot) { + if(recoveryCoordinator == null) { + recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext); + } + + recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction()); + + if(LOG.isDebugEnabled()) { + LOG.debug("{} : submitted recovery sbapshot", persistenceId()); + } + } + + @Override + protected void applyCurrentLogRecoveryBatch() { + if(recoveryCoordinator == null) { + recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext); + } + + recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction()); + + if(LOG.isDebugEnabled()) { + LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(), + currentLogRecoveryBatch.size()); + } + } + + @Override + protected void onRecoveryComplete() { + if(recoveryCoordinator != null) { + Collection txList = recoveryCoordinator.getTransactions(); + + if(LOG.isDebugEnabled()) { + LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size()); + } + + for(DOMStoreWriteTransaction tx: txList) { + try { + syncCommitTransaction(tx); + shardMBean.incrementCommittedTransactionCount(); + } catch (InterruptedException | ExecutionException e) { + shardMBean.incrementFailedTransactionsCount(); + LOG.error(e, "Failed to commit"); + } + } + } + + recoveryCoordinator = null; + currentLogRecoveryBatch = null; + updateJournalStats(); + } + + @Override + protected void applyState(ActorRef clientActor, String identifier, Object data) { if (data instanceof CompositeModificationPayload) { - Object modification = - ((CompositeModificationPayload) data).getModification(); + Object modification = ((CompositeModificationPayload) data).getModification(); if (modification != null) { commit(clientActor, modification); } else { LOG.error( "modification is null - this is very unexpected, clientActor = {}, identifier = {}", - identifier, clientActor.path().toString()); + identifier, clientActor != null ? clientActor.path().toString() : null); } } else { - LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}", data, data.getClass().getClassLoader(), CompositeModificationPayload.class.getClassLoader()); + LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}", + data, data.getClass().getClassLoader(), + CompositeModificationPayload.class.getClassLoader()); } - // Update stats + updateJournalStats(); + + } + + private void updateJournalStats() { ReplicatedLogEntry lastLogEntry = getLastLogEntry(); if (lastLogEntry != null) { @@ -490,10 +558,10 @@ public class Shard extends RaftActor { shardMBean.setCommitIndex(getCommitIndex()); shardMBean.setLastApplied(getLastApplied()); - } - @Override protected void createSnapshot() { + @Override + protected void createSnapshot() { if (createSnapshotTransaction == null) { // Create a transaction. We are really going to treat the transaction as a worker @@ -508,7 +576,9 @@ public class Shard extends RaftActor { } } - @VisibleForTesting @Override protected void applySnapshot(ByteString snapshot) { + @VisibleForTesting + @Override + protected void applySnapshot(ByteString snapshot) { // Since this will be done only on Recovery or when this actor is a Follower // we can safely commit everything in here. We not need to worry about event notifications // as they would have already been disabled on the follower @@ -565,16 +635,6 @@ public class Shard extends RaftActor { return this.name.toString(); } - - private static class ShardConfigParams extends DefaultConfigParamsImpl { - public static final FiniteDuration HEART_BEAT_INTERVAL = - new FiniteDuration(500, TimeUnit.MILLISECONDS); - - @Override public FiniteDuration getHeartBeatInterval() { - return HEART_BEAT_INTERVAL; - } - } - private static class ShardCreator implements Creator { private static final long serialVersionUID = 1L; @@ -598,20 +658,24 @@ public class Shard extends RaftActor { } } - @VisibleForTesting NormalizedNode readStore() throws ExecutionException, InterruptedException { + @VisibleForTesting + NormalizedNode readStore(YangInstanceIdentifier id) + throws ExecutionException, InterruptedException { DOMStoreReadTransaction transaction = store.newReadOnlyTransaction(); CheckedFuture>, ReadFailedException> future = - transaction.read(YangInstanceIdentifier.builder().build()); + transaction.read(id); - NormalizedNode node = future.get().get(); + Optional> optional = future.get(); + NormalizedNode node = optional.isPresent()? optional.get() : null; transaction.close(); return node; } - @VisibleForTesting void writeToStore(YangInstanceIdentifier id, NormalizedNode node) + @VisibleForTesting + void writeToStore(YangInstanceIdentifier id, NormalizedNode node) throws ExecutionException, InterruptedException { DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); @@ -620,4 +684,8 @@ public class Shard extends RaftActor { syncCommitTransaction(transaction); } + @VisibleForTesting + ShardStats getShardMBean() { + return shardMBean; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java new file mode 100644 index 0000000000..8afdb4c280 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; +import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; +import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot + * and journal log entry batch are de-serialized and applied to their own write transaction + * instance in parallel on a thread pool for faster recovery time. However the transactions are + * committed to the data store in the order the corresponding snapshot or log batch are received + * to preserve data store integrity. + * + * @author Thomas Panetelis + */ +class ShardRecoveryCoordinator { + + private static final int TIME_OUT = 10; + + private static final Logger LOG = LoggerFactory.getLogger(ShardRecoveryCoordinator.class); + + private final List resultingTxList = Lists.newArrayList(); + private final SchemaContext schemaContext; + private final String shardName; + private final ExecutorService executor; + + ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext) { + this.schemaContext = schemaContext; + this.shardName = shardName; + + executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("ShardRecovery-" + shardName + "-%d").build()); + } + + /** + * Submits a batch of journal log entries. + * + * @param logEntries the serialized journal log entries + * @param resultingTx the write Tx to which to apply the entries + */ + void submit(List logEntries, DOMStoreWriteTransaction resultingTx) { + LogRecoveryTask task = new LogRecoveryTask(logEntries, resultingTx); + resultingTxList.add(resultingTx); + executor.execute(task); + } + + /** + * Submits a snapshot. + * + * @param snapshot the serialized snapshot + * @param resultingTx the write Tx to which to apply the entries + */ + void submit(ByteString snapshot, DOMStoreWriteTransaction resultingTx) { + SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshot, resultingTx); + resultingTxList.add(resultingTx); + executor.execute(task); + } + + Collection getTransactions() { + // Shutdown the executor and wait for task completion. + executor.shutdown(); + + try { + if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES)) { + return resultingTxList; + } else { + LOG.error("Recovery for shard {} timed out after {} minutes", shardName, TIME_OUT); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + return Collections.emptyList(); + } + + private static abstract class ShardRecoveryTask implements Runnable { + + final DOMStoreWriteTransaction resultingTx; + + ShardRecoveryTask(DOMStoreWriteTransaction resultingTx) { + this.resultingTx = resultingTx; + } + } + + private class LogRecoveryTask extends ShardRecoveryTask { + + private final List logEntries; + + LogRecoveryTask(List logEntries, DOMStoreWriteTransaction resultingTx) { + super(resultingTx); + this.logEntries = logEntries; + } + + @Override + public void run() { + for(int i = 0; i < logEntries.size(); i++) { + MutableCompositeModification.fromSerializable( + logEntries.get(i), schemaContext).apply(resultingTx); + // Null out to GC quicker. + logEntries.set(i, null); + } + } + } + + private class SnapshotRecoveryTask extends ShardRecoveryTask { + + private final ByteString snapshot; + + SnapshotRecoveryTask(ByteString snapshot, DOMStoreWriteTransaction resultingTx) { + super(resultingTx); + this.snapshot = snapshot; + } + + @Override + public void run() { + try { + NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot); + NormalizedNode node = new NormalizedNodeToNodeCodec(schemaContext).decode( + YangInstanceIdentifier.builder().build(), serializedNode); + + // delete everything first + resultingTx.delete(YangInstanceIdentifier.builder().build()); + + // Add everything from the remote node back + resultingTx.write(YangInstanceIdentifier.builder().build(), node); + } catch (InvalidProtocolBufferException e) { + LOG.error("Error deserializing snapshot", e); + } + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java index e7a7aab406..84614bd7bb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java @@ -42,13 +42,16 @@ public class DistributedConfigDataStoreProviderModule extends DatastoreContext datastoreContext = new DatastoreContext("DistributedConfigDatastore", InMemoryDOMDataStoreConfigProperties.create( - props.getMaxShardDataChangeExecutorPoolSize().getValue(), - props.getMaxShardDataChangeExecutorQueueSize().getValue(), - props.getMaxShardDataChangeListenerQueueSize().getValue(), - props.getMaxShardDataStoreExecutorQueueSize().getValue()), + props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(), + props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(), + props.getMaxShardDataChangeListenerQueueSize().getValue().intValue(), + props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue()), Duration.create(props.getShardTransactionIdleTimeoutInMinutes().getValue(), TimeUnit.MINUTES), - props.getOperationTimeoutInSeconds().getValue()); + props.getOperationTimeoutInSeconds().getValue(), + props.getShardJournalRecoveryLogBatchSize().getValue().intValue(), + props.getShardSnapshotBatchCount().getValue().intValue(), + props.getShardHearbeatIntervalInMillis().getValue()); return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(), datastoreContext, bundleContext); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java index 814e6f606a..3183527eb0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java @@ -42,13 +42,16 @@ public class DistributedOperationalDataStoreProviderModule extends DatastoreContext datastoreContext = new DatastoreContext("DistributedOperationalDatastore", InMemoryDOMDataStoreConfigProperties.create( - props.getMaxShardDataChangeExecutorPoolSize().getValue(), - props.getMaxShardDataChangeExecutorQueueSize().getValue(), - props.getMaxShardDataChangeListenerQueueSize().getValue(), - props.getMaxShardDataStoreExecutorQueueSize().getValue()), + props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(), + props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(), + props.getMaxShardDataChangeListenerQueueSize().getValue().intValue(), + props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue()), Duration.create(props.getShardTransactionIdleTimeoutInMinutes().getValue(), TimeUnit.MINUTES), - props.getOperationTimeoutInSeconds().getValue()); + props.getOperationTimeoutInSeconds().getValue(), + props.getShardJournalRecoveryLogBatchSize().getValue().intValue(), + props.getShardSnapshotBatchCount().getValue().intValue(), + props.getShardHearbeatIntervalInMillis().getValue()); return DistributedDataStoreFactory.createInstance("operational", getOperationalSchemaServiceDependency(), datastoreContext, bundleContext); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang index e19a76703f..af43f953ff 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang @@ -36,8 +36,8 @@ module distributed-datastore-provider { config:java-name-prefix DistributedOperationalDataStoreProvider; } - typedef non-zero-uint16-type { - type uint16 { + typedef non-zero-uint32-type { + type uint32 { range "1..max"; } } @@ -48,43 +48,67 @@ module distributed-datastore-provider { } } + typedef heartbeat-interval-type { + type uint16 { + range "100..max"; + } + } + grouping data-store-properties { leaf max-shard-data-change-executor-queue-size { default 1000; - type non-zero-uint16-type; + type non-zero-uint32-type; description "The maximum queue size for each shard's data store data change notification executor."; } leaf max-shard-data-change-executor-pool-size { default 20; - type non-zero-uint16-type; + type non-zero-uint32-type; description "The maximum thread pool size for each shard's data store data change notification executor."; } leaf max-shard-data-change-listener-queue-size { default 1000; - type non-zero-uint16-type; + type non-zero-uint32-type; description "The maximum queue size for each shard's data store data change listeners."; } leaf max-shard-data-store-executor-queue-size { default 5000; - type non-zero-uint16-type; + type non-zero-uint32-type; description "The maximum queue size for each shard's data store executor."; } leaf shard-transaction-idle-timeout-in-minutes { default 10; - type non-zero-uint16-type; + type non-zero-uint32-type; description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs."; } + leaf shard-snapshot-batch-count { + default 20000; + type non-zero-uint32-type; + description "The minimum number of entries to be present in the in-memory journal log before a snapshot to be taken."; + } + + leaf shard-hearbeat-interval-in-millis { + default 500; + type heartbeat-interval-type; + description "The interval at which a shard will send a heart beat message to its remote shard."; + } + leaf operation-timeout-in-seconds { default 5; type operation-timeout-type; description "The maximum amount of time for akka operations (remote or local) to complete before failing."; } + leaf shard-journal-recovery-log-batch-size { + default 5000; + type non-zero-uint32-type; + description "The maximum number of journal log entries to batch on recovery for a shard before committing to the data store."; + } + leaf enable-metric-capture { default false; type boolean; @@ -93,7 +117,7 @@ module distributed-datastore-provider { leaf bounded-mailbox-capacity { default 1000; - type non-zero-uint16-type; + type non-zero-uint32-type; description "Max queue size that an actor's mailbox can reach"; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java index 022ef9bbaf..fae21f2709 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java @@ -10,11 +10,10 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; -import org.apache.commons.io.FileUtils; + import org.junit.AfterClass; import org.junit.BeforeClass; -import java.io.File; import java.io.IOException; public abstract class AbstractActorTest { @@ -25,35 +24,15 @@ public abstract class AbstractActorTest { System.setProperty("shard.persistent", "false"); system = ActorSystem.create("test"); - - deletePersistenceFiles(); } @AfterClass public static void tearDownClass() throws IOException { JavaTestKit.shutdownActorSystem(system); system = null; - - deletePersistenceFiles(); - } - - protected static void deletePersistenceFiles() throws IOException { - File journal = new File("journal"); - - if(journal.exists()) { - FileUtils.deleteDirectory(journal); - } - - File snapshots = new File("snapshots"); - - if(snapshots.exists()){ - FileUtils.deleteDirectory(snapshots); - } - } protected ActorSystem getSystem() { return system; } - } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index deb71c2df4..a3e0b3a07d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -4,23 +4,43 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.event.Logging; +import akka.japi.Creator; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; +import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.modification.MergeModification; +import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; +import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; +import org.opendaylight.controller.cluster.raft.Snapshot; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; +import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; @@ -28,222 +48,138 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; - +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.duration.Duration; import java.io.IOException; import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; - +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.verify; public class ShardTest extends AbstractActorTest { - private static final DatastoreContext DATA_STORE_CONTEXT = new DatastoreContext(); + private static final DatastoreContext DATA_STORE_CONTEXT = + new DatastoreContext("", null, Duration.create(10, TimeUnit.MINUTES), 5, 3, 5000, 500); - @Test - public void testOnReceiveRegisterListener() throws Exception { - new JavaTestKit(getSystem()) {{ - final ShardIdentifier identifier = - ShardIdentifier.builder().memberName("member-1") - .shardName("inventory").type("config").build(); + private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext(); - final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext()); - final ActorRef subject = - getSystem().actorOf(props, "testRegisterChangeListener"); + private static final ShardIdentifier IDENTIFIER = ShardIdentifier.builder().memberName("member-1") + .shardName("inventory").type("config").build(); - new Within(duration("3 seconds")) { - @Override - protected void run() { + @Before + public void setUp() { + System.setProperty("shard.persistent", "false"); - subject.tell( - new UpdateSchemaContext(SchemaContextHelper.full()), - getRef()); + InMemorySnapshotStore.clear(); + InMemoryJournal.clear(); + } - subject.tell(new RegisterChangeListener(TestModel.TEST_PATH, - getRef().path(), AsyncDataBroker.DataChangeScope.BASE), - getRef()); + @After + public void tearDown() { + InMemorySnapshotStore.clear(); + InMemoryJournal.clear(); + } - final Boolean notificationEnabled = new ExpectMsg( - duration("3 seconds"), "enable notification") { - // do not put code outside this method, will run afterwards - @Override - protected Boolean match(Object in) { - if(in instanceof EnableNotification){ - return ((EnableNotification) in).isEnabled(); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertFalse(notificationEnabled); - - final String out = new ExpectMsg(duration("3 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - if (in.getClass().equals(RegisterChangeListenerReply.class)) { - RegisterChangeListenerReply reply = - (RegisterChangeListenerReply) in; - return reply.getListenerRegistrationPath() - .toString(); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message + private Props newShardProps() { + return Shard.props(IDENTIFIER, Collections.emptyMap(), + DATA_STORE_CONTEXT, SCHEMA_CONTEXT); + } - assertTrue(out.matches( - "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*")); - } + @Test + public void testOnReceiveRegisterListener() throws Exception { + new JavaTestKit(getSystem()) {{ + ActorRef subject = getSystem().actorOf(newShardProps(), "testRegisterChangeListener"); + subject.tell(new UpdateSchemaContext(SchemaContextHelper.full()), getRef()); - }; + subject.tell(new RegisterChangeListener(TestModel.TEST_PATH, + getRef().path(), AsyncDataBroker.DataChangeScope.BASE), getRef()); + + EnableNotification enable = expectMsgClass(duration("3 seconds"), EnableNotification.class); + assertEquals("isEnabled", false, enable.isEnabled()); + + RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"), + RegisterChangeListenerReply.class); + assertTrue(reply.getListenerRegistrationPath().toString().matches( + "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*")); }}; } @Test public void testCreateTransaction(){ - new JavaTestKit(getSystem()) {{ - final ShardIdentifier identifier = - ShardIdentifier.builder().memberName("member-1") - .shardName("inventory").type("config").build(); - - final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext()); - final ActorRef subject = - getSystem().actorOf(props, "testCreateTransaction"); - - // Wait for a specific log message to show up - final boolean result = - new JavaTestKit.EventFilter(Logging.Info.class - ) { - @Override - protected Boolean run() { - return true; - } - }.from(subject.path().toString()) - .message("Switching from state Candidate to Leader") - .occurrences(1).exec(); - - Assert.assertEquals(true, result); + new ShardTestKit(getSystem()) {{ + ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransaction"); - new Within(duration("3 seconds")) { - @Override - protected void run() { + waitUntilLeader(subject); - subject.tell( - new UpdateSchemaContext(TestModel.createTestContext()), - getRef()); + subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), - getRef()); + subject.tell(new CreateTransaction("txn-1", + TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef()); - final String out = new ExpectMsg(duration("3 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - if (in instanceof CreateTransactionReply) { - CreateTransactionReply reply = - (CreateTransactionReply) in; - return reply.getTransactionActorPath() - .toString(); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message + CreateTransactionReply reply = expectMsgClass(duration("3 seconds"), + CreateTransactionReply.class); - assertTrue("Unexpected transaction path " + out, - out.contains("akka://test/user/testCreateTransaction/shard-txn-1")); - expectNoMsg(); - } - }; + String path = reply.getTransactionActorPath().toString(); + assertTrue("Unexpected transaction path " + path, + path.contains("akka://test/user/testCreateTransaction/shard-txn-1")); + expectNoMsg(); }}; } @Test public void testCreateTransactionOnChain(){ - new JavaTestKit(getSystem()) {{ - final ShardIdentifier identifier = - ShardIdentifier.builder().memberName("member-1") - .shardName("inventory").type("config").build(); - - final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext()); - final ActorRef subject = - getSystem().actorOf(props, "testCreateTransactionOnChain"); - - // Wait for a specific log message to show up - final boolean result = - new JavaTestKit.EventFilter(Logging.Info.class - ) { - @Override - protected Boolean run() { - return true; - } - }.from(subject.path().toString()) - .message("Switching from state Candidate to Leader") - .occurrences(1).exec(); - - Assert.assertEquals(true, result); - - new Within(duration("3 seconds")) { - @Override - protected void run() { + new ShardTestKit(getSystem()) {{ + final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain"); - subject.tell( - new UpdateSchemaContext(TestModel.createTestContext()), - getRef()); + waitUntilLeader(subject); - subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(), - getRef()); + subject.tell(new CreateTransaction("txn-1", + TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(), + getRef()); - final String out = new ExpectMsg(duration("3 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - if (in instanceof CreateTransactionReply) { - CreateTransactionReply reply = - (CreateTransactionReply) in; - return reply.getTransactionActorPath() - .toString(); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message + CreateTransactionReply reply = expectMsgClass(duration("3 seconds"), + CreateTransactionReply.class); - assertTrue("Unexpected transaction path " + out, - out.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1")); - expectNoMsg(); - } - }; + String path = reply.getTransactionActorPath().toString(); + assertTrue("Unexpected transaction path " + path, + path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1")); + expectNoMsg(); }}; } @Test public void testPeerAddressResolved(){ new JavaTestKit(getSystem()) {{ - Map peerAddresses = new HashMap<>(); - final ShardIdentifier identifier = ShardIdentifier.builder().memberName("member-1") .shardName("inventory").type("config").build(); - peerAddresses.put(identifier, null); - final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext()); - final ActorRef subject = - getSystem().actorOf(props, "testPeerAddressResolved"); + Props props = Shard.props(identifier, + Collections.singletonMap(identifier, null), + DATA_STORE_CONTEXT, SCHEMA_CONTEXT); + final ActorRef subject = getSystem().actorOf(props, "testPeerAddressResolved"); new Within(duration("3 seconds")) { @Override @@ -261,99 +197,205 @@ public class ShardTest extends AbstractActorTest { @Test public void testApplySnapshot() throws ExecutionException, InterruptedException { - Map peerAddresses = new HashMap<>(); + TestActorRef ref = TestActorRef.create(getSystem(), newShardProps()); - final ShardIdentifier identifier = - ShardIdentifier.builder().memberName("member-1") - .shardName("inventory").type("config").build(); + NormalizedNodeToNodeCodec codec = + new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT); - peerAddresses.put(identifier, null); - final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext()); + ref.underlyingActor().writeToStore(TestModel.TEST_PATH, ImmutableNodes.containerNode( + TestModel.TEST_QNAME)); - TestActorRef ref = TestActorRef.create(getSystem(), props); + YangInstanceIdentifier root = YangInstanceIdentifier.builder().build(); + NormalizedNode expected = ref.underlyingActor().readStore(root); - ref.underlyingActor().updateSchemaContext(TestModel.createTestContext()); + NormalizedNodeMessages.Container encode = codec.encode(root, expected); - NormalizedNodeToNodeCodec codec = - new NormalizedNodeToNodeCodec(TestModel.createTestContext()); + ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create( + encode.getNormalizedNode().toByteString().toByteArray(), + Collections.emptyList(), 1, 2, 3, 4)); - ref.underlyingActor().writeToStore(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + ref.underlyingActor().onReceiveCommand(applySnapshot); - NormalizedNode expected = ref.underlyingActor().readStore(); + NormalizedNode actual = ref.underlyingActor().readStore(root); - NormalizedNodeMessages.Container encode = codec - .encode(YangInstanceIdentifier.builder().build(), expected); + assertEquals(expected, actual); + } + @Test + public void testApplyState() throws Exception { - ref.underlyingActor().applySnapshot(encode.getNormalizedNode().toByteString()); + TestActorRef shard = TestActorRef.create(getSystem(), newShardProps()); - NormalizedNode actual = ref.underlyingActor().readStore(); + NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - assertEquals(expected, actual); - } + MutableCompositeModification compMod = new MutableCompositeModification(); + compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT)); + Payload payload = new CompositeModificationPayload(compMod.toSerializable()); + ApplyState applyState = new ApplyState(null, "test", + new ReplicatedLogImplEntry(1, 2, payload)); - private static class ShardTestKit extends JavaTestKit { + shard.underlyingActor().onReceiveCommand(applyState); - private ShardTestKit(ActorSystem actorSystem) { - super(actorSystem); + NormalizedNode actual = shard.underlyingActor().readStore(TestModel.TEST_PATH); + assertEquals("Applied state", node, actual); + } + + @SuppressWarnings("serial") + @Test + public void testRecovery() throws Exception { + + // Set up the InMemorySnapshotStore. + + InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null); + testStore.onGlobalContextUpdated(SCHEMA_CONTEXT); + + DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction(); + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready(); + commitCohort.preCommit().get(); + commitCohort.commit().get(); + + DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction(); + NormalizedNode root = readTx.read(YangInstanceIdentifier.builder().build()).get().get(); + + InMemorySnapshotStore.addSnapshot(IDENTIFIER.toString(), Snapshot.create( + new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode( + YangInstanceIdentifier.builder().build(), root). + getNormalizedNode().toByteString().toByteArray(), + Collections.emptyList(), 0, 1, -1, -1)); + + // Set up the InMemoryJournal. + + InMemoryJournal.addEntry(IDENTIFIER.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload( + new WriteModification(TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), + SCHEMA_CONTEXT)))); + + int nListEntries = 11; + Set listEntryKeys = new HashSet<>(); + for(int i = 1; i <= nListEntries; i++) { + listEntryKeys.add(Integer.valueOf(i)); + YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); + Modification mod = new MergeModification(path, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i), + SCHEMA_CONTEXT); + InMemoryJournal.addEntry(IDENTIFIER.toString(), i, new ReplicatedLogImplEntry(i, 1, + newPayload(mod))); } - protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){ - // Wait for a specific log message to show up - final boolean result = - new JavaTestKit.EventFilter(logLevel - ) { + InMemoryJournal.addEntry(IDENTIFIER.toString(), nListEntries + 1, + new ApplyLogEntries(nListEntries)); + + // Create the actor and wait for recovery complete. + + final CountDownLatch recoveryComplete = new CountDownLatch(1); + + Creator creator = new Creator() { + @Override + public Shard create() throws Exception { + return new Shard(IDENTIFIER, Collections.emptyMap(), + DATA_STORE_CONTEXT, SCHEMA_CONTEXT) { @Override - protected Boolean run() { - return true; + protected void onRecoveryComplete() { + try { + super.onRecoveryComplete(); + } finally { + recoveryComplete.countDown(); + } } - }.from(subject.path().toString()) - .message(logMessage) - .occurrences(1).exec(); + }; + } + }; - Assert.assertEquals(true, result); + TestActorRef shard = TestActorRef.create(getSystem(), + Props.create(new DelegatingShardCreator(creator)), "testRecovery"); + + assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS)); + + // Verify data in the data store. + + NormalizedNode outerList = shard.underlyingActor().readStore(TestModel.OUTER_LIST_PATH); + assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); + assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", + outerList.getValue() instanceof Iterable); + for(Object entry: (Iterable) outerList.getValue()) { + assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode", + entry instanceof MapEntryNode); + MapEntryNode mapEntry = (MapEntryNode)entry; + Optional> idLeaf = + mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME)); + assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); + Object value = idLeaf.get().getValue(); + assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value, + listEntryKeys.remove(value)); + } + if(!listEntryKeys.isEmpty()) { + fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " + + listEntryKeys); } + assertEquals("Last log index", nListEntries, + shard.underlyingActor().getShardMBean().getLastLogIndex()); + assertEquals("Commit index", nListEntries, + shard.underlyingActor().getShardMBean().getCommitIndex()); + assertEquals("Last applied", nListEntries, + shard.underlyingActor().getShardMBean().getLastApplied()); } + private CompositeModificationPayload newPayload(Modification... mods) { + MutableCompositeModification compMod = new MutableCompositeModification(); + for(Modification mod: mods) { + compMod.addModification(mod); + } + + return new CompositeModificationPayload(compMod.toSerializable()); + } + + @SuppressWarnings("unchecked") @Test - public void testCreateSnapshot() throws IOException, InterruptedException { + public void testForwardedCommitTransactionWithPersistence() throws IOException { + System.setProperty("shard.persistent", "true"); + new ShardTestKit(getSystem()) {{ - final ShardIdentifier identifier = - ShardIdentifier.builder().memberName("member-1") - .shardName("inventory").type("config").build(); + TestActorRef shard = TestActorRef.create(getSystem(), newShardProps()); - final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext()); - final ActorRef subject = - getSystem().actorOf(props, "testCreateSnapshot"); + waitUntilLeader(shard); - // Wait for a specific log message to show up - this.waitForLogMessage(Logging.Info.class, subject, "Switching from state Candidate to Leader"); + NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class); + doReturn(Futures.immediateFuture(null)).when(cohort).commit(); - new Within(duration("3 seconds")) { - @Override - protected void run() { + MutableCompositeModification modification = new MutableCompositeModification(); + modification.addModification(new WriteModification(TestModel.TEST_PATH, node, + SCHEMA_CONTEXT)); - subject.tell( - new UpdateSchemaContext(TestModel.createTestContext()), - getRef()); + shard.tell(new ForwardedCommitTransaction(cohort, modification), getRef()); - subject.tell(new CaptureSnapshot(-1,-1,-1,-1), - getRef()); + expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS); - waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor"); + verify(cohort).commit(); - subject.tell(new CaptureSnapshot(-1,-1,-1,-1), - getRef()); + assertEquals("Last log index", 0, shard.underlyingActor().getShardMBean().getLastLogIndex()); + }}; + } - waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor"); + @Test + public void testCreateSnapshot() throws IOException, InterruptedException { + new ShardTestKit(getSystem()) {{ + final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateSnapshot"); - } - }; + waitUntilLeader(subject); + + subject.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef()); + + waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor"); + + subject.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef()); - deletePersistenceFiles(); + waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor"); }}; } @@ -366,7 +408,7 @@ public class ShardTest extends AbstractActorTest { InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator( MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor()); - store.onGlobalContextUpdated(TestModel.createTestContext()); + store.onGlobalContextUpdated(SCHEMA_CONTEXT); DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction(); putTransaction.write(TestModel.TEST_PATH, @@ -424,4 +466,46 @@ public class ShardTest extends AbstractActorTest { } }; } + + private static final class DelegatingShardCreator implements Creator { + private final Creator delegate; + + DelegatingShardCreator(Creator delegate) { + this.delegate = delegate; + } + + @Override + public Shard create() throws Exception { + return delegate.create(); + } + } + + private static class ShardTestKit extends JavaTestKit { + + private ShardTestKit(ActorSystem actorSystem) { + super(actorSystem); + } + + protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){ + // Wait for a specific log message to show up + final boolean result = + new JavaTestKit.EventFilter(logLevel + ) { + @Override + protected Boolean run() { + return true; + } + }.from(subject.path().toString()) + .message(logMessage) + .occurrences(1).exec(); + + Assert.assertEquals(true, result); + + } + + protected void waitUntilLeader(ActorRef subject) { + waitForLogMessage(Logging.Info.class, subject, + "Switching from state Candidate to Leader"); + } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index 0beb00b435..3f31591c79 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -503,7 +503,7 @@ public class ShardTransactionTest extends AbstractActorTest { datastoreContext = new DatastoreContext("Test", InMemoryDOMDataStoreConfigProperties.getDefault(), - Duration.create(500, TimeUnit.MILLISECONDS), 5); + Duration.create(500, TimeUnit.MILLISECONDS), 5, 1000, 1000, 500); new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java new file mode 100644 index 0000000000..c9a0eaf033 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.utils; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import com.google.common.collect.Maps; +import scala.concurrent.Future; +import akka.dispatch.Futures; +import akka.japi.Procedure; +import akka.persistence.PersistentConfirmation; +import akka.persistence.PersistentId; +import akka.persistence.PersistentImpl; +import akka.persistence.PersistentRepr; +import akka.persistence.journal.japi.AsyncWriteJournal; + +public class InMemoryJournal extends AsyncWriteJournal { + + private static Map> journals = new ConcurrentHashMap<>(); + + public static void addEntry(String persistenceId, long sequenceNr, Object data) { + Map journal = journals.get(persistenceId); + if(journal == null) { + journal = Maps.newLinkedHashMap(); + journals.put(persistenceId, journal); + } + + journal.put(sequenceNr, data); + } + + public static void clear() { + journals.clear(); + } + + @Override + public Future doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, + long toSequenceNr, long max, final Procedure replayCallback) { + return Futures.future(new Callable() { + @Override + public Void call() throws Exception { + Map journal = journals.get(persistenceId); + if(journal == null) { + return null; + } + + for (Map.Entry entry : journal.entrySet()) { + PersistentRepr persistentMessage = + new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, false, null, null); + replayCallback.apply(persistentMessage); + } + + return null; + } + }, context().dispatcher()); + } + + @Override + public Future doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) { + return Futures.successful(new Long(0)); + } + + @Override + public Future doAsyncWriteMessages(Iterable messages) { + return Futures.successful(null); + } + + @Override + public Future doAsyncWriteConfirmations(Iterable confirmations) { + return Futures.successful(null); + } + + @Override + public Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent) { + return Futures.successful(null); + } + + @Override + public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) { + return Futures.successful(null); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java index 0e492f0fbb..22e522b760 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java @@ -16,46 +16,66 @@ import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.snapshot.japi.SnapshotStore; import com.google.common.collect.Iterables; import scala.concurrent.Future; - import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.opendaylight.controller.cluster.raft.Snapshot; public class InMemorySnapshotStore extends SnapshotStore { - Map> snapshots = new HashMap<>(); + private static Map> snapshots = new ConcurrentHashMap<>(); + + public static void addSnapshot(String persistentId, Snapshot snapshot) { + List snapshotList = snapshots.get(persistentId); + + if(snapshotList == null) { + snapshotList = new ArrayList<>(); + snapshots.put(persistentId, snapshotList); + } + + snapshotList.add(new StoredSnapshot(new SnapshotMetadata(persistentId, snapshotList.size(), + System.currentTimeMillis()), snapshot)); + } + + public static void clear() { + snapshots.clear(); + } - @Override public Future> doLoadAsync(String s, + @Override + public Future> doLoadAsync(String s, SnapshotSelectionCriteria snapshotSelectionCriteria) { - List snapshotList = snapshots.get(s); + List snapshotList = snapshots.get(s); if(snapshotList == null){ return Futures.successful(Option.none()); } - Snapshot snapshot = Iterables.getLast(snapshotList); + StoredSnapshot snapshot = Iterables.getLast(snapshotList); SelectedSnapshot selectedSnapshot = new SelectedSnapshot(snapshot.getMetadata(), snapshot.getData()); return Futures.successful(Option.some(selectedSnapshot)); } - @Override public Future doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) { - List snapshotList = snapshots.get(snapshotMetadata.persistenceId()); + @Override + public Future doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) { + List snapshotList = snapshots.get(snapshotMetadata.persistenceId()); if(snapshotList == null){ snapshotList = new ArrayList<>(); snapshots.put(snapshotMetadata.persistenceId(), snapshotList); } - snapshotList.add(new Snapshot(snapshotMetadata, o)); + snapshotList.add(new StoredSnapshot(snapshotMetadata, o)); return Futures.successful(null); } - @Override public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception { + @Override + public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception { } - @Override public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception { - List snapshotList = snapshots.get(snapshotMetadata.persistenceId()); + @Override + public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception { + List snapshotList = snapshots.get(snapshotMetadata.persistenceId()); if(snapshotList == null){ return; @@ -64,7 +84,7 @@ public class InMemorySnapshotStore extends SnapshotStore { int deleteIndex = -1; for(int i=0;i snapshotList = snapshots.get(s); + List snapshotList = snapshots.get(s); if(snapshotList == null){ return; @@ -90,11 +111,11 @@ public class InMemorySnapshotStore extends SnapshotStore { snapshots.remove(s); } - private static class Snapshot { + private static class StoredSnapshot { private final SnapshotMetadata metadata; private final Object data; - private Snapshot(SnapshotMetadata metadata, Object data) { + private StoredSnapshot(SnapshotMetadata metadata, Object data) { this.metadata = metadata; this.data = data; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf index f0dadc618b..3a37dd9376 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf @@ -1,5 +1,6 @@ akka { persistence.snapshot-store.plugin = "in-memory-snapshot-store" + persistence.journal.plugin = "in-memory-journal" loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"] @@ -17,6 +18,10 @@ akka { } } +in-memory-journal { + class = "org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal" +} + in-memory-snapshot-store { # Class name of the plugin. class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore"