X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTest.java;h=0121a15338a2ed807e910a793e6655a062ba35de;hb=7ec296abdea94bbdc336276731ee545a2fb13e7c;hp=28330185872b3231be213b6115696a75ddc7b64f;hpb=5aa58404a8ee1ad053742780439823309360a3a1;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 2833018587..0121a15338 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -37,10 +37,8 @@ import com.google.common.base.Optional; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; -import java.io.IOException; import java.util.Collections; import java.util.HashSet; -import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -50,6 +48,7 @@ import org.junit.Test; import org.mockito.InOrder; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; @@ -61,7 +60,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; -import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; @@ -76,7 +74,6 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.Modification; -import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; @@ -95,6 +92,8 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.messages.RequestVote; +import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; +import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; @@ -116,14 +115,13 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; +import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -190,8 +188,7 @@ public class ShardTest extends AbstractShardTest { // this will cause all other messages to not be queued properly after that. // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when // it does do a persist) - return new Shard(shardID, Collections.emptyMap(), - dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { + return new Shard(newShardBuilder()) { @Override public void onReceiveCommand(final Object message) throws Exception { if(message instanceof ElectionTimeout && firstElectionTimeout) { @@ -280,7 +277,7 @@ public class ShardTest extends AbstractShardTest { final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener), "testRegisterDataTreeChangeListener-DataTreeChangeListener"); - shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef()); + shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef()); final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"), RegisterDataTreeChangeListenerReply.class); @@ -309,8 +306,8 @@ public class ShardTest extends AbstractShardTest { @Override public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), - dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { + return new Shard(Shard.builder().id(shardID).datastoreContext( + dataStoreContextBuilder.persistent(false).build()).schemaContext(SCHEMA_CONTEXT)) { @Override public void onReceiveCommand(final Object message) throws Exception { if(message instanceof ElectionTimeout && firstElectionTimeout) { @@ -348,7 +345,7 @@ public class ShardTest extends AbstractShardTest { assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); - shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef()); + shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef()); final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), RegisterDataTreeChangeListenerReply.class); assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); @@ -422,8 +419,9 @@ public class ShardTest extends AbstractShardTest { final CountDownLatch recoveryComplete = new CountDownLatch(1); class TestShard extends Shard { TestShard() { - super(shardID, Collections.singletonMap(shardID.toString(), null), - newDatastoreContext(), SCHEMA_CONTEXT); + super(Shard.builder().id(shardID).datastoreContext(newDatastoreContext()). + peerAddresses(Collections.singletonMap(shardID.toString(), null)). + schemaContext(SCHEMA_CONTEXT)); } String getPeerAddress(String id) { @@ -464,14 +462,12 @@ public class ShardTest extends AbstractShardTest { @Test public void testApplySnapshot() throws Exception { - ShardTestKit testkit = new ShardTestKit(getSystem()); - final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplySnapshot"); - testkit.waitUntilLeader(shard); + ShardTestKit.waitUntilLeader(shard); - final DataTree store = InMemoryDataTreeFactory.getInstance().create(); + final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); store.setSchemaContext(SCHEMA_CONTEXT); final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier( @@ -498,40 +494,14 @@ public class ShardTest extends AbstractShardTest { @Test public void testApplyState() throws Exception { - - ShardTestKit testkit = new ShardTestKit(getSystem()); - final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState"); - testkit.waitUntilLeader(shard); + ShardTestKit.waitUntilLeader(shard); final NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2, - newModificationPayload(new WriteModification(TestModel.TEST_PATH, node)))); - - shard.underlyingActor().onReceiveCommand(applyState); - - final NormalizedNode actual = readStore(shard, TestModel.TEST_PATH); - assertEquals("Applied state", node, actual); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); - } - - @Test - public void testApplyStateWithCandidatePayload() throws Exception { - - ShardTestKit testkit = new ShardTestKit(getSystem()); - - final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState"); - - testkit.waitUntilLeader(shard); - - final NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node); - - final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2, - DataTreeCandidatePayload.create(candidate))); + newDataTreeCandidatePayload(new WriteModification(TestModel.TEST_PATH, node)))); shard.underlyingActor().onReceiveCommand(applyState); @@ -542,7 +512,7 @@ public class ShardTest extends AbstractShardTest { } DataTree setupInMemorySnapshotStore() throws DataValidationFailedException { - final DataTree testStore = InMemoryDataTreeFactory.getInstance().create(); + final DataTree testStore = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); testStore.setSchemaContext(SCHEMA_CONTEXT); writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -550,8 +520,8 @@ public class ShardTest extends AbstractShardTest { final NormalizedNode root = readStore(testStore, YangInstanceIdentifier.builder().build()); InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create( - SerializationUtils.serializeNormalizedNode(root), - Collections.emptyList(), 0, 1, -1, -1)); + SerializationUtils.serializeNormalizedNode(root), + Collections.emptyList(), 0, 1, -1, -1)); return testStore; } @@ -608,9 +578,13 @@ public class ShardTest extends AbstractShardTest { InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA); - InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload( - new WriteModification(TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build())))); + ShardDataTree shardDataTree = new ShardDataTree(SCHEMA_CONTEXT, TreeType.CONFIGURATION); + + InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newDataTreeCandidatePayload( + shardDataTree, + new WriteModification(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)), + new WriteModification(TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build())))); final int nListEntries = 16; final Set listEntryKeys = new HashSet<>(); @@ -623,7 +597,7 @@ public class ShardTest extends AbstractShardTest { final Modification mod = new MergeModification(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1, - newModificationPayload(mod))); + newDataTreeCandidatePayload(shardDataTree, mod))); } InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2, @@ -632,13 +606,18 @@ public class ShardTest extends AbstractShardTest { testRecovery(listEntryKeys); } - private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException { - final MutableCompositeModification compMod = new MutableCompositeModification(); + private static DataTreeCandidatePayload newDataTreeCandidatePayload(final Modification... mods) throws Exception { + return newDataTreeCandidatePayload(new ShardDataTree(SCHEMA_CONTEXT, TreeType.CONFIGURATION), mods); + } + + private static DataTreeCandidatePayload newDataTreeCandidatePayload(ShardDataTree shardDataTree, + final Modification... mods) throws Exception { + DataTreeModification dataTreeModification = shardDataTree.newModification(); for(final Modification mod: mods) { - compMod.addModification(mod); + mod.apply(dataTreeModification); } - return new ModificationPayload(compMod); + return DataTreeCandidatePayload.create(shardDataTree.commit(dataTreeModification)); } @Test @@ -678,11 +657,7 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS); final Timeout timeout = new Timeout(duration); - // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( expectMsgClass(duration, ReadyTransactionReply.class)); assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); @@ -694,14 +669,10 @@ public class ShardTest extends AbstractShardTest { expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); - // Send the ForwardedReadyTransaction for the next 2 Tx's. - - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, - cohort3, modification3, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and @@ -1099,8 +1070,7 @@ public class ShardTest extends AbstractShardTest { @Override public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), - newDatastoreContext(), SCHEMA_CONTEXT) { + return new Shard(newShardBuilder()) { @Override protected boolean isLeader() { return overrideLeaderCalls.get() ? false : super.isLeader(); @@ -1133,11 +1103,42 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{ + public void testTransactionMessagesWithNoLeader() { + new ShardTestKit(getSystem()) {{ + dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()). + shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1); + final TestActorRef shard = actorFactory.createTestActor( + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testTransactionMessagesWithNoLeader"); + + waitUntilNoLeader(shard); + + shard.tell(new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, ""), getRef()); + Failure failure = expectMsgClass(Failure.class); + assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass()); + + shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), "tx", + DataStoreVersions.CURRENT_VERSION, true), getRef()); + failure = expectMsgClass(Failure.class); + assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass()); + + shard.tell(new ReadyLocalTransaction("tx", mock(DataTreeModification.class), true), getRef()); + failure = expectMsgClass(Failure.class); + assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass()); + }}; + } + + @Test + public void testReadyWithImmediateCommit() throws Exception{ + testReadyWithImmediateCommit(true); + testReadyWithImmediateCommit(false); + } + + public void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{ new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testForwardedReadyTransactionWithImmediateCommit"); + "testReadyWithImmediateCommit-" + readWrite); waitUntilLeader(shard); @@ -1151,11 +1152,7 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true, true), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef()); expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class); @@ -1182,7 +1179,7 @@ public class ShardTest extends AbstractShardTest { final ShardDataTree dataStore = shard.underlyingActor().getDataStore(); - final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification(); + final DataTreeModification modification = dataStore.newModification(); final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME); new WriteModification(TestModel.TEST_PATH, writeData).apply(modification); @@ -1215,7 +1212,7 @@ public class ShardTest extends AbstractShardTest { final ShardDataTree dataStore = shard.underlyingActor().getDataStore(); - final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification(); + final DataTreeModification modification = dataStore.newModification(); final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME); new WriteModification(TestModel.TEST_PATH, writeData).apply(modification); @@ -1251,11 +1248,16 @@ public class ShardTest extends AbstractShardTest { @Test public void testCommitWithPersistenceDisabled() throws Throwable { + testCommitWithPersistenceDisabled(true); + testCommitWithPersistenceDisabled(false); + } + + public void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable { dataStoreContextBuilder.persistent(false); new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCommitWithPersistenceDisabled"); + "testCommitWithPersistenceDisabled-" + readWrite); waitUntilLeader(shard); @@ -1271,11 +1273,7 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -1322,14 +1320,19 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testCommitWhenTransactionHasNoModifications(){ + public void testCommitWhenTransactionHasNoModifications() { + testCommitWhenTransactionHasNoModifications(true); + testCommitWhenTransactionHasNoModifications(false); + } + + public void testCommitWhenTransactionHasNoModifications(final boolean readWrite){ // Note that persistence is enabled which would normally result in the entry getting written to the journal // but here that need not happen new ShardTestKit(getSystem()) { { final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCommitWhenTransactionHasNoModifications"); + "testCommitWhenTransactionHasNoModifications-" + readWrite); waitUntilLeader(shard); @@ -1343,11 +1346,7 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -1382,12 +1381,17 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testCommitWhenTransactionHasModifications(){ + public void testCommitWhenTransactionHasModifications() { + testCommitWhenTransactionHasModifications(true); + testCommitWhenTransactionHasModifications(false); + } + + public void testCommitWhenTransactionHasModifications(final boolean readWrite){ new ShardTestKit(getSystem()) { { final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCommitWhenTransactionHasModifications"); + "testCommitWhenTransactionHasModifications-" + readWrite); waitUntilLeader(shard); @@ -1402,11 +1406,7 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -1442,10 +1442,15 @@ public class ShardTest extends AbstractShardTest { @Test public void testCommitPhaseFailure() throws Throwable { + testCommitPhaseFailure(true); + testCommitPhaseFailure(false); + } + + public void testCommitPhaseFailure(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCommitPhaseFailure"); + "testCommitPhaseFailure-" + readWrite); waitUntilLeader(shard); @@ -1468,15 +1473,10 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); final Timeout timeout = new Timeout(duration); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the first Tx. @@ -1522,10 +1522,15 @@ public class ShardTest extends AbstractShardTest { @Test public void testPreCommitPhaseFailure() throws Throwable { + testPreCommitPhaseFailure(true); + testPreCommitPhaseFailure(false); + } + + public void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testPreCommitPhaseFailure"); + "testPreCommitPhaseFailure-" + readWrite); waitUntilLeader(shard); @@ -1543,15 +1548,10 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); final Timeout timeout = new Timeout(duration); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the first Tx. @@ -1596,10 +1596,15 @@ public class ShardTest extends AbstractShardTest { @Test public void testCanCommitPhaseFailure() throws Throwable { + testCanCommitPhaseFailure(true); + testCanCommitPhaseFailure(false); + } + + public void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCanCommitPhaseFailure"); + "testCanCommitPhaseFailure-" + readWrite); waitUntilLeader(shard); @@ -1610,11 +1615,7 @@ public class ShardTest extends AbstractShardTest { final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -1629,8 +1630,7 @@ public class ShardTest extends AbstractShardTest { final String transactionID2 = "tx2"; doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef()); @@ -1644,10 +1644,15 @@ public class ShardTest extends AbstractShardTest { @Test public void testCanCommitPhaseFalseResponse() throws Throwable { + testCanCommitPhaseFalseResponse(true); + testCanCommitPhaseFalseResponse(false); + } + + public void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCanCommitPhaseFalseResponse"); + "testCanCommitPhaseFalseResponse-" + readWrite); waitUntilLeader(shard); @@ -1658,11 +1663,7 @@ public class ShardTest extends AbstractShardTest { final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit(); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -1679,8 +1680,7 @@ public class ShardTest extends AbstractShardTest { final String transactionID2 = "tx2"; doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef()); @@ -1694,10 +1694,15 @@ public class ShardTest extends AbstractShardTest { @Test public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable { + testImmediateCommitWithCanCommitPhaseFailure(true); + testImmediateCommitWithCanCommitPhaseFailure(false); + } + + public void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testImmediateCommitWithCanCommitPhaseFailure"); + "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite); waitUntilLeader(shard); @@ -1708,11 +1713,7 @@ public class ShardTest extends AbstractShardTest { final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort, modification, true, true), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef()); expectMsgClass(duration, akka.actor.Status.Failure.class); @@ -1730,8 +1731,7 @@ public class ShardTest extends AbstractShardTest { doReturn(candidateRoot).when(candidate).getRootNode(); doReturn(candidate).when(cohort).getCandidate(); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort, modification, true, true), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef()); expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); @@ -1741,10 +1741,15 @@ public class ShardTest extends AbstractShardTest { @Test public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable { + testImmediateCommitWithCanCommitPhaseFalseResponse(true); + testImmediateCommitWithCanCommitPhaseFalseResponse(false); + } + + public void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testImmediateCommitWithCanCommitPhaseFalseResponse"); + "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite); waitUntilLeader(shard); @@ -1755,11 +1760,7 @@ public class ShardTest extends AbstractShardTest { final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit(); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true, true), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef()); expectMsgClass(duration, akka.actor.Status.Failure.class); @@ -1777,8 +1778,7 @@ public class ShardTest extends AbstractShardTest { doReturn(candidateRoot).when(candidate).getRootNode(); doReturn(candidate).when(cohort).getCandidate(); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort, modification, true, true), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef()); expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); @@ -1788,10 +1788,15 @@ public class ShardTest extends AbstractShardTest { @Test public void testAbortBeforeFinishCommit() throws Throwable { + testAbortBeforeFinishCommit(true); + testAbortBeforeFinishCommit(false); + } + + public void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testAbortBeforeFinishCommit"); + "testAbortBeforeFinishCommit-" + readWrite); waitUntilLeader(shard); @@ -1824,8 +1829,7 @@ public class ShardTest extends AbstractShardTest { TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification, preCommit); - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); @@ -1849,12 +1853,17 @@ public class ShardTest extends AbstractShardTest { @Test public void testTransactionCommitTimeout() throws Throwable { + testTransactionCommitTimeout(true); + testTransactionCommitTimeout(false); + } + + public void testTransactionCommitTimeout(final boolean readWrite) throws Throwable { dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1); new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testTransactionCommitTimeout"); + "testTransactionCommitTimeout-" + readWrite); waitUntilLeader(shard); @@ -1889,12 +1898,10 @@ public class ShardTest extends AbstractShardTest { // Ready the Tx's - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // canCommit 1st Tx. We don't send the commit so it should timeout. @@ -1958,18 +1965,15 @@ public class ShardTest extends AbstractShardTest { // Ready the Tx's - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // The 3rd Tx should exceed queue capacity and fail. - shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, - cohort3, modification3, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef()); expectMsgClass(duration, akka.actor.Status.Failure.class); // canCommit 1st Tx. @@ -2010,8 +2014,7 @@ public class ShardTest extends AbstractShardTest { final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); final String transactionID2 = "tx2"; @@ -2019,8 +2022,7 @@ public class ShardTest extends AbstractShardTest { final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); final String transactionID3 = "tx3"; @@ -2028,8 +2030,7 @@ public class ShardTest extends AbstractShardTest { final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3); - shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, - cohort3, modification3, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // All Tx's are readied. We'll send canCommit for the last one but not the others. The others @@ -2062,8 +2063,7 @@ public class ShardTest extends AbstractShardTest { final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // CanCommit the first one so it's the current in-progress CohortEntry. @@ -2078,14 +2078,13 @@ public class ShardTest extends AbstractShardTest { final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Ready the third Tx. final String transactionID3 = "tx3"; - final DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification(); + final DataTreeModification modification3 = dataStore.newModification(); new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME)) .apply(modification3); modification3.ready(); @@ -2126,10 +2125,15 @@ public class ShardTest extends AbstractShardTest { @Test public void testAbortCurrentTransaction() throws Throwable { + testAbortCurrentTransaction(true); + testAbortCurrentTransaction(false); + } + + public void testAbortCurrentTransaction(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testAbortCurrentTransaction"); + "testAbortCurrentTransaction-" + readWrite); waitUntilLeader(shard); @@ -2149,15 +2153,10 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); final Timeout timeout = new Timeout(duration); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the first Tx. @@ -2193,6 +2192,11 @@ public class ShardTest extends AbstractShardTest { @Test public void testAbortQueuedTransaction() throws Throwable { + testAbortQueuedTransaction(true); + testAbortQueuedTransaction(false); + } + + public void testAbortQueuedTransaction(final boolean readWrite) throws Throwable { dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1); new ShardTestKit(getSystem()) {{ final AtomicReference cleaupCheckLatch = new AtomicReference<>(); @@ -2200,8 +2204,7 @@ public class ShardTest extends AbstractShardTest { final Creator creator = new Creator() { @Override public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), - dataStoreContextBuilder.build(), SCHEMA_CONTEXT) { + return new Shard(newShardBuilder()) { @Override public void onReceiveCommand(final Object message) throws Exception { super.onReceiveCommand(message); @@ -2217,7 +2220,7 @@ public class ShardTest extends AbstractShardTest { final TestActorRef shard = TestActorRef.create(getSystem(), Props.create(new DelegatingShardCreator(creator)).withDispatcher( - Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction"); + Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite); waitUntilLeader(shard); @@ -2231,8 +2234,7 @@ public class ShardTest extends AbstractShardTest { // Ready the tx. - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize()); @@ -2296,9 +2298,8 @@ public class ShardTest extends AbstractShardTest { new ShardTestKit(getSystem()) {{ class TestShard extends Shard { - protected TestShard(final ShardIdentifier name, final Map peerAddresses, - final DatastoreContext datastoreContext, final SchemaContext schemaContext) { - super(name, peerAddresses, datastoreContext, schemaContext); + protected TestShard(AbstractBuilder builder) { + super(builder); setPersistence(new TestPersistentDataProvider(super.persistence())); } @@ -2320,8 +2321,7 @@ public class ShardTest extends AbstractShardTest { final Creator creator = new Creator() { @Override public Shard create() throws Exception { - return new TestShard(shardID, Collections.emptyMap(), - newDatastoreContext(), SCHEMA_CONTEXT); + return new TestShard(newShardBuilder()); } }; @@ -2329,7 +2329,6 @@ public class ShardTest extends AbstractShardTest { Props.create(new DelegatingShardCreator(creator)), shardActorName); waitUntilLeader(shard); - writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); final NormalizedNode expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build()); @@ -2337,35 +2336,35 @@ public class ShardTest extends AbstractShardTest { // Trigger creation of a snapshot by ensuring final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext(); raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1); - - assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); - - assertTrue("Invalid saved snapshot " + savedSnapshot.get(), - savedSnapshot.get() instanceof Snapshot); - - verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot); - - latch.set(new CountDownLatch(1)); - savedSnapshot.set(null); + awaitAndValidateSnapshot(expectedRoot); raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1); + awaitAndValidateSnapshot(expectedRoot); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } - assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); + private void awaitAndValidateSnapshot(NormalizedNode expectedRoot + ) throws InterruptedException { + System.out.println("Inside awaitAndValidateSnapshot {}" + savedSnapshot.get()); + assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); - assertTrue("Invalid saved snapshot " + savedSnapshot.get(), - savedSnapshot.get() instanceof Snapshot); + assertTrue("Invalid saved snapshot " + savedSnapshot.get(), + savedSnapshot.get() instanceof Snapshot); - verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot); + verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot); - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); - } + latch.set(new CountDownLatch(1)); + savedSnapshot.set(null); + } - private void verifySnapshot(final Snapshot snapshot, final NormalizedNode expectedRoot) { + private void verifySnapshot(final Snapshot snapshot, final NormalizedNode expectedRoot) { - final NormalizedNode actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState()); - assertEquals("Root node", expectedRoot, actual); + final NormalizedNode actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState()); + assertEquals("Root node", expectedRoot, actual); - }}; + } + }; } /** @@ -2375,7 +2374,7 @@ public class ShardTest extends AbstractShardTest { */ @Test public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException { - final DataTree store = InMemoryDataTreeFactory.getInstance().create(); + final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); store.setSchemaContext(SCHEMA_CONTEXT); final DataTreeModification putTransaction = store.takeSnapshot().newModification(); @@ -2404,14 +2403,14 @@ public class ShardTest extends AbstractShardTest { final DatastoreContext persistentContext = DatastoreContext.newBuilder(). shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build(); - final Props persistentProps = Shard.props(shardID, Collections.emptyMap(), - persistentContext, SCHEMA_CONTEXT); + final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext). + schemaContext(SCHEMA_CONTEXT).props(); final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder(). shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build(); - final Props nonPersistentProps = Shard.props(shardID, Collections.emptyMap(), - nonPersistentContext, SCHEMA_CONTEXT); + final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext). + schemaContext(SCHEMA_CONTEXT).props(); new ShardTestKit(getSystem()) {{ final TestActorRef shard1 = TestActorRef.create(getSystem(), @@ -2522,142 +2521,165 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testClusteredDataChangeListernerDelayedRegistration() throws Exception { + public void testClusteredDataChangeListenerDelayedRegistration() throws Exception { new ShardTestKit(getSystem()) {{ - final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1); - final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1); - final Creator creator = new Creator() { - boolean firstElectionTimeout = true; - - @Override - public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), - dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { - @Override - public void onReceiveCommand(final Object message) throws Exception { - if(message instanceof ElectionTimeout && firstElectionTimeout) { - firstElectionTimeout = false; - final ActorRef self = getSelf(); - new Thread() { - @Override - public void run() { - Uninterruptibles.awaitUninterruptibly( - onChangeListenerRegistered, 5, TimeUnit.SECONDS); - self.tell(message, self); - } - }.start(); - - onFirstElectionTimeout.countDown(); - } else { - super.onReceiveCommand(message); - } - } - }; - } - }; + String testName = "testClusteredDataChangeListenerDelayedRegistration"; + dataStoreContextBuilder.shardElectionTimeoutFactor(1000); final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener), - "testDataChangeListenerOnFollower-DataChangeListener"); - - final TestActorRef shard = TestActorRef.create(getSystem(), - Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()). - withDispatcher(Dispatchers.DefaultDispatcherId()),"testDataChangeListenerOnFollower"); + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), + actorFactory.generateActorId(testName + "-DataChangeListener")); - assertEquals("Got first ElectionTimeout", true, - onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); + final TestActorRef shard = actorFactory.createTestActor( + newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(testName + "-shard")); - shard.tell(new FindLeader(), getRef()); - final FindLeaderReply findLeadeReply = - expectMsgClass(duration("5 seconds"), FindLeaderReply.class); - assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor()); + waitUntilNoLeader(shard); final YangInstanceIdentifier path = TestModel.TEST_PATH; shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef()); final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), RegisterChangeListenerReply.class); - assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); + assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - onChangeListenerRegistered.countDown(); + shard.tell(new ElectionTimeout(), ActorRef.noSender()); listener.waitForChangeEvents(); - - dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @Test - public void testClusteredDataChangeListernerRegistration() throws Exception { + public void testClusteredDataChangeListenerRegistration() throws Exception { new ShardTestKit(getSystem()) {{ - final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1") - .shardName("inventory").type("config").build(); + String testName = "testClusteredDataChangeListenerRegistration"; + final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName( + actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build(); + + final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName( + actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build(); + + final TestActorRef followerShard = actorFactory.createTestActor( + Shard.builder().id(followerShardID). + datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()). + peerAddresses(Collections.singletonMap(leaderShardID.toString(), + "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString()); + + final TestActorRef leaderShard = actorFactory.createTestActor( + Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()). + peerAddresses(Collections.singletonMap(followerShardID.toString(), + "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString()); + + leaderShard.tell(new ElectionTimeout(), ActorRef.noSender()); + String leaderPath = waitUntilLeader(followerShard); + assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath); - final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2") - .shardName("inventory").type("config").build(); - final Creator followerShardCreator = new Creator() { - - @Override - public Shard create() throws Exception { - return new Shard(member1ShardID, Collections.singletonMap(member2ShardID.toString(), - "akka://test/user/" + member2ShardID.toString()), - dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { - @Override - public void onReceiveCommand(final Object message) throws Exception { + final YangInstanceIdentifier path = TestModel.TEST_PATH; + final MockDataChangeListener listener = new MockDataChangeListener(1); + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), + actorFactory.generateActorId(testName + "-DataChangeListener")); - if(!(message instanceof ElectionTimeout)) { - super.onReceiveCommand(message); - } - } - }; - } - }; + followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef()); + final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterChangeListenerReply.class); + assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); - final Creator leaderShardCreator = new Creator() { + writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - @Override - public Shard create() throws Exception { - return new Shard(member2ShardID, Collections.singletonMap(member1ShardID.toString(), - "akka://test/user/" + member1ShardID.toString()), - dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { }; - } - }; + listener.waitForChangeEvents(); + }}; + } + @Test + public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception { + new ShardTestKit(getSystem()) {{ + String testName = "testClusteredDataTreeChangeListenerDelayedRegistration"; + dataStoreContextBuilder.shardElectionTimeoutFactor(1000); - final TestActorRef shard = TestActorRef.create(getSystem(), - Props.create(new DelegatingShardCreator(followerShardCreator)), - member1ShardID.toString()); + final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); + final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener), + actorFactory.generateActorId(testName + "-DataTreeChangeListener")); - final TestActorRef shardLeader = TestActorRef.create(getSystem(), - Props.create(new DelegatingShardCreator(leaderShardCreator)).withDispatcher(Dispatchers.DefaultDispatcherId()), - member2ShardID.toString()); - // Sleep to let election happen - Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + final TestActorRef shard = actorFactory.createTestActor( + newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(testName + "-shard")); - shard.tell(new FindLeader(), getRef()); - final FindLeaderReply findLeaderReply = - expectMsgClass(duration("5 seconds"), FindLeaderReply.class); - assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor()); + waitUntilNoLeader(shard); final YangInstanceIdentifier path = TestModel.TEST_PATH; - final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener), - "testDataChangeListenerOnFollower-DataChangeListener"); - shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef()); - final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), - RegisterChangeListenerReply.class); - assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); + shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef()); + final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeChangeListenerReply.class); + assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + shard.tell(new ElectionTimeout(), ActorRef.noSender()); + listener.waitForChangeEvents(); + }}; + } - dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + @Test + public void testClusteredDataTreeChangeListenerRegistration() throws Exception { + new ShardTestKit(getSystem()) {{ + String testName = "testClusteredDataTreeChangeListenerRegistration"; + final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName( + actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build(); + + final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName( + actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build(); + + final TestActorRef followerShard = actorFactory.createTestActor( + Shard.builder().id(followerShardID). + datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()). + peerAddresses(Collections.singletonMap(leaderShardID.toString(), + "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString()); + + final TestActorRef leaderShard = actorFactory.createTestActor( + Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()). + peerAddresses(Collections.singletonMap(followerShardID.toString(), + "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString()); + + leaderShard.tell(new ElectionTimeout(), ActorRef.noSender()); + String leaderPath = waitUntilLeader(followerShard); + assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath); + + final YangInstanceIdentifier path = TestModel.TEST_PATH; + final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); + final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener), + actorFactory.generateActorId(testName + "-DataTreeChangeListener")); + + followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef()); + final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeChangeListenerReply.class); + assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); + + writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + listener.waitForChangeEvents(); }}; } + + @Test + public void testServerRemoved() throws Exception { + final TestActorRef parent = TestActorRef.create(getSystem(), MessageCollectorActor.props()); + + final ActorRef shard = parent.underlyingActor().context().actorOf( + newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testServerRemoved"); + + shard.tell(new ServerRemoved("test"), ActorRef.noSender()); + + MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class); + + } + }