- new ShardTestKit(getSystem()) {{
- dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
- shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testTransactionMessagesWithNoLeader");
-
- waitUntilNoLeader(shard);
-
- shard.tell(new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, ""), getRef());
- Failure failure = expectMsgClass(Failure.class);
- assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
-
- shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), "tx",
- DataStoreVersions.CURRENT_VERSION, true), getRef());
- failure = expectMsgClass(Failure.class);
- assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
-
- shard.tell(new ReadyLocalTransaction("tx", mock(DataTreeModification.class), true), getRef());
- failure = expectMsgClass(Failure.class);
- assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
- }};
- }
-
- @Test
- public void testReadyWithImmediateCommit() throws Exception{
- testReadyWithImmediateCommit(true);
- testReadyWithImmediateCommit(false);
- }
-
- public void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
- new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testReadyWithImmediateCommit-" + readWrite);
-
- waitUntilLeader(shard);
-
- final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
- final String transactionID = "tx1";
- final MutableCompositeModification modification = new MutableCompositeModification();
- final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
- TestModel.TEST_PATH, containerNode, modification);
-
- final FiniteDuration duration = duration("5 seconds");
-
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
-
- expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
-
- final InOrder inOrder = inOrder(cohort);
- inOrder.verify(cohort).canCommit();
- inOrder.verify(cohort).preCommit();
- inOrder.verify(cohort).commit();
-
- final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
- assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
- }};
- }
-
- @Test
- public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
- new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testReadyLocalTransactionWithImmediateCommit");
-
- waitUntilLeader(shard);
-
- final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
- final DataTreeModification modification = dataStore.newModification();
-
- final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
- final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
- new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
-
- final String txId = "tx1";
- modification.ready();
- final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
-
- shard.tell(readyMessage, getRef());
-
- expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
-
- final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
- assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
- }};
- }
-
- @Test
- public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
- new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testReadyLocalTransactionWithThreePhaseCommit");
-
- waitUntilLeader(shard);
-
- final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
- final DataTreeModification modification = dataStore.newModification();
-
- final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
- final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
- new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
-
- final String txId = "tx1";
- modification.ready();
- final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
-
- shard.tell(readyMessage, getRef());
-
- expectMsgClass(ReadyTransactionReply.class);
-
- // Send the CanCommitTransaction message.
-
- shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
- final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());