-
- @SuppressWarnings({ "unchecked" })
- @Test
- public void testPreLithiumConcurrentThreePhaseCommits() throws Throwable {
- new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testPreLithiumConcurrentThreePhaseCommits");
-
- waitUntilLeader(shard);
-
- // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
-
- InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
-
- String transactionID1 = "tx1";
- MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
-
- String transactionID2 = "tx2";
- MutableCompositeModification modification2 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
- TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
- modification2);
-
- String transactionID3 = "tx3";
- MutableCompositeModification modification3 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
- YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
- modification3);
-
- long timeoutSec = 5;
- final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
- final Timeout timeout = new Timeout(duration);
-
- // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID1, HELIUM_2_VERSION,
- cohort1, modification1, true), getRef());
- ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
- assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
-
- // Send the CanCommitTransaction message for the first Tx.
-
- shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
- CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
-
- // Send the ForwardedReadyTransaction for the next 2 Tx's.
-
- shard.tell(new ForwardedReadyTransaction(transactionID2, HELIUM_2_VERSION,
- cohort2, modification2, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
-
- shard.tell(new ForwardedReadyTransaction(transactionID3, HELIUM_2_VERSION,
- cohort3, modification3, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
-
- // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
- // processed after the first Tx completes.
-
- Future<Object> canCommitFuture1 = Patterns.ask(shard,
- new CanCommitTransaction(transactionID2).toSerializable(), timeout);
-
- Future<Object> canCommitFuture2 = Patterns.ask(shard,
- new CanCommitTransaction(transactionID3).toSerializable(), timeout);
-
- // Send the CommitTransaction message for the first Tx. After it completes, it should
- // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
-
- shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
- expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
-
- // Wait for the next 2 Tx's to complete.
-
- final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
- final CountDownLatch commitLatch = new CountDownLatch(2);
-
- class OnFutureComplete extends OnComplete<Object> {
- private final Class<?> expRespType;
-
- OnFutureComplete(final Class<?> expRespType) {
- this.expRespType = expRespType;
- }
-
- @Override
- public void onComplete(final Throwable error, final Object resp) {
- if(error != null) {
- caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
- } else {
- try {
- assertEquals("Commit response type", expRespType, resp.getClass());
- onSuccess(resp);
- } catch (Exception e) {
- caughtEx.set(e);
- }
- }
- }
-
- void onSuccess(final Object resp) throws Exception {
- }
- }
-
- class OnCommitFutureComplete extends OnFutureComplete {
- OnCommitFutureComplete() {
- super(CommitTransactionReply.SERIALIZABLE_CLASS);
- }
-
- @Override
- public void onComplete(final Throwable error, final Object resp) {
- super.onComplete(error, resp);
- commitLatch.countDown();
- }
- }
-
- class OnCanCommitFutureComplete extends OnFutureComplete {
- private final String transactionID;
-
- OnCanCommitFutureComplete(final String transactionID) {
- super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
- this.transactionID = transactionID;
- }
-
- @Override
- void onSuccess(final Object resp) throws Exception {
- CanCommitTransactionReply canCommitReply =
- CanCommitTransactionReply.fromSerializable(resp);
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
-
- Future<Object> commitFuture = Patterns.ask(shard,
- new CommitTransaction(transactionID).toSerializable(), timeout);
- commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
- }
- }
-
- canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
- getSystem().dispatcher());
-
- canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
- getSystem().dispatcher());
-
- boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
-
- if(caughtEx.get() != null) {
- throw caughtEx.get();
- }
-
- assertEquals("Commits complete", true, done);
-
- InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
- inOrder.verify(cohort1).canCommit();
- inOrder.verify(cohort1).preCommit();
- inOrder.verify(cohort1).commit();
- inOrder.verify(cohort2).canCommit();
- inOrder.verify(cohort2).preCommit();
- inOrder.verify(cohort2).commit();
- inOrder.verify(cohort3).canCommit();
- inOrder.verify(cohort3).preCommit();
- inOrder.verify(cohort3).commit();
-
- // Verify data in the data store.
-
- NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
- assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
- assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
- outerList.getValue() instanceof Iterable);
- Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
- assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
- entry instanceof MapEntryNode);
- MapEntryNode mapEntry = (MapEntryNode)entry;
- Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
- mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
- assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
- assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
-
- verifyLastApplied(shard, 2);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
- }};
- }