}
@Override
- public void readData(
- final YangInstanceIdentifier path,final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture ) {
+ public void readData(final YangInstanceIdentifier path,
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture ) {
LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
- // Send the remaining batched modifications if any.
+ // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
+ // public API contract.
sendAndRecordBatchedModifications();
- // If there were any previous recorded put/merge/delete operation reply Futures then we
- // must wait for them to successfully complete. This is necessary to honor the read
- // uncommitted semantics of the public API contract. If any one fails then fail the read.
-
- if(recordedOperationCount() == 0) {
- finishReadData(path, returnFuture);
- } else {
- LOG.debug("Tx {} readData: verifying {} previous recorded operations",
- getIdentifier(), recordedOperationCount());
-
- // Note: we make a copy of recordedOperationFutures to be on the safe side in case
- // Futures#sequence accesses the passed List on a different thread, as
- // recordedOperationFutures is not synchronized.
-
- Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
- copyRecordedOperationFutures(), actorContext.getClientDispatcher());
-
- OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
- @Override
- public void onComplete(Throwable failure, Iterable<Object> notUsed)
- throws Throwable {
- if(failure != null) {
- LOG.debug("Tx {} readData: a recorded operation failed: {}",
- getIdentifier(), failure);
- returnFuture.setException(new ReadFailedException(
- "The read could not be performed because a previous put, merge,"
- + "or delete operation failed", failure));
- } else {
- finishReadData(path, returnFuture);
- }
- }
- };
-
- combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher());
- }
-
- }
-
- private void finishReadData(final YangInstanceIdentifier path,
- final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
-
- LOG.debug("Tx {} finishReadData called path = {}", getIdentifier(), path);
-
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object readResponse) throws Throwable {
LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
- // Send the remaining batched modifications if any.
+ // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
+ // public API contract.
sendAndRecordBatchedModifications();
- // If there were any previous recorded put/merge/delete operation reply Futures then we
- // must wait for them to successfully complete. This is necessary to honor the read
- // uncommitted semantics of the public API contract. If any one fails then fail this
- // request.
-
- if(recordedOperationCount() == 0) {
- finishDataExists(path, returnFuture);
- } else {
- LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
- getIdentifier(), recordedOperationCount());
-
- // Note: we make a copy of recordedOperationFutures to be on the safe side in case
- // Futures#sequence accesses the passed List on a different thread, as
- // recordedOperationFutures is not synchronized.
-
- Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
- copyRecordedOperationFutures(),
- actorContext.getClientDispatcher());
- OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
- @Override
- public void onComplete(Throwable failure, Iterable<Object> notUsed)
- throws Throwable {
- if(failure != null) {
- LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
- getIdentifier(), failure);
- returnFuture.setException(new ReadFailedException(
- "The data exists could not be performed because a previous "
- + "put, merge, or delete operation failed", failure));
- } else {
- finishDataExists(path, returnFuture);
- }
- }
- };
-
- combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher());
- }
- }
-
- private void finishDataExists(final YangInstanceIdentifier path,
- final SettableFuture<Boolean> returnFuture) {
-
- LOG.debug("Tx {} finishDataExists called path = {}", getIdentifier(), path);
-
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object response) throws Throwable {
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
testReadWithExceptionOnInitialCreateTransaction(new TestException());
}
- @Test(expected = TestException.class)
- public void testReadWithPriorRecordingOperationFailure() throws Throwable {
- doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
- when(mockActorContext).getDatastoreContext();
-
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
- NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- expectFailedBatchedModifications(actorRef);
-
- doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedReadData());
-
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- transactionProxy.delete(TestModel.TEST_PATH);
-
- try {
- propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
- } finally {
- verify(mockActorContext, times(0)).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedReadData());
- }
- }
-
@Test
public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
}
- @Test(expected = TestException.class)
- public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
- doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
- when(mockActorContext).getDatastoreContext();
-
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
- NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- expectFailedBatchedModifications(actorRef);
-
- doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedDataExists());
-
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE);
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- transactionProxy.delete(TestModel.TEST_PATH);
-
- try {
- propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
- } finally {
- verify(mockActorContext, times(0)).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedDataExists());
- }
- }
-
@Test
public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);