import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
*
*/
class CompositeDataTreeCohort {
+ private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
private enum State {
/**
}
void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
+ LOG.debug("{}: canCommit - candidate: {}", txId, tip);
+
Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
+
+ LOG.debug("{}: canCommit - messages: {}", txId, messages);
+
// FIXME: Optimize empty collection list with pre-created futures, containing success.
Future<Iterable<Object>> canCommitsFuture = Futures.traverse(messages,
input -> Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
}
void preCommit() throws ExecutionException, TimeoutException {
+ LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
+
Preconditions.checkState(successfulFromPrevious != null);
Future<Iterable<Object>> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId));
changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
}
void commit() throws ExecutionException, TimeoutException {
+ LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
+
Preconditions.checkState(successfulFromPrevious != null);
Future<Iterable<Object>> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId));
changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
}
Optional<Future<Iterable<Object>>> abort() {
+ LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
+
state = State.ABORTED;
if (successfulFromPrevious != null && !Iterables.isEmpty(successfulFromPrevious)) {
return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId)));
}
private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
+ LOG.debug("{}: sendMesageToSuccessful: {}", txId, message);
+
return Futures.traverse(successfulFromPrevious, cohortResponse -> Patterns.ask(
cohortResponse.getCohort(), message, timeout), ExecutionContexts.global());
}
@SuppressWarnings("checkstyle:IllegalCatch")
private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState,
final State afterState) throws TimeoutException, ExecutionException {
+ LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
+
final Iterable<Object> results;
try {
results = Await.result(resultsFuture, timeout.duration());
} catch (Exception e) {
successfulFromPrevious = null;
+ LOG.debug("{}: processResponses - error from Future", txId, e);
Throwables.propagateIfInstanceOf(e, TimeoutException.class);
throw Throwables.propagate(e);
}
Iterable<Failure> failed = Iterables.filter(results, Status.Failure.class);
Iterable<Success> successful = Iterables.filter(results, DataTreeCohortActor.Success.class);
+
+ LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
+
successfulFromPrevious = successful;
if (!Iterables.isEmpty(failed)) {
changeStateFrom(currentState, State.FAILED);
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.AddressFromURIString;
import akka.cluster.Cluster;
import akka.testkit.JavaTestKit;
+import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
+import java.math.BigInteger;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.mdsal.common.api.DataValidationFailedException;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.common.api.PostCanCommitStep;
+import org.opendaylight.mdsal.common.api.PostPreCommitStep;
import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.yangtools.concepts.ObjectRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
return system;
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Test
- public void registerNoopCohortTest() throws Exception {
+ public void testSuccessfulCanCommitWithNoopPostStep() throws Exception {
final DOMDataTreeCommitCohort cohort = mock(DOMDataTreeCommitCohort.class);
- Mockito.doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class),
- any(DOMDataTreeCandidate.class), any(SchemaContext.class));
- ArgumentCaptor<DOMDataTreeCandidate> candidateCapt = ArgumentCaptor.forClass(DOMDataTreeCandidate.class);
- new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
- {
- try (AbstractDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest",
- "test-1")) {
- final ObjectRegistration<DOMDataTreeCommitCohort> cohortReg =
- dataStore.registerCommitCohort(TEST_ID, cohort);
- Thread.sleep(1000); // Registration is asynchronous
- assertNotNull(cohortReg);
- testWriteTransaction(dataStore, TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- Mockito.verify(cohort).canCommit(any(Object.class), candidateCapt.capture(),
- any(SchemaContext.class));
- DOMDataTreeCandidate candidate = candidateCapt.getValue();
- assertNotNull(candidate);
- assertEquals(TEST_ID, candidate.getRootPath());
- testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
- Mockito.verify(cohort, Mockito.times(2)).canCommit(any(Object.class),
- any(DOMDataTreeCandidate.class), any(SchemaContext.class));
- cohortReg.close();
- testWriteTransaction(dataStore, TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- Mockito.verifyNoMoreInteractions(cohort);
- }
- }
- };
+ doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class),
+ any(Collection.class), any(SchemaContext.class));
+ ArgumentCaptor<Collection> candidateCapt = ArgumentCaptor.forClass(Collection.class);
+ IntegrationTestKit kit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
+
+ try (AbstractDataStore dataStore = kit.setupDistributedDataStore("testSuccessfulCanCommitWithNoopPostStep",
+ "test-1")) {
+ final ObjectRegistration<DOMDataTreeCommitCohort> cohortReg = dataStore.registerCommitCohort(TEST_ID,
+ cohort);
+ assertNotNull(cohortReg);
+
+ IntegrationTestKit.verifyShardState(dataStore, "test-1",
+ state -> assertEquals("Cohort registrations", 1, state.getCommitCohortActors().size()));
+
+ final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ kit.testWriteTransaction(dataStore, TestModel.TEST_PATH, node);
+ verify(cohort).canCommit(any(Object.class),
+ candidateCapt.capture(), any(SchemaContext.class));
+ assertDataTreeCandidate((DOMDataTreeCandidate) candidateCapt.getValue().iterator().next(), TEST_ID,
+ ModificationType.WRITE, Optional.of(node), Optional.absent());
+
+ reset(cohort);
+ doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class),
+ any(Collection.class), any(SchemaContext.class));
+
+ kit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+ verify(cohort).canCommit(any(Object.class), any(Collection.class), any(SchemaContext.class));
+
+ cohortReg.close();
+
+ IntegrationTestKit.verifyShardState(dataStore, "test-1",
+ state -> assertEquals("Cohort registrations", 0, state.getCommitCohortActors().size()));
+
+ kit.testWriteTransaction(dataStore, TestModel.TEST_PATH, node);
+ verifyNoMoreInteractions(cohort);
+ }
}
+ @SuppressWarnings("unchecked")
@Test
- @SuppressWarnings("checkstyle:IllegalCatch")
- public void failCanCommitTest() throws Exception {
+ public void testFailedCanCommit() throws Exception {
final DOMDataTreeCommitCohort failedCohort = mock(DOMDataTreeCommitCohort.class);
- Mockito.doReturn(FAILED_CAN_COMMIT_FUTURE).when(failedCohort).canCommit(any(Object.class),
- any(DOMDataTreeCandidate.class), any(SchemaContext.class));
-
- new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
- {
- try (AbstractDataStore dataStore =
- setupDistributedDataStore("transactionIntegrationTest", "test-1")) {
- dataStore.registerCommitCohort(TEST_ID, failedCohort);
- Thread.sleep(1000); // Registration is asynchronous
-
- DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
- writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- DOMStoreThreePhaseCommitCohort dsCohort = writeTx.ready();
- try {
- // FIXME: Weird thing is that invoking canCommit on front-end invokes also
- // preCommit on backend.
- dsCohort.canCommit().get();
- fail("Exception should be raised.");
- } catch (Exception e) {
- assertSame(FAILED_CAN_COMMIT, Throwables.getRootCause(e));
- }
- }
+ doReturn(FAILED_CAN_COMMIT_FUTURE).when(failedCohort).canCommit(any(Object.class),
+ any(Collection.class), any(SchemaContext.class));
+
+ IntegrationTestKit kit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
+ try (AbstractDataStore dataStore = kit.setupDistributedDataStore("testFailedCanCommit", "test-1")) {
+ dataStore.registerCommitCohort(TEST_ID, failedCohort);
+
+ IntegrationTestKit.verifyShardState(dataStore, "test-1",
+ state -> assertEquals("Cohort registrations", 1, state.getCommitCohortActors().size()));
+
+ DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+ writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ DOMStoreThreePhaseCommitCohort dsCohort = writeTx.ready();
+ try {
+ dsCohort.canCommit().get(5, TimeUnit.SECONDS);
+ fail("Exception should be raised.");
+ } catch (ExecutionException e) {
+ assertSame(FAILED_CAN_COMMIT, Throwables.getRootCause(e));
+ }
+ }
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test
+ public void testCanCommitWithListEntries() throws Exception {
+ final DOMDataTreeCommitCohort cohort = mock(DOMDataTreeCommitCohort.class);
+ doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class),
+ any(Collection.class), any(SchemaContext.class));
+ IntegrationTestKit kit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
+
+ try (AbstractDataStore dataStore = kit.setupDistributedDataStore("testCanCommitWithMultipleListEntries",
+ "cars-1")) {
+ final ObjectRegistration<DOMDataTreeCommitCohort> cohortReg = dataStore.registerCommitCohort(
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH
+ .node(CarsModel.CAR_QNAME)), cohort);
+ assertNotNull(cohortReg);
+
+ IntegrationTestKit.verifyShardState(dataStore, "cars-1",
+ state -> assertEquals("Cohort registrations", 1, state.getCommitCohortActors().size()));
+
+ // First write an empty base container and verify the cohort isn't invoked.
+
+ DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ kit.doCommit(writeTx.ready());
+ verifyNoMoreInteractions(cohort);
+
+ // Write a single car entry and verify the cohort is invoked.
+
+ writeTx = dataStore.newWriteOnlyTransaction();
+ final YangInstanceIdentifier optimaPath = CarsModel.newCarPath("optima");
+ final MapEntryNode optimaNode = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+ writeTx.write(optimaPath, optimaNode);
+ kit.doCommit(writeTx.ready());
+
+ ArgumentCaptor<Collection> candidateCapture = ArgumentCaptor.forClass(Collection.class);
+ verify(cohort).canCommit(any(Object.class), candidateCapture.capture(), any(SchemaContext.class));
+ assertDataTreeCandidate((DOMDataTreeCandidate) candidateCapture.getValue().iterator().next(),
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, optimaPath), ModificationType.WRITE,
+ Optional.of(optimaNode), Optional.absent());
+
+ // Write replace the cars container with 2 new car entries. The cohort should get invoked with 3
+ // DOMDataTreeCandidates: once for each of the 2 new car entries (WRITE mod) and once for the deleted prior
+ // car entry (DELETE mod).
+
+ reset(cohort);
+ doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class),
+ any(Collection.class), any(SchemaContext.class));
+
+ writeTx = dataStore.newWriteOnlyTransaction();
+ final YangInstanceIdentifier sportagePath = CarsModel.newCarPath("sportage");
+ final MapEntryNode sportageNode = CarsModel.newCarEntry("sportage", BigInteger.valueOf(20000));
+ final YangInstanceIdentifier soulPath = CarsModel.newCarPath("soul");
+ final MapEntryNode soulNode = CarsModel.newCarEntry("soul", BigInteger.valueOf(20000));
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.newCarsNode(CarsModel.newCarsMapNode(sportageNode,soulNode)));
+ kit.doCommit(writeTx.ready());
+
+ candidateCapture = ArgumentCaptor.forClass(Collection.class);
+ verify(cohort).canCommit(any(Object.class), candidateCapture.capture(), any(SchemaContext.class));
+
+ assertDataTreeCandidate(findCandidate(candidateCapture, sportagePath), new DOMDataTreeIdentifier(
+ LogicalDatastoreType.CONFIGURATION, sportagePath), ModificationType.WRITE,
+ Optional.of(sportageNode), Optional.absent());
+
+ assertDataTreeCandidate(findCandidate(candidateCapture, soulPath), new DOMDataTreeIdentifier(
+ LogicalDatastoreType.CONFIGURATION, soulPath), ModificationType.WRITE,
+ Optional.of(soulNode), Optional.absent());
+
+ assertDataTreeCandidate(findCandidate(candidateCapture, optimaPath), new DOMDataTreeIdentifier(
+ LogicalDatastoreType.CONFIGURATION, optimaPath), ModificationType.DELETE,
+ Optional.absent(), Optional.of(optimaNode));
+
+ // Delete the cars container - cohort should be invoked for the 2 deleted car entries.
+
+ reset(cohort);
+ doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class),
+ any(Collection.class), any(SchemaContext.class));
+
+ writeTx = dataStore.newWriteOnlyTransaction();
+ writeTx.delete(CarsModel.BASE_PATH);
+ kit.doCommit(writeTx.ready());
+
+ candidateCapture = ArgumentCaptor.forClass(Collection.class);
+ verify(cohort).canCommit(any(Object.class), candidateCapture.capture(), any(SchemaContext.class));
+
+ assertDataTreeCandidate(findCandidate(candidateCapture, sportagePath), new DOMDataTreeIdentifier(
+ LogicalDatastoreType.CONFIGURATION, sportagePath), ModificationType.DELETE,
+ Optional.absent(), Optional.of(sportageNode));
+
+ assertDataTreeCandidate(findCandidate(candidateCapture, soulPath), new DOMDataTreeIdentifier(
+ LogicalDatastoreType.CONFIGURATION, soulPath), ModificationType.DELETE,
+ Optional.absent(), Optional.of(soulNode));
+
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static DOMDataTreeCandidate findCandidate(ArgumentCaptor<Collection> candidateCapture,
+ YangInstanceIdentifier rootPath) {
+ for (Object obj: candidateCapture.getValue()) {
+ DOMDataTreeCandidate candidate = (DOMDataTreeCandidate)obj;
+ if (rootPath.equals(candidate.getRootPath().getRootIdentifier())) {
+ return candidate;
}
- };
+ }
+
+ return null;
}
/**
- * FIXME: Weird thing is that invoking canCommit on front-end invokes also preCommit on backend
- * so we can not test abort after can commit.
+ * FIXME: Since we invoke DOMDataTreeCommitCohort#canCommit on preCommit (as that's when we generate a
+ * DataTreeCandidate) and since currently preCommit is a noop in the Shard backend (it is combined with commit),
+ * we can't actually test abort after canCommit.
*/
+ @SuppressWarnings("unchecked")
@Test
@Ignore
- public void canCommitSuccessExternallyAborted() throws Exception {
+ public void testAbortAfterCanCommit() throws Exception {
final DOMDataTreeCommitCohort cohortToAbort = mock(DOMDataTreeCommitCohort.class);
final PostCanCommitStep stepToAbort = mock(PostCanCommitStep.class);
- Mockito.doReturn(Futures.immediateCheckedFuture(stepToAbort)).when(cohortToAbort).canCommit(any(Object.class),
- any(DOMDataTreeCandidate.class), any(SchemaContext.class));
- Mockito.doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(stepToAbort).abort();
- new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
- {
- try (AbstractDataStore dataStore =
- setupDistributedDataStore("transactionIntegrationTest", "test-1")) {
- dataStore.registerCommitCohort(TEST_ID, cohortToAbort);
- Thread.sleep(1000); // Registration is asynchronous
-
- DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
- writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- DOMStoreThreePhaseCommitCohort dsCohort = writeTx.ready();
-
- dsCohort.canCommit().get();
- dsCohort.abort().get();
- Mockito.verify(stepToAbort, Mockito.times(1)).abort();
- }
- }
- };
+ doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(stepToAbort).abort();
+ doReturn(PostPreCommitStep.NOOP_FUTURE).when(stepToAbort).preCommit();
+ doReturn(Futures.immediateCheckedFuture(stepToAbort)).when(cohortToAbort).canCommit(any(Object.class),
+ any(Collection.class), any(SchemaContext.class));
+
+ IntegrationTestKit kit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
+ try (AbstractDataStore dataStore = kit.setupDistributedDataStore("testAbortAfterCanCommit",
+ "test-1", "cars-1")) {
+ dataStore.registerCommitCohort(TEST_ID, cohortToAbort);
+
+ IntegrationTestKit.verifyShardState(dataStore, "test-1",
+ state -> assertEquals("Cohort registrations", 1, state.getCommitCohortActors().size()));
+
+ DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+ writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ DOMStoreThreePhaseCommitCohort dsCohort = writeTx.ready();
+
+ dsCohort.canCommit().get(5, TimeUnit.SECONDS);
+ dsCohort.preCommit().get(5, TimeUnit.SECONDS);
+ dsCohort.abort().get(5, TimeUnit.SECONDS);
+ verify(stepToAbort).abort();
+ }
+ }
+
+ private static void assertDataTreeCandidate(DOMDataTreeCandidate candidate, DOMDataTreeIdentifier expTreeId,
+ ModificationType expType, Optional<NormalizedNode<?, ?>> expDataAfter,
+ Optional<NormalizedNode<?, ?>> expDataBefore) {
+ assertNotNull("Expected candidate for path " + expTreeId.getRootIdentifier(), candidate);
+ assertEquals("rootPath", expTreeId, candidate.getRootPath());
+ assertEquals("modificationType", expType, candidate.getRootNode().getModificationType());
+
+ assertEquals("dataAfter present", expDataAfter.isPresent(), candidate.getRootNode().getDataAfter().isPresent());
+ if (expDataAfter.isPresent()) {
+ assertEquals("dataAfter", expDataAfter.get(), candidate.getRootNode().getDataAfter().get());
+ }
+
+ assertEquals("dataBefore present", expDataBefore.isPresent(),
+ candidate.getRootNode().getDataBefore().isPresent());
+ if (expDataBefore.isPresent()) {
+ assertEquals("dataBefore", expDataBefore.get(), candidate.getRootNode().getDataBefore().get());
+ }
}
}