CONTROLLER-1641: Handle commit cohorts async
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeCohortActorRegistry.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Status;
14 import akka.util.Timeout;
15 import com.google.common.base.Preconditions;
16 import com.google.common.collect.ArrayListMultimap;
17 import com.google.common.collect.Multimap;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.HashMap;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.concurrent.Executor;
24 import javax.annotation.concurrent.NotThreadSafe;
25 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
28 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
29 import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree;
30 import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode;
31 import org.opendaylight.mdsal.dom.spi.RegistrationTreeSnapshot;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
34 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
37 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * Registry of user commit cohorts, which is responsible for handling registration and calculation
43  * of affected cohorts based on {@link DataTreeCandidate}.
44  *
45  */
46 @NotThreadSafe
47 class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
48
49     private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortActorRegistry.class);
50
51     private final Map<ActorRef, RegistrationTreeNode<ActorRef>> cohortToNode = new HashMap<>();
52
53     Collection<ActorRef> getCohortActors() {
54         return new ArrayList<>(cohortToNode.keySet());
55     }
56
57     @SuppressWarnings("checkstyle:IllegalCatch")
58     void registerCohort(final ActorRef sender, final RegisterCohort cohort) {
59         takeLock();
60         try {
61             final ActorRef cohortRef = cohort.getCohort();
62             final RegistrationTreeNode<ActorRef> node =
63                     findNodeFor(cohort.getPath().getRootIdentifier().getPathArguments());
64             addRegistration(node, cohort.getCohort());
65             cohortToNode.put(cohortRef, node);
66         } catch (final Exception e) {
67             sender.tell(new Status.Failure(e), ActorRef.noSender());
68             return;
69         } finally {
70             releaseLock();
71         }
72         sender.tell(new Status.Success(null), ActorRef.noSender());
73     }
74
75     void removeCommitCohort(final ActorRef sender, final RemoveCohort message) {
76         final ActorRef cohort = message.getCohort();
77         final RegistrationTreeNode<ActorRef> node = cohortToNode.get(cohort);
78         if (node != null) {
79             removeRegistration(node, cohort);
80             cohortToNode.remove(cohort);
81         }
82         sender.tell(new Status.Success(null), ActorRef.noSender());
83         cohort.tell(PoisonPill.getInstance(), cohort);
84     }
85
86     List<DataTreeCohortActor.CanCommit> createCanCommitMessages(final TransactionIdentifier txId,
87             final DataTreeCandidate candidate, final SchemaContext schema) {
88         try (RegistrationTreeSnapshot<ActorRef> cohorts = takeSnapshot()) {
89             return new CanCommitMessageBuilder(txId, candidate, schema).perform(cohorts.getRootNode());
90         }
91     }
92
93     void process(final ActorRef sender, final CohortRegistryCommand message) {
94         if (message instanceof RegisterCohort) {
95             registerCohort(sender, (RegisterCohort) message);
96         } else if (message instanceof RemoveCohort) {
97             removeCommitCohort(sender, (RemoveCohort) message);
98         }
99     }
100
101     abstract static class CohortRegistryCommand {
102
103         private final ActorRef cohort;
104
105         CohortRegistryCommand(final ActorRef cohort) {
106             this.cohort = Preconditions.checkNotNull(cohort);
107         }
108
109         ActorRef getCohort() {
110             return cohort;
111         }
112     }
113
114     static class RegisterCohort extends CohortRegistryCommand {
115
116         private final DOMDataTreeIdentifier path;
117
118         RegisterCohort(final DOMDataTreeIdentifier path, final ActorRef cohort) {
119             super(cohort);
120             this.path = path;
121
122         }
123
124         public DOMDataTreeIdentifier getPath() {
125             return path;
126         }
127
128     }
129
130     static class RemoveCohort extends CohortRegistryCommand {
131
132         RemoveCohort(final ActorRef cohort) {
133             super(cohort);
134         }
135
136     }
137
138     private static class CanCommitMessageBuilder {
139
140         private final TransactionIdentifier txId;
141         private final DataTreeCandidate candidate;
142         private final SchemaContext schema;
143         private final Multimap<ActorRef, DOMDataTreeCandidate> actorToCandidates = ArrayListMultimap.create();
144
145         CanCommitMessageBuilder(final TransactionIdentifier txId, final DataTreeCandidate candidate,
146                 final SchemaContext schema) {
147             this.txId = Preconditions.checkNotNull(txId);
148             this.candidate = Preconditions.checkNotNull(candidate);
149             this.schema = schema;
150         }
151
152         private void lookupAndCreateCanCommits(final List<PathArgument> args, final int offset,
153                 final RegistrationTreeNode<ActorRef> node) {
154
155             if (args.size() != offset) {
156                 final PathArgument arg = args.get(offset);
157                 final RegistrationTreeNode<ActorRef> exactChild = node.getExactChild(arg);
158                 if (exactChild != null) {
159                     lookupAndCreateCanCommits(args, offset + 1, exactChild);
160                 }
161                 for (final RegistrationTreeNode<ActorRef> c : node.getInexactChildren(arg)) {
162                     lookupAndCreateCanCommits(args, offset + 1, c);
163                 }
164             } else {
165                 lookupAndCreateCanCommits(candidate.getRootPath(), node, candidate.getRootNode());
166             }
167         }
168
169         private void lookupAndCreateCanCommits(final YangInstanceIdentifier path,
170                 final RegistrationTreeNode<ActorRef> regNode, final DataTreeCandidateNode candNode) {
171             if (candNode.getModificationType() == ModificationType.UNMODIFIED) {
172                 LOG.debug("Skipping unmodified candidate {}", path);
173                 return;
174             }
175             final Collection<ActorRef> regs = regNode.getRegistrations();
176             if (!regs.isEmpty()) {
177                 createCanCommits(regs, path, candNode);
178             }
179
180             for (final DataTreeCandidateNode candChild : candNode.getChildNodes()) {
181                 if (candChild.getModificationType() != ModificationType.UNMODIFIED) {
182                     final RegistrationTreeNode<ActorRef> regChild =
183                             regNode.getExactChild(candChild.getIdentifier());
184                     if (regChild != null) {
185                         lookupAndCreateCanCommits(path.node(candChild.getIdentifier()), regChild, candChild);
186                     }
187
188                     for (final RegistrationTreeNode<ActorRef> rc : regNode
189                             .getInexactChildren(candChild.getIdentifier())) {
190                         lookupAndCreateCanCommits(path.node(candChild.getIdentifier()), rc, candChild);
191                     }
192                 }
193             }
194         }
195
196         private void createCanCommits(final Collection<ActorRef> regs, final YangInstanceIdentifier path,
197                 final DataTreeCandidateNode node) {
198             final DOMDataTreeCandidate domCandidate = DOMDataTreeCandidateTO.create(treeIdentifier(path), node);
199             for (final ActorRef reg : regs) {
200                 actorToCandidates.put(reg, domCandidate);
201             }
202         }
203
204         private static DOMDataTreeIdentifier treeIdentifier(final YangInstanceIdentifier path) {
205             return new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, path);
206         }
207
208         List<DataTreeCohortActor.CanCommit> perform(final RegistrationTreeNode<ActorRef> rootNode) {
209             final List<PathArgument> toLookup = candidate.getRootPath().getPathArguments();
210             lookupAndCreateCanCommits(toLookup, 0, rootNode);
211
212             final Map<ActorRef, Collection<DOMDataTreeCandidate>> mapView = actorToCandidates.asMap();
213             final List<DataTreeCohortActor.CanCommit> messages = new ArrayList<>(mapView.size());
214             for (Map.Entry<ActorRef, Collection<DOMDataTreeCandidate>> entry: mapView.entrySet()) {
215                 messages.add(new DataTreeCohortActor.CanCommit(txId, entry.getValue(), schema, entry.getKey()));
216             }
217
218             return messages;
219         }
220     }
221
222     CompositeDataTreeCohort createCohort(final SchemaContext schemaContext, final TransactionIdentifier txId,
223             final Executor callbackExecutor, final Timeout commitStepTimeout) {
224         return new CompositeDataTreeCohort(this, txId, schemaContext, callbackExecutor, commitStepTimeout);
225     }
226 }