From: Tony Tkacik Date: Thu, 14 Jan 2016 17:42:33 +0000 (+0100) Subject: Bug 1435: CDS: Added support for custom commit cohort. X-Git-Tag: release/boron~202 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=1d3c54640b9fff649fe8d0f57e20d56f8f936cc1 Bug 1435: CDS: Added support for custom commit cohort. Implemented support for user provided commit cohorts, which implements DOMDataTreeCommitCohort interface contract. Messages are only in-JVM so cohort needs to be colocated with replica. Change-Id: I04e592c0232383e70fa8944b966b1aa341730a98 Signed-off-by: Tony Tkacik --- diff --git a/features/mdsal/src/main/features/features.xml b/features/mdsal/src/main/features/features.xml index ca5593d993..88a18f7543 100644 --- a/features/mdsal/src/main/features/features.xml +++ b/features/mdsal/src/main/features/features.xml @@ -32,7 +32,7 @@ odl-yangtools-common - odl-mdsal-binding-runtime + odl-mdsal-binding-dom-adapter odl-mdsal-models odl-mdsal-common odl-config-startup diff --git a/opendaylight/md-sal/sal-distributed-datastore/pom.xml b/opendaylight/md-sal/sal-distributed-datastore/pom.xml index d545cf49ec..f2719bccc8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/pom.xml +++ b/opendaylight/md-sal/sal-distributed-datastore/pom.xml @@ -129,6 +129,10 @@ sal-dom-broker-config + + org.opendaylight.mdsal + mdsal-dom-api + org.opendaylight.controller sal-core-spi diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBroker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBroker.java index e8b19b8346..02362b9371 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBroker.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBroker.java @@ -29,6 +29,10 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper; import org.opendaylight.controller.sal.core.spi.data.DOMStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.util.DurationStatisticsTracker; import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.slf4j.Logger; @@ -42,7 +46,7 @@ import org.slf4j.LoggerFactory; * @author Thomas Pantelis */ @Beta -public class ConcurrentDOMDataBroker extends AbstractDOMBroker { +public class ConcurrentDOMDataBroker extends AbstractDOMBroker implements DOMDataTreeCommitCohortRegistry { private static final Logger LOG = LoggerFactory.getLogger(ConcurrentDOMDataBroker.class); private static final String CAN_COMMIT = "CAN_COMMIT"; private static final String PRE_COMMIT = "PRE_COMMIT"; @@ -325,4 +329,25 @@ public class ConcurrentDOMDataBroker extends AbstractDOMBroker { } } } + + @Override + public DOMDataTreeCommitCohortRegistration registerCommitCohort( + DOMDataTreeIdentifier path, T cohort) { + DOMStore store = getTxFactories().get(toLegacy(path.getDatastoreType())); + if (store instanceof DOMDataTreeCommitCohortRegistry) { + return ((DOMDataTreeCommitCohortRegistry) store).registerCommitCohort(path, cohort); + } + throw new UnsupportedOperationException("Commit cohort is not supported for " + path); + } + + private LogicalDatastoreType toLegacy(org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType) { + switch (datastoreType) { + case CONFIGURATION: + return LogicalDatastoreType.CONFIGURATION; + case OPERATIONAL: + return LogicalDatastoreType.OPERATIONAL; + default: + throw new IllegalArgumentException("Unsupported data store type: " + datastoreType); + } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java new file mode 100644 index 0000000000..07ff936b2f --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.Status; +import akka.actor.Status.Failure; +import akka.dispatch.ExecutionContexts; +import akka.dispatch.Futures; +import akka.dispatch.Recover; +import akka.japi.Function; +import akka.pattern.Patterns; +import akka.util.Timeout; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit; +import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.Await; +import scala.concurrent.Future; + +/** + * + * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort. + * + * It tracks current operation and list of cohorts which successfuly finished previous phase in + * case, if abort is necessary to invoke it only on cohort steps which are still active. + * + */ +class CompositeDataTreeCohort { + + private enum State { + /** + * Cohorts are idle, no messages were sent. + */ + IDLE, + /** + * CanCommit message was sent to all participating cohorts. + */ + CAN_COMMIT_SENT, + /** + * Successful canCommit responses were received from every participating cohort. + */ + CAN_COMMIT_SUCCESSFUL, + /** + * PreCommit message was sent to all participating cohorts. + */ + PRE_COMMIT_SENT, + /** + * Successful preCommit responses were received from every participating cohort. + */ + PRE_COMMIT_SUCCESSFUL, + /** + * Commit message was send to all participating cohorts. + */ + COMMIT_SENT, + /** + * Successful commit responses were received from all participating cohorts. + */ + COMMITED, + /** + * Some of cohorts responsed back with unsuccessful message. + * + */ + FAILED, + /** + * + * Abort message was send to all cohorts which responded with success previously. + * + */ + ABORTED + } + + protected static final Recover EXCEPTION_TO_MESSAGE = new Recover() { + @Override + public Failure recover(Throwable error) throws Throwable { + return new Failure(error); + } + }; + + + private final DataTreeCohortActorRegistry registry; + private final String txId; + private final SchemaContext schema; + private final Timeout timeout; + private Iterable successfulFromPrevious; + private State state = State.IDLE; + + CompositeDataTreeCohort(DataTreeCohortActorRegistry registry, String txId, SchemaContext schema, Timeout timeout) { + this.registry = Preconditions.checkNotNull(registry); + this.txId = Preconditions.checkNotNull(txId); + this.schema = Preconditions.checkNotNull(schema); + this.timeout = Preconditions.checkNotNull(timeout); + } + + void canCommit(DataTreeCandidateTip tip) throws ExecutionException, TimeoutException { + Collection messages = registry.createCanCommitMessages(txId, tip, schema); + // FIXME: Optimize empty collection list with pre-created futures, containing success. + Future> canCommitsFuture = + Futures.traverse(messages, new Function>() { + @Override + public Future apply(CanCommit input) { + return Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE, + ExecutionContexts.global()); + } + }, ExecutionContexts.global()); + changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT); + processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL); + } + + void preCommit() throws ExecutionException, TimeoutException { + Preconditions.checkState(successfulFromPrevious != null); + Future> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId)); + changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT); + processResponses(preCommitFutures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL); + } + + void commit() throws ExecutionException, TimeoutException { + Preconditions.checkState(successfulFromPrevious != null); + Future> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId)); + changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT); + processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED); + } + + void abort() throws TimeoutException { + if (successfulFromPrevious != null) { + sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId)); + } + } + + private Future> sendMesageToSuccessful(final Object message) { + return Futures.traverse(successfulFromPrevious, new Function>() { + + @Override + public Future apply(DataTreeCohortActor.Success cohortResponse) throws Exception { + return Patterns.ask(cohortResponse.getCohort(), message, timeout); + } + + }, ExecutionContexts.global()); + } + + private void processResponses(Future> resultsFuture, State currentState, State afterState) + throws TimeoutException, ExecutionException { + final Iterable results; + try { + results = Await.result(resultsFuture, timeout.duration()); + } catch (Exception e) { + successfulFromPrevious = null; + Throwables.propagateIfInstanceOf(e, TimeoutException.class); + throw Throwables.propagate(e); + } + Iterable failed = Iterables.filter(results, Status.Failure.class); + Iterable successful = Iterables.filter(results, DataTreeCohortActor.Success.class); + successfulFromPrevious = successful; + if (!Iterables.isEmpty(failed)) { + changeStateFrom(currentState, State.FAILED); + Iterator it = failed.iterator(); + Throwable firstEx = it.next().cause(); + while (it.hasNext()) { + firstEx.addSuppressed(it.next().cause()); + } + Throwables.propagateIfPossible(firstEx, ExecutionException.class); + Throwables.propagateIfPossible(firstEx, TimeoutException.class); + throw Throwables.propagate(firstEx); + } + changeStateFrom(currentState, afterState); + } + + void changeStateFrom(State expected, State followup) { + Preconditions.checkState(state == expected); + state = followup; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMDataTreeCandidateTO.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMDataTreeCandidateTO.java new file mode 100644 index 0000000000..7e79c3e88a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMDataTreeCandidateTO.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; + +final class DOMDataTreeCandidateTO implements DOMDataTreeCandidate { + + private final DOMDataTreeIdentifier rootPath; + private final DataTreeCandidateNode rootNode; + + private DOMDataTreeCandidateTO(DOMDataTreeIdentifier rootPath, DataTreeCandidateNode rootNode) { + this.rootPath = Preconditions.checkNotNull(rootPath); + this.rootNode = Preconditions.checkNotNull(rootNode); + } + + @Override + public DOMDataTreeIdentifier getRootPath() { + return rootPath; + } + + @Override + public DataTreeCandidateNode getRootNode() { + return rootNode; + } + + static DOMDataTreeCandidate create(DOMDataTreeIdentifier path, DataTreeCandidateNode node) { + return new DOMDataTreeCandidateTO(path, node); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("rootPath", rootPath).add("rootNode", rootNode).toString(); + } + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java new file mode 100644 index 0000000000..485e8b5c24 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java @@ -0,0 +1,274 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.Status; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; +import org.opendaylight.mdsal.common.api.PostCanCommitStep; +import org.opendaylight.mdsal.common.api.PostPreCommitStep; +import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for + * decapsulating DataTreeChanged messages and dispatching their context to the user. + */ +final class DataTreeCohortActor extends AbstractUntypedActor { + private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortActor.class); + private final CohortBehaviour idleState = new Idle(); + private final DOMDataTreeCommitCohort cohort; + private CohortBehaviour currentState = idleState; + + private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort) { + this.cohort = Preconditions.checkNotNull(cohort); + } + + @Override + protected void handleReceive(final Object message) { + currentState = currentState.handle(message); + } + + + /** + * Abstract message base for messages handled by {@link DataTreeCohortActor}. + * + * @param Reply message type + */ + static abstract class CommitProtocolCommand { + + private final String txId; + + final String getTxId() { + return txId; + } + + protected CommitProtocolCommand(String txId) { + this.txId = Preconditions.checkNotNull(txId); + } + } + + static final class CanCommit extends CommitProtocolCommand { + + private final DOMDataTreeCandidate candidate; + private final ActorRef cohort; + private final SchemaContext schema; + + CanCommit(String txId, DOMDataTreeCandidate candidate, SchemaContext schema, ActorRef cohort) { + super(txId); + this.cohort = Preconditions.checkNotNull(cohort); + this.candidate = Preconditions.checkNotNull(candidate); + this.schema = Preconditions.checkNotNull(schema); + } + + DOMDataTreeCandidate getCandidate() { + return candidate; + } + + SchemaContext getSchema() { + return schema; + } + + ActorRef getCohort() { + return cohort; + } + + } + + static abstract class CommitReply { + + private final ActorRef cohortRef; + private final String txId; + + protected CommitReply(ActorRef cohortRef, String txId) { + this.cohortRef = Preconditions.checkNotNull(cohortRef); + this.txId = Preconditions.checkNotNull(txId); + } + + ActorRef getCohort() { + return cohortRef; + } + + final String getTxId() { + return txId; + } + + } + + static final class Success extends CommitReply { + + public Success(ActorRef cohortRef, String txId) { + super(cohortRef, txId); + } + + } + + static final class PreCommit extends CommitProtocolCommand { + + public PreCommit(String txId) { + super(txId); + } + } + + static final class Abort extends CommitProtocolCommand { + + public Abort(String txId) { + super(txId); + } + } + + static final class Commit extends CommitProtocolCommand { + + public Commit(String txId) { + super(txId); + } + } + + private static abstract class CohortBehaviour { + + abstract Class getHandledMessageType(); + + CohortBehaviour handle(Object message) { + if (getHandledMessageType().isInstance(message)) { + return process(getHandledMessageType().cast(message)); + } else if (message instanceof Abort) { + return abort(); + } + throw new UnsupportedOperationException(); + } + + abstract CohortBehaviour abort(); + + abstract CohortBehaviour process(E message); + + } + + private class Idle extends CohortBehaviour { + + @Override + Class getHandledMessageType() { + return CanCommit.class; + } + + @Override + CohortBehaviour process(CanCommit message) { + final PostCanCommitStep nextStep; + try { + nextStep = cohort.canCommit(message.getTxId(), message.getCandidate(), message.getSchema()).get(); + } catch (final Exception e) { + getSender().tell(new Status.Failure(e), getSelf()); + return this; + } + getSender().tell(new Success(getSelf(), message.getTxId()), getSelf()); + return new PostCanCommit(message.getTxId(), nextStep); + } + + @Override + CohortBehaviour abort() { + return this; + } + + } + + + private abstract class CohortStateWithStep, S extends ThreePhaseCommitStep> + extends CohortBehaviour { + + private final S step; + private final String txId; + + CohortStateWithStep(String txId, S step) { + this.txId = Preconditions.checkNotNull(txId); + this.step = Preconditions.checkNotNull(step); + } + + final S getStep() { + return step; + } + + final String getTxId() { + return txId; + } + + @Override + final CohortBehaviour abort() { + try { + getStep().abort().get(); + } catch (final Exception e) { + LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, e); + getSender().tell(new Status.Failure(e), getSelf()); + return idleState; + } + getSender().tell(new Success(getSelf(), txId), getSelf()); + return idleState; + } + + } + + private class PostCanCommit extends CohortStateWithStep { + + PostCanCommit(String txId, PostCanCommitStep nextStep) { + super(txId, nextStep); + } + + @Override + Class getHandledMessageType() { + return PreCommit.class; + } + + @Override + CohortBehaviour process(PreCommit message) { + final PostPreCommitStep nextStep; + try { + nextStep = getStep().preCommit().get(); + } catch (final Exception e) { + getSender().tell(new Status.Failure(e), getSelf()); + return idleState; + } + getSender().tell(new Success(getSelf(), message.getTxId()), getSelf()); + return new PostPreCommit(getTxId(), nextStep); + } + + } + + private class PostPreCommit extends CohortStateWithStep { + + PostPreCommit(String txId, PostPreCommitStep step) { + super(txId, step); + } + + @Override + CohortBehaviour process(Commit message) { + try { + getStep().commit().get(); + } catch (final Exception e) { + getSender().tell(new Status.Failure(e), getSender()); + return idleState; + } + getSender().tell(new Success(getSelf(), getTxId()), getSelf()); + return idleState; + } + + @Override + Class getHandledMessageType() { + return Commit.class; + } + + } + + static Props props(final DOMDataTreeCommitCohort cohort) { + return Props.create(DataTreeCohortActor.class, cohort); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java new file mode 100644 index 0000000000..ec7e2ee61b --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java @@ -0,0 +1,208 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.actor.PoisonPill; +import akka.actor.Status; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit; +import org.opendaylight.controller.md.sal.dom.spi.AbstractRegistrationTree; +import org.opendaylight.controller.md.sal.dom.spi.RegistrationTreeNode; +import org.opendaylight.controller.md.sal.dom.spi.RegistrationTreeSnapshot; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Registry of user commit cohorts, which is responsible for handling registration and calculation + * of affected cohorts based on {@link DataTreeCandidate}. + * + */ +@NotThreadSafe +class DataTreeCohortActorRegistry extends AbstractRegistrationTree { + + private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortActorRegistry.class); + + private final Map> cohortToNode = new HashMap<>(); + + + void registerCohort(ActorRef sender, RegisterCohort cohort) { + takeLock(); + try { + final ActorRef cohortRef = cohort.getCohort(); + final RegistrationTreeNode node = + findNodeFor(cohort.getPath().getRootIdentifier().getPathArguments()); + addRegistration(node, cohort.getCohort()); + cohortToNode.put(cohortRef, node); + } catch (final Exception e) { + sender.tell(new Status.Failure(e), ActorRef.noSender()); + return; + } finally { + releaseLock(); + } + sender.tell(new Status.Success(null), ActorRef.noSender()); + } + + void removeCommitCohort(ActorRef sender, RemoveCohort message) { + final ActorRef cohort = message.getCohort(); + final RegistrationTreeNode node = cohortToNode.get(cohort); + if (node != null) { + removeRegistration(node, cohort); + cohortToNode.remove(cohort); + } + sender.tell(new Status.Success(null), ActorRef.noSender()); + cohort.tell(PoisonPill.getInstance(), cohort); + } + + Collection createCanCommitMessages(String txId, DataTreeCandidate candidate, + SchemaContext schema) { + try (RegistrationTreeSnapshot cohorts = takeSnapshot()) { + return new CanCommitMessageBuilder(txId, candidate, schema).perform(cohorts.getRootNode()); + } + } + + void process(ActorRef sender, CohortRegistryCommand message) { + if (message instanceof RegisterCohort) { + registerCohort(sender, (RegisterCohort) message); + } else if (message instanceof RemoveCohort) { + removeCommitCohort(sender, (RemoveCohort) message); + } + } + + static abstract class CohortRegistryCommand { + + private final ActorRef cohort; + + CohortRegistryCommand(ActorRef cohort) { + this.cohort = Preconditions.checkNotNull(cohort); + } + + ActorRef getCohort() { + return cohort; + } + } + + static class RegisterCohort extends CohortRegistryCommand { + + private final DOMDataTreeIdentifier path; + + RegisterCohort(DOMDataTreeIdentifier path, ActorRef cohort) { + super(cohort); + this.path = path; + + } + + public DOMDataTreeIdentifier getPath() { + return path; + } + + } + + static class RemoveCohort extends CohortRegistryCommand { + + RemoveCohort(ActorRef cohort) { + super(cohort); + } + + } + + private static class CanCommitMessageBuilder { + + private final String txId; + private final DataTreeCandidate candidate; + private final SchemaContext schema; + private final Collection messages = + new ArrayList<>(); + + CanCommitMessageBuilder(String txId, DataTreeCandidate candidate, SchemaContext schema) { + this.txId = Preconditions.checkNotNull(txId); + this.candidate = Preconditions.checkNotNull(candidate); + this.schema = schema; + } + + private void lookupAndCreateCanCommits(List args, int offset, + RegistrationTreeNode node) { + + if (args.size() != offset) { + final PathArgument arg = args.get(offset); + final RegistrationTreeNode exactChild = node.getExactChild(arg); + if (exactChild != null) { + lookupAndCreateCanCommits(args, offset + 1, exactChild); + } + for (final RegistrationTreeNode c : node.getInexactChildren(arg)) { + lookupAndCreateCanCommits(args, offset + 1, c); + } + } else { + lookupAndCreateCanCommits(candidate.getRootPath(), node, candidate.getRootNode()); + } + } + + private void lookupAndCreateCanCommits(YangInstanceIdentifier path, RegistrationTreeNode regNode, + DataTreeCandidateNode candNode) { + if (candNode.getModificationType() == ModificationType.UNMODIFIED) { + LOG.debug("Skipping unmodified candidate {}", path); + return; + } + final Collection regs = regNode.getRegistrations(); + if (!regs.isEmpty()) { + createCanCommits(regs, path, candNode); + } + + for (final DataTreeCandidateNode candChild : candNode.getChildNodes()) { + if (candChild.getModificationType() != ModificationType.UNMODIFIED) { + final RegistrationTreeNode regChild = + regNode.getExactChild(candChild.getIdentifier()); + if (regChild != null) { + lookupAndCreateCanCommits(path.node(candChild.getIdentifier()), regChild, candChild); + } + + for (final RegistrationTreeNode rc : regNode + .getInexactChildren(candChild.getIdentifier())) { + lookupAndCreateCanCommits(path.node(candChild.getIdentifier()), rc, candChild); + } + } + } + } + + private void createCanCommits(Collection regs, YangInstanceIdentifier path, + DataTreeCandidateNode node) { + final DOMDataTreeCandidate candidate = DOMDataTreeCandidateTO.create(treeIdentifier(path), node); + for (final ActorRef reg : regs) { + final CanCommit message = new DataTreeCohortActor.CanCommit(txId, candidate, schema, reg); + messages.add(message); + } + } + + private static DOMDataTreeIdentifier treeIdentifier(YangInstanceIdentifier path) { + return new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, path); + } + + private Collection perform(RegistrationTreeNode rootNode) { + final List toLookup = candidate.getRootPath().getPathArguments(); + lookupAndCreateCanCommits(toLookup, 0, rootNode); + return messages; + } + } + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortRegistrationProxy.java new file mode 100644 index 0000000000..c269312c8d --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortRegistrationProxy.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import akka.util.Timeout; +import com.google.common.base.Preconditions; +import java.util.concurrent.TimeUnit; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +public class DataTreeCohortRegistrationProxy extends AbstractObjectRegistration + implements DOMDataTreeCommitCohortRegistration { + + private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortRegistrationProxy.class); + private static final Timeout TIMEOUT = new Timeout(new FiniteDuration(5, TimeUnit.SECONDS)); + private final DOMDataTreeIdentifier subtree; + private final ActorRef actor; + private final ActorContext actorContext; + @GuardedBy("this") + private ActorRef cohortRegistry; + + + DataTreeCohortRegistrationProxy(ActorContext actorContext, DOMDataTreeIdentifier subtree, C cohort) { + super(cohort); + this.subtree = Preconditions.checkNotNull(subtree); + this.actorContext = Preconditions.checkNotNull(actorContext); + this.actor = actorContext.getActorSystem().actorOf( + DataTreeCohortActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath())); + } + + + public void init(String shardName) { + // FIXME: Add late binding to shard. + Future findFuture = actorContext.findLocalShardAsync(shardName); + findFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final ActorRef shard) { + if (failure instanceof LocalShardNotFoundException) { + LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} " + + "cannot be registered", shardName, getInstance(), subtree); + } else if (failure != null) { + LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} " + + "cannot be registered: {}", shardName, getInstance(), subtree, failure); + } else { + performRegistration(shard); + } + } + }, actorContext.getClientDispatcher()); + } + + private synchronized void performRegistration(ActorRef shard) { + if (isClosed()) { + return; + } + cohortRegistry = shard; + Future future = + Patterns.ask(shard, new DataTreeCohortActorRegistry.RegisterCohort(subtree, actor), TIMEOUT); + future.onComplete(new OnComplete() { + + @Override + public void onComplete(Throwable e, Object val) throws Throwable { + if (e != null) { + LOG.error("Unable to register {} as commit cohort", getInstance(), e); + } + if (isClosed()) { + removeRegistration(); + } + } + + }, actorContext.getClientDispatcher()); + } + + @Override + protected synchronized void removeRegistration() { + if (cohortRegistry != null) { + cohortRegistry.tell(new DataTreeCohortActorRegistry.RemoveCohort(actor), ActorRef.noSender()); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 579c89747a..ea7330ae23 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -32,6 +32,10 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -44,7 +48,7 @@ import org.slf4j.LoggerFactory; * */ public class DistributedDataStore implements DistributedDataStoreInterface, SchemaContextListener, - DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, AutoCloseable { + DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, DOMDataTreeCommitCohortRegistry, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); private static final String UNKNOWN_TYPE = "unknown"; @@ -156,6 +160,23 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche return listenerRegistrationProxy; } + + @Override + public DOMDataTreeCommitCohortRegistration registerCommitCohort( + DOMDataTreeIdentifier subtree, C cohort) { + YangInstanceIdentifier treeId = + Preconditions.checkNotNull(subtree, "subtree should not be null").getRootIdentifier(); + Preconditions.checkNotNull(cohort, "listener should not be null"); + + + final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId); + LOG.debug("Registering cohort: {} for tree: {} shard: {}", cohort, treeId, shardName); + + DataTreeCohortRegistrationProxy cohortProxy = new DataTreeCohortRegistrationProxy(actorContext, subtree, cohort); + cohortProxy.init(shardName); + return cohortProxy; + } + @Override public DOMStoreTransactionChain createTransactionChain() { return txContextFactory.createTransactionChain(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 9cb015cfaf..23ad747d4c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -82,6 +82,7 @@ import scala.concurrent.duration.FiniteDuration; *

*/ public class Shard extends RaftActor { + @VisibleForTesting static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = new Object() { @Override @@ -269,6 +270,9 @@ public class Shard extends RaftActor { context().parent().forward(message, context()); } else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) { messageRetrySupport.onTimerMessage(message); + } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) { + commitCoordinator.processCohortRegistryCommand(getSender(), + (DataTreeCohortActorRegistry.CohortRegistryCommand) message); } else { super.handleNonRaftCommand(message); } @@ -334,9 +338,9 @@ public class Shard extends RaftActor { private void handleCommitTransaction(final CommitTransaction commit) { if (isLeader()) { - if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) { - shardMBean.incrementFailedTransactionsCount(); - } + if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) { + shardMBean.incrementFailedTransactionsCount(); + } } else { ActorSelection leader = getLeader(); if (leader == null) { @@ -354,7 +358,7 @@ public class Shard extends RaftActor { try { try { - cohortEntry.commit(); + cohortEntry.commit(); } catch(ExecutionException e) { // We may get a "store tree and candidate base differ" IllegalStateException from commit under // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last @@ -432,7 +436,7 @@ public class Shard extends RaftActor { LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID()); if (isLeader()) { - commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this); + commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this); } else { ActorSelection leader = getLeader(); if (leader == null) { @@ -447,7 +451,7 @@ public class Shard extends RaftActor { protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) { try { - commitCoordinator.handleBatchedModifications(batched, sender, this); + commitCoordinator.handleBatchedModifications(batched, sender, this, store.getSchemaContext()); } catch (Exception e) { LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(), batched.getTransactionID(), e); @@ -516,7 +520,7 @@ public class Shard extends RaftActor { boolean isLeaderActive = isLeaderActive(); if (isLeader() && isLeaderActive) { try { - commitCoordinator.handleReadyLocalTransaction(message, getSender(), this); + commitCoordinator.handleReadyLocalTransaction(message, getSender(), this, store.getSchemaContext()); } catch (Exception e) { LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(), message.getTransactionID(), e); @@ -540,7 +544,8 @@ public class Shard extends RaftActor { boolean isLeaderActive = isLeaderActive(); if (isLeader() && isLeaderActive) { - commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this); + commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this, + store.getSchemaContext()); } else { ActorSelection leader = getLeader(); if (!isLeaderActive || leader == null) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 739321b068..45fa7727e4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.Status.Failure; import akka.serialization.Serialization; +import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; @@ -24,6 +25,8 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand; import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry.State; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; @@ -39,7 +42,9 @@ import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModifi import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; +import scala.concurrent.duration.Duration; /** * Coordinates commits for a shard ensuring only one concurrent 3-phase commit. @@ -59,6 +64,8 @@ class ShardCommitCoordinator { private final ShardDataTree dataTree; + private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry(); + // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls // since this should only be accessed on the shard's dispatcher. private final Queue queuedCohortEntries = new LinkedList<>(); @@ -78,8 +85,11 @@ class ShardCommitCoordinator { private Runnable runOnPendingTransactionsComplete; - ShardCommitCoordinator(ShardDataTree dataTree, - long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log, String name) { + + private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS)); + + ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log, + String name) { this.queueCapacity = queueCapacity; this.log = log; @@ -119,7 +129,7 @@ class ShardCommitCoordinator { } else { cohortCache.remove(cohortEntry.getTransactionID()); - RuntimeException ex = new RuntimeException( + final RuntimeException ex = new RuntimeException( String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+ " capacity %d has been reached.", name, cohortEntry.getTransactionID(), queueCapacity)); @@ -136,13 +146,15 @@ class ShardCommitCoordinator { * @param ready the ForwardedReadyTransaction message to process * @param sender the sender of the message * @param shard the transaction's shard actor + * @param schema */ - void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) { + void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard, + SchemaContext schema) { log.debug("{}: Readying transaction {}, client version {}", name, ready.getTransactionID(), ready.getTxnClientVersion()); - ShardDataTreeCohort cohort = ready.getTransaction().ready(); - CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, ready.getTxnClientVersion()); + final ShardDataTreeCohort cohort = ready.getTransaction().ready(); + final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion()); cohortCache.put(ready.getTransactionID(), cohortEntry); if(!queueCohortEntry(cohortEntry, sender, shard)) { @@ -171,12 +183,12 @@ class ShardCommitCoordinator { * @param sender the sender of the message * @param shard the transaction's shard actor */ - void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) { + void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard, SchemaContext schema) { CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID()); if(cohortEntry == null) { cohortEntry = new CohortEntry(batched.getTransactionID(), - dataTree.newReadWriteTransaction(batched.getTransactionID(), - batched.getTransactionChainID()), batched.getVersion()); + dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()), + cohortRegistry, schema, batched.getVersion()); cohortCache.put(batched.getTransactionID(), cohortEntry); } @@ -225,16 +237,18 @@ class ShardCommitCoordinator { /** * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have - * been prepared beforehand by the sender and we just need to drive them through into the dataTree. + * been prepared beforehand by the sender and we just need to drive them through into the + * dataTree. * * @param message the ReadyLocalTransaction message to process * @param sender the sender of the message * @param shard the transaction's shard actor */ - void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) { + void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard, + SchemaContext schema) { final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(), message.getTransactionID()); - final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, + final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry, schema, DataStoreVersions.CURRENT_VERSION); cohortCache.put(message.getTransactionID(), cohortEntry); cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady()); @@ -503,6 +517,7 @@ class ShardCommitCoordinator { private List getAndClearPendingCohortEntries() { List cohortEntries = new ArrayList<>(); + if(currentCohortEntry != null) { cohortEntries.add(currentCohortEntry); cohortCache.remove(currentCohortEntry.getTransactionID()); @@ -538,7 +553,7 @@ class ShardCommitCoordinator { newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) { newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(), cohortEntry.getClientVersion(), "")); - } + } return newModifications.getLast(); } @@ -621,9 +636,9 @@ class ShardCommitCoordinator { private void maybeProcessNextCohortEntry() { // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also // clean out expired entries. - Iterator iter = queuedCohortEntries.iterator(); + final Iterator iter = queuedCohortEntries.iterator(); while(iter.hasNext()) { - CohortEntry next = iter.next(); + final CohortEntry next = iter.next(); if(next.isReadyToCommit()) { if(currentCohortEntry == null) { if(log.isDebugEnabled()) { @@ -674,6 +689,10 @@ class ShardCommitCoordinator { this.cohortDecorator = cohortDecorator; } + void processCohortRegistryCommand(ActorRef sender, CohortRegistryCommand message) { + cohortRegistry.process(sender, message); + } + static class CohortEntry { enum State { PENDING, @@ -681,7 +700,7 @@ class ShardCommitCoordinator { PRE_COMMITTED, COMMITTED, ABORTED - } + } private final String transactionID; private ShardDataTreeCohort cohort; @@ -694,18 +713,23 @@ class ShardCommitCoordinator { private int totalBatchedModificationsReceived; private State state = State.PENDING; private final short clientVersion; + private final CompositeDataTreeCohort userCohorts; - CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, short clientVersion) { + CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, + DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) { this.transaction = Preconditions.checkNotNull(transaction); this.transactionID = transactionID; this.clientVersion = clientVersion; + this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT); } - CohortEntry(String transactionID, ShardDataTreeCohort cohort, short clientVersion) { + CohortEntry(String transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry, + SchemaContext schema, short clientVersion) { this.transactionID = transactionID; this.cohort = cohort; this.transaction = null; this.clientVersion = clientVersion; + this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT); } void updateLastAccessTime() { @@ -770,19 +794,25 @@ class ShardCommitCoordinator { return cohort.canCommit().get(); } - void preCommit() throws InterruptedException, ExecutionException { + + + void preCommit() throws InterruptedException, ExecutionException, TimeoutException { state = State.PRE_COMMITTED; cohort.preCommit().get(); + userCohorts.canCommit(cohort.getCandidate()); + userCohorts.preCommit(); } - void commit() throws InterruptedException, ExecutionException { + void commit() throws InterruptedException, ExecutionException, TimeoutException { state = State.COMMITTED; cohort.commit().get(); + userCohorts.commit(); } - void abort() throws InterruptedException, ExecutionException { + void abort() throws InterruptedException, ExecutionException, TimeoutException { state = State.ABORTED; cohort.abort().get(); + userCohorts.abort(); } void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) { @@ -837,7 +867,7 @@ class ShardCommitCoordinator { @Override public String toString() { - StringBuilder builder = new StringBuilder(); + final StringBuilder builder = new StringBuilder(); builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=") .append(doImmediateCommit).append("]"); return builder.toString(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index ad5aab3453..618ea90e26 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -214,6 +214,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return dataTree.takeSnapshot().newModification(); } + // FIXME: This should be removed, it violates encapsulation public DataTreeCandidate commit(DataTreeModification modification) throws DataValidationFailedException { modification.ready(); dataTree.validate(modification); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java index f7cdd4e8dc..47876123cf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java @@ -17,9 +17,12 @@ public abstract class ShardDataTreeCohort { // Prevent foreign instantiation } + // FIXME: This leaks internal state generated in preCommit, + // should be result of canCommit abstract DataTreeCandidateTip getCandidate(); abstract DataTreeModification getDataTreeModification(); + // FIXME: Should return rebased DataTreeCandidateTip @VisibleForTesting public abstract ListenableFuture canCommit(); @VisibleForTesting diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java new file mode 100644 index 0000000000..84521e5ce6 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java @@ -0,0 +1,179 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; + +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.actor.AddressFromURIString; +import akka.cluster.Cluster; +import akka.testkit.JavaTestKit; +import akka.util.Timeout; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import com.typesafe.config.ConfigFactory; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.mdsal.common.api.DataValidationFailedException; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.common.api.PostCanCommitStep; +import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.yangtools.concepts.ObjectRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.duration.Duration; + +public class DataTreeCohortIntegrationTest { + + private static final DataValidationFailedException FAILED_CAN_COMMIT = + new DataValidationFailedException(YangInstanceIdentifier.class, TestModel.TEST_PATH, "Test failure."); + private static final CheckedFuture FAILED_CAN_COMMIT_FUTURE = + Futures.immediateFailedCheckedFuture(FAILED_CAN_COMMIT); + + private static final DOMDataTreeIdentifier TEST_ID = + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH); + + private static final Timeout TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS)); + + private static ActorSystem system; + + private final DatastoreContext.Builder datastoreContextBuilder = + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100); + + @BeforeClass + public static void setUpClass() throws IOException { + system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + final Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); + Cluster.get(system).join(member1Address); + } + + @AfterClass + public static void tearDownClass() throws IOException { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + protected ActorSystem getSystem() { + return system; + } + + @Test + public void registerNoopCohortTest() throws Exception { + final DOMDataTreeCommitCohort cohort = mock(DOMDataTreeCommitCohort.class); + Mockito.doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class), + any(DOMDataTreeCandidate.class), any(SchemaContext.class)); + ArgumentCaptor candidateCapt = ArgumentCaptor.forClass(DOMDataTreeCandidate.class); + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + final DistributedDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest", "test-1"); + final ObjectRegistration cohortReg = dataStore.registerCommitCohort(TEST_ID, cohort); + Thread.sleep(1000); // Registration is asynchronous + assertNotNull(cohortReg); + testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + Mockito.verify(cohort).canCommit(any(Object.class), candidateCapt.capture(), any(SchemaContext.class)); + DOMDataTreeCandidate candidate = candidateCapt.getValue(); + assertNotNull(candidate); + assertEquals(TEST_ID, candidate.getRootPath()); + testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + Mockito.verify(cohort, Mockito.times(2)).canCommit(any(Object.class), any(DOMDataTreeCandidate.class), + any(SchemaContext.class)); + cohortReg.close(); + testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + Mockito.verifyNoMoreInteractions(cohort); + cleanup(dataStore); + } + }; + } + + @Test + public void failCanCommitTest() throws Exception { + final DOMDataTreeCommitCohort failedCohort = mock(DOMDataTreeCommitCohort.class); + + Mockito.doReturn(FAILED_CAN_COMMIT_FUTURE).when(failedCohort).canCommit(any(Object.class), + any(DOMDataTreeCandidate.class), any(SchemaContext.class)); + + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + final DistributedDataStore dataStore = + setupDistributedDataStore("transactionIntegrationTest", "test-1"); + dataStore.registerCommitCohort(TEST_ID, failedCohort); + Thread.sleep(1000); // Registration is asynchronous + + DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + DOMStoreThreePhaseCommitCohort dsCohort = writeTx.ready(); + try { + // FIXME: Weird thing is that invoking canCommit on front-end invokes also + // preCommit on backend. + dsCohort.canCommit().get(); + fail("Exception should be raised."); + } catch (Exception e) { + assertSame(FAILED_CAN_COMMIT, Throwables.getRootCause(e)); + } + cleanup(dataStore); + } + }; + } + + /** + * + * FIXME: Weird thing is that invoking canCommit on front-end invokes also preCommit on backend + * so we can not test abort after can commit. + * + */ + @Test + @Ignore + public void canCommitSuccessExternallyAborted() throws Exception { + final DOMDataTreeCommitCohort cohortToAbort = mock(DOMDataTreeCommitCohort.class); + final PostCanCommitStep stepToAbort = mock(PostCanCommitStep.class); + Mockito.doReturn(Futures.immediateCheckedFuture(stepToAbort)).when(cohortToAbort).canCommit(any(Object.class), + any(DOMDataTreeCandidate.class), any(SchemaContext.class)); + Mockito.doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(stepToAbort).abort(); + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + final DistributedDataStore dataStore = + setupDistributedDataStore("transactionIntegrationTest", "test-1"); + dataStore.registerCommitCohort(TEST_ID, cohortToAbort); + Thread.sleep(1000); // Registration is asynchronous + + DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + DOMStoreThreePhaseCommitCohort dsCohort = writeTx.ready(); + + dsCohort.canCommit().get(); + dsCohort.abort().get(); + Mockito.verify(stepToAbort, Mockito.times(1)).abort(); + cleanup(dataStore); + } + }; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 3e2313dc27..2846397160 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -19,6 +19,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; + import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Props; @@ -1552,6 +1553,7 @@ public class ShardTest extends AbstractShardTest { final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class); doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType(); doReturn(candidateRoot).when(candidate).getRootNode(); + doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath(); doReturn(candidate).when(cohort).getCandidate(); shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef()); @@ -1597,6 +1599,7 @@ public class ShardTest extends AbstractShardTest { final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class); doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType(); doReturn(candidateRoot).when(candidate).getRootNode(); + doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath(); doReturn(candidate).when(cohort).getCandidate(); shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());