2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Status;
14 import akka.util.Timeout;
15 import com.google.common.base.Preconditions;
16 import com.google.common.collect.ArrayListMultimap;
17 import com.google.common.collect.Multimap;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.HashMap;
21 import java.util.List;
23 import java.util.concurrent.Executor;
24 import javax.annotation.concurrent.NotThreadSafe;
25 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
28 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
29 import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree;
30 import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode;
31 import org.opendaylight.mdsal.dom.spi.RegistrationTreeSnapshot;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
34 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
37 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 * Registry of user commit cohorts, which is responsible for handling registration and calculation
43 * of affected cohorts based on {@link DataTreeCandidate}.
47 class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
49 private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortActorRegistry.class);
51 private final Map<ActorRef, RegistrationTreeNode<ActorRef>> cohortToNode = new HashMap<>();
53 Collection<ActorRef> getCohortActors() {
54 return new ArrayList<>(cohortToNode.keySet());
57 @SuppressWarnings("checkstyle:IllegalCatch")
58 void registerCohort(final ActorRef sender, final RegisterCohort cohort) {
61 final ActorRef cohortRef = cohort.getCohort();
62 final RegistrationTreeNode<ActorRef> node =
63 findNodeFor(cohort.getPath().getRootIdentifier().getPathArguments());
64 addRegistration(node, cohort.getCohort());
65 cohortToNode.put(cohortRef, node);
66 } catch (final Exception e) {
67 sender.tell(new Status.Failure(e), ActorRef.noSender());
72 sender.tell(new Status.Success(null), ActorRef.noSender());
75 void removeCommitCohort(final ActorRef sender, final RemoveCohort message) {
76 final ActorRef cohort = message.getCohort();
77 final RegistrationTreeNode<ActorRef> node = cohortToNode.get(cohort);
79 removeRegistration(node, cohort);
80 cohortToNode.remove(cohort);
82 sender.tell(new Status.Success(null), ActorRef.noSender());
83 cohort.tell(PoisonPill.getInstance(), cohort);
86 List<DataTreeCohortActor.CanCommit> createCanCommitMessages(final TransactionIdentifier txId,
87 final DataTreeCandidate candidate, final SchemaContext schema) {
88 try (RegistrationTreeSnapshot<ActorRef> cohorts = takeSnapshot()) {
89 return new CanCommitMessageBuilder(txId, candidate, schema).perform(cohorts.getRootNode());
93 void process(final ActorRef sender, final CohortRegistryCommand message) {
94 if (message instanceof RegisterCohort) {
95 registerCohort(sender, (RegisterCohort) message);
96 } else if (message instanceof RemoveCohort) {
97 removeCommitCohort(sender, (RemoveCohort) message);
101 abstract static class CohortRegistryCommand {
103 private final ActorRef cohort;
105 CohortRegistryCommand(final ActorRef cohort) {
106 this.cohort = Preconditions.checkNotNull(cohort);
109 ActorRef getCohort() {
114 static class RegisterCohort extends CohortRegistryCommand {
116 private final DOMDataTreeIdentifier path;
118 RegisterCohort(final DOMDataTreeIdentifier path, final ActorRef cohort) {
124 public DOMDataTreeIdentifier getPath() {
130 static class RemoveCohort extends CohortRegistryCommand {
132 RemoveCohort(final ActorRef cohort) {
138 private static class CanCommitMessageBuilder {
140 private final TransactionIdentifier txId;
141 private final DataTreeCandidate candidate;
142 private final SchemaContext schema;
143 private final Multimap<ActorRef, DOMDataTreeCandidate> actorToCandidates = ArrayListMultimap.create();
145 CanCommitMessageBuilder(final TransactionIdentifier txId, final DataTreeCandidate candidate,
146 final SchemaContext schema) {
147 this.txId = Preconditions.checkNotNull(txId);
148 this.candidate = Preconditions.checkNotNull(candidate);
149 this.schema = schema;
152 private void lookupAndCreateCanCommits(final List<PathArgument> args, final int offset,
153 final RegistrationTreeNode<ActorRef> node) {
155 if (args.size() != offset) {
156 final PathArgument arg = args.get(offset);
157 final RegistrationTreeNode<ActorRef> exactChild = node.getExactChild(arg);
158 if (exactChild != null) {
159 lookupAndCreateCanCommits(args, offset + 1, exactChild);
161 for (final RegistrationTreeNode<ActorRef> c : node.getInexactChildren(arg)) {
162 lookupAndCreateCanCommits(args, offset + 1, c);
165 lookupAndCreateCanCommits(candidate.getRootPath(), node, candidate.getRootNode());
169 private void lookupAndCreateCanCommits(final YangInstanceIdentifier path,
170 final RegistrationTreeNode<ActorRef> regNode, final DataTreeCandidateNode candNode) {
171 if (candNode.getModificationType() == ModificationType.UNMODIFIED) {
172 LOG.debug("Skipping unmodified candidate {}", path);
175 final Collection<ActorRef> regs = regNode.getRegistrations();
176 if (!regs.isEmpty()) {
177 createCanCommits(regs, path, candNode);
180 for (final DataTreeCandidateNode candChild : candNode.getChildNodes()) {
181 if (candChild.getModificationType() != ModificationType.UNMODIFIED) {
182 final RegistrationTreeNode<ActorRef> regChild =
183 regNode.getExactChild(candChild.getIdentifier());
184 if (regChild != null) {
185 lookupAndCreateCanCommits(path.node(candChild.getIdentifier()), regChild, candChild);
188 for (final RegistrationTreeNode<ActorRef> rc : regNode
189 .getInexactChildren(candChild.getIdentifier())) {
190 lookupAndCreateCanCommits(path.node(candChild.getIdentifier()), rc, candChild);
196 private void createCanCommits(final Collection<ActorRef> regs, final YangInstanceIdentifier path,
197 final DataTreeCandidateNode node) {
198 final DOMDataTreeCandidate domCandidate = DOMDataTreeCandidateTO.create(treeIdentifier(path), node);
199 for (final ActorRef reg : regs) {
200 actorToCandidates.put(reg, domCandidate);
204 private static DOMDataTreeIdentifier treeIdentifier(final YangInstanceIdentifier path) {
205 return new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, path);
208 List<DataTreeCohortActor.CanCommit> perform(final RegistrationTreeNode<ActorRef> rootNode) {
209 final List<PathArgument> toLookup = candidate.getRootPath().getPathArguments();
210 lookupAndCreateCanCommits(toLookup, 0, rootNode);
212 final Map<ActorRef, Collection<DOMDataTreeCandidate>> mapView = actorToCandidates.asMap();
213 final List<DataTreeCohortActor.CanCommit> messages = new ArrayList<>(mapView.size());
214 for (Map.Entry<ActorRef, Collection<DOMDataTreeCandidate>> entry: mapView.entrySet()) {
215 messages.add(new DataTreeCohortActor.CanCommit(txId, entry.getValue(), schema, entry.getKey()));
222 CompositeDataTreeCohort createCohort(final SchemaContext schemaContext, final TransactionIdentifier txId,
223 final Executor callbackExecutor, final Timeout commitStepTimeout) {
224 return new CompositeDataTreeCohort(this, txId, schemaContext, callbackExecutor, commitStepTimeout);