@Override
ListenableFuture<PostCanCommitStep> process(CanCommit message) {
- return cohort.canCommit(message.getTxId(), message.getCandidates(), message.getSchema());
+ return cohort.canCommit(message.getTxId(), message.getSchema(), message.getCandidates());
}
@Override
import akka.actor.ActorRef;
import akka.pattern.Patterns;
import akka.util.Timeout;
+import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
TransactionIdentifier txId = nextTransactionId();
askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
- verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
+ verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
askAndAwait(cohortActor, new PreCommit(txId));
verify(mockPostCanCommit).preCommit();
resetMockCohort();
askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
- verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
+ verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
}
@Test
public void testAsyncCohort() throws Exception {
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
- doReturn(Futures.makeChecked(executeWithDelay(executor, mockPostCanCommit),
- ex -> new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock")))
- .when(mockCohort).canCommit(any(Object.class), any(Collection.class), any(SchemaContext.class));
+ doReturn(executeWithDelay(executor, mockPostCanCommit))
+ .when(mockCohort).canCommit(any(Object.class), any(SchemaContext.class), any(Collection.class));
doReturn(executor.submit(() -> mockPostPreCommit)).when(mockPostCanCommit).preCommit();
TransactionIdentifier txId = nextTransactionId();
askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
- verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
+ verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
askAndAwait(cohortActor, new PreCommit(txId));
verify(mockPostCanCommit).preCommit();
@Test
public void testFailureOnCanCommit() throws Exception {
DataValidationFailedException failure = new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock");
- doReturn(Futures.immediateFailedCheckedFuture(failure)).when(mockCohort).canCommit(any(Object.class),
- any(Collection.class), any(SchemaContext.class));
+ doReturn(FluentFutures.immediateFailedFluentFuture(failure)).when(mockCohort).canCommit(any(Object.class),
+ any(SchemaContext.class), any(Collection.class));
ActorRef cohortActor = newCohortActor("testFailureOnCanCommit");
resetMockCohort();
askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
- verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
+ verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
}
@Test
TransactionIdentifier txId = nextTransactionId();
askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
- verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
+ verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
askAndAwait(cohortActor, new Abort(txId));
verify(mockPostCanCommit).abort();
resetMockCohort();
askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
- verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
+ verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
}
@Test
TransactionIdentifier txId = nextTransactionId();
askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
- verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
+ verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
askAndAwait(cohortActor, new PreCommit(txId));
verify(mockPostCanCommit).preCommit();
verify(mockPostPreCommit).abort();
}
- private static <T> ListenableFuture<T> executeWithDelay(final ListeningExecutorService executor, final T result) {
- return executor.submit(() -> {
+ private static <T> FluentFuture<T> executeWithDelay(final ListeningExecutorService executor, final T result) {
+ return FluentFuture.from(executor.submit(() -> {
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
return result;
- });
+ }));
}
private ActorRef newCohortActor(final String name) {
reset(mockCohort);
doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(mockPostCanCommit).abort();
doReturn(Futures.immediateFuture(mockPostPreCommit)).when(mockPostCanCommit).preCommit();
- doReturn(Futures.immediateCheckedFuture(mockPostCanCommit)).when(mockCohort).canCommit(any(Object.class),
- any(Collection.class), any(SchemaContext.class));
+ doReturn(FluentFutures.immediateFluentFuture(mockPostCanCommit)).when(mockCohort).canCommit(any(Object.class),
+ any(SchemaContext.class), any(Collection.class));
doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(mockPostPreCommit).abort();
doReturn(Futures.immediateFuture(null)).when(mockPostPreCommit).commit();
import akka.testkit.javadsl.TestKit;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.FluentFuture;
import com.typesafe.config.ConfigFactory;
import java.math.BigInteger;
import java.util.Collection;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.ObjectRegistration;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
private static final DataValidationFailedException FAILED_CAN_COMMIT =
new DataValidationFailedException(YangInstanceIdentifier.class, TestModel.TEST_PATH, "Test failure.");
- private static final CheckedFuture<PostCanCommitStep, DataValidationFailedException> FAILED_CAN_COMMIT_FUTURE =
- Futures.immediateFailedCheckedFuture(FAILED_CAN_COMMIT);
+ private static final FluentFuture<PostCanCommitStep> FAILED_CAN_COMMIT_FUTURE =
+ FluentFutures.immediateFailedFluentFuture(FAILED_CAN_COMMIT);
private static final DOMDataTreeIdentifier TEST_ID =
new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
@Test
public void testSuccessfulCanCommitWithNoopPostStep() throws Exception {
final DOMDataTreeCommitCohort cohort = mock(DOMDataTreeCommitCohort.class);
- doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class),
- any(Collection.class), any(SchemaContext.class));
+ doReturn(PostCanCommitStep.NOOP_SUCCESSFUL_FUTURE).when(cohort).canCommit(any(Object.class),
+ any(SchemaContext.class), any(Collection.class));
ArgumentCaptor<Collection> candidateCapt = ArgumentCaptor.forClass(Collection.class);
IntegrationTestKit kit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
kit.testWriteTransaction(dataStore, TestModel.TEST_PATH, node);
- verify(cohort).canCommit(any(Object.class),
- candidateCapt.capture(), any(SchemaContext.class));
+ verify(cohort).canCommit(any(Object.class), any(SchemaContext.class), candidateCapt.capture());
assertDataTreeCandidate((DOMDataTreeCandidate) candidateCapt.getValue().iterator().next(), TEST_ID,
ModificationType.WRITE, Optional.of(node), Optional.absent());
reset(cohort);
- doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class),
- any(Collection.class), any(SchemaContext.class));
+ doReturn(PostCanCommitStep.NOOP_SUCCESSFUL_FUTURE).when(cohort).canCommit(any(Object.class),
+ any(SchemaContext.class), any(Collection.class));
kit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
- verify(cohort).canCommit(any(Object.class), any(Collection.class), any(SchemaContext.class));
+ verify(cohort).canCommit(any(Object.class), any(SchemaContext.class), any(Collection.class));
cohortReg.close();
final DOMDataTreeCommitCohort failedCohort = mock(DOMDataTreeCommitCohort.class);
doReturn(FAILED_CAN_COMMIT_FUTURE).when(failedCohort).canCommit(any(Object.class),
- any(Collection.class), any(SchemaContext.class));
+ any(SchemaContext.class), any(Collection.class));
IntegrationTestKit kit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
try (AbstractDataStore dataStore = kit.setupAbstractDataStore(
@Test
public void testCanCommitWithListEntries() throws Exception {
final DOMDataTreeCommitCohort cohort = mock(DOMDataTreeCommitCohort.class);
- doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class),
- any(Collection.class), any(SchemaContext.class));
+ doReturn(PostCanCommitStep.NOOP_SUCCESSFUL_FUTURE).when(cohort).canCommit(any(Object.class),
+ any(SchemaContext.class), any(Collection.class));
IntegrationTestKit kit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
try (AbstractDataStore dataStore = kit.setupAbstractDataStore(
kit.doCommit(writeTx.ready());
ArgumentCaptor<Collection> candidateCapture = ArgumentCaptor.forClass(Collection.class);
- verify(cohort).canCommit(any(Object.class), candidateCapture.capture(), any(SchemaContext.class));
+ verify(cohort).canCommit(any(Object.class), any(SchemaContext.class), candidateCapture.capture());
assertDataTreeCandidate((DOMDataTreeCandidate) candidateCapture.getValue().iterator().next(),
new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, optimaPath), ModificationType.WRITE,
Optional.of(optimaNode), Optional.absent());
// car entry (DELETE mod).
reset(cohort);
- doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class),
- any(Collection.class), any(SchemaContext.class));
+ doReturn(PostCanCommitStep.NOOP_SUCCESSFUL_FUTURE).when(cohort).canCommit(any(Object.class),
+ any(SchemaContext.class), any(Collection.class));
writeTx = dataStore.newWriteOnlyTransaction();
final YangInstanceIdentifier sportagePath = CarsModel.newCarPath("sportage");
kit.doCommit(writeTx.ready());
candidateCapture = ArgumentCaptor.forClass(Collection.class);
- verify(cohort).canCommit(any(Object.class), candidateCapture.capture(), any(SchemaContext.class));
+ verify(cohort).canCommit(any(Object.class), any(SchemaContext.class), candidateCapture.capture());
assertDataTreeCandidate(findCandidate(candidateCapture, sportagePath), new DOMDataTreeIdentifier(
LogicalDatastoreType.CONFIGURATION, sportagePath), ModificationType.WRITE,
// Delete the cars container - cohort should be invoked for the 2 deleted car entries.
reset(cohort);
- doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class),
- any(Collection.class), any(SchemaContext.class));
+ doReturn(PostCanCommitStep.NOOP_SUCCESSFUL_FUTURE).when(cohort).canCommit(any(Object.class),
+ any(SchemaContext.class), any(Collection.class));
writeTx = dataStore.newWriteOnlyTransaction();
writeTx.delete(CarsModel.BASE_PATH);
kit.doCommit(writeTx.ready());
candidateCapture = ArgumentCaptor.forClass(Collection.class);
- verify(cohort).canCommit(any(Object.class), candidateCapture.capture(), any(SchemaContext.class));
+ verify(cohort).canCommit(any(Object.class), any(SchemaContext.class), candidateCapture.capture());
assertDataTreeCandidate(findCandidate(candidateCapture, sportagePath), new DOMDataTreeIdentifier(
LogicalDatastoreType.CONFIGURATION, sportagePath), ModificationType.DELETE,
final PostCanCommitStep stepToAbort = mock(PostCanCommitStep.class);
doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(stepToAbort).abort();
doReturn(PostPreCommitStep.NOOP_FUTURE).when(stepToAbort).preCommit();
- doReturn(Futures.immediateCheckedFuture(stepToAbort)).when(cohortToAbort).canCommit(any(Object.class),
- any(Collection.class), any(SchemaContext.class));
+ doReturn(FluentFutures.immediateFluentFuture(stepToAbort)).when(cohortToAbort).canCommit(any(Object.class),
+ any(SchemaContext.class), any(Collection.class));
IntegrationTestKit kit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
try (AbstractDataStore dataStore = kit.setupAbstractDataStore(
package org.opendaylight.controller.clustering.it.provider;
import com.google.common.base.Verify;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.FluentFuture;
+import java.util.Collection;
import java.util.Optional;
import org.opendaylight.mdsal.common.api.DataValidationFailedException;
import org.opendaylight.mdsal.common.api.PostCanCommitStep;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
private static final NodeIdentifier YEAR_NODE_ID = new NodeIdentifier(YEAR_QNAME);
@Override
- public CheckedFuture<PostCanCommitStep, DataValidationFailedException> canCommit(final Object txId,
- final DOMDataTreeCandidate candidate, final SchemaContext ctx) {
+ public FluentFuture<PostCanCommitStep> canCommit(Object txId, SchemaContext ctx,
+ Collection<DOMDataTreeCandidate> candidates) {
- // Simple data validation - verify the year, if present, is >= 1990
+ for (DOMDataTreeCandidate candidate : candidates) {
+ // Simple data validation - verify the year, if present, is >= 1990
- final DataTreeCandidateNode rootNode = candidate.getRootNode();
- final Optional<NormalizedNode<?, ?>> dataAfter = rootNode.getDataAfter();
+ final DataTreeCandidateNode rootNode = candidate.getRootNode();
+ final Optional<NormalizedNode<?, ?>> dataAfter = rootNode.getDataAfter();
- LOG.info("In canCommit: modificationType: {}, dataBefore: {}, dataAfter: {}", rootNode.getModificationType(),
- rootNode.getDataBefore(), dataAfter);
+ LOG.info("In canCommit: modificationType: {}, dataBefore: {}, dataAfter: {}",
+ rootNode.getModificationType(), rootNode.getDataBefore(), dataAfter);
- // Note: we don't want to process DELETE modifications but we don't need to explicitly check the
- // ModificationType because dataAfter will not be present. Also dataAfter *should* always contain a
- // MapEntryNode but we verify anyway.
- if (dataAfter.isPresent()) {
- final NormalizedNode<?, ?> normalizedNode = dataAfter.get();
- Verify.verify(normalizedNode instanceof DataContainerNode, "Expected type DataContainerNode, actual was %s",
- normalizedNode.getClass());
- DataContainerNode<?> entryNode = (DataContainerNode<?>) normalizedNode;
- final Optional<DataContainerChild<? extends PathArgument, ?>> possibleYear =
- entryNode.getChild(YEAR_NODE_ID);
- if (possibleYear.isPresent()) {
- final Number year = (Number) possibleYear.get().getValue();
+ // Note: we don't want to process DELETE modifications but we don't need to explicitly check the
+ // ModificationType because dataAfter will not be present. Also dataAfter *should* always contain a
+ // MapEntryNode but we verify anyway.
+ if (dataAfter.isPresent()) {
+ final NormalizedNode<?, ?> normalizedNode = dataAfter.get();
+ Verify.verify(normalizedNode instanceof DataContainerNode,
+ "Expected type DataContainerNode, actual was %s", normalizedNode.getClass());
+ DataContainerNode<?> entryNode = (DataContainerNode<?>) normalizedNode;
+ final Optional<DataContainerChild<? extends PathArgument, ?>> possibleYear =
+ entryNode.getChild(YEAR_NODE_ID);
+ if (possibleYear.isPresent()) {
+ final Number year = (Number) possibleYear.get().getValue();
- LOG.info("year is {}", year);
+ LOG.info("year is {}", year);
- if (!(year.longValue() >= 1990)) {
- return Futures.immediateFailedCheckedFuture(new DataValidationFailedException(
- DOMDataTreeIdentifier.class, candidate.getRootPath(),
+ if (!(year.longValue() >= 1990)) {
+ return FluentFutures.immediateFailedFluentFuture(new DataValidationFailedException(
+ DOMDataTreeIdentifier.class, candidate.getRootPath(),
String.format("Invalid year %d - year must be >= 1990", year)));
+ }
}
}
}
// Return the noop PostCanCommitStep as we're only validating input data and not participating in the
// remaining 3PC stages (pre-commit and commit).
- return PostCanCommitStep.NOOP_SUCCESS_FUTURE;
+ return PostCanCommitStep.NOOP_SUCCESSFUL_FUTURE;
}
}