+
+ @Test(expected=NoShardLeaderException.class)
+ public void testTransactionWithIsolatedLeader() throws Throwable {
+ leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(300);
+ String testName = "testTransactionWithIsolatedLeader";
+ initDatastores(testName);
+
+ JavaTestKit.shutdownActorSystem(followerSystem, null, true);
+
+ Uninterruptibles.sleepUninterruptibly(leaderDistributedDataStore.getActorContext().getDatastoreContext()
+ .getShardRaftConfig().getElectionTimeOutInterval().toMillis() * 3, TimeUnit.MILLISECONDS);
+
+ DOMStoreWriteTransaction writeTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+ try {
+ followerTestKit.doCommit(writeTx.ready());
+ } catch (ExecutionException e) {
+ throw e.getCause();
+ }
+ }
+
+ @Test(expected=AskTimeoutException.class)
+ public void testTransactionWithShardLeaderNotResponding() throws Throwable {
+ followerDatastoreContextBuilder.shardElectionTimeoutFactor(30);
+ initDatastores("testTransactionWithShardLeaderNotResponding");
+
+ // Do an initial read to get the primary shard info cached.
+
+ DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
+ readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
+
+ // Shutdown the leader and try to create a new tx.
+
+ JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+
+ followerDatastoreContextBuilder.operationTimeoutInMillis(50).shardElectionTimeoutFactor(1);
+ sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
+
+ DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
+
+ rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+ try {
+ followerTestKit.doCommit(rwTx.ready());
+ } catch (ExecutionException e) {
+ assertTrue("Expected ShardLeaderNotRespondingException cause. Actual: " + e.getCause(),
+ e.getCause() instanceof ShardLeaderNotRespondingException);
+ assertNotNull("Expected a nested cause", e.getCause().getCause());
+ throw e.getCause().getCause();
+ }
+ }
+
+ @Test(expected=NoShardLeaderException.class)
+ public void testTransactionWithCreateTxFailureDueToNoLeader() throws Throwable {
+ initDatastores("testTransactionWithCreateTxFailureDueToNoLeader");
+
+ // Do an initial read to get the primary shard info cached.
+
+ DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
+ readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
+
+ // Shutdown the leader and try to create a new tx.
+
+ JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+
+ followerDatastoreContextBuilder.operationTimeoutInMillis(10).shardElectionTimeoutFactor(1);
+ sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
+
+ DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
+
+ rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+ try {
+ followerTestKit.doCommit(rwTx.ready());
+ } catch (ExecutionException e) {
+ throw e.getCause();
+ }
+ }
+
+ @Test
+ public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception {
+ followerDatastoreContextBuilder.shardElectionTimeoutFactor(30);
+ String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx";
+ initDatastores(testName, MODULE_SHARDS_CONFIG_3);
+
+ DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
+ IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, follower2DatastoreContextBuilder);
+ follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG_3, false, SHARD_NAMES);
+
+ // Do an initial read to get the primary shard info cached.
+
+ DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
+ readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
+
+ // Shutdown the leader and try to create a new tx.
+
+ JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+
+ followerDatastoreContextBuilder.operationTimeoutInMillis(500);
+ sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
+
+ DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
+
+ rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+ followerTestKit.doCommit(rwTx.ready());
+ }
+
+ private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) {
+ DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
+ Answer<DatastoreContext> answer = new Answer<DatastoreContext>() {
+ @Override
+ public DatastoreContext answer(InvocationOnMock invocation) {
+ return builder.build();
+ }
+ };
+ Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();
+ Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
+ dataStore.onDatastoreContextUpdated(mockContextFactory);
+ }