From cd81eb73b7abf677571b2366425ccbc8d794f4b6 Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Fri, 17 Oct 2014 16:42:41 -0700 Subject: [PATCH] BUG 2134 : Make persistence configurable at the datastore level - Added a peristent flag in the config sub-system - If persistent in not true then we ignore recovery messages and not use the akka persistence apis for persisting anything - The unit tests - assume that persistence is on by default (which it is) - test that if recovery is applicable only then the ShardManager and RaftActor process the recovoery messages. - test that when persisting the data-persistence API is used in the appropriate places (see RaftActorTest/ShardManagerTest) Change-Id: I19913bcd32e609ccde6ad8e35209788315504426 Signed-off-by: Moiz Raja --- .../cluster/example/ExampleActor.java | 14 +- .../controller/cluster/raft/RaftActor.java | 111 +++-- .../cluster/raft/RaftActorTest.java | 391 +++++++++++++++++- .../src/test/resources/application.conf | 1 + .../cluster/DataPersistenceProvider.java | 55 +++ .../actor/AbstractUntypedPersistentActor.java | 70 ++++ .../cluster/common/actor/Monitor.java | 4 +- .../DataPersistenceProviderMonitor.java | 68 +++ .../cluster/datastore/DatastoreContext.java | 34 +- .../DistributedDataStoreFactory.java | 34 +- .../controller/cluster/datastore/Shard.java | 36 +- .../cluster/datastore/ShardManager.java | 53 ++- .../datastore/config/ConfigurationReader.java | 15 + .../config/FileConfigurationReader.java | 28 ++ .../config/ResourceConfigurationReader.java | 19 + ...tributedConfigDataStoreProviderModule.java | 1 + ...tedOperationalDataStoreProviderModule.java | 1 + .../yang/distributed-datastore-provider.yang | 6 + .../DistributedDataStoreIntegrationTest.java | 7 +- .../cluster/datastore/ShardManagerTest.java | 92 ++++- .../cluster/datastore/ShardTest.java | 71 +++- .../cluster/datastore/ShardTestKit.java | 18 +- .../datastore/TransactionProxyTest.java | 1 - 23 files changed, 1011 insertions(+), 119 deletions(-) create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DataPersistenceProvider.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/DataPersistenceProviderMonitor.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationReader.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/FileConfigurationReader.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ResourceConfigurationReader.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 97b912ef74..06538fd2ae 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -11,10 +11,9 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.Props; import akka.japi.Creator; - import com.google.common.base.Optional; import com.google.protobuf.ByteString; - +import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; @@ -38,6 +37,7 @@ import java.util.Map; public class ExampleActor extends RaftActor { private final Map state = new HashMap(); + private final DataPersistenceProvider dataPersistenceProvider; private long persistIdentifier = 1; @@ -45,6 +45,7 @@ public class ExampleActor extends RaftActor { public ExampleActor(String id, Map peerAddresses, Optional configParams) { super(id, peerAddresses, configParams); + this.dataPersistenceProvider = new PersistentDataProvider(); } public static Props props(final String id, final Map peerAddresses, @@ -57,7 +58,7 @@ public class ExampleActor extends RaftActor { }); } - @Override public void onReceiveCommand(Object message){ + @Override public void onReceiveCommand(Object message) throws Exception{ if(message instanceof KeyValue){ if(isLeader()) { String persistId = Long.toString(persistIdentifier++); @@ -160,7 +161,12 @@ public class ExampleActor extends RaftActor { } - @Override public void onReceiveRecover(Object message) { + @Override + protected DataPersistenceProvider persistence() { + return dataPersistenceProvider; + } + + @Override public void onReceiveRecover(Object message)throws Exception { super.onReceiveRecover(message); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 66a46ef3bd..2459c2ff8b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -18,10 +18,11 @@ import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; -import akka.persistence.UntypedPersistentActor; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; import com.google.protobuf.ByteString; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; @@ -38,6 +39,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; + import java.io.Serializable; import java.util.Map; @@ -81,7 +83,7 @@ import java.util.Map; *
  • when a snapshot should be saved
  • * */ -public abstract class RaftActor extends UntypedPersistentActor { +public abstract class RaftActor extends AbstractUntypedPersistentActor { protected final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this); @@ -135,24 +137,40 @@ public abstract class RaftActor extends UntypedPersistentActor { public void preStart() throws Exception { LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(), context.getConfigParams().getJournalRecoveryLogBatchSize()); + super.preStart(); } @Override - public void onReceiveRecover(Object message) { - if (message instanceof SnapshotOffer) { - onRecoveredSnapshot((SnapshotOffer)message); - } else if (message instanceof ReplicatedLogEntry) { - onRecoveredJournalLogEntry((ReplicatedLogEntry)message); - } else if (message instanceof ApplyLogEntries) { - onRecoveredApplyLogEntries((ApplyLogEntries)message); - } else if (message instanceof DeleteEntries) { - replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex()); - } else if (message instanceof UpdateElectionTerm) { - context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), - ((UpdateElectionTerm) message).getVotedFor()); - } else if (message instanceof RecoveryCompleted) { - onRecoveryCompletedMessage(); + public void handleRecover(Object message) { + if(persistence().isRecoveryApplicable()) { + if (message instanceof SnapshotOffer) { + onRecoveredSnapshot((SnapshotOffer) message); + } else if (message instanceof ReplicatedLogEntry) { + onRecoveredJournalLogEntry((ReplicatedLogEntry) message); + } else if (message instanceof ApplyLogEntries) { + onRecoveredApplyLogEntries((ApplyLogEntries) message); + } else if (message instanceof DeleteEntries) { + replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex()); + } else if (message instanceof UpdateElectionTerm) { + context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), + ((UpdateElectionTerm) message).getVotedFor()); + } else if (message instanceof RecoveryCompleted) { + onRecoveryCompletedMessage(); + } + } else { + if (message instanceof RecoveryCompleted) { + // Delete all the messages from the akka journal so that we do not end up with consistency issues + // Note I am not using the dataPersistenceProvider and directly using the akka api here + deleteMessages(lastSequenceNr()); + + // Delete all the akka snapshots as they will not be needed + deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue())); + + onRecoveryComplete(); + currentBehavior = new Follower(context); + onStateChanged(); + } } } @@ -254,7 +272,7 @@ public abstract class RaftActor extends UntypedPersistentActor { onStateChanged(); } - @Override public void onReceiveCommand(Object message) { + @Override public void handleCommand(Object message) { if (message instanceof ApplyState){ ApplyState applyState = (ApplyState) message; @@ -272,7 +290,7 @@ public abstract class RaftActor extends UntypedPersistentActor { if(LOG.isDebugEnabled()) { LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex()); } - persist(new ApplyLogEntries(ale.getToIndex()), new Procedure() { + persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure() { @Override public void apply(ApplyLogEntries param) throws Exception { } @@ -304,10 +322,9 @@ public abstract class RaftActor extends UntypedPersistentActor { SaveSnapshotSuccess success = (SaveSnapshotSuccess) message; LOG.info("SaveSnapshotSuccess received for snapshot"); - context.getReplicatedLog().snapshotCommit(); + long sequenceNumber = success.metadata().sequenceNr(); - // TODO: Not sure if we want to be this aggressive with trimming stuff - trimPersistentData(success.metadata().sequenceNr()); + commitSnapshot(sequenceNumber); } else if (message instanceof SaveSnapshotFailure) { SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message; @@ -485,7 +502,12 @@ public abstract class RaftActor extends UntypedPersistentActor { context.setPeerAddress(peerId, peerAddress); } + protected void commitSnapshot(long sequenceNumber) { + context.getReplicatedLog().snapshotCommit(); + // TODO: Not sure if we want to be this aggressive with trimming stuff + trimPersistentData(sequenceNumber); + } /** * The applyState method will be called by the RaftActor when some data @@ -515,7 +537,7 @@ public abstract class RaftActor extends UntypedPersistentActor { /** * This method is called during recovery to append state data to the current batch. This method - * is called 1 or more times after {@link #startRecoveryStateBatch}. + * is called 1 or more times after {@link #startLogRecoveryBatch}. * * @param data the state data */ @@ -530,7 +552,7 @@ public abstract class RaftActor extends UntypedPersistentActor { /** * This method is called during recovery at the end of a batch to apply the current batched - * log entries. This method is called after {@link #appendRecoveryLogEntry}. + * log entries. This method is called after {@link #appendRecoveredLogEntry}. */ protected abstract void applyCurrentLogRecoveryBatch(); @@ -566,17 +588,19 @@ public abstract class RaftActor extends UntypedPersistentActor { */ protected abstract void onStateChanged(); + protected abstract DataPersistenceProvider persistence(); + protected void onLeaderChanged(String oldLeader, String newLeader){}; private void trimPersistentData(long sequenceNumber) { // Trim akka snapshots // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied // For now guessing that it is ANDed. - deleteSnapshots(new SnapshotSelectionCriteria( + persistence().deleteSnapshots(new SnapshotSelectionCriteria( sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000)); // Trim akka journal - deleteMessages(sequenceNumber); + persistence().deleteMessages(sequenceNumber); } private String getLeaderAddress(){ @@ -605,7 +629,7 @@ public abstract class RaftActor extends UntypedPersistentActor { captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); - saveSnapshot(sn); + persistence().saveSnapshot(sn); LOG.info("Persisting of snapshot done:{}", sn.getLogMessage()); @@ -647,7 +671,7 @@ public abstract class RaftActor extends UntypedPersistentActor { // FIXME: Maybe this should be done after the command is saved journal.subList(adjustedIndex , journal.size()).clear(); - persist(new DeleteEntries(adjustedIndex), new Procedure(){ + persistence().persist(new DeleteEntries(adjustedIndex), new Procedure(){ @Override public void apply(DeleteEntries param) throws Exception { @@ -677,7 +701,7 @@ public abstract class RaftActor extends UntypedPersistentActor { // persist call and the execution(s) of the associated event // handler. This also holds for multiple persist calls in context // of a single command. - persist(replicatedLogEntry, + persistence().persist(replicatedLogEntry, new Procedure() { @Override public void apply(ReplicatedLogEntry evt) throws Exception { @@ -723,7 +747,7 @@ public abstract class RaftActor extends UntypedPersistentActor { } - private static class DeleteEntries implements Serializable { + static class DeleteEntries implements Serializable { private final int fromIndex; @@ -766,7 +790,7 @@ public abstract class RaftActor extends UntypedPersistentActor { public void updateAndPersist(long currentTerm, String votedFor){ update(currentTerm, votedFor); // FIXME : Maybe first persist then update the state - persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure(){ + persistence().persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure(){ @Override public void apply(UpdateElectionTerm param) throws Exception { @@ -776,7 +800,7 @@ public abstract class RaftActor extends UntypedPersistentActor { } } - private static class UpdateElectionTerm implements Serializable { + static class UpdateElectionTerm implements Serializable { private final long currentTerm; private final String votedFor; @@ -794,4 +818,29 @@ public abstract class RaftActor extends UntypedPersistentActor { } } + protected class NonPersistentRaftDataProvider extends NonPersistentDataProvider { + + public NonPersistentRaftDataProvider(){ + + } + + /** + * The way snapshotting works is, + *
      + *
    1. RaftActor calls createSnapshot on the Shard + *
    2. Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot + *
    3. When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save the snapshot. + * The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the RaftActor gets SaveSnapshot + * success it commits the snapshot to the in-memory journal. This commitSnapshot is mimicking what is done + * in SaveSnapshotSuccess. + *
    + * @param o + */ + @Override + public void saveSnapshot(Object o) { + // Make saving Snapshot successful + commitSnapshot(-1L); + } + } + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index c15c9198bd..69af77c749 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -7,19 +7,29 @@ import akka.actor.Props; import akka.actor.Terminated; import akka.event.Logging; import akka.japi.Creator; +import akka.persistence.RecoveryCompleted; +import akka.persistence.SaveSnapshotSuccess; +import akka.persistence.SnapshotMetadata; +import akka.persistence.SnapshotOffer; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import com.google.common.base.Optional; +import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import org.junit.After; import org.junit.Test; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal; import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore; import scala.concurrent.duration.FiniteDuration; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -32,7 +42,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; + import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.mockito.Mockito.mock; public class RaftActorTest extends AbstractActorTest { @@ -45,30 +58,42 @@ public class RaftActorTest extends AbstractActorTest { public static class MockRaftActor extends RaftActor { + private final DataPersistenceProvider dataPersistenceProvider; + public static final class MockRaftActorCreator implements Creator { private final Map peerAddresses; private final String id; private final Optional config; + private final DataPersistenceProvider dataPersistenceProvider; private MockRaftActorCreator(Map peerAddresses, String id, - Optional config) { + Optional config, DataPersistenceProvider dataPersistenceProvider) { this.peerAddresses = peerAddresses; this.id = id; this.config = config; + this.dataPersistenceProvider = dataPersistenceProvider; } @Override public MockRaftActor create() throws Exception { - return new MockRaftActor(id, peerAddresses, config); + return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider); } } private final CountDownLatch recoveryComplete = new CountDownLatch(1); + private final CountDownLatch applyRecoverySnapshot = new CountDownLatch(1); + private final CountDownLatch applyStateLatch = new CountDownLatch(1); + private final List state; - public MockRaftActor(String id, Map peerAddresses, Optional config) { + public MockRaftActor(String id, Map peerAddresses, Optional config, DataPersistenceProvider dataPersistenceProvider) { super(id, peerAddresses, config); state = new ArrayList<>(); + if(dataPersistenceProvider == null){ + this.dataPersistenceProvider = new PersistentDataProvider(); + } else { + this.dataPersistenceProvider = dataPersistenceProvider; + } } public void waitForRecoveryComplete() { @@ -79,16 +104,27 @@ public class RaftActorTest extends AbstractActorTest { } } + public CountDownLatch getApplyRecoverySnapshotLatch(){ + return applyRecoverySnapshot; + } + public List getState() { return state; } public static Props props(final String id, final Map peerAddresses, Optional config){ - return Props.create(new MockRaftActorCreator(peerAddresses, id, config)); + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null)); } + public static Props props(final String id, final Map peerAddresses, + Optional config, DataPersistenceProvider dataPersistenceProvider){ + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider)); + } + + @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { + applyStateLatch.countDown(); } @Override @@ -111,6 +147,7 @@ public class RaftActorTest extends AbstractActorTest { @Override protected void applyRecoverySnapshot(ByteString snapshot) { + applyRecoverySnapshot.countDown(); try { Object data = toObject(snapshot); System.out.println("!!!!!applyRecoverySnapshot: "+data); @@ -123,7 +160,6 @@ public class RaftActorTest extends AbstractActorTest { } @Override protected void createSnapshot() { - throw new UnsupportedOperationException("createSnapshot"); } @Override protected void applySnapshot(ByteString snapshot) { @@ -132,6 +168,11 @@ public class RaftActorTest extends AbstractActorTest { @Override protected void onStateChanged() { } + @Override + protected DataPersistenceProvider persistence() { + return this.dataPersistenceProvider; + } + @Override public String persistenceId() { return this.getId(); } @@ -155,6 +196,9 @@ public class RaftActorTest extends AbstractActorTest { return obj; } + public ReplicatedLog getReplicatedLog(){ + return this.getRaftActorContext().getReplicatedLog(); + } } @@ -294,6 +338,343 @@ public class RaftActorTest extends AbstractActorTest { }}; } + /** + * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does + * process recovery messages + * + * @throws Exception + */ + + @Test + public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable"; + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, + Collections.EMPTY_MAP, Optional.of(config)), persistenceId); + + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("A"), + new MockRaftActorContext.MockPayload("B"), + new MockRaftActorContext.MockPayload("C"), + new MockRaftActorContext.MockPayload("D"))); + + Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(), + Lists.newArrayList(), 3, 1 ,3, 1); + + MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + + mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); + + CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch(); + + assertEquals("apply recovery snapshot", true, applyRecoverySnapshotLatch.await(5, TimeUnit.SECONDS)); + + mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); + + ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog(); + + assertEquals("add replicated log entry", 1, replicatedLog.size()); + + mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A"))); + + assertEquals("add replicated log entry", 2, replicatedLog.size()); + + mockRaftActor.onReceiveRecover(new ApplyLogEntries(1)); + + assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex()); + + // The snapshot had 4 items + we added 2 more items during the test + // We start removing from 5 and we should get 1 item in the replicated log + mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5)); + + assertEquals("remove log entries", 1, replicatedLog.size()); + + mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar")); + + assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm()); + assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor()); + + mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class)); + + mockRaftActor.waitForRecoveryComplete(); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + + }}; + } + + /** + * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does + * not process recovery messages + * + * @throws Exception + */ + @Test + public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable"; + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, + Collections.EMPTY_MAP, Optional.of(config), new DataPersistenceProviderMonitor()), persistenceId); + + MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("A"), + new MockRaftActorContext.MockPayload("B"), + new MockRaftActorContext.MockPayload("C"), + new MockRaftActorContext.MockPayload("D"))); + + Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(), + Lists.newArrayList(), 3, 1 ,3, 1); + + mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); + + CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch(); + + assertEquals("apply recovery snapshot", false, applyRecoverySnapshotLatch.await(1, TimeUnit.SECONDS)); + + mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); + + ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog(); + + assertEquals("add replicated log entry", 0, replicatedLog.size()); + + mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A"))); + + assertEquals("add replicated log entry", 0, replicatedLog.size()); + + mockRaftActor.onReceiveRecover(new ApplyLogEntries(1)); + + assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex()); + + mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2)); + + assertEquals("remove log entries", 0, replicatedLog.size()); + + mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar")); + + assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm()); + assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor()); + + mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class)); + + mockRaftActor.waitForRecoveryComplete(); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + }}; + } + + + @Test + public void testUpdatingElectionTermCallsDataPersistence() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = "testUpdatingElectionTermCallsDataPersistence"; + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + CountDownLatch persistLatch = new CountDownLatch(1); + DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor(); + dataPersistenceProviderMonitor.setPersistLatch(persistLatch); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, + Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + + MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + + mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar"); + + assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS)); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + + } + }; + } + + @Test + public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence"; + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + CountDownLatch persistLatch = new CountDownLatch(1); + DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor(); + dataPersistenceProviderMonitor.setPersistLatch(persistLatch); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, + Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + + MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + + mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class))); + + assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS)); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + + } + }; + } + + @Test + public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence"; + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + CountDownLatch persistLatch = new CountDownLatch(2); + DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor(); + dataPersistenceProviderMonitor.setPersistLatch(persistLatch); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, + Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + + MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + + mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); + + mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0); + + assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS)); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + + } + }; + } + + @Test + public void testApplyLogEntriesCallsDataPersistence() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = "testApplyLogEntriesCallsDataPersistence"; + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + CountDownLatch persistLatch = new CountDownLatch(1); + DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor(); + dataPersistenceProviderMonitor.setPersistLatch(persistLatch); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, + Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + + MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + + mockRaftActor.onReceiveCommand(new ApplyLogEntries(10)); + + assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS)); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + + } + }; + } + + @Test + public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence"; + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + CountDownLatch persistLatch = new CountDownLatch(1); + DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor(); + dataPersistenceProviderMonitor.setSaveSnapshotLatch(persistLatch); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, + Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + + MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("A"), + new MockRaftActorContext.MockPayload("B"), + new MockRaftActorContext.MockPayload("C"), + new MockRaftActorContext.MockPayload("D"))); + + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1)); + + mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes)); + + assertEquals("Save Snapshot called", true, persistLatch.await(5, TimeUnit.SECONDS)); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + + } + }; + } + + @Test + public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence"; + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + CountDownLatch deleteMessagesLatch = new CountDownLatch(1); + CountDownLatch deleteSnapshotsLatch = new CountDownLatch(1); + DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor(); + dataPersistenceProviderMonitor.setDeleteMessagesLatch(deleteMessagesLatch); + dataPersistenceProviderMonitor.setDeleteSnapshotsLatch(deleteSnapshotsLatch); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, + Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + + MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("A"), + new MockRaftActorContext.MockPayload("B"), + new MockRaftActorContext.MockPayload("C"), + new MockRaftActorContext.MockPayload("D"))); + + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1)); + + mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes)); + + mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100))); + + assertEquals("Delete Messages called", true, deleteMessagesLatch.await(5, TimeUnit.SECONDS)); + + assertEquals("Delete Snapshots called", true, deleteSnapshotsLatch.await(5, TimeUnit.SECONDS)); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + + } + }; + } + private ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf b/opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf index 2f53d4a4eb..8a45108f8b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf @@ -15,6 +15,7 @@ akka { } serialization-bindings { + "org.opendaylight.controller.cluster.common.actor.Monitor" = java "org.opendaylight.controller.cluster.raft.client.messages.FindLeader" = java "org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry" = java "com.google.protobuf.Message" = proto diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DataPersistenceProvider.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DataPersistenceProvider.java new file mode 100644 index 0000000000..db4bf31438 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DataPersistenceProvider.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster; + +import akka.japi.Procedure; +import akka.persistence.SnapshotSelectionCriteria; + +/** + * DataPersistenceProvider provides methods to persist data and is an abstraction of the akka-persistence persistence + * API. + */ +public interface DataPersistenceProvider { + /** + * @return false if recovery is not applicable. In that case the provider is not persistent and may not have + * anything to be recovered + */ + boolean isRecoveryApplicable(); + + /** + * Persist a journal entry. + * + * @param o + * @param procedure + * @param + */ + void persist(T o, Procedure procedure); + + /** + * Save a snapshot + * + * @param o + */ + void saveSnapshot(Object o); + + /** + * Delete snapshots based on the criteria + * + * @param criteria + */ + void deleteSnapshots(SnapshotSelectionCriteria criteria); + + /** + * Delete journal entries up to the sequence number + * + * @param sequenceNumber + */ + void deleteMessages(long sequenceNumber); + +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java index 36b2866210..8a6217deab 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java @@ -10,7 +10,10 @@ package org.opendaylight.controller.cluster.common.actor; import akka.event.Logging; import akka.event.LoggingAdapter; +import akka.japi.Procedure; +import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.UntypedPersistentActor; +import org.opendaylight.controller.cluster.DataPersistenceProvider; public abstract class AbstractUntypedPersistentActor extends UntypedPersistentActor { @@ -67,4 +70,71 @@ public abstract class AbstractUntypedPersistentActor extends UntypedPersistentAc } unhandled(message); } + + protected class PersistentDataProvider implements DataPersistenceProvider { + + public PersistentDataProvider(){ + + } + + @Override + public boolean isRecoveryApplicable() { + return true; + } + + @Override + public void persist(T o, Procedure procedure) { + AbstractUntypedPersistentActor.this.persist(o, procedure); + } + + @Override + public void saveSnapshot(Object o) { + AbstractUntypedPersistentActor.this.saveSnapshot(o); + } + + @Override + public void deleteSnapshots(SnapshotSelectionCriteria criteria) { + AbstractUntypedPersistentActor.this.deleteSnapshots(criteria); + } + + @Override + public void deleteMessages(long sequenceNumber) { + AbstractUntypedPersistentActor.this.deleteMessages(sequenceNumber); + } + } + + protected class NonPersistentDataProvider implements DataPersistenceProvider { + + public NonPersistentDataProvider(){ + + } + + @Override + public boolean isRecoveryApplicable() { + return false; + } + + @Override + public void persist(T o, Procedure procedure) { + try { + procedure.apply(o); + } catch (Exception e) { + LOG.error(e, "An unexpected error occurred"); + } + } + + @Override + public void saveSnapshot(Object o) { + } + + @Override + public void deleteSnapshots(SnapshotSelectionCriteria criteria) { + + } + + @Override + public void deleteMessages(long sequenceNumber) { + + } + } } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/Monitor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/Monitor.java index b2a43c03d9..88ce791f02 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/Monitor.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/Monitor.java @@ -10,7 +10,9 @@ package org.opendaylight.controller.cluster.common.actor; import akka.actor.ActorRef; -public class Monitor { +import java.io.Serializable; + +public class Monitor implements Serializable { private final ActorRef actorRef; public Monitor(ActorRef actorRef){ diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/DataPersistenceProviderMonitor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/DataPersistenceProviderMonitor.java new file mode 100644 index 0000000000..33d4056395 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/DataPersistenceProviderMonitor.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.japi.Procedure; +import akka.persistence.SnapshotSelectionCriteria; +import org.opendaylight.controller.cluster.DataPersistenceProvider; + +import java.util.concurrent.CountDownLatch; + +/** + * This class is intended for testing purposes. It just triggers CountDownLatch's in each method. + * This class really should be under src/test/java but it was problematic trying to uses it in other projects. + */ +public class DataPersistenceProviderMonitor implements DataPersistenceProvider { + + private CountDownLatch persistLatch = new CountDownLatch(1); + private CountDownLatch saveSnapshotLatch = new CountDownLatch(1); + private CountDownLatch deleteSnapshotsLatch = new CountDownLatch(1);; + private CountDownLatch deleteMessagesLatch = new CountDownLatch(1);; + + @Override + public boolean isRecoveryApplicable() { + return false; + } + + @Override + public void persist(T o, Procedure procedure) { + persistLatch.countDown(); + } + + @Override + public void saveSnapshot(Object o) { + saveSnapshotLatch.countDown(); + } + + @Override + public void deleteSnapshots(SnapshotSelectionCriteria criteria) { + deleteSnapshotsLatch.countDown(); + } + + @Override + public void deleteMessages(long sequenceNumber) { + deleteMessagesLatch.countDown(); + } + + public void setPersistLatch(CountDownLatch persistLatch) { + this.persistLatch = persistLatch; + } + + public void setSaveSnapshotLatch(CountDownLatch saveSnapshotLatch) { + this.saveSnapshotLatch = saveSnapshotLatch; + } + + public void setDeleteSnapshotsLatch(CountDownLatch deleteSnapshotsLatch) { + this.deleteSnapshotsLatch = deleteSnapshotsLatch; + } + + public void setDeleteMessagesLatch(CountDownLatch deleteMessagesLatch) { + this.deleteMessagesLatch = deleteMessagesLatch; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index 03d331b558..2048bde613 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -8,12 +8,15 @@ package org.opendaylight.controller.cluster.datastore; +import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader; +import org.opendaylight.controller.cluster.datastore.config.FileConfigurationReader; import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; import akka.util.Timeout; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; + import java.util.concurrent.TimeUnit; /** @@ -32,12 +35,15 @@ public class DatastoreContext { private final int shardTransactionCommitQueueCapacity; private final Timeout shardInitializationTimeout; private final Timeout shardLeaderElectionTimeout; + private final boolean persistent; + private final ConfigurationReader configurationReader; private DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties, ConfigParams shardRaftConfig, String dataStoreMXBeanType, int operationTimeoutInSeconds, Duration shardTransactionIdleTimeout, int shardTransactionCommitTimeoutInSeconds, int shardTransactionCommitQueueCapacity, Timeout shardInitializationTimeout, - Timeout shardLeaderElectionTimeout) { + Timeout shardLeaderElectionTimeout, + boolean persistent, ConfigurationReader configurationReader) { this.dataStoreProperties = dataStoreProperties; this.shardRaftConfig = shardRaftConfig; this.dataStoreMXBeanType = dataStoreMXBeanType; @@ -47,6 +53,8 @@ public class DatastoreContext { this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity; this.shardInitializationTimeout = shardInitializationTimeout; this.shardLeaderElectionTimeout = shardLeaderElectionTimeout; + this.persistent = persistent; + this.configurationReader = configurationReader; } public static Builder newBuilder() { @@ -89,6 +97,14 @@ public class DatastoreContext { return shardLeaderElectionTimeout; } + public boolean isPersistent() { + return persistent; + } + + public ConfigurationReader getConfigurationReader() { + return configurationReader; + } + public static class Builder { private InMemoryDOMDataStoreConfigProperties dataStoreProperties; private Duration shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES); @@ -101,6 +117,8 @@ public class DatastoreContext { private int shardTransactionCommitQueueCapacity = 20000; private Timeout shardInitializationTimeout = new Timeout(5, TimeUnit.MINUTES); private Timeout shardLeaderElectionTimeout = new Timeout(30, TimeUnit.SECONDS); + private boolean persistent = true; + private ConfigurationReader configurationReader = new FileConfigurationReader(); public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) { this.shardTransactionIdleTimeout = shardTransactionIdleTimeout; @@ -157,6 +175,17 @@ public class DatastoreContext { return this; } + public Builder configurationReader(ConfigurationReader configurationReader){ + this.configurationReader = configurationReader; + return this; + } + + + public Builder persistent(boolean persistent){ + this.persistent = persistent; + return this; + } + public DatastoreContext build() { DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis, @@ -167,7 +196,8 @@ public class DatastoreContext { return new DatastoreContext(dataStoreProperties, raftConfig, dataStoreMXBeanType, operationTimeoutInSeconds, shardTransactionIdleTimeout, shardTransactionCommitTimeoutInSeconds, shardTransactionCommitQueueCapacity, - shardInitializationTimeout, shardLeaderElectionTimeout); + shardInitializationTimeout, shardLeaderElectionTimeout, + persistent, configurationReader); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java index a6f187d065..004faf2de1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java @@ -11,27 +11,26 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSystem; import akka.actor.Props; import akka.osgi.BundleDelegatingClassLoader; -import com.google.common.base.Preconditions; -import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.sal.core.api.model.SchemaService; import org.osgi.framework.BundleContext; -import java.io.File; import java.util.concurrent.atomic.AtomicReference; public class DistributedDataStoreFactory { - public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf"; public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data"; + public static final String CONFIGURATION_NAME = "odl-cluster-data"; - private static AtomicReference actorSystem = new AtomicReference<>(); + + private static AtomicReference persistentActorSystem = new AtomicReference<>(); public static DistributedDataStore createInstance(String name, SchemaService schemaService, DatastoreContext datastoreContext, BundleContext bundleContext) { - ActorSystem actorSystem = getOrCreateInstance(bundleContext); + ActorSystem actorSystem = getOrCreateInstance(bundleContext, datastoreContext.getConfigurationReader()); Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf"); final DistributedDataStore dataStore = new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem), @@ -42,27 +41,26 @@ public class DistributedDataStoreFactory { return dataStore; } - synchronized private static final ActorSystem getOrCreateInstance(final BundleContext bundleContext) { + synchronized private static final ActorSystem getOrCreateInstance(final BundleContext bundleContext, ConfigurationReader configurationReader) { + + AtomicReference actorSystemReference = persistentActorSystem; + String configurationName = CONFIGURATION_NAME; + String actorSystemName = ACTOR_SYSTEM_NAME; - if (actorSystem.get() != null){ - return actorSystem.get(); + if (actorSystemReference.get() != null){ + return actorSystemReference.get(); } + // Create an OSGi bundle classloader for actor system BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(), Thread.currentThread().getContextClassLoader()); - ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME, - ConfigFactory.load(readAkkaConfiguration()).getConfig(CONFIGURATION_NAME), classLoader); + ActorSystem system = ActorSystem.create(actorSystemName, + ConfigFactory.load(configurationReader.read()).getConfig(configurationName), classLoader); system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor"); - actorSystem.set(system); + actorSystemReference.set(system); return system; } - - private static final Config readAkkaConfiguration() { - File defaultConfigFile = new File(AKKA_CONF_PATH); - Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing"); - return ConfigFactory.parseFile(defaultConfigFile); - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 770cdec39c..7d67e0856f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -27,6 +27,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry; @@ -104,10 +105,6 @@ public class Shard extends RaftActor { private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this); - // By default persistent will be true and can be turned off using the system - // property shard.persistent - private final boolean persistent; - /// The name of this shard private final ShardIdentifier name; @@ -120,6 +117,8 @@ public class Shard extends RaftActor { private final DatastoreContext datastoreContext; + private final DataPersistenceProvider dataPersistenceProvider; + private SchemaContext schemaContext; private ActorRef createSnapshotTransaction; @@ -148,12 +147,9 @@ public class Shard extends RaftActor { this.name = name; this.datastoreContext = datastoreContext; this.schemaContext = schemaContext; + this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider(); - String setting = System.getProperty("shard.persistent"); - - this.persistent = !"false".equals(setting); - - LOG.info("Shard created : {} persistent : {}", name, persistent); + LOG.info("Shard created : {} persistent : {}", name, datastoreContext.isPersistent()); store = InMemoryDOMDataStoreFactory.create(name.toString(), null, datastoreContext.getDataStoreProperties()); @@ -211,7 +207,7 @@ public class Shard extends RaftActor { } @Override - public void onReceiveRecover(Object message) { + public void onReceiveRecover(Object message) throws Exception { if(LOG.isDebugEnabled()) { LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(), @@ -230,7 +226,7 @@ public class Shard extends RaftActor { } @Override - public void onReceiveCommand(Object message) { + public void onReceiveCommand(Object message) throws Exception { if(LOG.isDebugEnabled()) { LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender()); } @@ -308,12 +304,8 @@ public class Shard extends RaftActor { // currently uses a same thread executor anyway. cohortEntry.getCohort().preCommit().get(); - if(persistent) { - Shard.this.persistData(getSender(), transactionID, - new CompositeModificationPayload(cohortEntry.getModification().toSerializable())); - } else { - Shard.this.finishCommit(getSender(), transactionID); - } + Shard.this.persistData(getSender(), transactionID, + new CompositeModificationPayload(cohortEntry.getModification().toSerializable())); } catch (InterruptedException | ExecutionException e) { LOG.error(e, "An exception occurred while preCommitting transaction {}", cohortEntry.getTransactionID()); @@ -843,6 +835,11 @@ public class Shard extends RaftActor { } } + @Override + protected DataPersistenceProvider persistence() { + return dataPersistenceProvider; + } + @Override protected void onLeaderChanged(String oldLeader, String newLeader) { shardMBean.setLeader(newLeader); } @@ -851,6 +848,11 @@ public class Shard extends RaftActor { return this.name.toString(); } + @VisibleForTesting + DataPersistenceProvider getDataPersistenceProvider() { + return dataPersistenceProvider; + } + private static class ShardCreator implements Creator { private static final long serialVersionUID = 1L; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index e861165c6b..c7213e6012 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.Lists; +import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; @@ -44,6 +45,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; + import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -91,6 +93,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Collection knownModules = new HashSet<>(128); + private final DataPersistenceProvider dataPersistenceProvider; + /** * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be * configuration or operational @@ -102,6 +106,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); this.datastoreContext = datastoreContext; + this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent()); // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); @@ -109,6 +114,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { createLocalShards(); } + protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) { + return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider(); + } + public static Props props(final String type, final ClusterWrapper cluster, final Configuration configuration, @@ -170,18 +179,27 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override protected void handleRecover(Object message) throws Exception { - if(message instanceof SchemaContextModules){ - SchemaContextModules msg = (SchemaContextModules) message; - knownModules.clear(); - knownModules.addAll(msg.getModules()); - } else if(message instanceof RecoveryFailure){ - RecoveryFailure failure = (RecoveryFailure) message; - LOG.error(failure.cause(), "Recovery failed"); - } else if(message instanceof RecoveryCompleted){ - LOG.info("Recovery complete : {}", persistenceId()); - - // Delete all the messages from the akka journal except the last one - deleteMessages(lastSequenceNr() - 1); + if(dataPersistenceProvider.isRecoveryApplicable()) { + if (message instanceof SchemaContextModules) { + SchemaContextModules msg = (SchemaContextModules) message; + knownModules.clear(); + knownModules.addAll(msg.getModules()); + } else if (message instanceof RecoveryFailure) { + RecoveryFailure failure = (RecoveryFailure) message; + LOG.error(failure.cause(), "Recovery failed"); + } else if (message instanceof RecoveryCompleted) { + LOG.info("Recovery complete : {}", persistenceId()); + + // Delete all the messages from the akka journal except the last one + deleteMessages(lastSequenceNr() - 1); + } + } else { + if (message instanceof RecoveryCompleted) { + LOG.info("Recovery complete : {}", persistenceId()); + + // Delete all the messages from the akka journal + deleteMessages(lastSequenceNr()); + } } } @@ -262,15 +280,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { knownModules.clear(); knownModules.addAll(newModules); - persist(new SchemaContextModules(newModules), new Procedure() { + dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure() { @Override public void apply(SchemaContextModules param) throws Exception { LOG.info("Sending new SchemaContext to Shards"); for (ShardInformation info : localShards.values()) { - if(info.getActor() == null) { + if (info.getActor() == null) { info.setActor(getContext().actorOf(Shard.props(info.getShardId(), - info.getPeerAddresses(), datastoreContext, schemaContext), + info.getPeerAddresses(), datastoreContext, schemaContext), info.getShardId().toString())); } else { info.getActor().tell(message, getSelf()); @@ -430,6 +448,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return knownModules; } + @VisibleForTesting + DataPersistenceProvider getDataPersistenceProvider() { + return dataPersistenceProvider; + } + private class ShardInformation { private final ShardIdentifier shardId; private final String shardName; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationReader.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationReader.java new file mode 100644 index 0000000000..12afdbd21b --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationReader.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.config; + +import com.typesafe.config.Config; + +public interface ConfigurationReader { + Config read(); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/FileConfigurationReader.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/FileConfigurationReader.java new file mode 100644 index 0000000000..fb84734119 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/FileConfigurationReader.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.config; + +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import java.io.File; + +public class FileConfigurationReader implements ConfigurationReader{ + + public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf"; + + @Override + public Config read() { + File defaultConfigFile = new File(AKKA_CONF_PATH); + Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing"); + return ConfigFactory.parseFile(defaultConfigFile); + + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ResourceConfigurationReader.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ResourceConfigurationReader.java new file mode 100644 index 0000000000..df17f97a4e --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ResourceConfigurationReader.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.config; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +public class ResourceConfigurationReader implements ConfigurationReader { + @Override + public Config read() { + return ConfigFactory.load(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java index a675b40718..2f3fbdcef1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java @@ -62,6 +62,7 @@ public class DistributedConfigDataStoreProviderModule extends props.getShardTransactionCommitTimeoutInSeconds().getValue().intValue()) .shardTransactionCommitQueueCapacity( props.getShardTransactionCommitQueueCapacity().getValue().intValue()) + .persistent(props.getPersistent().booleanValue()) .build(); return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(), diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java index 21cb7998a5..ecb3a91017 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java @@ -62,6 +62,7 @@ public class DistributedOperationalDataStoreProviderModule extends props.getShardTransactionCommitTimeoutInSeconds().getValue().intValue()) .shardTransactionCommitQueueCapacity( props.getShardTransactionCommitQueueCapacity().getValue().intValue()) + .persistent(props.getPersistent().booleanValue()) .build(); return DistributedDataStoreFactory.createInstance("operational", diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang index ef9da94887..995e98f38f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang @@ -147,6 +147,12 @@ module distributed-datastore-provider { type non-zero-uint32-type; description "Max queue size that an actor's mailbox can reach"; } + + leaf persistent { + default true; + type boolean; + description "Enable or disable data persistence"; + } } // Augments the 'configuration' choice node under modules/module. diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 5a45a9961a..cec7ce1e3f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -6,8 +6,6 @@ import akka.actor.PoisonPill; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Uninterruptibles; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; @@ -36,6 +34,9 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + public class DistributedDataStoreIntegrationTest extends AbstractActorTest { private final DatastoreContext.Builder datastoreContextBuilder = @@ -43,7 +44,6 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testWriteTransactionWithSingleShard() throws Exception{ - System.setProperty("shard.persistent", "true"); new IntegrationTestKit(getSystem()) {{ DistributedDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest", "test-1"); @@ -60,7 +60,6 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testWriteTransactionWithMultipleShards() throws Exception{ - System.setProperty("shard.persistent", "true"); new IntegrationTestKit(getSystem()) {{ DistributedDataStore dataStore = setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index c04dcf1534..f6eb6d7fbe 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -2,18 +2,19 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.Props; +import akka.japi.Creator; import akka.pattern.Patterns; import akka.persistence.RecoveryCompleted; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; -import akka.japi.Creator; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; @@ -33,6 +34,7 @@ import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; + import java.net.URI; import java.util.Collection; import java.util.HashSet; @@ -40,7 +42,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; + import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -347,6 +351,79 @@ public class ShardManagerTest extends AbstractActorTest { }}; } + @Test + public void testRecoveryApplicable(){ + new JavaTestKit(getSystem()) { + { + final Props persistentProps = ShardManager.props(shardMrgIDSuffix, + new MockClusterWrapper(), + new MockConfiguration(), + DatastoreContext.newBuilder().persistent(true).build()); + final TestActorRef persistentShardManager = + TestActorRef.create(getSystem(), persistentProps); + + DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider(); + + assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable()); + + final Props nonPersistentProps = ShardManager.props(shardMrgIDSuffix, + new MockClusterWrapper(), + new MockConfiguration(), + DatastoreContext.newBuilder().persistent(false).build()); + final TestActorRef nonPersistentShardManager = + TestActorRef.create(getSystem(), nonPersistentProps); + + DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider(); + + assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable()); + + + }}; + + } + + @Test + public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider() + throws Exception { + final CountDownLatch persistLatch = new CountDownLatch(1); + final Creator creator = new Creator() { + @Override + public ShardManager create() throws Exception { + return new ShardManager(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build()) { + @Override + protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) { + DataPersistenceProviderMonitor dataPersistenceProviderMonitor + = new DataPersistenceProviderMonitor(); + dataPersistenceProviderMonitor.setPersistLatch(persistLatch); + return dataPersistenceProviderMonitor; + } + }; + } + }; + + new JavaTestKit(getSystem()) {{ + + final TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator))); + + ModuleIdentifier foo = mock(ModuleIdentifier.class); + when(foo.getNamespace()).thenReturn(new URI("foo")); + + Set moduleIdentifierSet = new HashSet<>(); + moduleIdentifierSet.add(foo); + + SchemaContext schemaContext = mock(SchemaContext.class); + when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet); + + shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); + + assertEquals("Persisted", true, + Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS)); + + }}; + } + + private static class TestShardManager extends ShardManager { private final CountDownLatch recoveryComplete = new CountDownLatch(1); @@ -387,4 +464,17 @@ public class ShardManagerTest extends AbstractActorTest { } } + + private static class DelegatingShardManagerCreator implements Creator { + private Creator delegate; + + public DelegatingShardManagerCreator(Creator delegate) { + this.delegate = delegate; + } + + @Override + public ShardManager create() throws Exception { + return delegate.create(); + } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 03a18ea6c3..cd8a658447 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -79,6 +79,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; + import java.io.IOException; import java.util.Collections; import java.util.HashSet; @@ -89,15 +90,18 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; + import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; + public class ShardTest extends AbstractActorTest { @@ -114,8 +118,6 @@ public class ShardTest extends AbstractActorTest { @Before public void setUp() { - System.setProperty("shard.persistent", "false"); - InMemorySnapshotStore.clear(); InMemoryJournal.clear(); } @@ -187,7 +189,7 @@ public class ShardTest extends AbstractActorTest { return new Shard(shardID, Collections.emptyMap(), dataStoreContext, SCHEMA_CONTEXT) { @Override - public void onReceiveCommand(final Object message) { + public void onReceiveCommand(final Object message) throws Exception { if(message instanceof ElectionTimeout && firstElectionTimeout) { // Got the first ElectionTimeout. We don't forward it to the // base Shard yet until we've sent the RegisterChangeListener @@ -306,7 +308,7 @@ public class ShardTest extends AbstractActorTest { } @Test - public void testPeerAddressResolved(){ + public void testPeerAddressResolved() throws Exception { new ShardTestKit(getSystem()) {{ final CountDownLatch recoveryComplete = new CountDownLatch(1); class TestShard extends Shard { @@ -352,7 +354,7 @@ public class ShardTest extends AbstractActorTest { } @Test - public void testApplySnapshot() throws ExecutionException, InterruptedException { + public void testApplySnapshot() throws Exception { TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplySnapshot"); @@ -571,7 +573,6 @@ public class ShardTest extends AbstractActorTest { @SuppressWarnings({ "unchecked" }) @Test public void testConcurrentThreePhaseCommits() throws Throwable { - System.setProperty("shard.persistent", "true"); new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), @@ -915,7 +916,6 @@ public class ShardTest extends AbstractActorTest { @Test public void testAbortBeforeFinishCommit() throws Throwable { - System.setProperty("shard.persistent", "true"); new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), @@ -1196,6 +1196,18 @@ public class ShardTest extends AbstractActorTest { @Test public void testCreateSnapshot() throws IOException, InterruptedException { + testCreateSnapshot(true, "testCreateSnapshot"); + } + + @Test + public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException { + testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData"); + } + + public void testCreateSnapshot(boolean persistent, final String shardActorName) throws IOException, InterruptedException { + final DatastoreContext dataStoreContext = DatastoreContext.newBuilder(). + shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build(); + new ShardTestKit(getSystem()) {{ final AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); Creator creator = new Creator() { @@ -1204,8 +1216,8 @@ public class ShardTest extends AbstractActorTest { return new Shard(shardID, Collections.emptyMap(), dataStoreContext, SCHEMA_CONTEXT) { @Override - public void saveSnapshot(Object snapshot) { - super.saveSnapshot(snapshot); + protected void commitSnapshot(long sequenceNumber) { + super.commitSnapshot(sequenceNumber); latch.get().countDown(); } }; @@ -1213,7 +1225,7 @@ public class ShardTest extends AbstractActorTest { }; TestActorRef shard = TestActorRef.create(getSystem(), - Props.create(new DelegatingShardCreator(creator)), "testCreateSnapshot"); + Props.create(new DelegatingShardCreator(creator)), shardActorName); waitUntilLeader(shard); @@ -1262,6 +1274,41 @@ public class ShardTest extends AbstractActorTest { } + @Test + public void testRecoveryApplicable(){ + + final DatastoreContext persistentContext = DatastoreContext.newBuilder(). + shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build(); + + final Props persistentProps = Shard.props(shardID, Collections.emptyMap(), + persistentContext, SCHEMA_CONTEXT); + + final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder(). + shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build(); + + final Props nonPersistentProps = Shard.props(shardID, Collections.emptyMap(), + nonPersistentContext, SCHEMA_CONTEXT); + + new ShardTestKit(getSystem()) {{ + TestActorRef shard1 = TestActorRef.create(getSystem(), + persistentProps, "testPersistence1"); + + assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + + shard1.tell(PoisonPill.getInstance(), ActorRef.noSender()); + + TestActorRef shard2 = TestActorRef.create(getSystem(), + nonPersistentProps, "testPersistence2"); + + assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + + shard2.tell(PoisonPill.getInstance(), ActorRef.noSender()); + + }}; + + } + + private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException { DOMStoreReadTransaction transaction = store.newReadOnlyTransaction(); CheckedFuture>, ReadFailedException> read = diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java index d08258a2a0..79d5c5116d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java @@ -7,21 +7,22 @@ */ package org.opendaylight.controller.cluster.datastore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.pattern.Patterns; +import akka.testkit.JavaTestKit; +import akka.util.Timeout; +import com.google.common.util.concurrent.Uninterruptibles; import org.junit.Assert; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; -import com.google.common.util.concurrent.Uninterruptibles; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.pattern.Patterns; -import akka.testkit.JavaTestKit; -import akka.util.Timeout; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; class ShardTestKit extends JavaTestKit { @@ -67,4 +68,5 @@ class ShardTestKit extends JavaTestKit { Assert.fail("Leader not found for shard " + shard.path()); } + } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 9e0bba48c1..35f346f0d6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -368,7 +368,6 @@ public class TransactionProxyTest { future.checkedGet(5, TimeUnit.SECONDS); fail("Expected ReadFailedException"); } catch(ReadFailedException e) { - e.printStackTrace(); throw e.getCause(); } } -- 2.36.6