import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Status.Failure;
import akka.dispatch.Dispatchers;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Function;
-import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
-import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
@Test
public void testRegisterChangeListener() throws Exception {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps(), "testRegisterChangeListener");
waitUntilLeader(shard);
shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
"testRegisterChangeListener-DataChangeListener");
shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
listener.waitForChangeEvents(path);
-
- dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
};
final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
"testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
"testRegisterChangeListenerWhenNotLeaderInitially");
// Wait for the shard to become the leader and notify our listener with the existing
// data in the store.
listener.waitForChangeEvents(path);
-
- dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testRegisterDataTreeChangeListener() throws Exception {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps(), "testRegisterDataTreeChangeListener");
waitUntilLeader(shard);
shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
- final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
"testRegisterDataTreeChangeListener-DataTreeChangeListener");
shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
listener.waitForChangeEvents();
-
- dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
};
final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
- final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
"testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
"testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
// TODO: investigate why we do not receive data chage events
listener.waitForChangeEvents();
-
- dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testCreateTransaction(){
new ShardTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
+ final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
waitUntilLeader(shard);
final String path = reply.getTransactionActorPath().toString();
assertTrue("Unexpected transaction path " + path,
path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testCreateTransactionOnChain(){
new ShardTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
+ final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
waitUntilLeader(shard);
final String path = reply.getTransactionActorPath().toString();
assertTrue("Unexpected transaction path " + path,
path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
}
}
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
Props.create(new DelegatingShardCreator(new Creator<Shard>() {
@Override
public TestShard create() throws Exception {
assertEquals("getPeerAddress", address,
((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testApplySnapshot() throws Exception {
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
- "testApplySnapshot");
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testApplySnapshot");
ShardTestKit.waitUntilLeader(shard);
final NormalizedNode<?,?> actual = readStore(shard, root);
assertEquals("Root node", expected, actual);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
@Test
public void testApplyState() throws Exception {
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testApplyState");
ShardTestKit.waitUntilLeader(shard);
- final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ final DataTree source = setupInMemorySnapshotStore();
+ final DataTreeModification writeMod = source.takeSnapshot().newModification();
+ ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ writeMod.write(TestModel.TEST_PATH, node);
+ writeMod.ready();
final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
- newDataTreeCandidatePayload(new WriteModification(TestModel.TEST_PATH, node))));
+ payloadForModification(source, writeMod)));
shard.underlyingActor().onReceiveCommand(applyState);
final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
assertEquals("Applied state", node, actual);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
- final DataTree testStore = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
- testStore.setSchemaContext(SCHEMA_CONTEXT);
-
- writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
- final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
-
- InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
- SerializationUtils.serializeNormalizedNode(root),
- Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
- return testStore;
- }
-
- private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
- source.validate(mod);
- final DataTreeCandidate candidate = source.prepare(mod);
- source.commit(candidate);
- return DataTreeCandidatePayload.create(candidate);
}
@Test
testRecovery(listEntryKeys);
}
- @Test
- public void testModicationRecovery() throws Exception {
-
- // Set up the InMemorySnapshotStore.
- setupInMemorySnapshotStore();
-
- // Set up the InMemoryJournal.
-
- InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
-
- ShardDataTree shardDataTree = new ShardDataTree(SCHEMA_CONTEXT, TreeType.CONFIGURATION);
-
- InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newDataTreeCandidatePayload(
- shardDataTree,
- new WriteModification(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
- new WriteModification(TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
-
- final int nListEntries = 16;
- final Set<Integer> listEntryKeys = new HashSet<>();
-
- // Add some ModificationPayload entries
- for(int i = 1; i <= nListEntries; i++) {
- listEntryKeys.add(Integer.valueOf(i));
- final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
- final Modification mod = new MergeModification(path,
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
- InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
- newDataTreeCandidatePayload(shardDataTree, mod)));
- }
-
- InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
- new ApplyJournalEntries(nListEntries));
-
- testRecovery(listEntryKeys);
- }
-
- private static DataTreeCandidatePayload newDataTreeCandidatePayload(final Modification... mods) throws Exception {
- return newDataTreeCandidatePayload(new ShardDataTree(SCHEMA_CONTEXT, TreeType.CONFIGURATION), mods);
- }
-
- private static DataTreeCandidatePayload newDataTreeCandidatePayload(ShardDataTree shardDataTree,
- final Modification... mods) throws Exception {
- DataTreeModification dataTreeModification = shardDataTree.newModification();
- for(final Modification mod: mods) {
- mod.apply(dataTreeModification);
- }
-
- return DataTreeCandidatePayload.create(shardDataTree.commit(dataTreeModification));
- }
-
@Test
public void testConcurrentThreePhaseCommits() throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testConcurrentThreePhaseCommits");
waitUntilLeader(shard);
- // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
+ // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
verifyOuterListEntry(shard, 1);
verifyLastApplied(shard, 2);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
- private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
- final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
- return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
- }
-
- private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
- final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
- final int messagesSent) {
- final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
- batched.addModification(new WriteModification(path, data));
- batched.setReady(ready);
- batched.setDoCommitOnReady(doCommitOnReady);
- batched.setTotalMessagesSent(messagesSent);
- return batched;
- }
-
@Test
public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testBatchedModificationsWithNoCommitOnReady");
// Verify data in the data store.
verifyOuterListEntry(shard, 1);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testBatchedModificationsWithCommitOnReady() throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testBatchedModificationsWithCommitOnReady");
// Verify data in the data store.
verifyOuterListEntry(shard, 1);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test(expected=IllegalStateException.class)
public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testBatchedModificationsReadyWithIncorrectTotalMessageCount");
final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
-
if(failure != null) {
throw failure.cause();
}
@Test
public void testBatchedModificationsWithOperationFailure() throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testBatchedModificationsWithOperationFailure");
failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
assertEquals("Failure cause", cause, failure.cause());
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
- @SuppressWarnings("unchecked")
- private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
- final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
- assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
- assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
- outerList.getValue() instanceof Iterable);
- final Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
- assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
- entry instanceof MapEntryNode);
- final MapEntryNode mapEntry = (MapEntryNode)entry;
- final Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
- mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
- assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
- assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
- }
-
@Test
public void testBatchedModificationsOnTransactionChain() throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testBatchedModificationsOnTransactionChain");
final NormalizedNode<?, ?> actualNode = readStore(shard, path);
assertEquals("Stored node", containerNode, actualNode);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
}
};
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
waitUntilLeader(shard);
shard.tell(batched, ActorRef.noSender());
expectMsgEquals(batched);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
testReadyWithImmediateCommit(false);
}
- public void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
+ private void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testReadyWithImmediateCommit-" + readWrite);
final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testReadyLocalTransactionWithImmediateCommit");
final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testReadyLocalTransactionWithThreePhaseCommit");
final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
testCommitWithPersistenceDisabled(false);
}
- public void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
+ private void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
dataStoreContextBuilder.persistent(false);
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testCommitWithPersistenceDisabled-" + readWrite);
final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
- private static DataTreeCandidateTip mockCandidate(final String name) {
- final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
- final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
- doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
- doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
- doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
- doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
- return mockCandidate;
- }
-
- private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
- final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
- final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
- doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
- doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
- doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
- return mockCandidate;
- }
-
@Test
public void testCommitWhenTransactionHasNoModifications() {
testCommitWhenTransactionHasNoModifications(true);
testCommitWhenTransactionHasNoModifications(false);
}
- public void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
+ private void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
// Note that persistence is enabled which would normally result in the entry getting written to the journal
// but here that need not happen
new ShardTestKit(getSystem()) {
{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testCommitWhenTransactionHasNoModifications-" + readWrite);
// Commit index should not advance because this does not go into the journal
assertEquals(-1, shardStats.getCommitIndex());
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
-
}
};
}
testCommitWhenTransactionHasModifications(false);
}
- public void testCommitWhenTransactionHasModifications(final boolean readWrite){
+ private void testCommitWhenTransactionHasModifications(final boolean readWrite){
new ShardTestKit(getSystem()) {
{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testCommitWhenTransactionHasModifications-" + readWrite);
// Commit index should advance as we do not have an empty modification
assertEquals(0, shardStats.getCommitIndex());
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
-
}
};
}
testCommitPhaseFailure(false);
}
- public void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
+ private void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testCommitPhaseFailure-" + readWrite);
inOrder.verify(cohort1).preCommit();
inOrder.verify(cohort1).commit();
inOrder.verify(cohort2).canCommit();
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
testPreCommitPhaseFailure(false);
}
- public void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
+ private void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testPreCommitPhaseFailure-" + readWrite);
inOrder.verify(cohort1).canCommit();
inOrder.verify(cohort1).preCommit();
inOrder.verify(cohort2).canCommit();
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
testCanCommitPhaseFailure(false);
}
- public void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
+ private void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testCanCommitPhaseFailure-" + readWrite);
final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
assertEquals("getCanCommit", true, reply.getCanCommit());
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
testCanCommitPhaseFalseResponse(false);
}
- public void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
+ private void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testCanCommitPhaseFalseResponse-" + readWrite);
reply = CanCommitTransactionReply.fromSerializable(
expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
assertEquals("getCanCommit", true, reply.getCanCommit());
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
testImmediateCommitWithCanCommitPhaseFailure(false);
}
- public void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
+ private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
testImmediateCommitWithCanCommitPhaseFalseResponse(false);
}
- public void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
+ private void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite);
shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
testAbortBeforeFinishCommit(false);
}
- public void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
+ private void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testAbortBeforeFinishCommit-" + readWrite);
// the data should still get written to the in-memory store since we've gotten past
// canCommit and preCommit and persisted the data.
assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
testTransactionCommitTimeout(false);
}
- public void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
+ private void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testTransactionCommitTimeout-" + readWrite);
final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
assertNotNull(listNodePath + " not found", node);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testTransactionCommitQueueCapacityExceeded");
shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
expectMsgClass(duration, akka.actor.Status.Failure.class);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testTransactionCommitWithPriorExpiredCohortEntries");
shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testTransactionCommitWithSubsequentExpiredCohortEntry");
final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
assertNotNull(TestModel.TEST2_PATH + " not found", node);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testCanCommitBeforeReadyFailure() throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testCanCommitBeforeReadyFailure");
shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
testAbortCurrentTransaction(false);
}
- public void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
+ private void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testAbortCurrentTransaction-" + readWrite);
final InOrder inOrder = inOrder(cohort1, cohort2);
inOrder.verify(cohort1).canCommit();
inOrder.verify(cohort2).canCommit();
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
testAbortQueuedTransaction(false);
}
- public void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
+ private void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {{
final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
}
};
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
Props.create(new DelegatingShardCreator(creator)).withDispatcher(
Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite);
Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
assertTrue("Failure type", failure instanceof IllegalStateException);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
}
@SuppressWarnings("serial")
- public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
+ private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
}
};
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
Props.create(new DelegatingShardCreator(creator)), shardActorName);
waitUntilLeader(shard);
raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
awaitAndValidateSnapshot(expectedRoot);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
- private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot
- ) throws InterruptedException {
- System.out.println("Inside awaitAndValidateSnapshot {}" + savedSnapshot.get());
- assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
+ private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot) throws InterruptedException {
+ assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
- assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
- savedSnapshot.get() instanceof Snapshot);
+ assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
+ savedSnapshot.get() instanceof Snapshot);
- verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
+ verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
- latch.set(new CountDownLatch(1));
- savedSnapshot.set(null);
- }
+ latch.set(new CountDownLatch(1));
+ savedSnapshot.set(null);
+ }
- private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
+ private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
- final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
- assertEquals("Root node", expectedRoot, actual);
+ final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
+ assertEquals("Root node", expectedRoot, actual);
- }
- };
+ }};
}
/**
schemaContext(SCHEMA_CONTEXT).props();
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
- persistentProps, "testPersistence1");
+ final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
- shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
-
- final TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
- nonPersistentProps, "testPersistence2");
+ final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
-
- shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
-
}};
-
}
@Test
new ShardTestKit(getSystem()) {{
dataStoreContextBuilder.persistent(true);
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testOnDatastoreContext");
assertEquals("isRecoveryApplicable", true,
shard.underlyingActor().persistence().isRecoveryApplicable());
assertEquals("isRecoveryApplicable", true,
shard.underlyingActor().persistence().isRecoveryApplicable());
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
public void testRegisterRoleChangeListener() throws Exception {
new ShardTestKit(getSystem()) {
{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testRegisterRoleChangeListener");
ShardLeaderStateChanged.class);
assertEquals("getLocalShardDataTree present", false,
leaderStateChanged.getLocalShardDataTree().isPresent());
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
};
}
@Test
public void testFollowerInitialSyncStatus() throws Exception {
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testFollowerInitialSyncStatus");
shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- private static void commitTransaction(final DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
- modification.ready();
- store.validate(modification);
- store.commit(store.prepare(modification));
}
@Test
@Test
public void testServerRemoved() throws Exception {
- final TestActorRef<MessageCollectorActor> parent = TestActorRef.create(getSystem(), MessageCollectorActor.props());
+ final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props());
final ActorRef shard = parent.underlyingActor().context().actorOf(
newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
shard.tell(new ServerRemoved("test"), ActorRef.noSender());
MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
-
}
-
}