<feature name='odl-mdsal-broker-local' version='${project.version}' description="OpenDaylight :: MDSAL :: Broker">
<feature version='${yangtools.version}'>odl-yangtools-common</feature>
- <feature version='${mdsal.version}'>odl-mdsal-binding-runtime</feature>
+ <feature version='${mdsal.version}'>odl-mdsal-binding-dom-adapter</feature>
<feature version='${mdsal.model.version}'>odl-mdsal-models</feature>
<feature version='${project.version}'>odl-mdsal-common</feature>
<feature version='${config.version}'>odl-config-startup</feature>
<artifactId>sal-dom-broker-config</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-dom-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-core-spi</artifactId>
import org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.yangtools.util.DurationStatisticsTracker;
import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.slf4j.Logger;
* @author Thomas Pantelis
*/
@Beta
-public class ConcurrentDOMDataBroker extends AbstractDOMBroker {
+public class ConcurrentDOMDataBroker extends AbstractDOMBroker implements DOMDataTreeCommitCohortRegistry {
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentDOMDataBroker.class);
private static final String CAN_COMMIT = "CAN_COMMIT";
private static final String PRE_COMMIT = "PRE_COMMIT";
}
}
}
+
+ @Override
+ public <T extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<T> registerCommitCohort(
+ DOMDataTreeIdentifier path, T cohort) {
+ DOMStore store = getTxFactories().get(toLegacy(path.getDatastoreType()));
+ if (store instanceof DOMDataTreeCommitCohortRegistry) {
+ return ((DOMDataTreeCommitCohortRegistry) store).registerCommitCohort(path, cohort);
+ }
+ throw new UnsupportedOperationException("Commit cohort is not supported for " + path);
+ }
+
+ private LogicalDatastoreType toLegacy(org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType) {
+ switch (datastoreType) {
+ case CONFIGURATION:
+ return LogicalDatastoreType.CONFIGURATION;
+ case OPERATIONAL:
+ return LogicalDatastoreType.OPERATIONAL;
+ default:
+ throw new IllegalArgumentException("Unsupported data store type: " + datastoreType);
+ }
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Status;
+import akka.actor.Status.Failure;
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.Futures;
+import akka.dispatch.Recover;
+import akka.japi.Function;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
+import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+/**
+ *
+ * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
+ *
+ * It tracks current operation and list of cohorts which successfuly finished previous phase in
+ * case, if abort is necessary to invoke it only on cohort steps which are still active.
+ *
+ */
+class CompositeDataTreeCohort {
+
+ private enum State {
+ /**
+ * Cohorts are idle, no messages were sent.
+ */
+ IDLE,
+ /**
+ * CanCommit message was sent to all participating cohorts.
+ */
+ CAN_COMMIT_SENT,
+ /**
+ * Successful canCommit responses were received from every participating cohort.
+ */
+ CAN_COMMIT_SUCCESSFUL,
+ /**
+ * PreCommit message was sent to all participating cohorts.
+ */
+ PRE_COMMIT_SENT,
+ /**
+ * Successful preCommit responses were received from every participating cohort.
+ */
+ PRE_COMMIT_SUCCESSFUL,
+ /**
+ * Commit message was send to all participating cohorts.
+ */
+ COMMIT_SENT,
+ /**
+ * Successful commit responses were received from all participating cohorts.
+ */
+ COMMITED,
+ /**
+ * Some of cohorts responsed back with unsuccessful message.
+ *
+ */
+ FAILED,
+ /**
+ *
+ * Abort message was send to all cohorts which responded with success previously.
+ *
+ */
+ ABORTED
+ }
+
+ protected static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
+ @Override
+ public Failure recover(Throwable error) throws Throwable {
+ return new Failure(error);
+ }
+ };
+
+
+ private final DataTreeCohortActorRegistry registry;
+ private final String txId;
+ private final SchemaContext schema;
+ private final Timeout timeout;
+ private Iterable<Success> successfulFromPrevious;
+ private State state = State.IDLE;
+
+ CompositeDataTreeCohort(DataTreeCohortActorRegistry registry, String txId, SchemaContext schema, Timeout timeout) {
+ this.registry = Preconditions.checkNotNull(registry);
+ this.txId = Preconditions.checkNotNull(txId);
+ this.schema = Preconditions.checkNotNull(schema);
+ this.timeout = Preconditions.checkNotNull(timeout);
+ }
+
+ void canCommit(DataTreeCandidateTip tip) throws ExecutionException, TimeoutException {
+ Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
+ // FIXME: Optimize empty collection list with pre-created futures, containing success.
+ Future<Iterable<Object>> canCommitsFuture =
+ Futures.traverse(messages, new Function<CanCommit, Future<Object>>() {
+ @Override
+ public Future<Object> apply(CanCommit input) {
+ return Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
+ ExecutionContexts.global());
+ }
+ }, ExecutionContexts.global());
+ changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
+ processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
+ }
+
+ void preCommit() throws ExecutionException, TimeoutException {
+ Preconditions.checkState(successfulFromPrevious != null);
+ Future<Iterable<Object>> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId));
+ changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
+ processResponses(preCommitFutures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
+ }
+
+ void commit() throws ExecutionException, TimeoutException {
+ Preconditions.checkState(successfulFromPrevious != null);
+ Future<Iterable<Object>> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId));
+ changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
+ processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED);
+ }
+
+ void abort() throws TimeoutException {
+ if (successfulFromPrevious != null) {
+ sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId));
+ }
+ }
+
+ private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
+ return Futures.traverse(successfulFromPrevious, new Function<DataTreeCohortActor.Success, Future<Object>>() {
+
+ @Override
+ public Future<Object> apply(DataTreeCohortActor.Success cohortResponse) throws Exception {
+ return Patterns.ask(cohortResponse.getCohort(), message, timeout);
+ }
+
+ }, ExecutionContexts.global());
+ }
+
+ private void processResponses(Future<Iterable<Object>> resultsFuture, State currentState, State afterState)
+ throws TimeoutException, ExecutionException {
+ final Iterable<Object> results;
+ try {
+ results = Await.result(resultsFuture, timeout.duration());
+ } catch (Exception e) {
+ successfulFromPrevious = null;
+ Throwables.propagateIfInstanceOf(e, TimeoutException.class);
+ throw Throwables.propagate(e);
+ }
+ Iterable<Failure> failed = Iterables.filter(results, Status.Failure.class);
+ Iterable<Success> successful = Iterables.filter(results, DataTreeCohortActor.Success.class);
+ successfulFromPrevious = successful;
+ if (!Iterables.isEmpty(failed)) {
+ changeStateFrom(currentState, State.FAILED);
+ Iterator<Failure> it = failed.iterator();
+ Throwable firstEx = it.next().cause();
+ while (it.hasNext()) {
+ firstEx.addSuppressed(it.next().cause());
+ }
+ Throwables.propagateIfPossible(firstEx, ExecutionException.class);
+ Throwables.propagateIfPossible(firstEx, TimeoutException.class);
+ throw Throwables.propagate(firstEx);
+ }
+ changeStateFrom(currentState, afterState);
+ }
+
+ void changeStateFrom(State expected, State followup) {
+ Preconditions.checkState(state == expected);
+ state = followup;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+
+final class DOMDataTreeCandidateTO implements DOMDataTreeCandidate {
+
+ private final DOMDataTreeIdentifier rootPath;
+ private final DataTreeCandidateNode rootNode;
+
+ private DOMDataTreeCandidateTO(DOMDataTreeIdentifier rootPath, DataTreeCandidateNode rootNode) {
+ this.rootPath = Preconditions.checkNotNull(rootPath);
+ this.rootNode = Preconditions.checkNotNull(rootNode);
+ }
+
+ @Override
+ public DOMDataTreeIdentifier getRootPath() {
+ return rootPath;
+ }
+
+ @Override
+ public DataTreeCandidateNode getRootNode() {
+ return rootNode;
+ }
+
+ static DOMDataTreeCandidate create(DOMDataTreeIdentifier path, DataTreeCandidateNode node) {
+ return new DOMDataTreeCandidateTO(path, node);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("rootPath", rootPath).add("rootNode", rootNode).toString();
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.Status;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.mdsal.common.api.PostCanCommitStep;
+import org.opendaylight.mdsal.common.api.PostPreCommitStep;
+import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
+ * decapsulating DataTreeChanged messages and dispatching their context to the user.
+ */
+final class DataTreeCohortActor extends AbstractUntypedActor {
+ private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortActor.class);
+ private final CohortBehaviour<?> idleState = new Idle();
+ private final DOMDataTreeCommitCohort cohort;
+ private CohortBehaviour<?> currentState = idleState;
+
+ private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort) {
+ this.cohort = Preconditions.checkNotNull(cohort);
+ }
+
+ @Override
+ protected void handleReceive(final Object message) {
+ currentState = currentState.handle(message);
+ }
+
+
+ /**
+ * Abstract message base for messages handled by {@link DataTreeCohortActor}.
+ *
+ * @param <R> Reply message type
+ */
+ static abstract class CommitProtocolCommand<R extends CommitReply> {
+
+ private final String txId;
+
+ final String getTxId() {
+ return txId;
+ }
+
+ protected CommitProtocolCommand(String txId) {
+ this.txId = Preconditions.checkNotNull(txId);
+ }
+ }
+
+ static final class CanCommit extends CommitProtocolCommand<Success> {
+
+ private final DOMDataTreeCandidate candidate;
+ private final ActorRef cohort;
+ private final SchemaContext schema;
+
+ CanCommit(String txId, DOMDataTreeCandidate candidate, SchemaContext schema, ActorRef cohort) {
+ super(txId);
+ this.cohort = Preconditions.checkNotNull(cohort);
+ this.candidate = Preconditions.checkNotNull(candidate);
+ this.schema = Preconditions.checkNotNull(schema);
+ }
+
+ DOMDataTreeCandidate getCandidate() {
+ return candidate;
+ }
+
+ SchemaContext getSchema() {
+ return schema;
+ }
+
+ ActorRef getCohort() {
+ return cohort;
+ }
+
+ }
+
+ static abstract class CommitReply {
+
+ private final ActorRef cohortRef;
+ private final String txId;
+
+ protected CommitReply(ActorRef cohortRef, String txId) {
+ this.cohortRef = Preconditions.checkNotNull(cohortRef);
+ this.txId = Preconditions.checkNotNull(txId);
+ }
+
+ ActorRef getCohort() {
+ return cohortRef;
+ }
+
+ final String getTxId() {
+ return txId;
+ }
+
+ }
+
+ static final class Success extends CommitReply {
+
+ public Success(ActorRef cohortRef, String txId) {
+ super(cohortRef, txId);
+ }
+
+ }
+
+ static final class PreCommit extends CommitProtocolCommand<Success> {
+
+ public PreCommit(String txId) {
+ super(txId);
+ }
+ }
+
+ static final class Abort extends CommitProtocolCommand<Success> {
+
+ public Abort(String txId) {
+ super(txId);
+ }
+ }
+
+ static final class Commit extends CommitProtocolCommand<Success> {
+
+ public Commit(String txId) {
+ super(txId);
+ }
+ }
+
+ private static abstract class CohortBehaviour<E> {
+
+ abstract Class<E> getHandledMessageType();
+
+ CohortBehaviour<?> handle(Object message) {
+ if (getHandledMessageType().isInstance(message)) {
+ return process(getHandledMessageType().cast(message));
+ } else if (message instanceof Abort) {
+ return abort();
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ abstract CohortBehaviour<?> abort();
+
+ abstract CohortBehaviour<?> process(E message);
+
+ }
+
+ private class Idle extends CohortBehaviour<CanCommit> {
+
+ @Override
+ Class<CanCommit> getHandledMessageType() {
+ return CanCommit.class;
+ }
+
+ @Override
+ CohortBehaviour<?> process(CanCommit message) {
+ final PostCanCommitStep nextStep;
+ try {
+ nextStep = cohort.canCommit(message.getTxId(), message.getCandidate(), message.getSchema()).get();
+ } catch (final Exception e) {
+ getSender().tell(new Status.Failure(e), getSelf());
+ return this;
+ }
+ getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
+ return new PostCanCommit(message.getTxId(), nextStep);
+ }
+
+ @Override
+ CohortBehaviour<?> abort() {
+ return this;
+ }
+
+ }
+
+
+ private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep>
+ extends CohortBehaviour<M> {
+
+ private final S step;
+ private final String txId;
+
+ CohortStateWithStep(String txId, S step) {
+ this.txId = Preconditions.checkNotNull(txId);
+ this.step = Preconditions.checkNotNull(step);
+ }
+
+ final S getStep() {
+ return step;
+ }
+
+ final String getTxId() {
+ return txId;
+ }
+
+ @Override
+ final CohortBehaviour<?> abort() {
+ try {
+ getStep().abort().get();
+ } catch (final Exception e) {
+ LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, e);
+ getSender().tell(new Status.Failure(e), getSelf());
+ return idleState;
+ }
+ getSender().tell(new Success(getSelf(), txId), getSelf());
+ return idleState;
+ }
+
+ }
+
+ private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep> {
+
+ PostCanCommit(String txId, PostCanCommitStep nextStep) {
+ super(txId, nextStep);
+ }
+
+ @Override
+ Class<PreCommit> getHandledMessageType() {
+ return PreCommit.class;
+ }
+
+ @Override
+ CohortBehaviour<?> process(PreCommit message) {
+ final PostPreCommitStep nextStep;
+ try {
+ nextStep = getStep().preCommit().get();
+ } catch (final Exception e) {
+ getSender().tell(new Status.Failure(e), getSelf());
+ return idleState;
+ }
+ getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
+ return new PostPreCommit(getTxId(), nextStep);
+ }
+
+ }
+
+ private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep> {
+
+ PostPreCommit(String txId, PostPreCommitStep step) {
+ super(txId, step);
+ }
+
+ @Override
+ CohortBehaviour<?> process(Commit message) {
+ try {
+ getStep().commit().get();
+ } catch (final Exception e) {
+ getSender().tell(new Status.Failure(e), getSender());
+ return idleState;
+ }
+ getSender().tell(new Success(getSelf(), getTxId()), getSelf());
+ return idleState;
+ }
+
+ @Override
+ Class<Commit> getHandledMessageType() {
+ return Commit.class;
+ }
+
+ }
+
+ static Props props(final DOMDataTreeCommitCohort cohort) {
+ return Props.create(DataTreeCohortActor.class, cohort);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.actor.Status;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
+import org.opendaylight.controller.md.sal.dom.spi.AbstractRegistrationTree;
+import org.opendaylight.controller.md.sal.dom.spi.RegistrationTreeNode;
+import org.opendaylight.controller.md.sal.dom.spi.RegistrationTreeSnapshot;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Registry of user commit cohorts, which is responsible for handling registration and calculation
+ * of affected cohorts based on {@link DataTreeCandidate}.
+ *
+ */
+@NotThreadSafe
+class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortActorRegistry.class);
+
+ private final Map<ActorRef, RegistrationTreeNode<ActorRef>> cohortToNode = new HashMap<>();
+
+
+ void registerCohort(ActorRef sender, RegisterCohort cohort) {
+ takeLock();
+ try {
+ final ActorRef cohortRef = cohort.getCohort();
+ final RegistrationTreeNode<ActorRef> node =
+ findNodeFor(cohort.getPath().getRootIdentifier().getPathArguments());
+ addRegistration(node, cohort.getCohort());
+ cohortToNode.put(cohortRef, node);
+ } catch (final Exception e) {
+ sender.tell(new Status.Failure(e), ActorRef.noSender());
+ return;
+ } finally {
+ releaseLock();
+ }
+ sender.tell(new Status.Success(null), ActorRef.noSender());
+ }
+
+ void removeCommitCohort(ActorRef sender, RemoveCohort message) {
+ final ActorRef cohort = message.getCohort();
+ final RegistrationTreeNode<ActorRef> node = cohortToNode.get(cohort);
+ if (node != null) {
+ removeRegistration(node, cohort);
+ cohortToNode.remove(cohort);
+ }
+ sender.tell(new Status.Success(null), ActorRef.noSender());
+ cohort.tell(PoisonPill.getInstance(), cohort);
+ }
+
+ Collection<DataTreeCohortActor.CanCommit> createCanCommitMessages(String txId, DataTreeCandidate candidate,
+ SchemaContext schema) {
+ try (RegistrationTreeSnapshot<ActorRef> cohorts = takeSnapshot()) {
+ return new CanCommitMessageBuilder(txId, candidate, schema).perform(cohorts.getRootNode());
+ }
+ }
+
+ void process(ActorRef sender, CohortRegistryCommand message) {
+ if (message instanceof RegisterCohort) {
+ registerCohort(sender, (RegisterCohort) message);
+ } else if (message instanceof RemoveCohort) {
+ removeCommitCohort(sender, (RemoveCohort) message);
+ }
+ }
+
+ static abstract class CohortRegistryCommand {
+
+ private final ActorRef cohort;
+
+ CohortRegistryCommand(ActorRef cohort) {
+ this.cohort = Preconditions.checkNotNull(cohort);
+ }
+
+ ActorRef getCohort() {
+ return cohort;
+ }
+ }
+
+ static class RegisterCohort extends CohortRegistryCommand {
+
+ private final DOMDataTreeIdentifier path;
+
+ RegisterCohort(DOMDataTreeIdentifier path, ActorRef cohort) {
+ super(cohort);
+ this.path = path;
+
+ }
+
+ public DOMDataTreeIdentifier getPath() {
+ return path;
+ }
+
+ }
+
+ static class RemoveCohort extends CohortRegistryCommand {
+
+ RemoveCohort(ActorRef cohort) {
+ super(cohort);
+ }
+
+ }
+
+ private static class CanCommitMessageBuilder {
+
+ private final String txId;
+ private final DataTreeCandidate candidate;
+ private final SchemaContext schema;
+ private final Collection<DataTreeCohortActor.CanCommit> messages =
+ new ArrayList<>();
+
+ CanCommitMessageBuilder(String txId, DataTreeCandidate candidate, SchemaContext schema) {
+ this.txId = Preconditions.checkNotNull(txId);
+ this.candidate = Preconditions.checkNotNull(candidate);
+ this.schema = schema;
+ }
+
+ private void lookupAndCreateCanCommits(List<PathArgument> args, int offset,
+ RegistrationTreeNode<ActorRef> node) {
+
+ if (args.size() != offset) {
+ final PathArgument arg = args.get(offset);
+ final RegistrationTreeNode<ActorRef> exactChild = node.getExactChild(arg);
+ if (exactChild != null) {
+ lookupAndCreateCanCommits(args, offset + 1, exactChild);
+ }
+ for (final RegistrationTreeNode<ActorRef> c : node.getInexactChildren(arg)) {
+ lookupAndCreateCanCommits(args, offset + 1, c);
+ }
+ } else {
+ lookupAndCreateCanCommits(candidate.getRootPath(), node, candidate.getRootNode());
+ }
+ }
+
+ private void lookupAndCreateCanCommits(YangInstanceIdentifier path, RegistrationTreeNode<ActorRef> regNode,
+ DataTreeCandidateNode candNode) {
+ if (candNode.getModificationType() == ModificationType.UNMODIFIED) {
+ LOG.debug("Skipping unmodified candidate {}", path);
+ return;
+ }
+ final Collection<ActorRef> regs = regNode.getRegistrations();
+ if (!regs.isEmpty()) {
+ createCanCommits(regs, path, candNode);
+ }
+
+ for (final DataTreeCandidateNode candChild : candNode.getChildNodes()) {
+ if (candChild.getModificationType() != ModificationType.UNMODIFIED) {
+ final RegistrationTreeNode<ActorRef> regChild =
+ regNode.getExactChild(candChild.getIdentifier());
+ if (regChild != null) {
+ lookupAndCreateCanCommits(path.node(candChild.getIdentifier()), regChild, candChild);
+ }
+
+ for (final RegistrationTreeNode<ActorRef> rc : regNode
+ .getInexactChildren(candChild.getIdentifier())) {
+ lookupAndCreateCanCommits(path.node(candChild.getIdentifier()), rc, candChild);
+ }
+ }
+ }
+ }
+
+ private void createCanCommits(Collection<ActorRef> regs, YangInstanceIdentifier path,
+ DataTreeCandidateNode node) {
+ final DOMDataTreeCandidate candidate = DOMDataTreeCandidateTO.create(treeIdentifier(path), node);
+ for (final ActorRef reg : regs) {
+ final CanCommit message = new DataTreeCohortActor.CanCommit(txId, candidate, schema, reg);
+ messages.add(message);
+ }
+ }
+
+ private static DOMDataTreeIdentifier treeIdentifier(YangInstanceIdentifier path) {
+ return new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, path);
+ }
+
+ private Collection<DataTreeCohortActor.CanCommit> perform(RegistrationTreeNode<ActorRef> rootNode) {
+ final List<PathArgument> toLookup = candidate.getRootPath().getPathArguments();
+ lookupAndCreateCanCommits(toLookup, 0, rootNode);
+ return messages;
+ }
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+public class DataTreeCohortRegistrationProxy<C extends DOMDataTreeCommitCohort> extends AbstractObjectRegistration<C>
+ implements DOMDataTreeCommitCohortRegistration<C> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortRegistrationProxy.class);
+ private static final Timeout TIMEOUT = new Timeout(new FiniteDuration(5, TimeUnit.SECONDS));
+ private final DOMDataTreeIdentifier subtree;
+ private final ActorRef actor;
+ private final ActorContext actorContext;
+ @GuardedBy("this")
+ private ActorRef cohortRegistry;
+
+
+ DataTreeCohortRegistrationProxy(ActorContext actorContext, DOMDataTreeIdentifier subtree, C cohort) {
+ super(cohort);
+ this.subtree = Preconditions.checkNotNull(subtree);
+ this.actorContext = Preconditions.checkNotNull(actorContext);
+ this.actor = actorContext.getActorSystem().actorOf(
+ DataTreeCohortActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath()));
+ }
+
+
+ public void init(String shardName) {
+ // FIXME: Add late binding to shard.
+ Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
+ findFuture.onComplete(new OnComplete<ActorRef>() {
+ @Override
+ public void onComplete(final Throwable failure, final ActorRef shard) {
+ if (failure instanceof LocalShardNotFoundException) {
+ LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} "
+ + "cannot be registered", shardName, getInstance(), subtree);
+ } else if (failure != null) {
+ LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} "
+ + "cannot be registered: {}", shardName, getInstance(), subtree, failure);
+ } else {
+ performRegistration(shard);
+ }
+ }
+ }, actorContext.getClientDispatcher());
+ }
+
+ private synchronized void performRegistration(ActorRef shard) {
+ if (isClosed()) {
+ return;
+ }
+ cohortRegistry = shard;
+ Future<Object> future =
+ Patterns.ask(shard, new DataTreeCohortActorRegistry.RegisterCohort(subtree, actor), TIMEOUT);
+ future.onComplete(new OnComplete<Object>() {
+
+ @Override
+ public void onComplete(Throwable e, Object val) throws Throwable {
+ if (e != null) {
+ LOG.error("Unable to register {} as commit cohort", getInstance(), e);
+ }
+ if (isClosed()) {
+ removeRegistration();
+ }
+ }
+
+ }, actorContext.getClientDispatcher());
+ }
+
+ @Override
+ protected synchronized void removeRegistration() {
+ if (cohortRegistry != null) {
+ cohortRegistry.tell(new DataTreeCohortActorRegistry.RemoveCohort(actor), ActorRef.noSender());
+ }
+ }
+}
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
*
*/
public class DistributedDataStore implements DistributedDataStoreInterface, SchemaContextListener,
- DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, AutoCloseable {
+ DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, DOMDataTreeCommitCohortRegistry, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
private static final String UNKNOWN_TYPE = "unknown";
return listenerRegistrationProxy;
}
+
+ @Override
+ public <C extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<C> registerCommitCohort(
+ DOMDataTreeIdentifier subtree, C cohort) {
+ YangInstanceIdentifier treeId =
+ Preconditions.checkNotNull(subtree, "subtree should not be null").getRootIdentifier();
+ Preconditions.checkNotNull(cohort, "listener should not be null");
+
+
+ final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId);
+ LOG.debug("Registering cohort: {} for tree: {} shard: {}", cohort, treeId, shardName);
+
+ DataTreeCohortRegistrationProxy<C> cohortProxy = new DataTreeCohortRegistrationProxy<C>(actorContext, subtree, cohort);
+ cohortProxy.init(shardName);
+ return cohortProxy;
+ }
+
@Override
public DOMStoreTransactionChain createTransactionChain() {
return txContextFactory.createTransactionChain();
* </p>
*/
public class Shard extends RaftActor {
+
@VisibleForTesting
static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = new Object() {
@Override
context().parent().forward(message, context());
} else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
messageRetrySupport.onTimerMessage(message);
+ } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
+ commitCoordinator.processCohortRegistryCommand(getSender(),
+ (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
} else {
super.handleNonRaftCommand(message);
}
private void handleCommitTransaction(final CommitTransaction commit) {
if (isLeader()) {
- if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
- shardMBean.incrementFailedTransactionsCount();
- }
+ if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
+ shardMBean.incrementFailedTransactionsCount();
+ }
} else {
ActorSelection leader = getLeader();
if (leader == null) {
try {
try {
- cohortEntry.commit();
+ cohortEntry.commit();
} catch(ExecutionException e) {
// We may get a "store tree and candidate base differ" IllegalStateException from commit under
// certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
if (isLeader()) {
- commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+ commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
} else {
ActorSelection leader = getLeader();
if (leader == null) {
protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
try {
- commitCoordinator.handleBatchedModifications(batched, sender, this);
+ commitCoordinator.handleBatchedModifications(batched, sender, this, store.getSchemaContext());
} catch (Exception e) {
LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
batched.getTransactionID(), e);
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
try {
- commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
+ commitCoordinator.handleReadyLocalTransaction(message, getSender(), this, store.getSchemaContext());
} catch (Exception e) {
LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
message.getTransactionID(), e);
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
- commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
+ commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this,
+ store.getSchemaContext());
} else {
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {
import akka.actor.ActorRef;
import akka.actor.Status.Failure;
import akka.serialization.Serialization;
+import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry.State;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
+import scala.concurrent.duration.Duration;
/**
* Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
private final ShardDataTree dataTree;
+ private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
+
// We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
// since this should only be accessed on the shard's dispatcher.
private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
private Runnable runOnPendingTransactionsComplete;
- ShardCommitCoordinator(ShardDataTree dataTree,
- long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log, String name) {
+
+ private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
+
+ ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log,
+ String name) {
this.queueCapacity = queueCapacity;
this.log = log;
} else {
cohortCache.remove(cohortEntry.getTransactionID());
- RuntimeException ex = new RuntimeException(
+ final RuntimeException ex = new RuntimeException(
String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
" capacity %d has been reached.",
name, cohortEntry.getTransactionID(), queueCapacity));
* @param ready the ForwardedReadyTransaction message to process
* @param sender the sender of the message
* @param shard the transaction's shard actor
+ * @param schema
*/
- void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
+ void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard,
+ SchemaContext schema) {
log.debug("{}: Readying transaction {}, client version {}", name,
ready.getTransactionID(), ready.getTxnClientVersion());
- ShardDataTreeCohort cohort = ready.getTransaction().ready();
- CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, ready.getTxnClientVersion());
+ final ShardDataTreeCohort cohort = ready.getTransaction().ready();
+ final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion());
cohortCache.put(ready.getTransactionID(), cohortEntry);
if(!queueCohortEntry(cohortEntry, sender, shard)) {
* @param sender the sender of the message
* @param shard the transaction's shard actor
*/
- void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
+ void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard, SchemaContext schema) {
CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
if(cohortEntry == null) {
cohortEntry = new CohortEntry(batched.getTransactionID(),
- dataTree.newReadWriteTransaction(batched.getTransactionID(),
- batched.getTransactionChainID()), batched.getVersion());
+ dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()),
+ cohortRegistry, schema, batched.getVersion());
cohortCache.put(batched.getTransactionID(), cohortEntry);
}
/**
* This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
- * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
+ * been prepared beforehand by the sender and we just need to drive them through into the
+ * dataTree.
*
* @param message the ReadyLocalTransaction message to process
* @param sender the sender of the message
* @param shard the transaction's shard actor
*/
- void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
+ void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard,
+ SchemaContext schema) {
final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
message.getTransactionID());
- final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort,
+ final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry, schema,
DataStoreVersions.CURRENT_VERSION);
cohortCache.put(message.getTransactionID(), cohortEntry);
cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
private List<CohortEntry> getAndClearPendingCohortEntries() {
List<CohortEntry> cohortEntries = new ArrayList<>();
+
if(currentCohortEntry != null) {
cohortEntries.add(currentCohortEntry);
cohortCache.remove(currentCohortEntry.getTransactionID());
newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
cohortEntry.getClientVersion(), ""));
- }
+ }
return newModifications.getLast();
}
private void maybeProcessNextCohortEntry() {
// Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
// clean out expired entries.
- Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
+ final Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
while(iter.hasNext()) {
- CohortEntry next = iter.next();
+ final CohortEntry next = iter.next();
if(next.isReadyToCommit()) {
if(currentCohortEntry == null) {
if(log.isDebugEnabled()) {
this.cohortDecorator = cohortDecorator;
}
+ void processCohortRegistryCommand(ActorRef sender, CohortRegistryCommand message) {
+ cohortRegistry.process(sender, message);
+ }
+
static class CohortEntry {
enum State {
PENDING,
PRE_COMMITTED,
COMMITTED,
ABORTED
- }
+ }
private final String transactionID;
private ShardDataTreeCohort cohort;
private int totalBatchedModificationsReceived;
private State state = State.PENDING;
private final short clientVersion;
+ private final CompositeDataTreeCohort userCohorts;
- CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, short clientVersion) {
+ CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction,
+ DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
this.transaction = Preconditions.checkNotNull(transaction);
this.transactionID = transactionID;
this.clientVersion = clientVersion;
+ this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
}
- CohortEntry(String transactionID, ShardDataTreeCohort cohort, short clientVersion) {
+ CohortEntry(String transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
+ SchemaContext schema, short clientVersion) {
this.transactionID = transactionID;
this.cohort = cohort;
this.transaction = null;
this.clientVersion = clientVersion;
+ this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
}
void updateLastAccessTime() {
return cohort.canCommit().get();
}
- void preCommit() throws InterruptedException, ExecutionException {
+
+
+ void preCommit() throws InterruptedException, ExecutionException, TimeoutException {
state = State.PRE_COMMITTED;
cohort.preCommit().get();
+ userCohorts.canCommit(cohort.getCandidate());
+ userCohorts.preCommit();
}
- void commit() throws InterruptedException, ExecutionException {
+ void commit() throws InterruptedException, ExecutionException, TimeoutException {
state = State.COMMITTED;
cohort.commit().get();
+ userCohorts.commit();
}
- void abort() throws InterruptedException, ExecutionException {
+ void abort() throws InterruptedException, ExecutionException, TimeoutException {
state = State.ABORTED;
cohort.abort().get();
+ userCohorts.abort();
}
void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
+ final StringBuilder builder = new StringBuilder();
builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
.append(doImmediateCommit).append("]");
return builder.toString();
return dataTree.takeSnapshot().newModification();
}
+ // FIXME: This should be removed, it violates encapsulation
public DataTreeCandidate commit(DataTreeModification modification) throws DataValidationFailedException {
modification.ready();
dataTree.validate(modification);
// Prevent foreign instantiation
}
+ // FIXME: This leaks internal state generated in preCommit,
+ // should be result of canCommit
abstract DataTreeCandidateTip getCandidate();
abstract DataTreeModification getDataTreeModification();
+ // FIXME: Should return rebased DataTreeCandidateTip
@VisibleForTesting
public abstract ListenableFuture<Boolean> canCommit();
@VisibleForTesting
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.AddressFromURIString;
+import akka.cluster.Cluster;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.mdsal.common.api.DataValidationFailedException;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.PostCanCommitStep;
+import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.Duration;
+
+public class DataTreeCohortIntegrationTest {
+
+ private static final DataValidationFailedException FAILED_CAN_COMMIT =
+ new DataValidationFailedException(YangInstanceIdentifier.class, TestModel.TEST_PATH, "Test failure.");
+ private static final CheckedFuture<PostCanCommitStep, DataValidationFailedException> FAILED_CAN_COMMIT_FUTURE =
+ Futures.immediateFailedCheckedFuture(FAILED_CAN_COMMIT);
+
+ private static final DOMDataTreeIdentifier TEST_ID =
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+
+ private static final Timeout TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
+
+ private static ActorSystem system;
+
+ private final DatastoreContext.Builder datastoreContextBuilder =
+ DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
+
+ @BeforeClass
+ public static void setUpClass() throws IOException {
+ system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+ final Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
+ Cluster.get(system).join(member1Address);
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws IOException {
+ JavaTestKit.shutdownActorSystem(system);
+ system = null;
+ }
+
+ protected ActorSystem getSystem() {
+ return system;
+ }
+
+ @Test
+ public void registerNoopCohortTest() throws Exception {
+ final DOMDataTreeCommitCohort cohort = mock(DOMDataTreeCommitCohort.class);
+ Mockito.doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class),
+ any(DOMDataTreeCandidate.class), any(SchemaContext.class));
+ ArgumentCaptor<DOMDataTreeCandidate> candidateCapt = ArgumentCaptor.forClass(DOMDataTreeCandidate.class);
+ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
+ {
+ final DistributedDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest", "test-1");
+ final ObjectRegistration<DOMDataTreeCommitCohort> cohortReg = dataStore.registerCommitCohort(TEST_ID, cohort);
+ Thread.sleep(1000); // Registration is asynchronous
+ assertNotNull(cohortReg);
+ testWriteTransaction(dataStore, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ Mockito.verify(cohort).canCommit(any(Object.class), candidateCapt.capture(), any(SchemaContext.class));
+ DOMDataTreeCandidate candidate = candidateCapt.getValue();
+ assertNotNull(candidate);
+ assertEquals(TEST_ID, candidate.getRootPath());
+ testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+ Mockito.verify(cohort, Mockito.times(2)).canCommit(any(Object.class), any(DOMDataTreeCandidate.class),
+ any(SchemaContext.class));
+ cohortReg.close();
+ testWriteTransaction(dataStore, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ Mockito.verifyNoMoreInteractions(cohort);
+ cleanup(dataStore);
+ }
+ };
+ }
+
+ @Test
+ public void failCanCommitTest() throws Exception {
+ final DOMDataTreeCommitCohort failedCohort = mock(DOMDataTreeCommitCohort.class);
+
+ Mockito.doReturn(FAILED_CAN_COMMIT_FUTURE).when(failedCohort).canCommit(any(Object.class),
+ any(DOMDataTreeCandidate.class), any(SchemaContext.class));
+
+ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
+ {
+ final DistributedDataStore dataStore =
+ setupDistributedDataStore("transactionIntegrationTest", "test-1");
+ dataStore.registerCommitCohort(TEST_ID, failedCohort);
+ Thread.sleep(1000); // Registration is asynchronous
+
+ DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+ writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ DOMStoreThreePhaseCommitCohort dsCohort = writeTx.ready();
+ try {
+ // FIXME: Weird thing is that invoking canCommit on front-end invokes also
+ // preCommit on backend.
+ dsCohort.canCommit().get();
+ fail("Exception should be raised.");
+ } catch (Exception e) {
+ assertSame(FAILED_CAN_COMMIT, Throwables.getRootCause(e));
+ }
+ cleanup(dataStore);
+ }
+ };
+ }
+
+ /**
+ *
+ * FIXME: Weird thing is that invoking canCommit on front-end invokes also preCommit on backend
+ * so we can not test abort after can commit.
+ *
+ */
+ @Test
+ @Ignore
+ public void canCommitSuccessExternallyAborted() throws Exception {
+ final DOMDataTreeCommitCohort cohortToAbort = mock(DOMDataTreeCommitCohort.class);
+ final PostCanCommitStep stepToAbort = mock(PostCanCommitStep.class);
+ Mockito.doReturn(Futures.immediateCheckedFuture(stepToAbort)).when(cohortToAbort).canCommit(any(Object.class),
+ any(DOMDataTreeCandidate.class), any(SchemaContext.class));
+ Mockito.doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(stepToAbort).abort();
+ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
+ {
+ final DistributedDataStore dataStore =
+ setupDistributedDataStore("transactionIntegrationTest", "test-1");
+ dataStore.registerCommitCohort(TEST_ID, cohortToAbort);
+ Thread.sleep(1000); // Registration is asynchronous
+
+ DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+ writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ DOMStoreThreePhaseCommitCohort dsCohort = writeTx.ready();
+
+ dsCohort.canCommit().get();
+ dsCohort.abort().get();
+ Mockito.verify(stepToAbort, Mockito.times(1)).abort();
+ cleanup(dataStore);
+ }
+ };
+ }
+}
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
+
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
doReturn(candidateRoot).when(candidate).getRootNode();
+ doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath();
doReturn(candidate).when(cohort).getCandidate();
shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
doReturn(candidateRoot).when(candidate).getRootNode();
+ doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath();
doReturn(candidate).when(cohort).getCandidate();
shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());