+ String transactionID2 = "tx2";
+
+ FiniteDuration duration = duration("5 seconds");
+
+ // Send a BatchedModifications to start a chained write transaction and ready it.
+
+ ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
+ containerNode, true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
+
+ // Create a read Tx on the same chain.
+
+ shard.tell(new CreateTransaction(transactionID2, TransactionProxy.TransactionType.READ_ONLY.ordinal() ,
+ transactionChainID).toSerializable(), getRef());
+
+ CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
+
+ getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
+ ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
+ assertEquals("Read node", containerNode, readReply.getNormalizedNode());
+
+ // Commit the write transaction.
+
+ shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+ CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+ // Verify data in the data store.
+
+ NormalizedNode<?, ?> actualNode = readStore(shard, path);
+ assertEquals("Stored node", containerNode, actualNode);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testOnBatchedModificationsWhenNotLeader() {
+ final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
+ new ShardTestKit(getSystem()) {{
+ Creator<Shard> creator = new Creator<Shard>() {
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
+ newDatastoreContext(), SCHEMA_CONTEXT) {
+ @Override
+ protected boolean isLeader() {
+ return overrideLeaderCalls.get() ? false : super.isLeader();
+ }
+
+ @Override
+ protected ActorSelection getLeader() {
+ return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
+ super.getLeader();
+ }
+ };
+ }
+ };
+
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
+
+ waitUntilLeader(shard);
+
+ overrideLeaderCalls.set(true);
+
+ BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
+
+ shard.tell(batched, ActorRef.noSender());
+
+ expectMsgEquals(batched);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testCommitWithPersistenceDisabled() throws Throwable {
+ dataStoreContextBuilder.persistent(false);
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWithPersistenceDisabled");
+
+ waitUntilLeader(shard);
+
+ String transactionID = "tx";
+ FiniteDuration duration = duration("5 seconds");
+
+ // Send a BatchedModifications to start a transaction.
+
+ NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+ CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+ NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
+ assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testCommitWhenTransactionHasNoModifications(){
+ // Note that persistence is enabled which would normally result in the entry getting written to the journal
+ // but here that need not happen
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWhenTransactionHasNoModifications");
+
+ waitUntilLeader(shard);
+
+ String transactionID = "tx1";
+ MutableCompositeModification modification = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+
+ FiniteDuration duration = duration("5 seconds");
+
+ // Simulate the ForwardedReadyTransaction messages that would be sent
+ // by the ShardTransaction.
+
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+ CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+ expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+
+ InOrder inOrder = inOrder(cohort);
+ inOrder.verify(cohort).canCommit();
+ inOrder.verify(cohort).preCommit();
+ inOrder.verify(cohort).commit();
+
+ // Use MBean for verification
+ // Committed transaction count should increase as usual
+ assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
+
+ // Commit index should not advance because this does not go into the journal
+ assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex());
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+ }
+ };
+ }
+
+ @Test
+ public void testCommitWhenTransactionHasModifications(){
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWhenTransactionHasModifications");
+
+ waitUntilLeader(shard);
+
+ String transactionID = "tx1";
+ MutableCompositeModification modification = new MutableCompositeModification();
+ modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
+ DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+
+ FiniteDuration duration = duration("5 seconds");
+
+ // Simulate the ForwardedReadyTransaction messages that would be sent
+ // by the ShardTransaction.
+
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+ CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+ expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+
+ InOrder inOrder = inOrder(cohort);
+ inOrder.verify(cohort).canCommit();
+ inOrder.verify(cohort).preCommit();
+ inOrder.verify(cohort).commit();
+
+ // Use MBean for verification
+ // Committed transaction count should increase as usual
+ assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
+
+ // Commit index should advance as we do not have an empty modification
+ assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+ }
+ };
+ }
+
+ @Test
+ public void testCommitPhaseFailure() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitPhaseFailure");
+
+ waitUntilLeader(shard);
+
+ // Setup 2 mock cohorts. The first one fails in the commit phase.
+
+ final String transactionID1 = "tx1";
+ final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");