Bug 1435: CDS: Added support for custom commit cohort. 02/32602/21
authorTony Tkacik <ttkacik@cisco.com>
Thu, 14 Jan 2016 17:42:33 +0000 (18:42 +0100)
committerTony Tkacik <ttkacik@cisco.com>
Thu, 28 Apr 2016 14:50:50 +0000 (16:50 +0200)
Implemented support for user provided commit
cohorts, which implements DOMDataTreeCommitCohort
interface contract.

Messages are only in-JVM so cohort needs to be colocated
with replica.

Change-Id: I04e592c0232383e70fa8944b966b1aa341730a98
Signed-off-by: Tony Tkacik <ttkacik@cisco.com>
15 files changed:
features/mdsal/src/main/features/features.xml
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBroker.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMDataTreeCandidateTO.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortRegistrationProxy.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index ca5593d99368a9874e43b6a6eeef0b605f5a657b..88a18f75439ec5aead02c6e62915114c6a07016c 100644 (file)
@@ -32,7 +32,7 @@
 
     <feature name='odl-mdsal-broker-local' version='${project.version}' description="OpenDaylight :: MDSAL :: Broker">
         <feature version='${yangtools.version}'>odl-yangtools-common</feature>
-        <feature version='${mdsal.version}'>odl-mdsal-binding-runtime</feature>
+        <feature version='${mdsal.version}'>odl-mdsal-binding-dom-adapter</feature>
         <feature version='${mdsal.model.version}'>odl-mdsal-models</feature>
         <feature version='${project.version}'>odl-mdsal-common</feature>
         <feature version='${config.version}'>odl-config-startup</feature>
index d545cf49ecf1e72e51db6b188b8ca8dd5ff9670e..f2719bccc85e157878b540220c8bafaba6200dfe 100644 (file)
       <artifactId>sal-dom-broker-config</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.opendaylight.mdsal</groupId>
+      <artifactId>mdsal-dom-api</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-core-spi</artifactId>
index e8b19b8346cecbb4c8b9d740b434cfbee7d0be7e..02362b9371bb76c54d2168baf56109cf4ab6e1d6 100644 (file)
@@ -29,6 +29,10 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.yangtools.util.DurationStatisticsTracker;
 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.slf4j.Logger;
@@ -42,7 +46,7 @@ import org.slf4j.LoggerFactory;
  * @author Thomas Pantelis
  */
 @Beta
-public class ConcurrentDOMDataBroker extends AbstractDOMBroker {
+public class ConcurrentDOMDataBroker extends AbstractDOMBroker implements DOMDataTreeCommitCohortRegistry {
     private static final Logger LOG = LoggerFactory.getLogger(ConcurrentDOMDataBroker.class);
     private static final String CAN_COMMIT = "CAN_COMMIT";
     private static final String PRE_COMMIT = "PRE_COMMIT";
@@ -325,4 +329,25 @@ public class ConcurrentDOMDataBroker extends AbstractDOMBroker {
             }
         }
     }
+
+    @Override
+    public <T extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<T> registerCommitCohort(
+            DOMDataTreeIdentifier path, T cohort) {
+        DOMStore store = getTxFactories().get(toLegacy(path.getDatastoreType()));
+        if (store instanceof DOMDataTreeCommitCohortRegistry) {
+            return ((DOMDataTreeCommitCohortRegistry) store).registerCommitCohort(path, cohort);
+        }
+        throw new UnsupportedOperationException("Commit cohort is not supported for " + path);
+    }
+
+    private LogicalDatastoreType toLegacy(org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType) {
+        switch (datastoreType) {
+            case CONFIGURATION:
+                return LogicalDatastoreType.CONFIGURATION;
+            case OPERATIONAL:
+                return LogicalDatastoreType.OPERATIONAL;
+            default:
+                throw new IllegalArgumentException("Unsupported data store type: " + datastoreType);
+        }
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java
new file mode 100644 (file)
index 0000000..07ff936
--- /dev/null
@@ -0,0 +1,184 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Status;
+import akka.actor.Status.Failure;
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.Futures;
+import akka.dispatch.Recover;
+import akka.japi.Function;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
+import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+/**
+ *
+ * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
+ *
+ * It tracks current operation and list of cohorts which successfuly finished previous phase in
+ * case, if abort is necessary to invoke it only on cohort steps which are still active.
+ *
+ */
+class CompositeDataTreeCohort {
+
+    private enum State {
+        /**
+         * Cohorts are idle, no messages were sent.
+         */
+        IDLE,
+        /**
+         * CanCommit message was sent to all participating cohorts.
+         */
+        CAN_COMMIT_SENT,
+        /**
+         * Successful canCommit responses were received from every participating cohort.
+         */
+        CAN_COMMIT_SUCCESSFUL,
+        /**
+         * PreCommit message was sent to all participating cohorts.
+         */
+        PRE_COMMIT_SENT,
+        /**
+         * Successful preCommit responses were received from every participating cohort.
+         */
+        PRE_COMMIT_SUCCESSFUL,
+        /**
+         * Commit message was send to all participating cohorts.
+         */
+        COMMIT_SENT,
+        /**
+         * Successful commit responses were received from all participating cohorts.
+         */
+        COMMITED,
+        /**
+         * Some of cohorts responsed back with unsuccessful message.
+         *
+         */
+        FAILED,
+        /**
+         *
+         * Abort message was send to all cohorts which responded with success previously.
+         *
+         */
+        ABORTED
+    }
+
+    protected static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
+        @Override
+        public Failure recover(Throwable error) throws Throwable {
+            return new Failure(error);
+        }
+    };
+
+
+    private final DataTreeCohortActorRegistry registry;
+    private final String txId;
+    private final SchemaContext schema;
+    private final Timeout timeout;
+    private Iterable<Success> successfulFromPrevious;
+    private State state = State.IDLE;
+
+    CompositeDataTreeCohort(DataTreeCohortActorRegistry registry, String txId, SchemaContext schema, Timeout timeout) {
+        this.registry = Preconditions.checkNotNull(registry);
+        this.txId = Preconditions.checkNotNull(txId);
+        this.schema = Preconditions.checkNotNull(schema);
+        this.timeout = Preconditions.checkNotNull(timeout);
+    }
+
+    void canCommit(DataTreeCandidateTip tip) throws ExecutionException, TimeoutException {
+        Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
+        // FIXME: Optimize empty collection list with pre-created futures, containing success.
+        Future<Iterable<Object>> canCommitsFuture =
+                Futures.traverse(messages, new Function<CanCommit, Future<Object>>() {
+                    @Override
+                    public Future<Object> apply(CanCommit input) {
+                        return Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
+                                ExecutionContexts.global());
+                    }
+                }, ExecutionContexts.global());
+        changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
+        processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
+    }
+
+    void preCommit() throws ExecutionException, TimeoutException {
+        Preconditions.checkState(successfulFromPrevious != null);
+        Future<Iterable<Object>> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId));
+        changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
+        processResponses(preCommitFutures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
+    }
+
+    void commit() throws ExecutionException, TimeoutException {
+        Preconditions.checkState(successfulFromPrevious != null);
+        Future<Iterable<Object>> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId));
+        changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
+        processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED);
+    }
+
+    void abort() throws TimeoutException {
+        if (successfulFromPrevious != null) {
+            sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId));
+        }
+    }
+
+    private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
+        return Futures.traverse(successfulFromPrevious, new Function<DataTreeCohortActor.Success, Future<Object>>() {
+
+            @Override
+            public Future<Object> apply(DataTreeCohortActor.Success cohortResponse) throws Exception {
+                return Patterns.ask(cohortResponse.getCohort(), message, timeout);
+            }
+
+        }, ExecutionContexts.global());
+    }
+
+    private void processResponses(Future<Iterable<Object>> resultsFuture, State currentState, State afterState)
+            throws TimeoutException, ExecutionException {
+        final Iterable<Object> results;
+        try {
+            results = Await.result(resultsFuture, timeout.duration());
+        } catch (Exception e) {
+            successfulFromPrevious = null;
+            Throwables.propagateIfInstanceOf(e, TimeoutException.class);
+            throw Throwables.propagate(e);
+        }
+        Iterable<Failure> failed = Iterables.filter(results, Status.Failure.class);
+        Iterable<Success> successful = Iterables.filter(results, DataTreeCohortActor.Success.class);
+        successfulFromPrevious = successful;
+        if (!Iterables.isEmpty(failed)) {
+            changeStateFrom(currentState, State.FAILED);
+            Iterator<Failure> it = failed.iterator();
+            Throwable firstEx = it.next().cause();
+            while (it.hasNext()) {
+                firstEx.addSuppressed(it.next().cause());
+            }
+            Throwables.propagateIfPossible(firstEx, ExecutionException.class);
+            Throwables.propagateIfPossible(firstEx, TimeoutException.class);
+            throw Throwables.propagate(firstEx);
+        }
+        changeStateFrom(currentState, afterState);
+    }
+
+    void changeStateFrom(State expected, State followup) {
+        Preconditions.checkState(state == expected);
+        state = followup;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMDataTreeCandidateTO.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMDataTreeCandidateTO.java
new file mode 100644 (file)
index 0000000..7e79c3e
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+
+final class DOMDataTreeCandidateTO implements DOMDataTreeCandidate {
+
+    private final DOMDataTreeIdentifier rootPath;
+    private final DataTreeCandidateNode rootNode;
+
+    private DOMDataTreeCandidateTO(DOMDataTreeIdentifier rootPath, DataTreeCandidateNode rootNode) {
+        this.rootPath = Preconditions.checkNotNull(rootPath);
+        this.rootNode = Preconditions.checkNotNull(rootNode);
+    }
+
+    @Override
+    public DOMDataTreeIdentifier getRootPath() {
+        return rootPath;
+    }
+
+    @Override
+    public DataTreeCandidateNode getRootNode() {
+        return rootNode;
+    }
+
+    static DOMDataTreeCandidate create(DOMDataTreeIdentifier path, DataTreeCandidateNode node) {
+        return new DOMDataTreeCandidateTO(path, node);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this).add("rootPath", rootPath).add("rootNode", rootNode).toString();
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java
new file mode 100644 (file)
index 0000000..485e8b5
--- /dev/null
@@ -0,0 +1,274 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.Status;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.mdsal.common.api.PostCanCommitStep;
+import org.opendaylight.mdsal.common.api.PostPreCommitStep;
+import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
+ * decapsulating DataTreeChanged messages and dispatching their context to the user.
+ */
+final class DataTreeCohortActor extends AbstractUntypedActor {
+    private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortActor.class);
+    private final CohortBehaviour<?> idleState = new Idle();
+    private final DOMDataTreeCommitCohort cohort;
+    private CohortBehaviour<?> currentState = idleState;
+
+    private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort) {
+        this.cohort = Preconditions.checkNotNull(cohort);
+    }
+
+    @Override
+    protected void handleReceive(final Object message) {
+        currentState = currentState.handle(message);
+    }
+
+
+    /**
+     * Abstract message base for messages handled by {@link DataTreeCohortActor}.
+     *
+     * @param <R> Reply message type
+     */
+    static abstract class CommitProtocolCommand<R extends CommitReply> {
+
+        private final String txId;
+
+        final String getTxId() {
+            return txId;
+        }
+
+        protected CommitProtocolCommand(String txId) {
+            this.txId = Preconditions.checkNotNull(txId);
+        }
+    }
+
+    static final class CanCommit extends CommitProtocolCommand<Success> {
+
+        private final DOMDataTreeCandidate candidate;
+        private final ActorRef cohort;
+        private final SchemaContext schema;
+
+        CanCommit(String txId, DOMDataTreeCandidate candidate, SchemaContext schema, ActorRef cohort) {
+            super(txId);
+            this.cohort = Preconditions.checkNotNull(cohort);
+            this.candidate = Preconditions.checkNotNull(candidate);
+            this.schema = Preconditions.checkNotNull(schema);
+        }
+
+        DOMDataTreeCandidate getCandidate() {
+            return candidate;
+        }
+
+        SchemaContext getSchema() {
+            return schema;
+        }
+
+        ActorRef getCohort() {
+            return cohort;
+        }
+
+    }
+
+    static abstract class CommitReply {
+
+        private final ActorRef cohortRef;
+        private final String txId;
+
+        protected CommitReply(ActorRef cohortRef, String txId) {
+            this.cohortRef = Preconditions.checkNotNull(cohortRef);
+            this.txId = Preconditions.checkNotNull(txId);
+        }
+
+        ActorRef getCohort() {
+            return cohortRef;
+        }
+
+        final String getTxId() {
+            return txId;
+        }
+
+    }
+
+    static final class Success extends CommitReply {
+
+        public Success(ActorRef cohortRef, String txId) {
+            super(cohortRef, txId);
+        }
+
+    }
+
+    static final class PreCommit extends CommitProtocolCommand<Success> {
+
+        public PreCommit(String txId) {
+            super(txId);
+        }
+    }
+
+    static final class Abort extends CommitProtocolCommand<Success> {
+
+        public Abort(String txId) {
+            super(txId);
+        }
+    }
+
+    static final class Commit extends CommitProtocolCommand<Success> {
+
+        public Commit(String txId) {
+            super(txId);
+        }
+    }
+
+    private static abstract class CohortBehaviour<E> {
+
+        abstract Class<E> getHandledMessageType();
+
+        CohortBehaviour<?> handle(Object message) {
+            if (getHandledMessageType().isInstance(message)) {
+                return process(getHandledMessageType().cast(message));
+            } else if (message instanceof Abort) {
+                return abort();
+            }
+            throw new UnsupportedOperationException();
+        }
+
+        abstract CohortBehaviour<?> abort();
+
+        abstract CohortBehaviour<?> process(E message);
+
+    }
+
+    private class Idle extends CohortBehaviour<CanCommit> {
+
+        @Override
+        Class<CanCommit> getHandledMessageType() {
+            return CanCommit.class;
+        }
+
+        @Override
+        CohortBehaviour<?> process(CanCommit message) {
+            final PostCanCommitStep nextStep;
+            try {
+                nextStep = cohort.canCommit(message.getTxId(), message.getCandidate(), message.getSchema()).get();
+            } catch (final Exception e) {
+                getSender().tell(new Status.Failure(e), getSelf());
+                return this;
+            }
+            getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
+            return new PostCanCommit(message.getTxId(), nextStep);
+        }
+
+        @Override
+        CohortBehaviour<?> abort() {
+            return this;
+        }
+
+    }
+
+
+    private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep>
+            extends CohortBehaviour<M> {
+
+        private final S step;
+        private final String txId;
+
+        CohortStateWithStep(String txId, S step) {
+            this.txId = Preconditions.checkNotNull(txId);
+            this.step = Preconditions.checkNotNull(step);
+        }
+
+        final S getStep() {
+            return step;
+        }
+
+        final String getTxId() {
+            return txId;
+        }
+
+        @Override
+        final CohortBehaviour<?> abort() {
+            try {
+                getStep().abort().get();
+            } catch (final Exception e) {
+                LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, e);
+                getSender().tell(new Status.Failure(e), getSelf());
+                return idleState;
+            }
+            getSender().tell(new Success(getSelf(), txId), getSelf());
+            return idleState;
+        }
+
+    }
+
+    private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep> {
+
+        PostCanCommit(String txId, PostCanCommitStep nextStep) {
+            super(txId, nextStep);
+        }
+
+        @Override
+        Class<PreCommit> getHandledMessageType() {
+            return PreCommit.class;
+        }
+
+        @Override
+        CohortBehaviour<?> process(PreCommit message) {
+            final PostPreCommitStep nextStep;
+            try {
+                nextStep = getStep().preCommit().get();
+            } catch (final Exception e) {
+                getSender().tell(new Status.Failure(e), getSelf());
+                return idleState;
+            }
+            getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
+            return new PostPreCommit(getTxId(), nextStep);
+        }
+
+    }
+
+    private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep> {
+
+        PostPreCommit(String txId, PostPreCommitStep step) {
+            super(txId, step);
+        }
+
+        @Override
+        CohortBehaviour<?> process(Commit message) {
+            try {
+                getStep().commit().get();
+            } catch (final Exception e) {
+                getSender().tell(new Status.Failure(e), getSender());
+                return idleState;
+            }
+            getSender().tell(new Success(getSelf(), getTxId()), getSelf());
+            return idleState;
+        }
+
+        @Override
+        Class<Commit> getHandledMessageType() {
+            return Commit.class;
+        }
+
+    }
+
+    static Props props(final DOMDataTreeCommitCohort cohort) {
+        return Props.create(DataTreeCohortActor.class, cohort);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java
new file mode 100644 (file)
index 0000000..ec7e2ee
--- /dev/null
@@ -0,0 +1,208 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.actor.Status;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
+import org.opendaylight.controller.md.sal.dom.spi.AbstractRegistrationTree;
+import org.opendaylight.controller.md.sal.dom.spi.RegistrationTreeNode;
+import org.opendaylight.controller.md.sal.dom.spi.RegistrationTreeSnapshot;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Registry of user commit cohorts, which is responsible for handling registration and calculation
+ * of affected cohorts based on {@link DataTreeCandidate}.
+ *
+ */
+@NotThreadSafe
+class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortActorRegistry.class);
+
+    private final Map<ActorRef, RegistrationTreeNode<ActorRef>> cohortToNode = new HashMap<>();
+
+
+    void registerCohort(ActorRef sender, RegisterCohort cohort) {
+        takeLock();
+        try {
+            final ActorRef cohortRef = cohort.getCohort();
+            final RegistrationTreeNode<ActorRef> node =
+                    findNodeFor(cohort.getPath().getRootIdentifier().getPathArguments());
+            addRegistration(node, cohort.getCohort());
+            cohortToNode.put(cohortRef, node);
+        } catch (final Exception e) {
+            sender.tell(new Status.Failure(e), ActorRef.noSender());
+            return;
+        } finally {
+            releaseLock();
+        }
+        sender.tell(new Status.Success(null), ActorRef.noSender());
+    }
+
+    void removeCommitCohort(ActorRef sender, RemoveCohort message) {
+        final ActorRef cohort = message.getCohort();
+        final RegistrationTreeNode<ActorRef> node = cohortToNode.get(cohort);
+        if (node != null) {
+            removeRegistration(node, cohort);
+            cohortToNode.remove(cohort);
+        }
+        sender.tell(new Status.Success(null), ActorRef.noSender());
+        cohort.tell(PoisonPill.getInstance(), cohort);
+    }
+
+    Collection<DataTreeCohortActor.CanCommit> createCanCommitMessages(String txId, DataTreeCandidate candidate,
+            SchemaContext schema) {
+        try (RegistrationTreeSnapshot<ActorRef> cohorts = takeSnapshot()) {
+            return new CanCommitMessageBuilder(txId, candidate, schema).perform(cohorts.getRootNode());
+        }
+    }
+
+    void process(ActorRef sender, CohortRegistryCommand message) {
+        if (message instanceof RegisterCohort) {
+            registerCohort(sender, (RegisterCohort) message);
+        } else if (message instanceof RemoveCohort) {
+            removeCommitCohort(sender, (RemoveCohort) message);
+        }
+    }
+
+    static abstract class CohortRegistryCommand {
+
+        private final ActorRef cohort;
+
+        CohortRegistryCommand(ActorRef cohort) {
+            this.cohort = Preconditions.checkNotNull(cohort);
+        }
+
+        ActorRef getCohort() {
+            return cohort;
+        }
+    }
+
+    static class RegisterCohort extends CohortRegistryCommand {
+
+        private final DOMDataTreeIdentifier path;
+
+        RegisterCohort(DOMDataTreeIdentifier path, ActorRef cohort) {
+            super(cohort);
+            this.path = path;
+
+        }
+
+        public DOMDataTreeIdentifier getPath() {
+            return path;
+        }
+
+    }
+
+    static class RemoveCohort extends CohortRegistryCommand {
+
+        RemoveCohort(ActorRef cohort) {
+            super(cohort);
+        }
+
+    }
+
+    private static class CanCommitMessageBuilder {
+
+        private final String txId;
+        private final DataTreeCandidate candidate;
+        private final SchemaContext schema;
+        private final Collection<DataTreeCohortActor.CanCommit> messages =
+                new ArrayList<>();
+
+        CanCommitMessageBuilder(String txId, DataTreeCandidate candidate, SchemaContext schema) {
+            this.txId = Preconditions.checkNotNull(txId);
+            this.candidate = Preconditions.checkNotNull(candidate);
+            this.schema = schema;
+        }
+
+        private void lookupAndCreateCanCommits(List<PathArgument> args, int offset,
+                RegistrationTreeNode<ActorRef> node) {
+
+            if (args.size() != offset) {
+                final PathArgument arg = args.get(offset);
+                final RegistrationTreeNode<ActorRef> exactChild = node.getExactChild(arg);
+                if (exactChild != null) {
+                    lookupAndCreateCanCommits(args, offset + 1, exactChild);
+                }
+                for (final RegistrationTreeNode<ActorRef> c : node.getInexactChildren(arg)) {
+                    lookupAndCreateCanCommits(args, offset + 1, c);
+                }
+            } else {
+                lookupAndCreateCanCommits(candidate.getRootPath(), node, candidate.getRootNode());
+            }
+        }
+
+        private void lookupAndCreateCanCommits(YangInstanceIdentifier path, RegistrationTreeNode<ActorRef> regNode,
+                DataTreeCandidateNode candNode) {
+            if (candNode.getModificationType() == ModificationType.UNMODIFIED) {
+                LOG.debug("Skipping unmodified candidate {}", path);
+                return;
+            }
+            final Collection<ActorRef> regs = regNode.getRegistrations();
+            if (!regs.isEmpty()) {
+                createCanCommits(regs, path, candNode);
+            }
+
+            for (final DataTreeCandidateNode candChild : candNode.getChildNodes()) {
+                if (candChild.getModificationType() != ModificationType.UNMODIFIED) {
+                    final RegistrationTreeNode<ActorRef> regChild =
+                            regNode.getExactChild(candChild.getIdentifier());
+                    if (regChild != null) {
+                        lookupAndCreateCanCommits(path.node(candChild.getIdentifier()), regChild, candChild);
+                    }
+
+                    for (final RegistrationTreeNode<ActorRef> rc : regNode
+                            .getInexactChildren(candChild.getIdentifier())) {
+                        lookupAndCreateCanCommits(path.node(candChild.getIdentifier()), rc, candChild);
+                    }
+                }
+            }
+        }
+
+        private void createCanCommits(Collection<ActorRef> regs, YangInstanceIdentifier path,
+                DataTreeCandidateNode node) {
+            final DOMDataTreeCandidate candidate = DOMDataTreeCandidateTO.create(treeIdentifier(path), node);
+            for (final ActorRef reg : regs) {
+                final CanCommit message = new DataTreeCohortActor.CanCommit(txId, candidate, schema, reg);
+                messages.add(message);
+            }
+        }
+
+        private static DOMDataTreeIdentifier treeIdentifier(YangInstanceIdentifier path) {
+            return new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, path);
+        }
+
+        private Collection<DataTreeCohortActor.CanCommit> perform(RegistrationTreeNode<ActorRef> rootNode) {
+            final List<PathArgument> toLookup = candidate.getRootPath().getPathArguments();
+            lookupAndCreateCanCommits(toLookup, 0, rootNode);
+            return messages;
+        }
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortRegistrationProxy.java
new file mode 100644 (file)
index 0000000..c269312
--- /dev/null
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+public class DataTreeCohortRegistrationProxy<C extends DOMDataTreeCommitCohort> extends AbstractObjectRegistration<C>
+        implements DOMDataTreeCommitCohortRegistration<C> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortRegistrationProxy.class);
+    private static final Timeout TIMEOUT = new Timeout(new FiniteDuration(5, TimeUnit.SECONDS));
+    private final DOMDataTreeIdentifier subtree;
+    private final ActorRef actor;
+    private final ActorContext actorContext;
+    @GuardedBy("this")
+    private ActorRef cohortRegistry;
+
+
+    DataTreeCohortRegistrationProxy(ActorContext actorContext, DOMDataTreeIdentifier subtree, C cohort) {
+        super(cohort);
+        this.subtree = Preconditions.checkNotNull(subtree);
+        this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.actor = actorContext.getActorSystem().actorOf(
+                DataTreeCohortActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath()));
+    }
+
+
+    public void init(String shardName) {
+        // FIXME: Add late binding to shard.
+        Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
+        findFuture.onComplete(new OnComplete<ActorRef>() {
+            @Override
+            public void onComplete(final Throwable failure, final ActorRef shard) {
+                if (failure instanceof LocalShardNotFoundException) {
+                    LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} "
+                            + "cannot be registered", shardName, getInstance(), subtree);
+                } else if (failure != null) {
+                    LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} "
+                            + "cannot be registered: {}", shardName, getInstance(), subtree, failure);
+                } else {
+                    performRegistration(shard);
+                }
+            }
+        }, actorContext.getClientDispatcher());
+    }
+
+    private synchronized void performRegistration(ActorRef shard) {
+        if (isClosed()) {
+            return;
+        }
+        cohortRegistry = shard;
+        Future<Object> future =
+                Patterns.ask(shard, new DataTreeCohortActorRegistry.RegisterCohort(subtree, actor), TIMEOUT);
+        future.onComplete(new OnComplete<Object>() {
+
+            @Override
+            public void onComplete(Throwable e, Object val) throws Throwable {
+                if (e != null) {
+                    LOG.error("Unable to register {} as commit cohort", getInstance(), e);
+                }
+                if (isClosed()) {
+                    removeRegistration();
+                }
+            }
+
+        }, actorContext.getClientDispatcher());
+    }
+
+    @Override
+    protected synchronized void removeRegistration() {
+        if (cohortRegistry != null) {
+            cohortRegistry.tell(new DataTreeCohortActorRegistry.RemoveCohort(actor), ActorRef.noSender());
+        }
+    }
+}
index 579c89747ab9847192c836a0c2cd383d5f98e391..ea7330ae23316a36637081142480a8acc781ba0c 100644 (file)
@@ -32,6 +32,10 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -44,7 +48,7 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class DistributedDataStore implements DistributedDataStoreInterface, SchemaContextListener,
-        DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, AutoCloseable {
+        DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, DOMDataTreeCommitCohortRegistry, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
     private static final String UNKNOWN_TYPE = "unknown";
@@ -156,6 +160,23 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
         return listenerRegistrationProxy;
     }
 
+
+    @Override
+    public <C extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<C> registerCommitCohort(
+            DOMDataTreeIdentifier subtree, C cohort) {
+        YangInstanceIdentifier treeId =
+                Preconditions.checkNotNull(subtree, "subtree should not be null").getRootIdentifier();
+        Preconditions.checkNotNull(cohort, "listener should not be null");
+
+
+        final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId);
+        LOG.debug("Registering cohort: {} for tree: {} shard: {}", cohort, treeId, shardName);
+
+        DataTreeCohortRegistrationProxy<C> cohortProxy = new DataTreeCohortRegistrationProxy<C>(actorContext, subtree, cohort);
+        cohortProxy.init(shardName);
+        return cohortProxy;
+    }
+
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
         return txContextFactory.createTransactionChain();
index 9cb015cfaf35796bdf98692d8d98b9b67dd86a13..23ad747d4cf3d4c7247271c09f5c64fa9fc44c2e 100644 (file)
@@ -82,6 +82,7 @@ import scala.concurrent.duration.FiniteDuration;
  * </p>
  */
 public class Shard extends RaftActor {
+
     @VisibleForTesting
     static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = new Object() {
         @Override
@@ -269,6 +270,9 @@ public class Shard extends RaftActor {
                 context().parent().forward(message, context());
             } else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
                 messageRetrySupport.onTimerMessage(message);
+            } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
+                commitCoordinator.processCohortRegistryCommand(getSender(),
+                        (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
             } else {
                 super.handleNonRaftCommand(message);
             }
@@ -334,9 +338,9 @@ public class Shard extends RaftActor {
 
     private void handleCommitTransaction(final CommitTransaction commit) {
         if (isLeader()) {
-            if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
-                shardMBean.incrementFailedTransactionsCount();
-            }
+        if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
+            shardMBean.incrementFailedTransactionsCount();
+        }
         } else {
             ActorSelection leader = getLeader();
             if (leader == null) {
@@ -354,7 +358,7 @@ public class Shard extends RaftActor {
 
         try {
             try {
-                cohortEntry.commit();
+            cohortEntry.commit();
             } catch(ExecutionException e) {
                 // We may get a "store tree and candidate base differ" IllegalStateException from commit under
                 // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
@@ -432,7 +436,7 @@ public class Shard extends RaftActor {
         LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
 
         if (isLeader()) {
-            commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+        commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
         } else {
             ActorSelection leader = getLeader();
             if (leader == null) {
@@ -447,7 +451,7 @@ public class Shard extends RaftActor {
 
     protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
         try {
-            commitCoordinator.handleBatchedModifications(batched, sender, this);
+            commitCoordinator.handleBatchedModifications(batched, sender, this, store.getSchemaContext());
         } catch (Exception e) {
             LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
                     batched.getTransactionID(), e);
@@ -516,7 +520,7 @@ public class Shard extends RaftActor {
         boolean isLeaderActive = isLeaderActive();
         if (isLeader() && isLeaderActive) {
             try {
-                commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
+                commitCoordinator.handleReadyLocalTransaction(message, getSender(), this, store.getSchemaContext());
             } catch (Exception e) {
                 LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
                         message.getTransactionID(), e);
@@ -540,7 +544,8 @@ public class Shard extends RaftActor {
 
         boolean isLeaderActive = isLeaderActive();
         if (isLeader() && isLeaderActive) {
-            commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
+            commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this,
+                    store.getSchemaContext());
         } else {
             ActorSelection leader = getLeader();
             if (!isLeaderActive || leader == null) {
index 739321b06876ca6331bd785acb6074944ac86daf..45fa7727e4f125e11d73658c699ecbc779715ceb 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.Status.Failure;
 import akka.serialization.Serialization;
+import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
@@ -24,6 +25,8 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry.State;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@@ -39,7 +42,9 @@ import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModifi
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
+import scala.concurrent.duration.Duration;
 
 /**
  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
@@ -59,6 +64,8 @@ class ShardCommitCoordinator {
 
     private final ShardDataTree dataTree;
 
+    private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
+
     // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
     // since this should only be accessed on the shard's dispatcher.
     private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
@@ -78,8 +85,11 @@ class ShardCommitCoordinator {
 
     private Runnable runOnPendingTransactionsComplete;
 
-    ShardCommitCoordinator(ShardDataTree dataTree,
-            long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log, String name) {
+
+    private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
+
+    ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log,
+            String name) {
 
         this.queueCapacity = queueCapacity;
         this.log = log;
@@ -119,7 +129,7 @@ class ShardCommitCoordinator {
         } else {
             cohortCache.remove(cohortEntry.getTransactionID());
 
-            RuntimeException ex = new RuntimeException(
+            final RuntimeException ex = new RuntimeException(
                     String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
                                   " capacity %d has been reached.",
                                   name, cohortEntry.getTransactionID(), queueCapacity));
@@ -136,13 +146,15 @@ class ShardCommitCoordinator {
      * @param ready the ForwardedReadyTransaction message to process
      * @param sender the sender of the message
      * @param shard the transaction's shard actor
+     * @param schema
      */
-    void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
+    void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard,
+            SchemaContext schema) {
         log.debug("{}: Readying transaction {}, client version {}", name,
                 ready.getTransactionID(), ready.getTxnClientVersion());
 
-        ShardDataTreeCohort cohort = ready.getTransaction().ready();
-        CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, ready.getTxnClientVersion());
+        final ShardDataTreeCohort cohort = ready.getTransaction().ready();
+        final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion());
         cohortCache.put(ready.getTransactionID(), cohortEntry);
 
         if(!queueCohortEntry(cohortEntry, sender, shard)) {
@@ -171,12 +183,12 @@ class ShardCommitCoordinator {
      * @param sender the sender of the message
      * @param shard the transaction's shard actor
      */
-    void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
+    void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard, SchemaContext schema) {
         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
         if(cohortEntry == null) {
             cohortEntry = new CohortEntry(batched.getTransactionID(),
-                    dataTree.newReadWriteTransaction(batched.getTransactionID(),
-                        batched.getTransactionChainID()), batched.getVersion());
+                    dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()),
+                    cohortRegistry, schema,  batched.getVersion());
             cohortCache.put(batched.getTransactionID(), cohortEntry);
         }
 
@@ -225,16 +237,18 @@ class ShardCommitCoordinator {
 
     /**
      * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
-     * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
+     * been prepared beforehand by the sender and we just need to drive them through into the
+     * dataTree.
      *
      * @param message the ReadyLocalTransaction message to process
      * @param sender the sender of the message
      * @param shard the transaction's shard actor
      */
-    void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
+    void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard,
+            SchemaContext schema) {
         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
                 message.getTransactionID());
-        final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort,
+        final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry, schema,
                 DataStoreVersions.CURRENT_VERSION);
         cohortCache.put(message.getTransactionID(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
@@ -503,6 +517,7 @@ class ShardCommitCoordinator {
 
     private List<CohortEntry> getAndClearPendingCohortEntries() {
         List<CohortEntry> cohortEntries = new ArrayList<>();
+
         if(currentCohortEntry != null) {
             cohortEntries.add(currentCohortEntry);
             cohortCache.remove(currentCohortEntry.getTransactionID());
@@ -538,7 +553,7 @@ class ShardCommitCoordinator {
                             newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
                         newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
                                 cohortEntry.getClientVersion(), ""));
-                    }
+        }
 
                     return newModifications.getLast();
                 }
@@ -621,9 +636,9 @@ class ShardCommitCoordinator {
     private void maybeProcessNextCohortEntry() {
         // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
         // clean out expired entries.
-        Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
+        final Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
         while(iter.hasNext()) {
-            CohortEntry next = iter.next();
+            final CohortEntry next = iter.next();
             if(next.isReadyToCommit()) {
                 if(currentCohortEntry == null) {
                     if(log.isDebugEnabled()) {
@@ -674,6 +689,10 @@ class ShardCommitCoordinator {
         this.cohortDecorator = cohortDecorator;
     }
 
+   void processCohortRegistryCommand(ActorRef sender, CohortRegistryCommand message) {
+        cohortRegistry.process(sender, message);
+    }
+
     static class CohortEntry {
         enum State {
             PENDING,
@@ -681,7 +700,7 @@ class ShardCommitCoordinator {
             PRE_COMMITTED,
             COMMITTED,
             ABORTED
-        }
+    }
 
         private final String transactionID;
         private ShardDataTreeCohort cohort;
@@ -694,18 +713,23 @@ class ShardCommitCoordinator {
         private int totalBatchedModificationsReceived;
         private State state = State.PENDING;
         private final short clientVersion;
+        private final CompositeDataTreeCohort userCohorts;
 
-        CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, short clientVersion) {
+        CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction,
+                DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
             this.transaction = Preconditions.checkNotNull(transaction);
             this.transactionID = transactionID;
             this.clientVersion = clientVersion;
+            this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
         }
 
-        CohortEntry(String transactionID, ShardDataTreeCohort cohort, short clientVersion) {
+        CohortEntry(String transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
+                SchemaContext schema, short clientVersion) {
             this.transactionID = transactionID;
             this.cohort = cohort;
             this.transaction = null;
             this.clientVersion = clientVersion;
+            this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
         }
 
         void updateLastAccessTime() {
@@ -770,19 +794,25 @@ class ShardCommitCoordinator {
             return cohort.canCommit().get();
         }
 
-        void preCommit() throws InterruptedException, ExecutionException {
+
+
+        void preCommit() throws InterruptedException, ExecutionException, TimeoutException {
             state = State.PRE_COMMITTED;
             cohort.preCommit().get();
+            userCohorts.canCommit(cohort.getCandidate());
+            userCohorts.preCommit();
         }
 
-        void commit() throws InterruptedException, ExecutionException {
+        void commit() throws InterruptedException, ExecutionException, TimeoutException {
             state = State.COMMITTED;
             cohort.commit().get();
+            userCohorts.commit();
         }
 
-        void abort() throws InterruptedException, ExecutionException {
+        void abort() throws InterruptedException, ExecutionException, TimeoutException {
             state = State.ABORTED;
             cohort.abort().get();
+            userCohorts.abort();
         }
 
         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
@@ -837,7 +867,7 @@ class ShardCommitCoordinator {
 
         @Override
         public String toString() {
-            StringBuilder builder = new StringBuilder();
+            final StringBuilder builder = new StringBuilder();
             builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
                     .append(doImmediateCommit).append("]");
             return builder.toString();
index ad5aab345395c0d655a48c3fd59aeeab68db9669..618ea90e265c33b13c3df266b6ab2f38bb43c52b 100644 (file)
@@ -214,6 +214,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return dataTree.takeSnapshot().newModification();
     }
 
+    // FIXME: This should be removed, it violates encapsulation
     public DataTreeCandidate commit(DataTreeModification modification) throws DataValidationFailedException {
         modification.ready();
         dataTree.validate(modification);
index f7cdd4e8dc3e8c13730e668627e03782703edffc..47876123cfa7b716edae5b72e74ee50c7faacd12 100644 (file)
@@ -17,9 +17,12 @@ public abstract class ShardDataTreeCohort {
         // Prevent foreign instantiation
     }
 
+    // FIXME: This leaks internal state generated in preCommit,
+    // should be result of canCommit
     abstract DataTreeCandidateTip getCandidate();
     abstract DataTreeModification getDataTreeModification();
 
+    // FIXME: Should return rebased DataTreeCandidateTip
     @VisibleForTesting
     public abstract ListenableFuture<Boolean> canCommit();
     @VisibleForTesting
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java
new file mode 100644 (file)
index 0000000..84521e5
--- /dev/null
@@ -0,0 +1,179 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.AddressFromURIString;
+import akka.cluster.Cluster;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.mdsal.common.api.DataValidationFailedException;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.PostCanCommitStep;
+import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.Duration;
+
+public class DataTreeCohortIntegrationTest {
+
+    private static final DataValidationFailedException FAILED_CAN_COMMIT =
+            new DataValidationFailedException(YangInstanceIdentifier.class, TestModel.TEST_PATH, "Test failure.");
+    private static final CheckedFuture<PostCanCommitStep, DataValidationFailedException> FAILED_CAN_COMMIT_FUTURE =
+            Futures.immediateFailedCheckedFuture(FAILED_CAN_COMMIT);
+
+    private static final DOMDataTreeIdentifier TEST_ID =
+            new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+
+    private static final Timeout TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
+
+    private static ActorSystem system;
+
+    private final DatastoreContext.Builder datastoreContextBuilder =
+            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
+
+    @BeforeClass
+    public static void setUpClass() throws IOException {
+        system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        final Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
+        Cluster.get(system).join(member1Address);
+    }
+
+    @AfterClass
+    public static void tearDownClass() throws IOException {
+        JavaTestKit.shutdownActorSystem(system);
+        system = null;
+    }
+
+    protected ActorSystem getSystem() {
+        return system;
+    }
+
+    @Test
+    public void registerNoopCohortTest() throws Exception {
+        final DOMDataTreeCommitCohort cohort = mock(DOMDataTreeCommitCohort.class);
+        Mockito.doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class),
+                any(DOMDataTreeCandidate.class), any(SchemaContext.class));
+        ArgumentCaptor<DOMDataTreeCandidate> candidateCapt = ArgumentCaptor.forClass(DOMDataTreeCandidate.class);
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
+            {
+                final DistributedDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest", "test-1");
+                final ObjectRegistration<DOMDataTreeCommitCohort> cohortReg = dataStore.registerCommitCohort(TEST_ID, cohort);
+                Thread.sleep(1000); // Registration is asynchronous
+                assertNotNull(cohortReg);
+                testWriteTransaction(dataStore, TestModel.TEST_PATH,
+                        ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+                Mockito.verify(cohort).canCommit(any(Object.class), candidateCapt.capture(), any(SchemaContext.class));
+                DOMDataTreeCandidate candidate = candidateCapt.getValue();
+                assertNotNull(candidate);
+                assertEquals(TEST_ID, candidate.getRootPath());
+                testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
+                        ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+                Mockito.verify(cohort, Mockito.times(2)).canCommit(any(Object.class), any(DOMDataTreeCandidate.class),
+                        any(SchemaContext.class));
+                cohortReg.close();
+                testWriteTransaction(dataStore, TestModel.TEST_PATH,
+                        ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+                Mockito.verifyNoMoreInteractions(cohort);
+                cleanup(dataStore);
+            }
+        };
+    }
+
+    @Test
+    public void failCanCommitTest() throws Exception {
+        final DOMDataTreeCommitCohort failedCohort = mock(DOMDataTreeCommitCohort.class);
+
+        Mockito.doReturn(FAILED_CAN_COMMIT_FUTURE).when(failedCohort).canCommit(any(Object.class),
+                any(DOMDataTreeCandidate.class), any(SchemaContext.class));
+
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
+            {
+                final DistributedDataStore dataStore =
+                        setupDistributedDataStore("transactionIntegrationTest", "test-1");
+                dataStore.registerCommitCohort(TEST_ID, failedCohort);
+                Thread.sleep(1000); // Registration is asynchronous
+
+                DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+                writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+                DOMStoreThreePhaseCommitCohort dsCohort = writeTx.ready();
+                try {
+                    // FIXME: Weird thing is that invoking canCommit on front-end invokes also
+                    // preCommit on backend.
+                    dsCohort.canCommit().get();
+                    fail("Exception should be raised.");
+                } catch (Exception e) {
+                    assertSame(FAILED_CAN_COMMIT, Throwables.getRootCause(e));
+                }
+                cleanup(dataStore);
+            }
+        };
+    }
+
+    /**
+     *
+     * FIXME: Weird thing is that invoking canCommit on front-end invokes also preCommit on backend
+     * so we can not test abort after can commit.
+     *
+     */
+    @Test
+    @Ignore
+    public void canCommitSuccessExternallyAborted() throws Exception {
+        final DOMDataTreeCommitCohort cohortToAbort = mock(DOMDataTreeCommitCohort.class);
+        final PostCanCommitStep stepToAbort = mock(PostCanCommitStep.class);
+        Mockito.doReturn(Futures.immediateCheckedFuture(stepToAbort)).when(cohortToAbort).canCommit(any(Object.class),
+                any(DOMDataTreeCandidate.class), any(SchemaContext.class));
+        Mockito.doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(stepToAbort).abort();
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
+            {
+                final DistributedDataStore dataStore =
+                        setupDistributedDataStore("transactionIntegrationTest", "test-1");
+                dataStore.registerCommitCohort(TEST_ID, cohortToAbort);
+                Thread.sleep(1000); // Registration is asynchronous
+
+                DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+                writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+                DOMStoreThreePhaseCommitCohort dsCohort = writeTx.ready();
+
+                dsCohort.canCommit().get();
+                dsCohort.abort().get();
+                Mockito.verify(stepToAbort, Mockito.times(1)).abort();
+                cleanup(dataStore);
+            }
+        };
+    }
+}
index 3e2313dc2780eab81c8a6c355751f8def2589905..2846397160abbf11f78d69a6eec7d5577624c0b0 100644 (file)
@@ -19,6 +19,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
@@ -1552,6 +1553,7 @@ public class ShardTest extends AbstractShardTest {
             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
             doReturn(candidateRoot).when(candidate).getRootNode();
+            doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath();
             doReturn(candidate).when(cohort).getCandidate();
 
             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
@@ -1597,6 +1599,7 @@ public class ShardTest extends AbstractShardTest {
             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
             doReturn(candidateRoot).when(candidate).getRootNode();
+            doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath();
             doReturn(candidate).when(cohort).getCandidate();
 
             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());