Details outlined in bug 2337.
Moved the publishing of the TransactonContext instance in
TransactionFutureCallback#onComplete after the cached operations are
executed.
Also fixed a timing issue in
DistributedDataStoreIntegrationTest#testChangeListenerRegistration that
caused intermittent failures. This was an issue with the test. The
listener registration is done async in the shard so the notification for
the first write commit could occur in the initial notification on
registration which screwed up the test. So I moved the first write
commit before the registration so we expect an initial notification.
Change-Id: Ied2af93be8165208b853c48e57312c3a5acbdaea
Signed-off-by: tpantelis <tpanteli@brocade.com>
return recordedOperationFutures;
}
+ @VisibleForTesting
+ boolean hasTransactionContext() {
+ for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
+ TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+ if(transactionContext != null) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
final YangInstanceIdentifier path) {
// respect to #addTxOperationOnComplete to handle timing issues and ensure no
// TransactionOperation is missed and that they are processed in the order they occurred.
synchronized(txOperationsOnComplete) {
+ // Store the new TransactionContext locally until we've completed invoking the
+ // TransactionOperations. This avoids thread timing issues which could cause
+ // out-of-order TransactionOperations. Eg, on a modification operation, if the
+ // TransactionContext is non-null, then we directly call the TransactionContext.
+ // However, at the same time, the code may be executing the cached
+ // TransactionOperations. So to avoid thus timing, we don't publish the
+ // TransactionContext until after we've executed all cached TransactionOperations.
+ TransactionContext localTransactionContext;
if(failure != null) {
LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
failure.getMessage());
- transactionContext = new NoOpTransactionContext(failure, identifier);
+ localTransactionContext = new NoOpTransactionContext(failure, identifier);
} else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
- createValidTransactionContext(CreateTransactionReply.fromSerializable(response));
+ localTransactionContext = createValidTransactionContext(
+ CreateTransactionReply.fromSerializable(response));
} else {
IllegalArgumentException exception = new IllegalArgumentException(String.format(
"Invalid reply type %s for CreateTransaction", response.getClass()));
- transactionContext = new NoOpTransactionContext(exception, identifier);
+ localTransactionContext = new NoOpTransactionContext(exception, identifier);
}
for(TransactionOperation oper: txOperationsOnComplete) {
- oper.invoke(transactionContext);
+ oper.invoke(localTransactionContext);
}
txOperationsOnComplete.clear();
+
+ // We're done invoking the TransactionOperations so we can now publish the
+ // TransactionContext.
+ transactionContext = localTransactionContext;
}
}
- private void createValidTransactionContext(CreateTransactionReply reply) {
+ private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
String transactionPath = reply.getTransactionPath();
LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath);
// Check if TxActor is created in the same node
boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
- transactionContext = new TransactionContextImpl(transactionPath, transactionActor, identifier,
+ return new TransactionContextImpl(transactionPath, transactionActor, identifier,
actorContext, schemaContext, isTxActorLocal, reply.getVersion());
}
}
DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
- Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
- assertEquals("canCommit", true, canCommit);
- cohort.preCommit().get(5, TimeUnit.SECONDS);
- cohort.commit().get(5, TimeUnit.SECONDS);
+ doCommit(cohort);
// Verify the data in the store
// 5. Commit the Tx
- Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
- assertEquals("canCommit", true, canCommit);
- cohort.preCommit().get(5, TimeUnit.SECONDS);
- cohort.commit().get(5, TimeUnit.SECONDS);
+ doCommit(cohort);
// 6. Verify the data in the store
// Wait for the Tx commit to complete.
- assertEquals("canCommit", true, txCohort.get().canCommit().get(5, TimeUnit.SECONDS));
- txCohort.get().preCommit().get(5, TimeUnit.SECONDS);
- txCohort.get().commit().get(5, TimeUnit.SECONDS);
+ doCommit(txCohort.get());
// Verify the data in the store
@Test
public void testTransactionChain() throws Exception{
- System.setProperty("shard.persistent", "true");
new IntegrationTestKit(getSystem()) {{
- DistributedDataStore dataStore =
- setupDistributedDataStore("transactionChainIntegrationTest", "test-1");
+ DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChain", "test-1");
// 1. Create a Tx chain and write-only Tx
assertEquals("Data node", outerNode, optional.get());
cleanup(dataStore);
- }
-
- private void doCommit(final DOMStoreThreePhaseCommitCohort cohort1) throws Exception {
- Boolean canCommit = cohort1.canCommit().get(5, TimeUnit.SECONDS);
- assertEquals("canCommit", true, canCommit);
- cohort1.preCommit().get(5, TimeUnit.SECONDS);
- cohort1.commit().get(5, TimeUnit.SECONDS);
}};
}
DistributedDataStore dataStore =
setupDistributedDataStore("testChangeListenerRegistration", "test-1");
- MockDataChangeListener listener = new MockDataChangeListener(3);
+ testWriteTransaction(dataStore, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ MockDataChangeListener listener = new MockDataChangeListener(1);
ListenerRegistration<MockDataChangeListener>
listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
assertNotNull("registerChangeListener returned null", listenerReg);
- testWriteTransaction(dataStore, TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ // Wait for the initial notification
+
+ listener.waitForChangeEvents(TestModel.TEST_PATH);
+
+ listener.reset(2);
+
+ // Write 2 updates.
testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
testWriteTransaction(dataStore, listPath,
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
- listener.waitForChangeEvents(TestModel.TEST_PATH, TestModel.OUTER_LIST_PATH, listPath );
+ // Wait for the 2 updates.
+
+ listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
listenerReg.close();
// 4. Commit the Tx
- Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
- assertEquals("canCommit", true, canCommit);
- cohort.preCommit().get(5, TimeUnit.SECONDS);
- cohort.commit().get(5, TimeUnit.SECONDS);
+ doCommit(cohort);
// 5. Verify the data in the store
assertEquals("Data node", nodeToWrite, optional.get());
}
+ void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
+ Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
+ assertEquals("canCommit", true, canCommit);
+ cohort.preCommit().get(5, TimeUnit.SECONDS);
+ cohort.commit().get(5, TimeUnit.SECONDS);
+ }
+
void cleanup(DistributedDataStore dataStore) {
dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Uninterruptibles;
/**
* A mock DataChangeListener implementation.
public class MockDataChangeListener implements
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
- private final List<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- changeList = Lists.newArrayList();
- private final CountDownLatch changeLatch;
- private final int expChangeEventCount;
+ private final List<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>> changeList =
+ Collections.synchronizedList(Lists.<AsyncDataChangeEvent<YangInstanceIdentifier,
+ NormalizedNode<?, ?>>>newArrayList());
+
+ private volatile CountDownLatch changeLatch;
+ private int expChangeEventCount;
public MockDataChangeListener(int expChangeEventCount) {
+ reset(expChangeEventCount);
+ }
+
+ public void reset(int expChangeEventCount) {
changeLatch = new CountDownLatch(expChangeEventCount);
this.expChangeEventCount = expChangeEventCount;
+ changeList.clear();
}
@Override
}
public void waitForChangeEvents(YangInstanceIdentifier... expPaths) {
- assertEquals("Change notifications complete", true,
- Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS));
+ boolean done = Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS);
+ if(!done) {
+ fail(String.format("Missing change notifications. Expected: %d. Actual: %d",
+ expChangeEventCount, (expChangeEventCount - changeLatch.getCount())));
+ }
+
+ assertEquals("Change notifications complete", true, done);
for(int i = 0; i < expPaths.length; i++) {
assertTrue(String.format("Change %d does not contain %s", (i+1), expPaths[i]),