+ expectMsgClass(duration, CommitTransactionReply.class);
+ }
+ };
+ }
+
+ @Test
+ public void testAbortWithCommitPending() throws Exception {
+ new ShardTestKit(getSystem()) {
+ {
+ final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
+ @Override
+ void persistPayload(final Identifier id, final Payload payload,
+ final boolean batchHint) {
+ // Simulate an AbortTransaction message occurring during
+ // replication, after
+ // persisting and before finishing the commit to the
+ // in-memory store.
+
+ doAbortTransaction(id, null);
+ super.persistPayload(id, payload, batchHint);
+ }
+ };
+
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(Props
+ .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testAbortWithCommitPending");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ final TransactionIdentifier transactionID = nextTransactionId();
+
+ shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
+
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
+
+ // Since we're simulating an abort occurring during replication
+ // and before finish commit,
+ // the data should still get written to the in-memory store
+ // since we've gotten past
+ // canCommit and preCommit and persisted the data.
+ assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
+ }
+ };
+ }
+
+ @Test
+ public void testTransactionCommitTimeout() throws Exception {
+ dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testTransactionCommitTimeout");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ writeToStore(shard, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+
+ // Ready 2 Tx's - the first will timeout
+
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(
+ prepareBatchedModifications(transactionID1,
+ YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false),
+ getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
+ shard.tell(
+ prepareBatchedModifications(transactionID2, listNodePath,
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), false),
+ getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // canCommit 1st Tx. We don't send the commit so it should
+ // timeout.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
+
+ // canCommit the 2nd Tx - it should complete after the 1st Tx
+ // times out.
+
+ shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
+
+ // Try to commit the 1st Tx - should fail as it's not the
+ // current Tx.
+
+ shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+ // Commit the 2nd Tx.
+
+ shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
+ assertNotNull(listNodePath + " not found", node);
+ }
+ };
+ }
+
+// @Test
+// @Ignore
+// public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
+// dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
+//
+// new ShardTestKit(getSystem()) {{
+// final TestActorRef<Shard> shard = actorFactory.createTestActor(
+// newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+// "testTransactionCommitQueueCapacityExceeded");
+//
+// waitUntilLeader(shard);
+//
+// final FiniteDuration duration = duration("5 seconds");
+//
+// final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+//
+// final TransactionIdentifier transactionID1 = nextTransactionId();
+// final MutableCompositeModification modification1 = new MutableCompositeModification();
+// final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+// TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID1,
+// modification1);
+//
+// final TransactionIdentifier transactionID2 = nextTransactionId();
+// final MutableCompositeModification modification2 = new MutableCompositeModification();
+// final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+// TestModel.OUTER_LIST_PATH,
+// ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), transactionID2,
+// modification2);
+//
+// final TransactionIdentifier transactionID3 = nextTransactionId();
+// final MutableCompositeModification modification3 = new MutableCompositeModification();
+// final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+// TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID3,
+// modification3);
+//
+// // Ready the Tx's
+//
+// shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1,
+// modification1), getRef());
+// expectMsgClass(duration, ReadyTransactionReply.class);
+//
+// shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2,
+// modification2), getRef());
+// expectMsgClass(duration, ReadyTransactionReply.class);
+//
+// // The 3rd Tx should exceed queue capacity and fail.
+//
+// shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3,
+// modification3), getRef());
+// expectMsgClass(duration, akka.actor.Status.Failure.class);
+//
+// // canCommit 1st Tx.
+//
+// shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+// expectMsgClass(duration, CanCommitTransactionReply.class);
+//
+// // canCommit the 2nd Tx - it should get queued.
+//
+// shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+//
+// // canCommit the 3rd Tx - should exceed queue capacity and fail.
+//
+// shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
+// expectMsgClass(duration, akka.actor.Status.Failure.class);
+// }};
+// }
+
+ @Test
+ public void testTransactionCommitWithPriorExpiredCohortEntries() throws Exception {
+ dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testTransactionCommitWithPriorExpiredCohortEntries");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ final TransactionIdentifier transactionID3 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // All Tx's are readied. We'll send canCommit for the last one
+ // but not the others. The others
+ // should expire from the queue and the last one should be
+ // processed.
+
+ shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
+ }
+ };
+ }
+
+ @Test
+ public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Exception {
+ dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testTransactionCommitWithSubsequentExpiredCohortEntry");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // CanCommit the first Tx so it's the current in-progress Tx.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
+
+ // Ready the second Tx.
+
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Ready the third Tx.
+
+ final TransactionIdentifier transactionID3 = nextTransactionId();
+ final DataTreeModification modification3 = dataStore.newModification();
+ new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
+ .apply(modification3);
+ modification3.ready();
+ final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3,
+ true);
+ shard.tell(readyMessage, getRef());
+
+ // Commit the first Tx. After completing, the second should
+ // expire from the queue and the third
+ // Tx committed.
+
+ shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ // Expect commit reply from the third Tx.
+
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
+ assertNotNull(TestModel.TEST2_PATH + " not found", node);
+ }
+ };
+ }
+
+ @Test
+ public void testCanCommitBeforeReadyFailure() throws Exception {
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCanCommitBeforeReadyFailure");
+
+ shard.tell(new CanCommitTransaction(nextTransactionId(), CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+ }
+ };
+ }
+
+ @Test
+ public void testAbortAfterCanCommit() throws Exception {
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterCanCommit");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+ final Timeout timeout = new Timeout(duration);
+
+ // Ready 2 transactions - the first one will be aborted.
+
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);