import com.typesafe.config.ConfigFactory;
import java.math.BigInteger;
import java.util.Arrays;
+import java.util.LinkedList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.junit.After;
@Test
public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
+ followerDatastoreContextBuilder.shardBatchedModificationCount(2);
+ leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
initDatastoresWithCars("testTransactionForwardedToLeaderAfterRetry");
// Do an initial write to get the primary shard info cached.
- DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
- writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
- writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
- followerTestKit.doCommit(writeTx.ready());
+ DOMStoreWriteTransaction writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction();
+ writeTx1.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx1.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ followerTestKit.doCommit(writeTx1.ready());
// Wait for the commit to be replicated to the follower.
// Create and prepare wo and rw tx's.
- writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
- MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
- writeTx.write(CarsModel.newCarPath("optima"), car1);
+ writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction();
+ LinkedList<MapEntryNode> cars = new LinkedList<>();
+ int carIndex = 1;
+ for(; carIndex <= 5; carIndex++) {
+ cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
+ writeTx1.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
+ }
+
+ DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction();
+ cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
+ writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
+ carIndex++;
DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction();
- MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
- readWriteTx.write(CarsModel.newCarPath("sportage"), car2);
+ cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
+ readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
@Override
// Submit tx's and enable elections on the follower so it becomes the leader, at which point the
// readied tx's should get forwarded from the previous leader.
- DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
- DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
+ DOMStoreThreePhaseCommitCohort cohort1 = writeTx1.ready();
+ DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
+ DOMStoreThreePhaseCommitCohort cohort3 = readWriteTx.ready();
sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1));
followerTestKit.doCommit(cohort1);
followerTestKit.doCommit(cohort2);
+ followerTestKit.doCommit(cohort3);
- verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
+ verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), cars.toArray(new MapEntryNode[cars.size()]));
}
@Test