- }
- };
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName)
- throws Exception {
- new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
- {
- final String shardName = "default";
-
- // We don't want the shard to become the leader so prevent shard
- // elections.
- datastoreContextBuilder.customRaftPolicyImplementation(
- "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
-
- // The ShardManager uses the election timeout for FindPrimary so
- // reset it low so it will timeout quickly.
- datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1)
- .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).frontendRequestTimeoutInSeconds(2);
-
- try (AbstractDataStore dataStore = setupAbstractDataStore(
- testParameter, testName, false, shardName)) {
-
- final Object result = dataStore.getActorContext().executeOperation(
- dataStore.getActorContext().getShardManager(), new FindLocalShard(shardName, true));
- assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
-
- // Create the write Tx.
- DOMStoreWriteTransaction writeTxToClose = null;
- try {
- writeTxToClose = writeOnly ? dataStore.newWriteOnlyTransaction()
- : dataStore.newReadWriteTransaction();
- final DOMStoreWriteTransaction writeTx = writeTxToClose;
- assertNotNull("newReadWriteTransaction returned null", writeTx);
-
- // Do some modifications and ready the Tx on a separate
- // thread.
- final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
- final AtomicReference<Exception> caughtEx = new AtomicReference<>();
- final CountDownLatch txReady = new CountDownLatch(1);
- final Thread txThread = new Thread(() -> {
- try {
- writeTx.write(TestModel.JUNK_PATH,
- ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
-
- txCohort.set(writeTx.ready());
- } catch (Exception e) {
- caughtEx.set(e);
- } finally {
- txReady.countDown();
- }
- });
-
- txThread.start();
-
- // Wait for the Tx operations to complete.
- boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
- if (caughtEx.get() != null) {
- throw caughtEx.get();
- }
-
- assertEquals("Tx ready", true, done);
-
- // Wait for the commit to complete. Since no shard
- // leader was elected in time, the Tx
- // should have timed out and throw an appropriate
- // exception cause.
- try {
- txCohort.get().canCommit().get(10, TimeUnit.SECONDS);
- fail("Expected NoShardLeaderException");
- } catch (final ExecutionException e) {
- final String msg = "Unexpected exception: "
- + Throwables.getStackTraceAsString(e.getCause());
- if (DistributedDataStore.class.equals(testParameter)) {
- assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException);
- } else {
- assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
- }
- }
- } finally {
- try {
- if (writeTxToClose != null) {
- writeTxToClose.close();
- }
- } catch (Exception e) {
- // FIXME TransactionProxy.close throws IllegalStateException:
- // Transaction is ready, it cannot be closed
- }