From 806ca77eb8f5c6a7de07f5f6a0f2b3d234752050 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 8 Feb 2017 13:52:12 -0500 Subject: [PATCH] Rework CDS commit cohort impl to handle yang lists If a cohort registers for yang list entries, it works fine if the transaction only contains a write or delete of one list entry. However the DataTreeCohortActor throws an UnsupportedOperationException from CohortBehaviour#handle if more then one list entry is written. In that case multiple CanCommit messages are sent to the DataTreeCohortActor, a DOMDataTreeCandidate for each entry, but the CohortBehaviour is set up to only handle one message, after which it expects to transition to the PostCanCommit step. It seems the DOMDataTreeCommitCohort#canCommit API really should take a collection of DOMDataTreeCandidates. Howeever in lieu of an API change, I modified the CanCommit message to contain a collection of DOMDataTreeCandidates. The DataTreeCohortActor invokes canCommit for each one and uses the last PostCanCommitStep returned. This *may* be OK although there doesn't seem to be an alternative at this point. We probably should note this behavior in the DOMDataTreeCommitCohort API. Change-Id: I17c4d2f477ffc6c6c3921217e5f6c13bcdafde8f Signed-off-by: Tom Pantelis --- .../datastore/CompositeDataTreeCohort.java | 22 ++ .../datastore/DataTreeCohortActor.java | 41 ++- .../DataTreeCohortActorRegistry.java | 16 +- .../DataTreeCohortIntegrationTest.java | 309 +++++++++++++----- .../cluster/datastore/IntegrationTestKit.java | 28 ++ 5 files changed, 324 insertions(+), 92 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java index 006555f1f8..4044a4cb3a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java @@ -28,6 +28,8 @@ import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanComm import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; @@ -39,6 +41,7 @@ import scala.concurrent.Future; * */ class CompositeDataTreeCohort { + private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class); private enum State { /** @@ -120,7 +123,12 @@ class CompositeDataTreeCohort { } void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException { + LOG.debug("{}: canCommit - candidate: {}", txId, tip); + Collection messages = registry.createCanCommitMessages(txId, tip, schema); + + LOG.debug("{}: canCommit - messages: {}", txId, messages); + // FIXME: Optimize empty collection list with pre-created futures, containing success. Future> canCommitsFuture = Futures.traverse(messages, input -> Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE, @@ -130,6 +138,8 @@ class CompositeDataTreeCohort { } void preCommit() throws ExecutionException, TimeoutException { + LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious); + Preconditions.checkState(successfulFromPrevious != null); Future> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId)); changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT); @@ -137,6 +147,8 @@ class CompositeDataTreeCohort { } void commit() throws ExecutionException, TimeoutException { + LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious); + Preconditions.checkState(successfulFromPrevious != null); Future> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId)); changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT); @@ -144,6 +156,8 @@ class CompositeDataTreeCohort { } Optional>> abort() { + LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious); + state = State.ABORTED; if (successfulFromPrevious != null && !Iterables.isEmpty(successfulFromPrevious)) { return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId))); @@ -153,6 +167,8 @@ class CompositeDataTreeCohort { } private Future> sendMesageToSuccessful(final Object message) { + LOG.debug("{}: sendMesageToSuccessful: {}", txId, message); + return Futures.traverse(successfulFromPrevious, cohortResponse -> Patterns.ask( cohortResponse.getCohort(), message, timeout), ExecutionContexts.global()); } @@ -160,16 +176,22 @@ class CompositeDataTreeCohort { @SuppressWarnings("checkstyle:IllegalCatch") private void processResponses(final Future> resultsFuture, final State currentState, final State afterState) throws TimeoutException, ExecutionException { + LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState); + final Iterable results; try { results = Await.result(resultsFuture, timeout.duration()); } catch (Exception e) { successfulFromPrevious = null; + LOG.debug("{}: processResponses - error from Future", txId, e); Throwables.propagateIfInstanceOf(e, TimeoutException.class); throw Throwables.propagate(e); } Iterable failed = Iterables.filter(results, Status.Failure.class); Iterable successful = Iterables.filter(results, DataTreeCohortActor.Success.class); + + LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed); + successfulFromPrevious = successful; if (!Iterables.isEmpty(failed)) { changeStateFrom(currentState, State.FAILED); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java index e6ff10d831..da1117764f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java @@ -12,6 +12,7 @@ import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.Status; import com.google.common.base.Preconditions; +import java.util.Collection; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.mdsal.common.api.PostCanCommitStep; @@ -39,6 +40,9 @@ final class DataTreeCohortActor extends AbstractUntypedActor { @Override protected void handleReceive(final Object message) { + LOG.debug("handleReceive for cohort {} - currentState: {}, message: {}", cohort.getClass().getName(), + currentState, message); + currentState = currentState.handle(message); } @@ -59,23 +63,29 @@ final class DataTreeCohortActor extends AbstractUntypedActor { protected CommitProtocolCommand(TransactionIdentifier txId) { this.txId = Preconditions.checkNotNull(txId); } + + @Override + public String toString() { + return getClass().getSimpleName() + " [txId=" + txId + "]"; + } } static final class CanCommit extends CommitProtocolCommand { - private final DOMDataTreeCandidate candidate; + private final Collection candidates; private final ActorRef cohort; private final SchemaContext schema; - CanCommit(TransactionIdentifier txId, DOMDataTreeCandidate candidate, SchemaContext schema, ActorRef cohort) { + CanCommit(TransactionIdentifier txId, Collection candidates, SchemaContext schema, + ActorRef cohort) { super(txId); this.cohort = Preconditions.checkNotNull(cohort); - this.candidate = Preconditions.checkNotNull(candidate); + this.candidates = Preconditions.checkNotNull(candidates); this.schema = Preconditions.checkNotNull(schema); } - DOMDataTreeCandidate getCandidate() { - return candidate; + Collection getCandidates() { + return candidates; } SchemaContext getSchema() { @@ -86,6 +96,10 @@ final class DataTreeCohortActor extends AbstractUntypedActor { return cohort; } + @Override + public String toString() { + return "CanCommit [txId=" + getTxId() + ", candidates=" + candidates + ", cohort=" + cohort + "]"; + } } abstract static class CommitReply { @@ -105,6 +119,11 @@ final class DataTreeCohortActor extends AbstractUntypedActor { final TransactionIdentifier getTxId() { return txId; } + + @Override + public String toString() { + return getClass().getSimpleName() + " [txId=" + txId + ", cohortRef=" + cohortRef + "]"; + } } static final class Success extends CommitReply { @@ -112,7 +131,6 @@ final class DataTreeCohortActor extends AbstractUntypedActor { Success(ActorRef cohortRef, TransactionIdentifier txId) { super(cohortRef, txId); } - } static final class PreCommit extends CommitProtocolCommand { @@ -154,6 +172,10 @@ final class DataTreeCohortActor extends AbstractUntypedActor { abstract CohortBehaviour process(E message); + @Override + public String toString() { + return getClass().getSimpleName(); + } } private class Idle extends CohortBehaviour { @@ -168,7 +190,7 @@ final class DataTreeCohortActor extends AbstractUntypedActor { CohortBehaviour process(CanCommit message) { final PostCanCommitStep nextStep; try { - nextStep = cohort.canCommit(message.getTxId(), message.getCandidate(), message.getSchema()).get(); + nextStep = cohort.canCommit(message.getTxId(), message.getCandidates(), message.getSchema()).get(); } catch (final Exception e) { getSender().tell(new Status.Failure(e), getSelf()); return this; @@ -181,7 +203,6 @@ final class DataTreeCohortActor extends AbstractUntypedActor { CohortBehaviour abort() { return this; } - } @@ -218,6 +239,10 @@ final class DataTreeCohortActor extends AbstractUntypedActor { return idleState; } + @Override + public String toString() { + return getClass().getSimpleName() + " [txId=" + txId + ", step=" + step + "]"; + } } private class PostCanCommit extends CohortStateWithStep { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java index c367f98f5a..05ebe55bc6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java @@ -13,6 +13,8 @@ import akka.actor.PoisonPill; import akka.actor.Status; import akka.util.Timeout; import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -20,7 +22,6 @@ import java.util.List; import java.util.Map; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit; import org.opendaylight.controller.md.sal.dom.spi.AbstractRegistrationTree; import org.opendaylight.controller.md.sal.dom.spi.RegistrationTreeNode; import org.opendaylight.controller.md.sal.dom.spi.RegistrationTreeSnapshot; @@ -138,8 +139,7 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree { private final TransactionIdentifier txId; private final DataTreeCandidate candidate; private final SchemaContext schema; - private final Collection messages = - new ArrayList<>(); + private final Multimap actorToCandidates = ArrayListMultimap.create(); CanCommitMessageBuilder(final TransactionIdentifier txId, final DataTreeCandidate candidate, final SchemaContext schema) { @@ -196,8 +196,7 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree { final DataTreeCandidateNode node) { final DOMDataTreeCandidate domCandidate = DOMDataTreeCandidateTO.create(treeIdentifier(path), node); for (final ActorRef reg : regs) { - final CanCommit message = new DataTreeCohortActor.CanCommit(txId, domCandidate, schema, reg); - messages.add(message); + actorToCandidates.put(reg, domCandidate); } } @@ -208,6 +207,13 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree { private Collection perform(final RegistrationTreeNode rootNode) { final List toLookup = candidate.getRootPath().getPathArguments(); lookupAndCreateCanCommits(toLookup, 0, rootNode); + + final Map> mapView = actorToCandidates.asMap(); + Collection messages = new ArrayList<>(mapView.size()); + for (Map.Entry> entry: mapView.entrySet()) { + messages.add(new DataTreeCohortActor.CanCommit(txId, entry.getValue(), schema, entry.getKey())); + } + return messages; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java index 20ecb0c578..c9a30677a6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java @@ -13,36 +13,50 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.cluster.Cluster; import akka.testkit.JavaTestKit; +import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.typesafe.config.ConfigFactory; import java.io.IOException; +import java.math.BigInteger; +import java.util.Collection; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; +import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.mdsal.common.api.DataValidationFailedException; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.common.api.PostCanCommitStep; +import org.opendaylight.mdsal.common.api.PostPreCommitStep; import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep; import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate; import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.concepts.ObjectRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -78,99 +92,236 @@ public class DataTreeCohortIntegrationTest { return system; } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Test - public void registerNoopCohortTest() throws Exception { + public void testSuccessfulCanCommitWithNoopPostStep() throws Exception { final DOMDataTreeCommitCohort cohort = mock(DOMDataTreeCommitCohort.class); - Mockito.doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class), - any(DOMDataTreeCandidate.class), any(SchemaContext.class)); - ArgumentCaptor candidateCapt = ArgumentCaptor.forClass(DOMDataTreeCandidate.class); - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - try (AbstractDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest", - "test-1")) { - final ObjectRegistration cohortReg = - dataStore.registerCommitCohort(TEST_ID, cohort); - Thread.sleep(1000); // Registration is asynchronous - assertNotNull(cohortReg); - testWriteTransaction(dataStore, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - Mockito.verify(cohort).canCommit(any(Object.class), candidateCapt.capture(), - any(SchemaContext.class)); - DOMDataTreeCandidate candidate = candidateCapt.getValue(); - assertNotNull(candidate); - assertEquals(TEST_ID, candidate.getRootPath()); - testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); - Mockito.verify(cohort, Mockito.times(2)).canCommit(any(Object.class), - any(DOMDataTreeCandidate.class), any(SchemaContext.class)); - cohortReg.close(); - testWriteTransaction(dataStore, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - Mockito.verifyNoMoreInteractions(cohort); - } - } - }; + doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class), + any(Collection.class), any(SchemaContext.class)); + ArgumentCaptor candidateCapt = ArgumentCaptor.forClass(Collection.class); + IntegrationTestKit kit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + + try (AbstractDataStore dataStore = kit.setupDistributedDataStore("testSuccessfulCanCommitWithNoopPostStep", + "test-1")) { + final ObjectRegistration cohortReg = dataStore.registerCommitCohort(TEST_ID, + cohort); + assertNotNull(cohortReg); + + IntegrationTestKit.verifyShardState(dataStore, "test-1", + state -> assertEquals("Cohort registrations", 1, state.getCommitCohortActors().size())); + + final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + kit.testWriteTransaction(dataStore, TestModel.TEST_PATH, node); + verify(cohort).canCommit(any(Object.class), + candidateCapt.capture(), any(SchemaContext.class)); + assertDataTreeCandidate((DOMDataTreeCandidate) candidateCapt.getValue().iterator().next(), TEST_ID, + ModificationType.WRITE, Optional.of(node), Optional.absent()); + + reset(cohort); + doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class), + any(Collection.class), any(SchemaContext.class)); + + kit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + verify(cohort).canCommit(any(Object.class), any(Collection.class), any(SchemaContext.class)); + + cohortReg.close(); + + IntegrationTestKit.verifyShardState(dataStore, "test-1", + state -> assertEquals("Cohort registrations", 0, state.getCommitCohortActors().size())); + + kit.testWriteTransaction(dataStore, TestModel.TEST_PATH, node); + verifyNoMoreInteractions(cohort); + } } + @SuppressWarnings("unchecked") @Test - @SuppressWarnings("checkstyle:IllegalCatch") - public void failCanCommitTest() throws Exception { + public void testFailedCanCommit() throws Exception { final DOMDataTreeCommitCohort failedCohort = mock(DOMDataTreeCommitCohort.class); - Mockito.doReturn(FAILED_CAN_COMMIT_FUTURE).when(failedCohort).canCommit(any(Object.class), - any(DOMDataTreeCandidate.class), any(SchemaContext.class)); - - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - try (AbstractDataStore dataStore = - setupDistributedDataStore("transactionIntegrationTest", "test-1")) { - dataStore.registerCommitCohort(TEST_ID, failedCohort); - Thread.sleep(1000); // Registration is asynchronous - - DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - DOMStoreThreePhaseCommitCohort dsCohort = writeTx.ready(); - try { - // FIXME: Weird thing is that invoking canCommit on front-end invokes also - // preCommit on backend. - dsCohort.canCommit().get(); - fail("Exception should be raised."); - } catch (Exception e) { - assertSame(FAILED_CAN_COMMIT, Throwables.getRootCause(e)); - } - } + doReturn(FAILED_CAN_COMMIT_FUTURE).when(failedCohort).canCommit(any(Object.class), + any(Collection.class), any(SchemaContext.class)); + + IntegrationTestKit kit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = kit.setupDistributedDataStore("testFailedCanCommit", "test-1")) { + dataStore.registerCommitCohort(TEST_ID, failedCohort); + + IntegrationTestKit.verifyShardState(dataStore, "test-1", + state -> assertEquals("Cohort registrations", 1, state.getCommitCohortActors().size())); + + DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + DOMStoreThreePhaseCommitCohort dsCohort = writeTx.ready(); + try { + dsCohort.canCommit().get(5, TimeUnit.SECONDS); + fail("Exception should be raised."); + } catch (ExecutionException e) { + assertSame(FAILED_CAN_COMMIT, Throwables.getRootCause(e)); + } + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testCanCommitWithListEntries() throws Exception { + final DOMDataTreeCommitCohort cohort = mock(DOMDataTreeCommitCohort.class); + doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class), + any(Collection.class), any(SchemaContext.class)); + IntegrationTestKit kit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + + try (AbstractDataStore dataStore = kit.setupDistributedDataStore("testCanCommitWithMultipleListEntries", + "cars-1")) { + final ObjectRegistration cohortReg = dataStore.registerCommitCohort( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH + .node(CarsModel.CAR_QNAME)), cohort); + assertNotNull(cohortReg); + + IntegrationTestKit.verifyShardState(dataStore, "cars-1", + state -> assertEquals("Cohort registrations", 1, state.getCommitCohortActors().size())); + + // First write an empty base container and verify the cohort isn't invoked. + + DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + kit.doCommit(writeTx.ready()); + verifyNoMoreInteractions(cohort); + + // Write a single car entry and verify the cohort is invoked. + + writeTx = dataStore.newWriteOnlyTransaction(); + final YangInstanceIdentifier optimaPath = CarsModel.newCarPath("optima"); + final MapEntryNode optimaNode = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + writeTx.write(optimaPath, optimaNode); + kit.doCommit(writeTx.ready()); + + ArgumentCaptor candidateCapture = ArgumentCaptor.forClass(Collection.class); + verify(cohort).canCommit(any(Object.class), candidateCapture.capture(), any(SchemaContext.class)); + assertDataTreeCandidate((DOMDataTreeCandidate) candidateCapture.getValue().iterator().next(), + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, optimaPath), ModificationType.WRITE, + Optional.of(optimaNode), Optional.absent()); + + // Write replace the cars container with 2 new car entries. The cohort should get invoked with 3 + // DOMDataTreeCandidates: once for each of the 2 new car entries (WRITE mod) and once for the deleted prior + // car entry (DELETE mod). + + reset(cohort); + doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class), + any(Collection.class), any(SchemaContext.class)); + + writeTx = dataStore.newWriteOnlyTransaction(); + final YangInstanceIdentifier sportagePath = CarsModel.newCarPath("sportage"); + final MapEntryNode sportageNode = CarsModel.newCarEntry("sportage", BigInteger.valueOf(20000)); + final YangInstanceIdentifier soulPath = CarsModel.newCarPath("soul"); + final MapEntryNode soulNode = CarsModel.newCarEntry("soul", BigInteger.valueOf(20000)); + writeTx.write(CarsModel.BASE_PATH, CarsModel.newCarsNode(CarsModel.newCarsMapNode(sportageNode,soulNode))); + kit.doCommit(writeTx.ready()); + + candidateCapture = ArgumentCaptor.forClass(Collection.class); + verify(cohort).canCommit(any(Object.class), candidateCapture.capture(), any(SchemaContext.class)); + + assertDataTreeCandidate(findCandidate(candidateCapture, sportagePath), new DOMDataTreeIdentifier( + LogicalDatastoreType.CONFIGURATION, sportagePath), ModificationType.WRITE, + Optional.of(sportageNode), Optional.absent()); + + assertDataTreeCandidate(findCandidate(candidateCapture, soulPath), new DOMDataTreeIdentifier( + LogicalDatastoreType.CONFIGURATION, soulPath), ModificationType.WRITE, + Optional.of(soulNode), Optional.absent()); + + assertDataTreeCandidate(findCandidate(candidateCapture, optimaPath), new DOMDataTreeIdentifier( + LogicalDatastoreType.CONFIGURATION, optimaPath), ModificationType.DELETE, + Optional.absent(), Optional.of(optimaNode)); + + // Delete the cars container - cohort should be invoked for the 2 deleted car entries. + + reset(cohort); + doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class), + any(Collection.class), any(SchemaContext.class)); + + writeTx = dataStore.newWriteOnlyTransaction(); + writeTx.delete(CarsModel.BASE_PATH); + kit.doCommit(writeTx.ready()); + + candidateCapture = ArgumentCaptor.forClass(Collection.class); + verify(cohort).canCommit(any(Object.class), candidateCapture.capture(), any(SchemaContext.class)); + + assertDataTreeCandidate(findCandidate(candidateCapture, sportagePath), new DOMDataTreeIdentifier( + LogicalDatastoreType.CONFIGURATION, sportagePath), ModificationType.DELETE, + Optional.absent(), Optional.of(sportageNode)); + + assertDataTreeCandidate(findCandidate(candidateCapture, soulPath), new DOMDataTreeIdentifier( + LogicalDatastoreType.CONFIGURATION, soulPath), ModificationType.DELETE, + Optional.absent(), Optional.of(soulNode)); + + } + } + + @SuppressWarnings("rawtypes") + private static DOMDataTreeCandidate findCandidate(ArgumentCaptor candidateCapture, + YangInstanceIdentifier rootPath) { + for (Object obj: candidateCapture.getValue()) { + DOMDataTreeCandidate candidate = (DOMDataTreeCandidate)obj; + if (rootPath.equals(candidate.getRootPath().getRootIdentifier())) { + return candidate; } - }; + } + + return null; } /** - * FIXME: Weird thing is that invoking canCommit on front-end invokes also preCommit on backend - * so we can not test abort after can commit. + * FIXME: Since we invoke DOMDataTreeCommitCohort#canCommit on preCommit (as that's when we generate a + * DataTreeCandidate) and since currently preCommit is a noop in the Shard backend (it is combined with commit), + * we can't actually test abort after canCommit. */ + @SuppressWarnings("unchecked") @Test @Ignore - public void canCommitSuccessExternallyAborted() throws Exception { + public void testAbortAfterCanCommit() throws Exception { final DOMDataTreeCommitCohort cohortToAbort = mock(DOMDataTreeCommitCohort.class); final PostCanCommitStep stepToAbort = mock(PostCanCommitStep.class); - Mockito.doReturn(Futures.immediateCheckedFuture(stepToAbort)).when(cohortToAbort).canCommit(any(Object.class), - any(DOMDataTreeCandidate.class), any(SchemaContext.class)); - Mockito.doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(stepToAbort).abort(); - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - try (AbstractDataStore dataStore = - setupDistributedDataStore("transactionIntegrationTest", "test-1")) { - dataStore.registerCommitCohort(TEST_ID, cohortToAbort); - Thread.sleep(1000); // Registration is asynchronous - - DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - DOMStoreThreePhaseCommitCohort dsCohort = writeTx.ready(); - - dsCohort.canCommit().get(); - dsCohort.abort().get(); - Mockito.verify(stepToAbort, Mockito.times(1)).abort(); - } - } - }; + doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(stepToAbort).abort(); + doReturn(PostPreCommitStep.NOOP_FUTURE).when(stepToAbort).preCommit(); + doReturn(Futures.immediateCheckedFuture(stepToAbort)).when(cohortToAbort).canCommit(any(Object.class), + any(Collection.class), any(SchemaContext.class)); + + IntegrationTestKit kit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = kit.setupDistributedDataStore("testAbortAfterCanCommit", + "test-1", "cars-1")) { + dataStore.registerCommitCohort(TEST_ID, cohortToAbort); + + IntegrationTestKit.verifyShardState(dataStore, "test-1", + state -> assertEquals("Cohort registrations", 1, state.getCommitCohortActors().size())); + + DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + DOMStoreThreePhaseCommitCohort dsCohort = writeTx.ready(); + + dsCohort.canCommit().get(5, TimeUnit.SECONDS); + dsCohort.preCommit().get(5, TimeUnit.SECONDS); + dsCohort.abort().get(5, TimeUnit.SECONDS); + verify(stepToAbort).abort(); + } + } + + private static void assertDataTreeCandidate(DOMDataTreeCandidate candidate, DOMDataTreeIdentifier expTreeId, + ModificationType expType, Optional> expDataAfter, + Optional> expDataBefore) { + assertNotNull("Expected candidate for path " + expTreeId.getRootIdentifier(), candidate); + assertEquals("rootPath", expTreeId, candidate.getRootPath()); + assertEquals("modificationType", expType, candidate.getRootNode().getModificationType()); + + assertEquals("dataAfter present", expDataAfter.isPresent(), candidate.getRootNode().getDataAfter().isPresent()); + if (expDataAfter.isPresent()) { + assertEquals("dataAfter", expDataAfter.get(), candidate.getRootNode().getDataAfter().get()); + } + + assertEquals("dataBefore present", expDataBefore.isPresent(), + candidate.getRootNode().getDataBefore().isPresent()); + if (expDataBefore.isPresent()) { + assertEquals("dataBefore", expDataBefore.get(), candidate.getRootNode().getDataBefore().get()); + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java index 28c4fa38bb..b336365221 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java @@ -25,14 +25,17 @@ import com.google.common.util.concurrent.Uninterruptibles; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; +import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; @@ -213,6 +216,31 @@ public class IntegrationTestKit extends ShardTestKit { throw lastError; } + public static void verifyShardState(final AbstractDataStore datastore, final String shardName, + final Consumer verifier) throws Exception { + ActorContext actorContext = datastore.getActorContext(); + + Future future = actorContext.findLocalShardAsync(shardName); + ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS)); + + AssertionError lastError = null; + Stopwatch sw = Stopwatch.createStarted(); + while (sw.elapsed(TimeUnit.SECONDS) <= 5) { + OnDemandShardState shardState = (OnDemandShardState)actorContext + .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE); + + try { + verifier.accept(shardState); + return; + } catch (AssertionError e) { + lastError = e; + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + } + + throw lastError; + } + void testWriteTransaction(final AbstractDataStore dataStore, final YangInstanceIdentifier nodePath, final NormalizedNode nodeToWrite) throws Exception { -- 2.36.6