- protected ActorSystem getSystem() {
- return system;
- }
-
- @Test
- public void testWriteTransactionWithSingleShard() throws Exception{
- new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
- try (DistributedDataStore dataStore =
- setupDistributedDataStore("transactionIntegrationTest", "test-1")) {
-
- testWriteTransaction(dataStore, TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
- testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
- }
- }};
- }
-
- @Test
- public void testWriteTransactionWithMultipleShards() throws Exception{
- new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
- try (DistributedDataStore dataStore =
- setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
-
- DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
- assertNotNull("newWriteOnlyTransaction returned null", writeTx);
-
- writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
- writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
-
- doCommit(writeTx.ready());
-
- writeTx = dataStore.newWriteOnlyTransaction();
-
- writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
- writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
-
- doCommit(writeTx.ready());
-
- writeTx = dataStore.newWriteOnlyTransaction();
-
- MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
- YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
- writeTx.write(carPath, car);
-
- MapEntryNode person = PeopleModel.newPersonEntry("jack");
- YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
- writeTx.write(personPath, person);
-
- doCommit(writeTx.ready());
-
- // Verify the data in the store
-
- DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
-
- Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
- assertEquals("isPresent", true, optional.isPresent());
- assertEquals("Data node", car, optional.get());
-
- optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
- assertEquals("isPresent", true, optional.isPresent());
- assertEquals("Data node", person, optional.get());
- }
- }};
- }
-
- @Test
- public void testReadWriteTransactionWithSingleShard() throws Exception{
- System.setProperty("shard.persistent", "true");
- new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
- try (DistributedDataStore dataStore =
- setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1")) {
-
- // 1. Create a read-write Tx
-
- DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
- assertNotNull("newReadWriteTransaction returned null", readWriteTx);
-
- // 2. Write some data
-
- YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
- NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- readWriteTx.write(nodePath, nodeToWrite );
-
- // 3. Read the data from Tx
-
- Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
- assertEquals("exists", true, exists);
-
- Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
- assertEquals("isPresent", true, optional.isPresent());
- assertEquals("Data node", nodeToWrite, optional.get());
-
- // 4. Ready the Tx for commit
-
- DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
-
- // 5. Commit the Tx
-
- doCommit(cohort);
-
- // 6. Verify the data in the store
-
- DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
-
- optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
- assertEquals("isPresent", true, optional.isPresent());
- assertEquals("Data node", nodeToWrite, optional.get());
- }
- }};
- }
-
- @Test
- public void testReadWriteTransactionWithMultipleShards() throws Exception{
- new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
- try (DistributedDataStore dataStore =
- setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
-
- DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
- assertNotNull("newReadWriteTransaction returned null", readWriteTx);
-
- readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
- readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
-
- doCommit(readWriteTx.ready());
-
- readWriteTx = dataStore.newReadWriteTransaction();
-
- readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
- readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
-
- doCommit(readWriteTx.ready());
-
- readWriteTx = dataStore.newReadWriteTransaction();
-
- MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
- YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
- readWriteTx.write(carPath, car);
-
- MapEntryNode person = PeopleModel.newPersonEntry("jack");
- YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
- readWriteTx.write(personPath, person);
-
- Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
- assertEquals("exists", true, exists);
-
- Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
- assertEquals("isPresent", true, optional.isPresent());
- assertEquals("Data node", car, optional.get());
-
- doCommit(readWriteTx.ready());
-
- // Verify the data in the store
-
- DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
-
- optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
- assertEquals("isPresent", true, optional.isPresent());
- assertEquals("Data node", car, optional.get());
-
- optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
- assertEquals("isPresent", true, optional.isPresent());
- assertEquals("Data node", person, optional.get());
-
- }
- }};
- }
-
- @Test
- public void testSingleTransactionsWritesInQuickSuccession() throws Exception{
- new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
- try (DistributedDataStore dataStore = setupDistributedDataStore(
- "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void testTransactionWritesWithShardNotInitiallyReady(final String testName, final boolean writeOnly)
+ throws Exception {
+ final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
+ final String shardName = "test-1";
+
+ // Setup the InMemoryJournal to block shard recovery to ensure
+ // the shard isn't
+ // initialized until we create and submit the write the Tx.
+ final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
+ final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
+ InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
+
+ try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
+ testParameter, testName, false, shardName)) {
+
+ // Create the write Tx
+ final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
+ : dataStore.newReadWriteTransaction();
+ assertNotNull("newReadWriteTransaction returned null", writeTx);
+
+ // Do some modification operations and ready the Tx on a
+ // separate thread.
+ final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier
+ .builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
+
+ final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
+ final AtomicReference<Exception> caughtEx = new AtomicReference<>();
+ final CountDownLatch txReady = new CountDownLatch(1);
+ final Thread txThread = new Thread(() -> {
+ try {
+ writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));