Add OnDemandShardState to report additional Shard state
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeCohortActorRegistry.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Status;
14 import akka.util.Timeout;
15 import com.google.common.base.Preconditions;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.HashMap;
20 import java.util.List;
21 import java.util.Map;
22 import javax.annotation.concurrent.NotThreadSafe;
23 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
24 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
25 import org.opendaylight.controller.md.sal.dom.spi.AbstractRegistrationTree;
26 import org.opendaylight.controller.md.sal.dom.spi.RegistrationTreeNode;
27 import org.opendaylight.controller.md.sal.dom.spi.RegistrationTreeSnapshot;
28 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
31 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
33 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
34 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * Registry of user commit cohorts, which is responsible for handling registration and calculation
42  * of affected cohorts based on {@link DataTreeCandidate}.
43  *
44  */
45 @NotThreadSafe
46 class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
47
48     private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortActorRegistry.class);
49
50     private final Map<ActorRef, RegistrationTreeNode<ActorRef>> cohortToNode = new HashMap<>();
51
52     Collection<ActorRef> getCohortActors() {
53         return Collections.unmodifiableCollection(cohortToNode.keySet());
54     }
55
56     @SuppressWarnings("checkstyle:IllegalCatch")
57     void registerCohort(final ActorRef sender, final RegisterCohort cohort) {
58         takeLock();
59         try {
60             final ActorRef cohortRef = cohort.getCohort();
61             final RegistrationTreeNode<ActorRef> node =
62                     findNodeFor(cohort.getPath().getRootIdentifier().getPathArguments());
63             addRegistration(node, cohort.getCohort());
64             cohortToNode.put(cohortRef, node);
65         } catch (final Exception e) {
66             sender.tell(new Status.Failure(e), ActorRef.noSender());
67             return;
68         } finally {
69             releaseLock();
70         }
71         sender.tell(new Status.Success(null), ActorRef.noSender());
72     }
73
74     void removeCommitCohort(final ActorRef sender, final RemoveCohort message) {
75         final ActorRef cohort = message.getCohort();
76         final RegistrationTreeNode<ActorRef> node = cohortToNode.get(cohort);
77         if (node != null) {
78             removeRegistration(node, cohort);
79             cohortToNode.remove(cohort);
80         }
81         sender.tell(new Status.Success(null), ActorRef.noSender());
82         cohort.tell(PoisonPill.getInstance(), cohort);
83     }
84
85     Collection<DataTreeCohortActor.CanCommit> createCanCommitMessages(final TransactionIdentifier txId,
86             final DataTreeCandidate candidate, final SchemaContext schema) {
87         try (RegistrationTreeSnapshot<ActorRef> cohorts = takeSnapshot()) {
88             return new CanCommitMessageBuilder(txId, candidate, schema).perform(cohorts.getRootNode());
89         }
90     }
91
92     void process(final ActorRef sender, final CohortRegistryCommand message) {
93         if (message instanceof RegisterCohort) {
94             registerCohort(sender, (RegisterCohort) message);
95         } else if (message instanceof RemoveCohort) {
96             removeCommitCohort(sender, (RemoveCohort) message);
97         }
98     }
99
100     abstract static class CohortRegistryCommand {
101
102         private final ActorRef cohort;
103
104         CohortRegistryCommand(final ActorRef cohort) {
105             this.cohort = Preconditions.checkNotNull(cohort);
106         }
107
108         ActorRef getCohort() {
109             return cohort;
110         }
111     }
112
113     static class RegisterCohort extends CohortRegistryCommand {
114
115         private final DOMDataTreeIdentifier path;
116
117         RegisterCohort(final DOMDataTreeIdentifier path, final ActorRef cohort) {
118             super(cohort);
119             this.path = path;
120
121         }
122
123         public DOMDataTreeIdentifier getPath() {
124             return path;
125         }
126
127     }
128
129     static class RemoveCohort extends CohortRegistryCommand {
130
131         RemoveCohort(final ActorRef cohort) {
132             super(cohort);
133         }
134
135     }
136
137     private static class CanCommitMessageBuilder {
138
139         private final TransactionIdentifier txId;
140         private final DataTreeCandidate candidate;
141         private final SchemaContext schema;
142         private final Collection<DataTreeCohortActor.CanCommit> messages =
143                 new ArrayList<>();
144
145         CanCommitMessageBuilder(final TransactionIdentifier txId, final DataTreeCandidate candidate,
146                 final SchemaContext schema) {
147             this.txId = Preconditions.checkNotNull(txId);
148             this.candidate = Preconditions.checkNotNull(candidate);
149             this.schema = schema;
150         }
151
152         private void lookupAndCreateCanCommits(final List<PathArgument> args, final int offset,
153                 final RegistrationTreeNode<ActorRef> node) {
154
155             if (args.size() != offset) {
156                 final PathArgument arg = args.get(offset);
157                 final RegistrationTreeNode<ActorRef> exactChild = node.getExactChild(arg);
158                 if (exactChild != null) {
159                     lookupAndCreateCanCommits(args, offset + 1, exactChild);
160                 }
161                 for (final RegistrationTreeNode<ActorRef> c : node.getInexactChildren(arg)) {
162                     lookupAndCreateCanCommits(args, offset + 1, c);
163                 }
164             } else {
165                 lookupAndCreateCanCommits(candidate.getRootPath(), node, candidate.getRootNode());
166             }
167         }
168
169         private void lookupAndCreateCanCommits(final YangInstanceIdentifier path,
170                 final RegistrationTreeNode<ActorRef> regNode, final DataTreeCandidateNode candNode) {
171             if (candNode.getModificationType() == ModificationType.UNMODIFIED) {
172                 LOG.debug("Skipping unmodified candidate {}", path);
173                 return;
174             }
175             final Collection<ActorRef> regs = regNode.getRegistrations();
176             if (!regs.isEmpty()) {
177                 createCanCommits(regs, path, candNode);
178             }
179
180             for (final DataTreeCandidateNode candChild : candNode.getChildNodes()) {
181                 if (candChild.getModificationType() != ModificationType.UNMODIFIED) {
182                     final RegistrationTreeNode<ActorRef> regChild =
183                             regNode.getExactChild(candChild.getIdentifier());
184                     if (regChild != null) {
185                         lookupAndCreateCanCommits(path.node(candChild.getIdentifier()), regChild, candChild);
186                     }
187
188                     for (final RegistrationTreeNode<ActorRef> rc : regNode
189                             .getInexactChildren(candChild.getIdentifier())) {
190                         lookupAndCreateCanCommits(path.node(candChild.getIdentifier()), rc, candChild);
191                     }
192                 }
193             }
194         }
195
196         private void createCanCommits(final Collection<ActorRef> regs, final YangInstanceIdentifier path,
197                 final DataTreeCandidateNode node) {
198             final DOMDataTreeCandidate domCandidate = DOMDataTreeCandidateTO.create(treeIdentifier(path), node);
199             for (final ActorRef reg : regs) {
200                 final CanCommit message = new DataTreeCohortActor.CanCommit(txId, domCandidate, schema, reg);
201                 messages.add(message);
202             }
203         }
204
205         private static DOMDataTreeIdentifier treeIdentifier(final YangInstanceIdentifier path) {
206             return new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, path);
207         }
208
209         private Collection<DataTreeCohortActor.CanCommit> perform(final RegistrationTreeNode<ActorRef> rootNode) {
210             final List<PathArgument> toLookup = candidate.getRootPath().getPathArguments();
211             lookupAndCreateCanCommits(toLookup, 0, rootNode);
212             return messages;
213         }
214     }
215
216     CompositeDataTreeCohort createCohort(final SchemaContext schemaContext, final TransactionIdentifier txId,
217             final Timeout commitStepTimeout) {
218         return new CompositeDataTreeCohort(this, txId, schemaContext, commitStepTimeout);
219     }
220 }