Fix followerDistributedDataStore tear down
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeCohortActorRegistry.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorRef;
13 import akka.actor.PoisonPill;
14 import akka.actor.Status;
15 import akka.util.Timeout;
16 import com.google.common.collect.ArrayListMultimap;
17 import com.google.common.collect.Multimap;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.HashMap;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.concurrent.Executor;
24 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
25 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
28 import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree;
29 import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode;
30 import org.opendaylight.mdsal.dom.spi.RegistrationTreeSnapshot;
31 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
33 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
34 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * Registry of user commit cohorts, which is responsible for handling registration and calculation
42  * of affected cohorts based on {@link DataTreeCandidate}. This class is NOT thread-safe.
43  *
44  */
45 class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
46
47     private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortActorRegistry.class);
48
49     private final Map<ActorRef, RegistrationTreeNode<ActorRef>> cohortToNode = new HashMap<>();
50
51     Collection<ActorRef> getCohortActors() {
52         return new ArrayList<>(cohortToNode.keySet());
53     }
54
55     @SuppressWarnings("checkstyle:IllegalCatch")
56     void registerCohort(final ActorRef sender, final RegisterCohort cohort) {
57         takeLock();
58         try {
59             final ActorRef cohortRef = cohort.getCohort();
60             final RegistrationTreeNode<ActorRef> node =
61                     findNodeFor(cohort.getPath().getRootIdentifier().getPathArguments());
62             addRegistration(node, cohort.getCohort());
63             cohortToNode.put(cohortRef, node);
64         } catch (final Exception e) {
65             sender.tell(new Status.Failure(e), ActorRef.noSender());
66             return;
67         } finally {
68             releaseLock();
69         }
70         sender.tell(new Status.Success(null), ActorRef.noSender());
71     }
72
73     void removeCommitCohort(final ActorRef sender, final RemoveCohort message) {
74         final ActorRef cohort = message.getCohort();
75         final RegistrationTreeNode<ActorRef> node = cohortToNode.get(cohort);
76         if (node != null) {
77             removeRegistration(node, cohort);
78             cohortToNode.remove(cohort);
79         }
80         sender.tell(new Status.Success(null), ActorRef.noSender());
81         cohort.tell(PoisonPill.getInstance(), cohort);
82     }
83
84     List<DataTreeCohortActor.CanCommit> createCanCommitMessages(final TransactionIdentifier txId,
85             final DataTreeCandidate candidate, final SchemaContext schema) {
86         try (RegistrationTreeSnapshot<ActorRef> cohorts = takeSnapshot()) {
87             return new CanCommitMessageBuilder(txId, candidate, schema).perform(cohorts.getRootNode());
88         }
89     }
90
91     void process(final ActorRef sender, final CohortRegistryCommand message) {
92         if (message instanceof RegisterCohort) {
93             registerCohort(sender, (RegisterCohort) message);
94         } else if (message instanceof RemoveCohort) {
95             removeCommitCohort(sender, (RemoveCohort) message);
96         }
97     }
98
99     abstract static class CohortRegistryCommand {
100         private final ActorRef cohort;
101
102         CohortRegistryCommand(final ActorRef cohort) {
103             this.cohort = requireNonNull(cohort);
104         }
105
106         ActorRef getCohort() {
107             return cohort;
108         }
109     }
110
111     static class RegisterCohort extends CohortRegistryCommand {
112         private final DOMDataTreeIdentifier path;
113
114         RegisterCohort(final DOMDataTreeIdentifier path, final ActorRef cohort) {
115             super(cohort);
116             this.path = path;
117         }
118
119         public DOMDataTreeIdentifier getPath() {
120             return path;
121         }
122     }
123
124     static class RemoveCohort extends CohortRegistryCommand {
125         RemoveCohort(final ActorRef cohort) {
126             super(cohort);
127         }
128     }
129
130     private static class CanCommitMessageBuilder {
131         private final Multimap<ActorRef, DOMDataTreeCandidate> actorToCandidates = ArrayListMultimap.create();
132         private final TransactionIdentifier txId;
133         private final DataTreeCandidate candidate;
134         private final SchemaContext schema;
135
136         CanCommitMessageBuilder(final TransactionIdentifier txId, final DataTreeCandidate candidate,
137                 final SchemaContext schema) {
138             this.txId = requireNonNull(txId);
139             this.candidate = requireNonNull(candidate);
140             this.schema = schema;
141         }
142
143         private void lookupAndCreateCanCommits(final List<PathArgument> args, final int offset,
144                 final RegistrationTreeNode<ActorRef> node) {
145
146             if (args.size() != offset) {
147                 final PathArgument arg = args.get(offset);
148                 final RegistrationTreeNode<ActorRef> exactChild = node.getExactChild(arg);
149                 if (exactChild != null) {
150                     lookupAndCreateCanCommits(args, offset + 1, exactChild);
151                 }
152                 for (final RegistrationTreeNode<ActorRef> c : node.getInexactChildren(arg)) {
153                     lookupAndCreateCanCommits(args, offset + 1, c);
154                 }
155             } else {
156                 lookupAndCreateCanCommits(candidate.getRootPath(), node, candidate.getRootNode());
157             }
158         }
159
160         private void lookupAndCreateCanCommits(final YangInstanceIdentifier path,
161                 final RegistrationTreeNode<ActorRef> regNode, final DataTreeCandidateNode candNode) {
162             if (candNode.getModificationType() == ModificationType.UNMODIFIED) {
163                 LOG.debug("Skipping unmodified candidate {}", path);
164                 return;
165             }
166             final Collection<ActorRef> regs = regNode.getRegistrations();
167             if (!regs.isEmpty()) {
168                 createCanCommits(regs, path, candNode);
169             }
170
171             for (final DataTreeCandidateNode candChild : candNode.getChildNodes()) {
172                 if (candChild.getModificationType() != ModificationType.UNMODIFIED) {
173                     final RegistrationTreeNode<ActorRef> regChild =
174                             regNode.getExactChild(candChild.getIdentifier());
175                     if (regChild != null) {
176                         lookupAndCreateCanCommits(path.node(candChild.getIdentifier()), regChild, candChild);
177                     }
178
179                     for (final RegistrationTreeNode<ActorRef> rc : regNode
180                             .getInexactChildren(candChild.getIdentifier())) {
181                         lookupAndCreateCanCommits(path.node(candChild.getIdentifier()), rc, candChild);
182                     }
183                 }
184             }
185         }
186
187         private void createCanCommits(final Collection<ActorRef> regs, final YangInstanceIdentifier path,
188                 final DataTreeCandidateNode node) {
189             final DOMDataTreeCandidate domCandidate = DOMDataTreeCandidateTO.create(treeIdentifier(path), node);
190             for (final ActorRef reg : regs) {
191                 actorToCandidates.put(reg, domCandidate);
192             }
193         }
194
195         private static DOMDataTreeIdentifier treeIdentifier(final YangInstanceIdentifier path) {
196             return new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, path);
197         }
198
199         List<DataTreeCohortActor.CanCommit> perform(final RegistrationTreeNode<ActorRef> rootNode) {
200             final List<PathArgument> toLookup = candidate.getRootPath().getPathArguments();
201             lookupAndCreateCanCommits(toLookup, 0, rootNode);
202
203             final Map<ActorRef, Collection<DOMDataTreeCandidate>> mapView = actorToCandidates.asMap();
204             final List<DataTreeCohortActor.CanCommit> messages = new ArrayList<>(mapView.size());
205             for (Map.Entry<ActorRef, Collection<DOMDataTreeCandidate>> entry: mapView.entrySet()) {
206                 messages.add(new DataTreeCohortActor.CanCommit(txId, entry.getValue(), schema, entry.getKey()));
207             }
208
209             return messages;
210         }
211     }
212
213     CompositeDataTreeCohort createCohort(final SchemaContext schemaContext, final TransactionIdentifier txId,
214             final Executor callbackExecutor, final Timeout commitStepTimeout) {
215         return new CompositeDataTreeCohort(this, txId, schemaContext, callbackExecutor, commitStepTimeout);
216     }
217 }