504ac101986ef2f1660a5943d901c20c40574d0f
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / DistributedShardChangePublisher.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.sharding;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.util.Iterator;
17 import java.util.LinkedList;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Optional;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.stream.Collectors;
23 import javax.annotation.concurrent.GuardedBy;
24 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
25 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
28 import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
29 import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree;
30 import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode;
31 import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext;
32 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
33 import org.opendaylight.yangtools.concepts.ListenerRegistration;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
36 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
37 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
42 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
43 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
44 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
45 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
46 import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 public class DistributedShardChangePublisher
51         extends AbstractRegistrationTree<AbstractDOMDataTreeChangeListenerRegistration<?>>
52         implements DOMStoreTreeChangePublisher {
53
54     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class);
55
56     private final AbstractDataStore distributedDataStore;
57     private final YangInstanceIdentifier shardPath;
58
59     private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards;
60
61     @GuardedBy("this")
62     private final DataTree dataTree;
63
64     public DistributedShardChangePublisher(final DataStoreClient client,
65                                            final AbstractDataStore distributedDataStore,
66                                            final DOMDataTreeIdentifier prefix,
67                                            final Map<DOMDataTreeIdentifier, ChildShardContext> childShards) {
68         this.distributedDataStore = distributedDataStore;
69         // TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea
70         // maybe the whole listener logic would be better in the backend shards where we have direct access to the
71         // dataTree and wont have to cache it redundantly.
72
73         final DataTreeConfiguration baseConfig;
74         switch (prefix.getDatastoreType()) {
75             case CONFIGURATION:
76                 baseConfig = DataTreeConfiguration.DEFAULT_CONFIGURATION;
77                 break;
78             case OPERATIONAL:
79                 baseConfig = DataTreeConfiguration.DEFAULT_OPERATIONAL;
80                 break;
81             default:
82                 throw new UnsupportedOperationException("Unknown prefix type " + prefix.getDatastoreType());
83         }
84
85         this.dataTree = new InMemoryDataTreeFactory().create(new DataTreeConfiguration.Builder(baseConfig.getTreeType())
86                 .setMandatoryNodesValidation(baseConfig.isMandatoryNodesValidationEnabled())
87                 .setUniqueIndexes(baseConfig.isUniqueIndexEnabled())
88                 .setRootPath(prefix.getRootIdentifier())
89                 .build());
90
91         // XXX: can we guarantee that the root is present in the schemacontext?
92         this.dataTree.setSchemaContext(distributedDataStore.getActorUtils().getSchemaContext());
93         this.shardPath = prefix.getRootIdentifier();
94         this.childShards = childShards;
95     }
96
97     protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
98         LOG.debug("Closing registration {}", registration);
99     }
100
101     @Override
102     public <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
103             registerTreeChangeListener(final YangInstanceIdentifier path, final L listener) {
104         takeLock();
105         try {
106             return setupListenerContext(path, listener);
107         } finally {
108             releaseLock();
109         }
110     }
111
112     private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
113             setupListenerContext(final YangInstanceIdentifier listenerPath, final L listener) {
114         // we need to register the listener registration path based on the shards root
115         // we have to strip the shard path from the listener path and then register
116         YangInstanceIdentifier strippedIdentifier = listenerPath;
117         if (!shardPath.isEmpty()) {
118             strippedIdentifier = YangInstanceIdentifier.create(stripShardPath(shardPath, listenerPath));
119         }
120
121         final DOMDataTreeListenerWithSubshards subshardListener =
122                 new DOMDataTreeListenerWithSubshards(strippedIdentifier, listener);
123         final AbstractDOMDataTreeChangeListenerRegistration<L> reg =
124                 setupContextWithoutSubshards(listenerPath, strippedIdentifier, subshardListener);
125
126         for (final ChildShardContext maybeAffected : childShards.values()) {
127             if (listenerPath.contains(maybeAffected.getPrefix().getRootIdentifier())) {
128                 // consumer has initialDataChangeEvent subshard somewhere on lower level
129                 // register to the notification manager with snapshot and forward child notifications to parent
130                 LOG.debug("Adding new subshard{{}} to listener at {}", maybeAffected.getPrefix(), listenerPath);
131                 subshardListener.addSubshard(maybeAffected);
132             } else if (maybeAffected.getPrefix().getRootIdentifier().contains(listenerPath)) {
133                 // bind path is inside subshard
134                 // TODO can this happen? seems like in ShardedDOMDataTree we are
135                 // already registering to the lowest shard possible
136                 throw new UnsupportedOperationException("Listener should be registered directly "
137                         + "into initialDataChangeEvent subshard");
138             }
139         }
140
141         return reg;
142     }
143
144     private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
145             setupContextWithoutSubshards(final YangInstanceIdentifier shardLookup,
146                                          final YangInstanceIdentifier listenerPath,
147                                          final DOMDataTreeListenerWithSubshards listener) {
148
149         LOG.debug("Registering root listener full path: {}, path inside shard: {}", shardLookup, listenerPath);
150
151         // register in the shard tree
152         final RegistrationTreeNode<AbstractDOMDataTreeChangeListenerRegistration<?>> node =
153                 findNodeFor(listenerPath.getPathArguments());
154
155         // register listener in CDS
156         ListenerRegistration<DOMDataTreeChangeListener> listenerReg = distributedDataStore
157                 .registerProxyListener(shardLookup, listenerPath, listener);
158
159         @SuppressWarnings("unchecked")
160         final AbstractDOMDataTreeChangeListenerRegistration<L> registration =
161             new AbstractDOMDataTreeChangeListenerRegistration<L>((L) listener) {
162                 @Override
163                 protected void removeRegistration() {
164                     listener.close();
165                     DistributedShardChangePublisher.this.removeRegistration(node, this);
166                     registrationRemoved(this);
167                     listenerReg.close();
168                 }
169             };
170         addRegistration(node, registration);
171
172         return registration;
173     }
174
175     private static Iterable<PathArgument> stripShardPath(final YangInstanceIdentifier shardPath,
176                                                          final YangInstanceIdentifier listenerPath) {
177         if (shardPath.isEmpty()) {
178             return listenerPath.getPathArguments();
179         }
180
181         final List<PathArgument> listenerPathArgs = new ArrayList<>(listenerPath.getPathArguments());
182         final Iterator<PathArgument> shardIter = shardPath.getPathArguments().iterator();
183         final Iterator<PathArgument> listenerIter = listenerPathArgs.iterator();
184
185         while (shardIter.hasNext()) {
186             if (shardIter.next().equals(listenerIter.next())) {
187                 listenerIter.remove();
188             } else {
189                 break;
190             }
191         }
192
193         return listenerPathArgs;
194     }
195
196     synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath,
197             final Collection<DataTreeCandidate> changes) throws DataValidationFailedException {
198         final DataTreeModification modification = dataTree.takeSnapshot().newModification();
199         for (final DataTreeCandidate change : changes) {
200             try {
201                 DataTreeCandidates.applyToModification(modification, change);
202             } catch (SchemaValidationFailedException e) {
203                 LOG.error("Validation failed", e);
204             }
205         }
206
207         modification.ready();
208
209         final DataTreeCandidate candidate;
210
211         dataTree.validate(modification);
212
213         // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree
214         candidate = dataTree.prepare(modification);
215         dataTree.commit(candidate);
216
217
218         DataTreeCandidateNode modifiedChild = candidate.getRootNode();
219
220         for (final PathArgument pathArgument : listenerPath.getPathArguments()) {
221             modifiedChild = modifiedChild.getModifiedChild(pathArgument);
222         }
223
224         if (modifiedChild == null) {
225             modifiedChild = new EmptyDataTreeCandidateNode(dataTree.getRootPath().getLastPathArgument());
226         }
227
228         return DataTreeCandidates.newDataTreeCandidate(dataTree.getRootPath(), modifiedChild);
229     }
230
231
232     private final class DOMDataTreeListenerWithSubshards implements DOMDataTreeChangeListener {
233
234         private final YangInstanceIdentifier listenerPath;
235         private final DOMDataTreeChangeListener delegate;
236         private final Map<YangInstanceIdentifier, ListenerRegistration<DOMDataTreeChangeListener>> registrations =
237                 new ConcurrentHashMap<>();
238
239         @GuardedBy("this")
240         private final Collection<DataTreeCandidate> stashedDataTreeCandidates = new LinkedList<>();
241
242         DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath,
243                                          final DOMDataTreeChangeListener delegate) {
244             this.listenerPath = requireNonNull(listenerPath);
245             this.delegate = requireNonNull(delegate);
246         }
247
248         @Override
249         public synchronized void onDataTreeChanged(final Collection<DataTreeCandidate> changes) {
250             LOG.debug("Received data changed {}", changes);
251
252             if (!stashedDataTreeCandidates.isEmpty()) {
253                 LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates);
254                 changes.addAll(stashedDataTreeCandidates);
255                 stashedDataTreeCandidates.clear();
256             }
257
258             try {
259                 applyChanges(listenerPath, changes);
260             } catch (final DataValidationFailedException e) {
261                 // TODO should we fail here? What if stashed changes
262                 // (changes from subshards) got ahead more than one generation
263                 // from current shard. Than we can fail to apply this changes
264                 // upon current data tree, but once we get respective changes
265                 // from current shard, we can apply also changes from
266                 // subshards.
267                 //
268                 // However, we can loose ability to notice and report some
269                 // errors then. For example, we cannot detect potential lost
270                 // changes from current shard.
271                 LOG.error("Validation failed for modification built from changes {}, current data tree: {}",
272                         changes, dataTree, e);
273                 throw new RuntimeException("Notification validation failed", e);
274             }
275
276             delegate.onDataTreeChanged(changes);
277         }
278
279         synchronized void onDataTreeChanged(final YangInstanceIdentifier pathFromRoot,
280                                             final Collection<DataTreeCandidate> changes) {
281             final YangInstanceIdentifier changeId =
282                     YangInstanceIdentifier.create(stripShardPath(dataTree.getRootPath(), pathFromRoot));
283
284             final List<DataTreeCandidate> newCandidates = changes.stream()
285                     .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode()))
286                     .collect(Collectors.toList());
287
288             try {
289                 delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
290             } catch (final DataValidationFailedException e) {
291                 // We cannot apply changes from subshard to current data tree.
292                 // Maybe changes from current shard haven't been applied to
293                 // data tree yet. Postpone processing of these changes till we
294                 // receive changes from current shard.
295                 LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.",
296                         pathFromRoot, changes, dataTree, e);
297                 stashedDataTreeCandidates.addAll(newCandidates);
298             }
299         }
300
301         void addSubshard(final ChildShardContext context) {
302             checkState(context.getShard() instanceof DOMStoreTreeChangePublisher,
303                     "All subshards that are initialDataChangeEvent part of ListenerContext need to be listenable");
304
305             final DOMStoreTreeChangePublisher listenableShard = (DOMStoreTreeChangePublisher) context.getShard();
306             // since this is going into subshard we want to listen for ALL changes in the subshard
307             registrations.put(context.getPrefix().getRootIdentifier(),
308                     listenableShard.registerTreeChangeListener(
309                             context.getPrefix().getRootIdentifier(), changes -> onDataTreeChanged(
310                                     context.getPrefix().getRootIdentifier(), changes)));
311         }
312
313         void close() {
314             for (final ListenerRegistration<DOMDataTreeChangeListener> registration : registrations.values()) {
315                 registration.close();
316             }
317             registrations.clear();
318         }
319     }
320
321     private static final class EmptyDataTreeCandidateNode implements DataTreeCandidateNode {
322
323         private final PathArgument identifier;
324
325         EmptyDataTreeCandidateNode(final PathArgument identifier) {
326             this.identifier = requireNonNull(identifier, "Identifier should not be null");
327         }
328
329         @Override
330         public PathArgument getIdentifier() {
331             return identifier;
332         }
333
334         @Override
335         public Collection<DataTreeCandidateNode> getChildNodes() {
336             return Collections.emptySet();
337         }
338
339         @Override
340         @SuppressWarnings("checkstyle:hiddenField")
341         public DataTreeCandidateNode getModifiedChild(final PathArgument identifier) {
342             return null;
343         }
344
345         @Override
346         public ModificationType getModificationType() {
347             return ModificationType.UNMODIFIED;
348         }
349
350         @Override
351         public Optional<NormalizedNode<?, ?>> getDataAfter() {
352             return Optional.empty();
353         }
354
355         @Override
356         public Optional<NormalizedNode<?, ?>> getDataBefore() {
357             return Optional.empty();
358         }
359     }
360 }