X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTest.java;h=83dae880bad3fceef0133fdf692884ca2297f5c7;hb=refs%2Fchanges%2F78%2F33178%2F3;hp=25e37edf714cc8c6b2c6a6cf33ccc47b854c287f;hpb=1b244fc4c1f40bb663b6763c8deb25850dbbf245;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 25e37edf71..83dae880ba 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -22,7 +22,6 @@ import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; import akka.actor.ActorRef; import akka.actor.ActorSelection; -import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status.Failure; import akka.dispatch.Dispatchers; @@ -33,11 +32,9 @@ import akka.persistence.SaveSnapshotSuccess; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; -import java.io.IOException; import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -49,6 +46,7 @@ import org.junit.Test; import org.mockito.InOrder; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; @@ -60,6 +58,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; @@ -73,8 +72,6 @@ import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateCh import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; -import org.opendaylight.controller.cluster.datastore.modification.Modification; -import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; @@ -94,28 +91,21 @@ import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; +import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; -import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages; -import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; -import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; -import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; @@ -128,14 +118,12 @@ import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; public class ShardTest extends AbstractShardTest { - private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars"); - private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1"; @Test public void testRegisterChangeListener() throws Exception { new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps(), "testRegisterChangeListener"); waitUntilLeader(shard); @@ -143,7 +131,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender()); final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener), + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), "testRegisterChangeListener-DataChangeListener"); shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, @@ -159,9 +147,6 @@ public class ShardTest extends AbstractShardTest { writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); listener.waitForChangeEvents(path); - - dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -222,10 +207,10 @@ public class ShardTest extends AbstractShardTest { }; final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener), + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener"); - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), "testRegisterChangeListenerWhenNotLeaderInitially"); @@ -258,16 +243,13 @@ public class ShardTest extends AbstractShardTest { // Wait for the shard to become the leader and notify our listener with the existing // data in the store. listener.waitForChangeEvents(path); - - dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @Test public void testRegisterDataTreeChangeListener() throws Exception { new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps(), "testRegisterDataTreeChangeListener"); waitUntilLeader(shard); @@ -275,7 +257,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender()); final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); - final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener), + final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener), "testRegisterDataTreeChangeListener-DataTreeChangeListener"); shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef()); @@ -290,9 +272,6 @@ public class ShardTest extends AbstractShardTest { writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); listener.waitForChangeEvents(); - - dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -333,10 +312,10 @@ public class ShardTest extends AbstractShardTest { }; final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); - final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener), + final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener), "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener"); - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration"); @@ -362,54 +341,46 @@ public class ShardTest extends AbstractShardTest { // TODO: investigate why we do not receive data chage events listener.waitForChangeEvents(); - - dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @Test public void testCreateTransaction(){ new ShardTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction"); + final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction"); waitUntilLeader(shard); shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - shard.tell(new CreateTransaction("txn-1", - TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef()); + shard.tell(new CreateTransaction("txn-1", TransactionType.READ_ONLY.ordinal(), null, + DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef()); final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class); - final String path = reply.getTransactionActorPath().toString(); + final String path = reply.getTransactionPath().toString(); assertTrue("Unexpected transaction path " + path, path.contains("akka://test/user/testCreateTransaction/shard-txn-1")); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @Test public void testCreateTransactionOnChain(){ new ShardTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain"); + final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain"); waitUntilLeader(shard); - shard.tell(new CreateTransaction("txn-1", - TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(), - getRef()); + shard.tell(new CreateTransaction("txn-1",TransactionType.READ_ONLY.ordinal(), "foobar", + DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef()); final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class); - final String path = reply.getTransactionActorPath().toString(); + final String path = reply.getTransactionPath().toString(); assertTrue("Unexpected transaction path " + path, path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1")); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -439,7 +410,7 @@ public class ShardTest extends AbstractShardTest { } } - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( Props.create(new DelegatingShardCreator(new Creator() { @Override public TestShard create() throws Exception { @@ -455,20 +426,15 @@ public class ShardTest extends AbstractShardTest { assertEquals("getPeerAddress", address, ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString())); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @Test public void testApplySnapshot() throws Exception { - ShardTestKit testkit = new ShardTestKit(getSystem()); - - final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), - "testApplySnapshot"); + final TestActorRef shard = actorFactory.createTestActor(newShardProps(), "testApplySnapshot"); - testkit.waitUntilLeader(shard); + ShardTestKit.waitUntilLeader(shard); final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); store.setSchemaContext(SCHEMA_CONTEXT); @@ -491,74 +457,27 @@ public class ShardTest extends AbstractShardTest { final NormalizedNode actual = readStore(shard, root); assertEquals("Root node", expected, actual); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } @Test public void testApplyState() throws Exception { + final TestActorRef shard = actorFactory.createTestActor(newShardProps(), "testApplyState"); - ShardTestKit testkit = new ShardTestKit(getSystem()); - - final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState"); - - testkit.waitUntilLeader(shard); + ShardTestKit.waitUntilLeader(shard); - final NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + final DataTree source = setupInMemorySnapshotStore(); + final DataTreeModification writeMod = source.takeSnapshot().newModification(); + ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + writeMod.write(TestModel.TEST_PATH, node); + writeMod.ready(); final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2, - newModificationPayload(new WriteModification(TestModel.TEST_PATH, node)))); + payloadForModification(source, writeMod))); shard.underlyingActor().onReceiveCommand(applyState); final NormalizedNode actual = readStore(shard, TestModel.TEST_PATH); assertEquals("Applied state", node, actual); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); - } - - @Test - public void testApplyStateWithCandidatePayload() throws Exception { - - ShardTestKit testkit = new ShardTestKit(getSystem()); - - final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState"); - - testkit.waitUntilLeader(shard); - - final NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node); - - final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2, - DataTreeCandidatePayload.create(candidate))); - - shard.underlyingActor().onReceiveCommand(applyState); - - final NormalizedNode actual = readStore(shard, TestModel.TEST_PATH); - assertEquals("Applied state", node, actual); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); - } - - DataTree setupInMemorySnapshotStore() throws DataValidationFailedException { - final DataTree testStore = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); - testStore.setSchemaContext(SCHEMA_CONTEXT); - - writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - - final NormalizedNode root = readStore(testStore, YangInstanceIdentifier.builder().build()); - - InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create( - SerializationUtils.serializeNormalizedNode(root), - Collections.emptyList(), 0, 1, -1, -1)); - return testStore; - } - - private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException { - source.validate(mod); - final DataTreeCandidate candidate = source.prepare(mod); - source.commit(candidate); - return DataTreeCandidatePayload.create(candidate); } @Test @@ -597,59 +516,16 @@ public class ShardTest extends AbstractShardTest { testRecovery(listEntryKeys); } - @Test - public void testModicationRecovery() throws Exception { - - // Set up the InMemorySnapshotStore. - setupInMemorySnapshotStore(); - - // Set up the InMemoryJournal. - - InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA); - - InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload( - new WriteModification(TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build())))); - - final int nListEntries = 16; - final Set listEntryKeys = new HashSet<>(); - - // Add some ModificationPayload entries - for(int i = 1; i <= nListEntries; i++) { - listEntryKeys.add(Integer.valueOf(i)); - final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); - final Modification mod = new MergeModification(path, - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); - InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1, - newModificationPayload(mod))); - } - - InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2, - new ApplyJournalEntries(nListEntries)); - - testRecovery(listEntryKeys); - } - - private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException { - final MutableCompositeModification compMod = new MutableCompositeModification(); - for(final Modification mod: mods) { - compMod.addModification(mod); - } - - return new ModificationPayload(compMod); - } - @Test public void testConcurrentThreePhaseCommits() throws Throwable { new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testConcurrentThreePhaseCommits"); waitUntilLeader(shard); - // Setup 3 simulated transactions with mock cohorts backed by real cohorts. + // Setup 3 simulated transactions with mock cohorts backed by real cohorts. final ShardDataTree dataStore = shard.underlyingActor().getDataStore(); @@ -802,31 +678,13 @@ public class ShardTest extends AbstractShardTest { verifyOuterListEntry(shard, 1); verifyLastApplied(shard, 2); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } - private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path, - final NormalizedNode data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) { - return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent); - } - - private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID, - final YangInstanceIdentifier path, final NormalizedNode data, final boolean ready, final boolean doCommitOnReady, - final int messagesSent) { - final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID); - batched.addModification(new WriteModification(path, data)); - batched.setReady(ready); - batched.setDoCommitOnReady(doCommitOnReady); - batched.setTotalMessagesSent(messagesSent); - return batched; - } - @Test public void testBatchedModificationsWithNoCommitOnReady() throws Throwable { new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testBatchedModificationsWithNoCommitOnReady"); @@ -886,15 +744,13 @@ public class ShardTest extends AbstractShardTest { // Verify data in the data store. verifyOuterListEntry(shard, 1); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @Test public void testBatchedModificationsWithCommitOnReady() throws Throwable { new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testBatchedModificationsWithCommitOnReady"); @@ -943,15 +799,13 @@ public class ShardTest extends AbstractShardTest { // Verify data in the data store. verifyOuterListEntry(shard, 1); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @Test(expected=IllegalStateException.class) public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable { new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testBatchedModificationsReadyWithIncorrectTotalMessageCount"); @@ -966,8 +820,6 @@ public class ShardTest extends AbstractShardTest { final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); - if(failure != null) { throw failure.cause(); } @@ -977,7 +829,7 @@ public class ShardTest extends AbstractShardTest { @Test public void testBatchedModificationsWithOperationFailure() throws Throwable { new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testBatchedModificationsWithOperationFailure"); @@ -1007,31 +859,13 @@ public class ShardTest extends AbstractShardTest { failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); assertEquals("Failure cause", cause, failure.cause()); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } - @SuppressWarnings("unchecked") - private static void verifyOuterListEntry(final TestActorRef shard, final Object expIDValue) throws Exception { - final NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); - assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); - assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", - outerList.getValue() instanceof Iterable); - final Object entry = ((Iterable)outerList.getValue()).iterator().next(); - assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode", - entry instanceof MapEntryNode); - final MapEntryNode mapEntry = (MapEntryNode)entry; - final Optional> idLeaf = - mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME)); - assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); - assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue()); - } - @Test public void testBatchedModificationsOnTransactionChain() throws Throwable { new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testBatchedModificationsOnTransactionChain"); @@ -1053,12 +887,13 @@ public class ShardTest extends AbstractShardTest { // Create a read Tx on the same chain. - shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() , - transactionChainID).toSerializable(), getRef()); + shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(), + transactionChainID, DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef()); final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class); - getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef()); + getSystem().actorSelection(createReply.getTransactionPath()).tell( + new ReadData(path, DataStoreVersions.CURRENT_VERSION), getRef()); final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class); assertEquals("Read node", containerNode, readReply.getNormalizedNode()); @@ -1076,8 +911,6 @@ public class ShardTest extends AbstractShardTest { final NormalizedNode actualNode = readStore(shard, path); assertEquals("Stored node", containerNode, actualNode); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -1105,7 +938,7 @@ public class ShardTest extends AbstractShardTest { } }; - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader"); waitUntilLeader(shard); @@ -1117,8 +950,32 @@ public class ShardTest extends AbstractShardTest { shard.tell(batched, ActorRef.noSender()); expectMsgEquals(batched); + }}; + } + + @Test + public void testTransactionMessagesWithNoLeader() { + new ShardTestKit(getSystem()) {{ + dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()). + shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1); + final TestActorRef shard = actorFactory.createTestActor( + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testTransactionMessagesWithNoLeader"); + + waitUntilNoLeader(shard); + + shard.tell(new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, ""), getRef()); + Failure failure = expectMsgClass(Failure.class); + assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass()); + + shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), "tx", + DataStoreVersions.CURRENT_VERSION, true), getRef()); + failure = expectMsgClass(Failure.class); + assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass()); - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + shard.tell(new ReadyLocalTransaction("tx", mock(DataTreeModification.class), true), getRef()); + failure = expectMsgClass(Failure.class); + assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass()); }}; } @@ -1128,9 +985,9 @@ public class ShardTest extends AbstractShardTest { testReadyWithImmediateCommit(false); } - public void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{ + private void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{ new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testReadyWithImmediateCommit-" + readWrite); @@ -1157,15 +1014,13 @@ public class ShardTest extends AbstractShardTest { final NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH); assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @Test public void testReadyLocalTransactionWithImmediateCommit() throws Exception{ new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testReadyLocalTransactionWithImmediateCommit"); @@ -1190,15 +1045,13 @@ public class ShardTest extends AbstractShardTest { final NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH); assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @Test public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{ new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testReadyLocalTransactionWithThreePhaseCommit"); @@ -1235,8 +1088,6 @@ public class ShardTest extends AbstractShardTest { final NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH); assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -1246,10 +1097,10 @@ public class ShardTest extends AbstractShardTest { testCommitWithPersistenceDisabled(false); } - public void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable { + private void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable { dataStoreContextBuilder.persistent(false); new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testCommitWithPersistenceDisabled-" + readWrite); @@ -1289,42 +1140,21 @@ public class ShardTest extends AbstractShardTest { final NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH); assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } - private static DataTreeCandidateTip mockCandidate(final String name) { - final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name); - final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node"); - doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType(); - doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter(); - doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath(); - doReturn(mockCandidateNode).when(mockCandidate).getRootNode(); - return mockCandidate; - } - - private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) { - final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name); - final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node"); - doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType(); - doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath(); - doReturn(mockCandidateNode).when(mockCandidate).getRootNode(); - return mockCandidate; - } - @Test public void testCommitWhenTransactionHasNoModifications() { testCommitWhenTransactionHasNoModifications(true); testCommitWhenTransactionHasNoModifications(false); } - public void testCommitWhenTransactionHasNoModifications(final boolean readWrite){ + private void testCommitWhenTransactionHasNoModifications(final boolean readWrite){ // Note that persistence is enabled which would normally result in the entry getting written to the journal // but here that need not happen new ShardTestKit(getSystem()) { { - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testCommitWhenTransactionHasNoModifications-" + readWrite); @@ -1367,9 +1197,6 @@ public class ShardTest extends AbstractShardTest { // Commit index should not advance because this does not go into the journal assertEquals(-1, shardStats.getCommitIndex()); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); - } }; } @@ -1380,10 +1207,10 @@ public class ShardTest extends AbstractShardTest { testCommitWhenTransactionHasModifications(false); } - public void testCommitWhenTransactionHasModifications(final boolean readWrite){ + private void testCommitWhenTransactionHasModifications(final boolean readWrite){ new ShardTestKit(getSystem()) { { - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testCommitWhenTransactionHasModifications-" + readWrite); @@ -1427,9 +1254,6 @@ public class ShardTest extends AbstractShardTest { // Commit index should advance as we do not have an empty modification assertEquals(0, shardStats.getCommitIndex()); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); - } }; } @@ -1440,9 +1264,9 @@ public class ShardTest extends AbstractShardTest { testCommitPhaseFailure(false); } - public void testCommitPhaseFailure(final boolean readWrite) throws Throwable { + private void testCommitPhaseFailure(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testCommitPhaseFailure-" + readWrite); @@ -1509,8 +1333,6 @@ public class ShardTest extends AbstractShardTest { inOrder.verify(cohort1).preCommit(); inOrder.verify(cohort1).commit(); inOrder.verify(cohort2).canCommit(); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -1520,9 +1342,9 @@ public class ShardTest extends AbstractShardTest { testPreCommitPhaseFailure(false); } - public void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable { + private void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testPreCommitPhaseFailure-" + readWrite); @@ -1583,8 +1405,6 @@ public class ShardTest extends AbstractShardTest { inOrder.verify(cohort1).canCommit(); inOrder.verify(cohort1).preCommit(); inOrder.verify(cohort2).canCommit(); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -1594,9 +1414,9 @@ public class ShardTest extends AbstractShardTest { testCanCommitPhaseFailure(false); } - public void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable { + private void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testCanCommitPhaseFailure-" + readWrite); @@ -1631,8 +1451,6 @@ public class ShardTest extends AbstractShardTest { final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable( expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS)); assertEquals("getCanCommit", true, reply.getCanCommit()); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -1642,9 +1460,9 @@ public class ShardTest extends AbstractShardTest { testCanCommitPhaseFalseResponse(false); } - public void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable { + private void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testCanCommitPhaseFalseResponse-" + readWrite); @@ -1681,8 +1499,6 @@ public class ShardTest extends AbstractShardTest { reply = CanCommitTransactionReply.fromSerializable( expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS)); assertEquals("getCanCommit", true, reply.getCanCommit()); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -1692,9 +1508,9 @@ public class ShardTest extends AbstractShardTest { testImmediateCommitWithCanCommitPhaseFailure(false); } - public void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable { + private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite); @@ -1728,8 +1544,6 @@ public class ShardTest extends AbstractShardTest { shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef()); expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -1739,9 +1553,9 @@ public class ShardTest extends AbstractShardTest { testImmediateCommitWithCanCommitPhaseFalseResponse(false); } - public void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable { + private void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite); @@ -1775,8 +1589,6 @@ public class ShardTest extends AbstractShardTest { shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef()); expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -1786,9 +1598,9 @@ public class ShardTest extends AbstractShardTest { testAbortBeforeFinishCommit(false); } - public void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable { + private void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortBeforeFinishCommit-" + readWrite); @@ -1840,8 +1652,6 @@ public class ShardTest extends AbstractShardTest { // the data should still get written to the in-memory store since we've gotten past // canCommit and preCommit and persisted the data. assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -1851,11 +1661,11 @@ public class ShardTest extends AbstractShardTest { testTransactionCommitTimeout(false); } - public void testTransactionCommitTimeout(final boolean readWrite) throws Throwable { + private void testTransactionCommitTimeout(final boolean readWrite) throws Throwable { dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1); new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testTransactionCommitTimeout-" + readWrite); @@ -1920,8 +1730,6 @@ public class ShardTest extends AbstractShardTest { final NormalizedNode node = readStore(shard, listNodePath); assertNotNull(listNodePath + " not found", node); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -1930,7 +1738,7 @@ public class ShardTest extends AbstractShardTest { dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2); new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testTransactionCommitQueueCapacityExceeded"); @@ -1983,8 +1791,6 @@ public class ShardTest extends AbstractShardTest { shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef()); expectMsgClass(duration, akka.actor.Status.Failure.class); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -1993,7 +1799,7 @@ public class ShardTest extends AbstractShardTest { dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1); new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testTransactionCommitWithPriorExpiredCohortEntries"); @@ -2032,8 +1838,6 @@ public class ShardTest extends AbstractShardTest { shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef()); expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -2042,7 +1846,7 @@ public class ShardTest extends AbstractShardTest { dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1); new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testTransactionCommitWithSubsequentExpiredCohortEntry"); @@ -2098,22 +1902,18 @@ public class ShardTest extends AbstractShardTest { final NormalizedNode node = readStore(shard, TestModel.TEST2_PATH); assertNotNull(TestModel.TEST2_PATH + " not found", node); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @Test public void testCanCommitBeforeReadyFailure() throws Throwable { new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testCanCommitBeforeReadyFailure"); shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef()); expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -2123,9 +1923,9 @@ public class ShardTest extends AbstractShardTest { testAbortCurrentTransaction(false); } - public void testAbortCurrentTransaction(final boolean readWrite) throws Throwable { + private void testAbortCurrentTransaction(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortCurrentTransaction-" + readWrite); @@ -2179,8 +1979,6 @@ public class ShardTest extends AbstractShardTest { final InOrder inOrder = inOrder(cohort1, cohort2); inOrder.verify(cohort1).canCommit(); inOrder.verify(cohort2).canCommit(); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -2190,7 +1988,7 @@ public class ShardTest extends AbstractShardTest { testAbortQueuedTransaction(false); } - public void testAbortQueuedTransaction(final boolean readWrite) throws Throwable { + private void testAbortQueuedTransaction(final boolean readWrite) throws Throwable { dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1); new ShardTestKit(getSystem()) {{ final AtomicReference cleaupCheckLatch = new AtomicReference<>(); @@ -2212,7 +2010,7 @@ public class ShardTest extends AbstractShardTest { } }; - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( Props.create(new DelegatingShardCreator(creator)).withDispatcher( Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite); @@ -2254,8 +2052,6 @@ public class ShardTest extends AbstractShardTest { Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause(); assertTrue("Failure type", failure instanceof IllegalStateException); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -2270,7 +2066,7 @@ public class ShardTest extends AbstractShardTest { } @SuppressWarnings("serial") - public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{ + private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{ final AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); @@ -2319,7 +2115,7 @@ public class ShardTest extends AbstractShardTest { } }; - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( Props.create(new DelegatingShardCreator(creator)), shardActorName); waitUntilLeader(shard); @@ -2334,31 +2130,26 @@ public class ShardTest extends AbstractShardTest { raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1); awaitAndValidateSnapshot(expectedRoot); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } - private void awaitAndValidateSnapshot(NormalizedNode expectedRoot - ) throws InterruptedException { - System.out.println("Inside awaitAndValidateSnapshot {}" + savedSnapshot.get()); - assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); + private void awaitAndValidateSnapshot(NormalizedNode expectedRoot) throws InterruptedException { + assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); - assertTrue("Invalid saved snapshot " + savedSnapshot.get(), - savedSnapshot.get() instanceof Snapshot); + assertTrue("Invalid saved snapshot " + savedSnapshot.get(), + savedSnapshot.get() instanceof Snapshot); - verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot); + verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot); - latch.set(new CountDownLatch(1)); - savedSnapshot.set(null); - } + latch.set(new CountDownLatch(1)); + savedSnapshot.set(null); + } - private void verifySnapshot(final Snapshot snapshot, final NormalizedNode expectedRoot) { + private void verifySnapshot(final Snapshot snapshot, final NormalizedNode expectedRoot) { - final NormalizedNode actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState()); - assertEquals("Root node", expectedRoot, actual); + final NormalizedNode actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState()); + assertEquals("Root node", expectedRoot, actual); - } - }; + }}; } /** @@ -2407,22 +2198,14 @@ public class ShardTest extends AbstractShardTest { schemaContext(SCHEMA_CONTEXT).props(); new ShardTestKit(getSystem()) {{ - final TestActorRef shard1 = TestActorRef.create(getSystem(), - persistentProps, "testPersistence1"); + final TestActorRef shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1"); assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable()); - shard1.tell(PoisonPill.getInstance(), ActorRef.noSender()); - - final TestActorRef shard2 = TestActorRef.create(getSystem(), - nonPersistentProps, "testPersistence2"); + final TestActorRef shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2"); assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable()); - - shard2.tell(PoisonPill.getInstance(), ActorRef.noSender()); - }}; - } @Test @@ -2430,7 +2213,7 @@ public class ShardTest extends AbstractShardTest { new ShardTestKit(getSystem()) {{ dataStoreContextBuilder.persistent(true); - final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext"); + final TestActorRef shard = actorFactory.createTestActor(newShardProps(), "testOnDatastoreContext"); assertEquals("isRecoveryApplicable", true, shard.underlyingActor().persistence().isRecoveryApplicable()); @@ -2446,8 +2229,6 @@ public class ShardTest extends AbstractShardTest { assertEquals("isRecoveryApplicable", true, shard.underlyingActor().persistence().isRecoveryApplicable()); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -2455,7 +2236,7 @@ public class ShardTest extends AbstractShardTest { public void testRegisterRoleChangeListener() throws Exception { new ShardTestKit(getSystem()) { { - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testRegisterRoleChangeListener"); @@ -2485,15 +2266,13 @@ public class ShardTest extends AbstractShardTest { ShardLeaderStateChanged.class); assertEquals("getLocalShardDataTree present", false, leaderStateChanged.getLocalShardDataTree().isPresent()); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } }; } @Test public void testFollowerInitialSyncStatus() throws Exception { - final TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testFollowerInitialSyncStatus"); @@ -2504,14 +2283,6 @@ public class ShardTest extends AbstractShardTest { shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational")); assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus()); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); - } - - private static void commitTransaction(final DataTree store, final DataTreeModification modification) throws DataValidationFailedException { - modification.ready(); - store.validate(modification); - store.commit(store.prepare(modification)); } @Test @@ -2664,7 +2435,7 @@ public class ShardTest extends AbstractShardTest { @Test public void testServerRemoved() throws Exception { - final TestActorRef parent = TestActorRef.create(getSystem(), MessageCollectorActor.props()); + final TestActorRef parent = actorFactory.createTestActor(MessageCollectorActor.props()); final ActorRef shard = parent.underlyingActor().context().actorOf( newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()), @@ -2673,7 +2444,5 @@ public class ShardTest extends AbstractShardTest { shard.tell(new ServerRemoved("test"), ActorRef.noSender()); MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class); - } - }