DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
- Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
- assertEquals("canCommit", true, canCommit);
- cohort.preCommit().get(5, TimeUnit.SECONDS);
- cohort.commit().get(5, TimeUnit.SECONDS);
+ doCommit(cohort);
// Verify the data in the store
// 5. Commit the Tx
- Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
- assertEquals("canCommit", true, canCommit);
- cohort.preCommit().get(5, TimeUnit.SECONDS);
- cohort.commit().get(5, TimeUnit.SECONDS);
+ doCommit(cohort);
// 6. Verify the data in the store
// Wait for the Tx commit to complete.
- assertEquals("canCommit", true, txCohort.get().canCommit().get(5, TimeUnit.SECONDS));
- txCohort.get().preCommit().get(5, TimeUnit.SECONDS);
- txCohort.get().commit().get(5, TimeUnit.SECONDS);
+ doCommit(txCohort.get());
// Verify the data in the store
@Test
public void testTransactionChain() throws Exception{
- System.setProperty("shard.persistent", "true");
new IntegrationTestKit(getSystem()) {{
- DistributedDataStore dataStore =
- setupDistributedDataStore("transactionChainIntegrationTest", "test-1");
+ DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChain", "test-1");
// 1. Create a Tx chain and write-only Tx
assertEquals("Data node", outerNode, optional.get());
cleanup(dataStore);
- }
-
- private void doCommit(final DOMStoreThreePhaseCommitCohort cohort1) throws Exception {
- Boolean canCommit = cohort1.canCommit().get(5, TimeUnit.SECONDS);
- assertEquals("canCommit", true, canCommit);
- cohort1.preCommit().get(5, TimeUnit.SECONDS);
- cohort1.commit().get(5, TimeUnit.SECONDS);
}};
}
DistributedDataStore dataStore =
setupDistributedDataStore("testChangeListenerRegistration", "test-1");
- MockDataChangeListener listener = new MockDataChangeListener(3);
+ testWriteTransaction(dataStore, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ MockDataChangeListener listener = new MockDataChangeListener(1);
ListenerRegistration<MockDataChangeListener>
listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
assertNotNull("registerChangeListener returned null", listenerReg);
- testWriteTransaction(dataStore, TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ // Wait for the initial notification
+
+ listener.waitForChangeEvents(TestModel.TEST_PATH);
+
+ listener.reset(2);
+
+ // Write 2 updates.
testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
testWriteTransaction(dataStore, listPath,
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
- listener.waitForChangeEvents(TestModel.TEST_PATH, TestModel.OUTER_LIST_PATH, listPath );
+ // Wait for the 2 updates.
+
+ listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
listenerReg.close();
// 4. Commit the Tx
- Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
- assertEquals("canCommit", true, canCommit);
- cohort.preCommit().get(5, TimeUnit.SECONDS);
- cohort.commit().get(5, TimeUnit.SECONDS);
+ doCommit(cohort);
// 5. Verify the data in the store
assertEquals("Data node", nodeToWrite, optional.get());
}
+ void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
+ Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
+ assertEquals("canCommit", true, canCommit);
+ cohort.preCommit().get(5, TimeUnit.SECONDS);
+ cohort.commit().get(5, TimeUnit.SECONDS);
+ }
+
void cleanup(DistributedDataStore dataStore) {
dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
}