- txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
- fail("Expected NotInitializedException");
- } catch (final Exception e) {
- final Throwable root = Throwables.getRootCause(e);
- Throwables.throwIfUnchecked(root);
- throw new RuntimeException(root);
- } finally {
- blockRecoveryLatch.countDown();
- }
- }
- };
- }
-
- @Test(expected = NotInitializedException.class)
- @SuppressWarnings("checkstyle:IllegalCatch")
- public void testTransactionReadFailureWithShardNotInitialized() throws Exception {
- new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
- {
- final String testName = "testTransactionReadFailureWithShardNotInitialized";
- final String shardName = "test-1";
-
- // Set the shard initialization timeout low for the test.
- datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
-
- // Setup the InMemoryJournal to block shard recovery
- // indefinitely.
- final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
- final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
- InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
-
- InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
-
- try (AbstractDataStore dataStore = setupAbstractDataStore(
- testParameter, testName, false, shardName)) {
-
- // Create the read-write Tx
- final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
- assertNotNull("newReadWriteTransaction returned null", readWriteTx);
-
- // Do a read on the Tx on a separate thread.
- final AtomicReference<FluentFuture<Optional<NormalizedNode<?, ?>>>>
- txReadFuture = new AtomicReference<>();
- final AtomicReference<Exception> caughtEx = new AtomicReference<>();
- final CountDownLatch txReadDone = new CountDownLatch(1);
- final Thread txThread = new Thread(() -> {
- try {
- readWriteTx.write(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
- txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
-
- readWriteTx.close();
- } catch (Exception e) {
- caughtEx.set(e);
- } finally {
- txReadDone.countDown();
- }
- });
-
- txThread.start();
-
- // Wait for the Tx operations to complete.
- boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
- if (caughtEx.get() != null) {
- throw caughtEx.get();
- }
-
- assertEquals("Tx read done", true, done);
-
- // Wait for the read to complete. Since the shard never
- // initialized, the Tx should
- // have timed out and throw an appropriate exception cause.
- try {
- txReadFuture.get().get(5, TimeUnit.SECONDS);
- } catch (ExecutionException e) {
- assertTrue("Expected ReadFailedException cause: " + e.getCause(),
- e.getCause() instanceof ReadFailedException);
- final Throwable root = Throwables.getRootCause(e);
- Throwables.throwIfUnchecked(root);
- throw new RuntimeException(root);
- } finally {
- blockRecoveryLatch.countDown();