import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Status.Failure;
import akka.dispatch.Dispatchers;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Function;
-import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
-import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
-import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
import scala.concurrent.duration.FiniteDuration;
public class ShardTest extends AbstractShardTest {
- private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
-
private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
@Test
public void testRegisterChangeListener() throws Exception {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps(), "testRegisterChangeListener");
waitUntilLeader(shard);
shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
"testRegisterChangeListener-DataChangeListener");
shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
listener.waitForChangeEvents(path);
-
- dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
};
final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
"testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
"testRegisterChangeListenerWhenNotLeaderInitially");
// Wait for the shard to become the leader and notify our listener with the existing
// data in the store.
listener.waitForChangeEvents(path);
-
- dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testRegisterDataTreeChangeListener() throws Exception {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps(), "testRegisterDataTreeChangeListener");
waitUntilLeader(shard);
shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
- final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
"testRegisterDataTreeChangeListener-DataTreeChangeListener");
shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
listener.waitForChangeEvents();
-
- dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
};
final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
- final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
"testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
"testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
// TODO: investigate why we do not receive data chage events
listener.waitForChangeEvents();
-
- dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testCreateTransaction(){
new ShardTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
+ final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
waitUntilLeader(shard);
shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- shard.tell(new CreateTransaction("txn-1",
- TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
+ shard.tell(new CreateTransaction("txn-1", TransactionType.READ_ONLY.ordinal(), null,
+ DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
CreateTransactionReply.class);
- final String path = reply.getTransactionActorPath().toString();
+ final String path = reply.getTransactionPath().toString();
assertTrue("Unexpected transaction path " + path,
path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testCreateTransactionOnChain(){
new ShardTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
+ final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
waitUntilLeader(shard);
- shard.tell(new CreateTransaction("txn-1",
- TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
- getRef());
+ shard.tell(new CreateTransaction("txn-1",TransactionType.READ_ONLY.ordinal(), "foobar",
+ DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
CreateTransactionReply.class);
- final String path = reply.getTransactionActorPath().toString();
+ final String path = reply.getTransactionPath().toString();
assertTrue("Unexpected transaction path " + path,
path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
}
}
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
Props.create(new DelegatingShardCreator(new Creator<Shard>() {
@Override
public TestShard create() throws Exception {
assertEquals("getPeerAddress", address,
((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testApplySnapshot() throws Exception {
- ShardTestKit testkit = new ShardTestKit(getSystem());
-
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
- "testApplySnapshot");
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testApplySnapshot");
- testkit.waitUntilLeader(shard);
+ ShardTestKit.waitUntilLeader(shard);
final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
store.setSchemaContext(SCHEMA_CONTEXT);
final NormalizedNode<?,?> actual = readStore(shard, root);
assertEquals("Root node", expected, actual);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
@Test
public void testApplyState() throws Exception {
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testApplyState");
- ShardTestKit testkit = new ShardTestKit(getSystem());
-
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
+ ShardTestKit.waitUntilLeader(shard);
- testkit.waitUntilLeader(shard);
-
- final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
- newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
-
- shard.underlyingActor().onReceiveCommand(applyState);
-
- final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
- assertEquals("Applied state", node, actual);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- @Test
- public void testApplyStateWithCandidatePayload() throws Exception {
-
- ShardTestKit testkit = new ShardTestKit(getSystem());
-
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
-
- testkit.waitUntilLeader(shard);
-
- final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
+ final DataTree source = setupInMemorySnapshotStore();
+ final DataTreeModification writeMod = source.takeSnapshot().newModification();
+ ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ writeMod.write(TestModel.TEST_PATH, node);
+ writeMod.ready();
final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
- DataTreeCandidatePayload.create(candidate)));
+ payloadForModification(source, writeMod)));
shard.underlyingActor().onReceiveCommand(applyState);
final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
assertEquals("Applied state", node, actual);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
- final DataTree testStore = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
- testStore.setSchemaContext(SCHEMA_CONTEXT);
-
- writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
- final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
-
- InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
- SerializationUtils.serializeNormalizedNode(root),
- Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
- return testStore;
- }
-
- private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
- source.validate(mod);
- final DataTreeCandidate candidate = source.prepare(mod);
- source.commit(candidate);
- return DataTreeCandidatePayload.create(candidate);
}
@Test
testRecovery(listEntryKeys);
}
- @Test
- public void testModicationRecovery() throws Exception {
-
- // Set up the InMemorySnapshotStore.
- setupInMemorySnapshotStore();
-
- // Set up the InMemoryJournal.
-
- InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
-
- InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
- new WriteModification(TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
-
- final int nListEntries = 16;
- final Set<Integer> listEntryKeys = new HashSet<>();
-
- // Add some ModificationPayload entries
- for(int i = 1; i <= nListEntries; i++) {
- listEntryKeys.add(Integer.valueOf(i));
- final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
- final Modification mod = new MergeModification(path,
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
- InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
- newModificationPayload(mod)));
- }
-
- InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
- new ApplyJournalEntries(nListEntries));
-
- testRecovery(listEntryKeys);
- }
-
- private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
- final MutableCompositeModification compMod = new MutableCompositeModification();
- for(final Modification mod: mods) {
- compMod.addModification(mod);
- }
-
- return new ModificationPayload(compMod);
- }
-
@Test
public void testConcurrentThreePhaseCommits() throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testConcurrentThreePhaseCommits");
waitUntilLeader(shard);
- // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
+ // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
// Send the CanCommitTransaction message for the first Tx.
- shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(duration, CanCommitTransactionReply.class));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
// processed after the first Tx completes.
final Future<Object> canCommitFuture1 = Patterns.ask(shard,
- new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+ new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
final Future<Object> canCommitFuture2 = Patterns.ask(shard,
- new CanCommitTransaction(transactionID3).toSerializable(), timeout);
+ new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
// Send the CommitTransaction message for the first Tx. After it completes, it should
// trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
- shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
- expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
// Wait for the next 2 Tx's to complete.
class OnCommitFutureComplete extends OnFutureComplete {
OnCommitFutureComplete() {
- super(CommitTransactionReply.SERIALIZABLE_CLASS);
+ super(CommitTransactionReply.class);
}
@Override
private final String transactionID;
OnCanCommitFutureComplete(final String transactionID) {
- super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
+ super(CanCommitTransactionReply.class);
this.transactionID = transactionID;
}
assertEquals("Can commit", true, canCommitReply.getCanCommit());
final Future<Object> commitFuture = Patterns.ask(shard,
- new CommitTransaction(transactionID).toSerializable(), timeout);
+ new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
}
}
verifyOuterListEntry(shard, 1);
verifyLastApplied(shard, 2);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
- private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
- final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
- return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
- }
-
- private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
- final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
- final int messagesSent) {
- final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
- batched.addModification(new WriteModification(path, data));
- batched.setReady(ready);
- batched.setDoCommitOnReady(doCommitOnReady);
- batched.setTotalMessagesSent(messagesSent);
- return batched;
- }
-
@Test
public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testBatchedModificationsWithNoCommitOnReady");
// Send the CanCommitTransaction message.
- shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(duration, CanCommitTransactionReply.class));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
// Send the CanCommitTransaction message.
- shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
- expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
final InOrder inOrder = inOrder(mockCohort.get());
inOrder.verify(mockCohort.get()).canCommit();
// Verify data in the data store.
verifyOuterListEntry(shard, 1);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testBatchedModificationsWithCommitOnReady() throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testBatchedModificationsWithCommitOnReady");
TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
- expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+ expectMsgClass(duration, CommitTransactionReply.class);
final InOrder inOrder = inOrder(mockCohort.get());
inOrder.verify(mockCohort.get()).canCommit();
// Verify data in the data store.
verifyOuterListEntry(shard, 1);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test(expected=IllegalStateException.class)
public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testBatchedModificationsReadyWithIncorrectTotalMessageCount");
final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
-
if(failure != null) {
throw failure.cause();
}
@Test
public void testBatchedModificationsWithOperationFailure() throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testBatchedModificationsWithOperationFailure");
failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
assertEquals("Failure cause", cause, failure.cause());
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
- @SuppressWarnings("unchecked")
- private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
- final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
- assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
- assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
- outerList.getValue() instanceof Iterable);
- final Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
- assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
- entry instanceof MapEntryNode);
- final MapEntryNode mapEntry = (MapEntryNode)entry;
- final Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
- mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
- assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
- assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
- }
-
@Test
public void testBatchedModificationsOnTransactionChain() throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testBatchedModificationsOnTransactionChain");
// Create a read Tx on the same chain.
- shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
- transactionChainID).toSerializable(), getRef());
+ shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
+ transactionChainID, DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
- getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
+ getSystem().actorSelection(createReply.getTransactionPath()).tell(
+ new ReadData(path, DataStoreVersions.CURRENT_VERSION), getRef());
final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
assertEquals("Read node", containerNode, readReply.getNormalizedNode());
// Commit the write transaction.
- shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(duration, CanCommitTransactionReply.class));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
- shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
- expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
// Verify data in the data store.
final NormalizedNode<?, ?> actualNode = readStore(shard, path);
assertEquals("Stored node", containerNode, actualNode);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
}
};
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
waitUntilLeader(shard);
shard.tell(batched, ActorRef.noSender());
expectMsgEquals(batched);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
testReadyWithImmediateCommit(false);
}
- public void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
+ private void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testReadyWithImmediateCommit-" + readWrite);
shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
- expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+ expectMsgClass(duration, CommitTransactionReply.class);
final InOrder inOrder = inOrder(cohort);
inOrder.verify(cohort).canCommit();
final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testReadyLocalTransactionWithImmediateCommit");
shard.tell(readyMessage, getRef());
- expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
+ expectMsgClass(CommitTransactionReply.class);
final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testReadyLocalTransactionWithThreePhaseCommit");
// Send the CanCommitTransaction message.
- shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(CanCommitTransactionReply.class));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
// Send the CanCommitTransaction message.
- shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
- expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(CommitTransactionReply.class);
final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
testCommitWithPersistenceDisabled(false);
}
- public void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
+ private void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
dataStoreContextBuilder.persistent(false);
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testCommitWithPersistenceDisabled-" + readWrite);
// Send the CanCommitTransaction message.
- shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(duration, CanCommitTransactionReply.class));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
// Send the CanCommitTransaction message.
- shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
- expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
final InOrder inOrder = inOrder(cohort);
inOrder.verify(cohort).canCommit();
final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
- private static DataTreeCandidateTip mockCandidate(final String name) {
- final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
- final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
- doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
- doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
- doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
- doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
- return mockCandidate;
- }
-
- private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
- final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
- final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
- doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
- doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
- doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
- return mockCandidate;
- }
-
@Test
public void testCommitWhenTransactionHasNoModifications() {
testCommitWhenTransactionHasNoModifications(true);
testCommitWhenTransactionHasNoModifications(false);
}
- public void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
+ private void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
// Note that persistence is enabled which would normally result in the entry getting written to the journal
// but here that need not happen
new ShardTestKit(getSystem()) {
{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testCommitWhenTransactionHasNoModifications-" + readWrite);
// Send the CanCommitTransaction message.
- shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(duration, CanCommitTransactionReply.class));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
- shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
- expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
final InOrder inOrder = inOrder(cohort);
inOrder.verify(cohort).canCommit();
// Commit index should not advance because this does not go into the journal
assertEquals(-1, shardStats.getCommitIndex());
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
-
}
};
}
testCommitWhenTransactionHasModifications(false);
}
- public void testCommitWhenTransactionHasModifications(final boolean readWrite){
+ private void testCommitWhenTransactionHasModifications(final boolean readWrite){
new ShardTestKit(getSystem()) {
{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testCommitWhenTransactionHasModifications-" + readWrite);
// Send the CanCommitTransaction message.
- shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(duration, CanCommitTransactionReply.class));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
- shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
- expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
final InOrder inOrder = inOrder(cohort);
inOrder.verify(cohort).canCommit();
// Commit index should advance as we do not have an empty modification
assertEquals(0, shardStats.getCommitIndex());
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
-
}
};
}
testCommitPhaseFailure(false);
}
- public void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
+ private void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testCommitPhaseFailure-" + readWrite);
// Send the CanCommitTransaction message for the first Tx.
- shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(duration, CanCommitTransactionReply.class));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
// Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
// processed after the first Tx completes.
final Future<Object> canCommitFuture = Patterns.ask(shard,
- new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+ new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
// Send the CommitTransaction message for the first Tx. This should send back an error
// and trigger the 2nd Tx to proceed.
- shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+ shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
expectMsgClass(duration, akka.actor.Status.Failure.class);
// Wait for the 2nd Tx to complete the canCommit phase.
inOrder.verify(cohort1).preCommit();
inOrder.verify(cohort1).commit();
inOrder.verify(cohort2).canCommit();
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
testPreCommitPhaseFailure(false);
}
- public void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
+ private void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testPreCommitPhaseFailure-" + readWrite);
// Send the CanCommitTransaction message for the first Tx.
- shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(duration, CanCommitTransactionReply.class));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
// Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
// processed after the first Tx completes.
final Future<Object> canCommitFuture = Patterns.ask(shard,
- new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+ new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
// Send the CommitTransaction message for the first Tx. This should send back an error
// and trigger the 2nd Tx to proceed.
- shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+ shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
expectMsgClass(duration, akka.actor.Status.Failure.class);
// Wait for the 2nd Tx to complete the canCommit phase.
inOrder.verify(cohort1).canCommit();
inOrder.verify(cohort1).preCommit();
inOrder.verify(cohort2).canCommit();
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
testCanCommitPhaseFailure(false);
}
- public void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
+ private void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testCanCommitPhaseFailure-" + readWrite);
// Send the CanCommitTransaction message.
- shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
expectMsgClass(duration, akka.actor.Status.Failure.class);
// Send another can commit to ensure the failed one got cleaned up.
shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(CanCommitTransactionReply.class));
assertEquals("getCanCommit", true, reply.getCanCommit());
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
testCanCommitPhaseFalseResponse(false);
}
- public void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
+ private void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testCanCommitPhaseFalseResponse-" + readWrite);
// Send the CanCommitTransaction message.
- shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(CanCommitTransactionReply.class));
assertEquals("getCanCommit", false, reply.getCanCommit());
// Send another can commit to ensure the failed one got cleaned up.
shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
reply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(CanCommitTransactionReply.class));
assertEquals("getCanCommit", true, reply.getCanCommit());
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
testImmediateCommitWithCanCommitPhaseFailure(false);
}
- public void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
+ private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
- expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ expectMsgClass(duration, CommitTransactionReply.class);
}};
}
testImmediateCommitWithCanCommitPhaseFalseResponse(false);
}
- public void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
+ private void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite);
shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
- expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ expectMsgClass(duration, CommitTransactionReply.class);
}};
}
testAbortBeforeFinishCommit(false);
}
- public void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
+ private void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testAbortBeforeFinishCommit-" + readWrite);
shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(duration, CanCommitTransactionReply.class));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
- shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
- expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
// the data should still get written to the in-memory store since we've gotten past
// canCommit and preCommit and persisted the data.
assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
testTransactionCommitTimeout(false);
}
- public void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
+ private void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testTransactionCommitTimeout-" + readWrite);
// canCommit 1st Tx. We don't send the commit so it should timeout.
- shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
// canCommit the 2nd Tx - it should complete after the 1st Tx times out.
- shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
// Try to commit the 1st Tx - should fail as it's not the current Tx.
- shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+ shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
expectMsgClass(duration, akka.actor.Status.Failure.class);
// Commit the 2nd Tx.
- shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
- expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
assertNotNull(listNodePath + " not found", node);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testTransactionCommitQueueCapacityExceeded");
// canCommit 1st Tx.
- shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
// canCommit the 2nd Tx - it should get queued.
- shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
// canCommit the 3rd Tx - should exceed queue capacity and fail.
- shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
expectMsgClass(duration, akka.actor.Status.Failure.class);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testTransactionCommitWithPriorExpiredCohortEntries");
// All Tx's are readied. We'll send canCommit for the last one but not the others. The others
// should expire from the queue and the last one should be processed.
- shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
}};
}
dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testTransactionCommitWithSubsequentExpiredCohortEntry");
// CanCommit the first one so it's the current in-progress CohortEntry.
- shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
// Ready the second Tx.
// Commit the first Tx. After completing, the second should expire from the queue and the third
// Tx committed.
- shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
- expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
// Expect commit reply from the third Tx.
- expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+ expectMsgClass(duration, CommitTransactionReply.class);
final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
assertNotNull(TestModel.TEST2_PATH + " not found", node);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testCanCommitBeforeReadyFailure() throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testCanCommitBeforeReadyFailure");
- shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction("tx", CURRENT_VERSION).toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
testAbortCurrentTransaction(false);
}
- public void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
+ private void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testAbortCurrentTransaction-" + readWrite);
// Send the CanCommitTransaction message for the first Tx.
- shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(duration, CanCommitTransactionReply.class));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
// Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
// processed after the first Tx completes.
final Future<Object> canCommitFuture = Patterns.ask(shard,
- new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+ new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
// Send the AbortTransaction message for the first Tx. This should trigger the 2nd
// Tx to proceed.
- shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
- expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, AbortTransactionReply.class);
// Wait for the 2nd Tx to complete the canCommit phase.
final InOrder inOrder = inOrder(cohort1, cohort2);
inOrder.verify(cohort1).canCommit();
inOrder.verify(cohort2).canCommit();
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
testAbortQueuedTransaction(false);
}
- public void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
+ private void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {{
final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
}
};
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
Props.create(new DelegatingShardCreator(creator)).withDispatcher(
Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite);
// Send the AbortTransaction message.
- shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
- expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(new AbortTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, AbortTransactionReply.class);
verify(cohort).abort();
// Now send CanCommitTransaction - should fail.
- shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
assertTrue("Failure type", failure instanceof IllegalStateException);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
}
@SuppressWarnings("serial")
- public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
+ private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
}
};
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
Props.create(new DelegatingShardCreator(creator)), shardActorName);
waitUntilLeader(shard);
raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
awaitAndValidateSnapshot(expectedRoot);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
- private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot
- ) throws InterruptedException {
- System.out.println("Inside awaitAndValidateSnapshot {}" + savedSnapshot.get());
- assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
+ private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot) throws InterruptedException {
+ assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
- assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
- savedSnapshot.get() instanceof Snapshot);
+ assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
+ savedSnapshot.get() instanceof Snapshot);
- verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
+ verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
- latch.set(new CountDownLatch(1));
- savedSnapshot.set(null);
- }
+ latch.set(new CountDownLatch(1));
+ savedSnapshot.set(null);
+ }
- private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
+ private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
- final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
- assertEquals("Root node", expectedRoot, actual);
+ final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
+ assertEquals("Root node", expectedRoot, actual);
- }
- };
+ }};
}
/**
schemaContext(SCHEMA_CONTEXT).props();
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
- persistentProps, "testPersistence1");
+ final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
- shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
-
- final TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
- nonPersistentProps, "testPersistence2");
+ final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
-
- shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
-
}};
-
}
@Test
new ShardTestKit(getSystem()) {{
dataStoreContextBuilder.persistent(true);
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testOnDatastoreContext");
assertEquals("isRecoveryApplicable", true,
shard.underlyingActor().persistence().isRecoveryApplicable());
assertEquals("isRecoveryApplicable", true,
shard.underlyingActor().persistence().isRecoveryApplicable());
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
public void testRegisterRoleChangeListener() throws Exception {
new ShardTestKit(getSystem()) {
{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testRegisterRoleChangeListener");
ShardLeaderStateChanged.class);
assertEquals("getLocalShardDataTree present", false,
leaderStateChanged.getLocalShardDataTree().isPresent());
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
};
}
@Test
public void testFollowerInitialSyncStatus() throws Exception {
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testFollowerInitialSyncStatus");
shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- private static void commitTransaction(final DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
- modification.ready();
- store.validate(modification);
- store.commit(store.prepare(modification));
}
@Test
@Test
public void testServerRemoved() throws Exception {
- final TestActorRef<MessageCollectorActor> parent = TestActorRef.create(getSystem(), MessageCollectorActor.props());
+ final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props());
final ActorRef shard = parent.underlyingActor().context().actorOf(
newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
shard.tell(new ServerRemoved("test"), ActorRef.noSender());
MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
-
}
-
}