+ assertEquals("Response type", CommitTransactionReply.SERIALIZABLE_CLASS, resp.getClass());
+
+ verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
+
+ // Send another tx without immediate commit.
+
+ modification = dataTree.takeSnapshot().newModification();
+ MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
+ new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
+ modification.ready();
+
+ readyLocal = new ReadyLocalTransaction("tx-2" , modification, false);
+
+ carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
+ resp = followerTestKit.expectMsgClass(Object.class);
+ if(resp instanceof akka.actor.Status.Failure) {
+ throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
+ }
+
+ assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
+
+ ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
+ ((ReadyTransactionReply)resp).getCohortPath());
+
+ ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
+ leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2");
+ cohort.canCommit().get(5, TimeUnit.SECONDS);
+ cohort.preCommit().get(5, TimeUnit.SECONDS);
+ cohort.commit().get(5, TimeUnit.SECONDS);
+
+ verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
+ }
+
+ @Test
+ public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
+ initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
+ followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars");
+
+ Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars");
+ assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent());
+
+ carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
+ DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class);
+
+ // Send a tx with immediate commit.
+
+ DataTreeModification modification = dataTree.takeSnapshot().newModification();
+ new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
+ new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
+
+ MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+ new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
+
+ ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction("tx-1",
+ DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
+ Mockito.mock(ShardDataTreeTransactionParent.class), "tx-1", modification), true, true);
+
+ carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
+ Object resp = followerTestKit.expectMsgClass(Object.class);
+ if(resp instanceof akka.actor.Status.Failure) {
+ throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
+ }
+
+ assertEquals("Response type", CommitTransactionReply.SERIALIZABLE_CLASS, resp.getClass());
+
+ verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
+
+ // Send another tx without immediate commit.
+
+ modification = dataTree.takeSnapshot().newModification();
+ MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
+ new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
+
+ forwardedReady = new ForwardedReadyTransaction("tx-2",
+ DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
+ Mockito.mock(ShardDataTreeTransactionParent.class), "tx-2", modification), true, false);
+
+ carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
+ resp = followerTestKit.expectMsgClass(Object.class);
+ if(resp instanceof akka.actor.Status.Failure) {
+ throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
+ }
+
+ assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
+
+ ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
+ ((ReadyTransactionReply)resp).getCohortPath());
+
+ ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
+ leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2");
+ cohort.canCommit().get(5, TimeUnit.SECONDS);
+ cohort.preCommit().get(5, TimeUnit.SECONDS);
+ cohort.commit().get(5, TimeUnit.SECONDS);
+
+ verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
+ }
+
+ @Test
+ public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
+ initDatastoresWithCars("testTransactionForwardedToLeaderAfterRetry");
+
+ // Do an initial write to get the primary shard info cached.
+
+ DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ followerTestKit.doCommit(writeTx.ready());
+
+ // Wait for the commit to be replicated to the follower.
+
+ MemberNode.verifyRaftState(followerDistributedDataStore, "cars", new RaftStateVerifier() {
+ @Override
+ public void verify(OnDemandRaftState raftState) {
+ assertEquals("getLastApplied", 0, raftState.getLastApplied());
+ }
+ });
+
+ // Create and prepare wo and rw tx's.
+
+ writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+ MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+ writeTx.write(CarsModel.newCarPath("optima"), car1);
+
+ DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction();
+ MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
+ readWriteTx.write(CarsModel.newCarPath("sportage"), car2);
+
+ IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
+ @Override
+ public void verify(ShardStats stats) {
+ assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount());
+ }
+ });
+
+ // Disable elections on the leader so it switches to follower.
+
+ sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
+ customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
+ shardElectionTimeoutFactor(10));
+
+ leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars");