f91dc3b2600dd09a5ed815b4d7a24f391d527404
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / DistributedShardChangePublisher.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.util.Iterator;
17 import java.util.LinkedList;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.stream.Collectors;
22 import javax.annotation.Nonnull;
23 import javax.annotation.Nullable;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
26 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
28 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
29 import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
30 import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree;
31 import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode;
32 import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext;
33 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
34 import org.opendaylight.yangtools.concepts.ListenerRegistration;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
36 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
37 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
42 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
43 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
44 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
45 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
46 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
47 import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 public class DistributedShardChangePublisher
52         extends AbstractRegistrationTree<AbstractDOMDataTreeChangeListenerRegistration<?>>
53         implements DOMStoreTreeChangePublisher {
54
55     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class);
56
57     private final AbstractDataStore distributedDataStore;
58     private final YangInstanceIdentifier shardPath;
59
60     private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards;
61
62     @GuardedBy("this")
63     private final DataTree dataTree;
64
65     public DistributedShardChangePublisher(final DataStoreClient client,
66                                            final AbstractDataStore distributedDataStore,
67                                            final DOMDataTreeIdentifier prefix,
68                                            final Map<DOMDataTreeIdentifier, ChildShardContext> childShards) {
69         this.distributedDataStore = distributedDataStore;
70         // TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea
71         // maybe the whole listener logic would be better in the backend shards where we have direct access to the
72         // dataTree and wont have to cache it redundantly.
73         this.dataTree = InMemoryDataTreeFactory.getInstance().create(
74                 TreeType.valueOf(prefix.getDatastoreType().name()), prefix.getRootIdentifier());
75
76         dataTree.setSchemaContext(distributedDataStore.getActorContext().getSchemaContext());
77
78         this.shardPath = prefix.getRootIdentifier();
79         this.childShards = childShards;
80     }
81
82     protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
83         LOG.debug("Closing registration {}", registration);
84     }
85
86     @Override
87     public <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
88             registerTreeChangeListener(final YangInstanceIdentifier path, final L listener) {
89         takeLock();
90         try {
91             return setupListenerContext(path, listener);
92         } finally {
93             releaseLock();
94         }
95     }
96
97     private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
98             setupListenerContext(final YangInstanceIdentifier listenerPath, final L listener) {
99         // we need to register the listener registration path based on the shards root
100         // we have to strip the shard path from the listener path and then register
101         YangInstanceIdentifier strippedIdentifier = listenerPath;
102         if (!shardPath.isEmpty()) {
103             strippedIdentifier = YangInstanceIdentifier.create(stripShardPath(shardPath, listenerPath));
104         }
105
106         final DOMDataTreeListenerWithSubshards subshardListener =
107                 new DOMDataTreeListenerWithSubshards(strippedIdentifier, listener);
108         final AbstractDOMDataTreeChangeListenerRegistration<L> reg =
109                 setupContextWithoutSubshards(listenerPath, strippedIdentifier, subshardListener);
110
111         for (final ChildShardContext maybeAffected : childShards.values()) {
112             if (listenerPath.contains(maybeAffected.getPrefix().getRootIdentifier())) {
113                 // consumer has initialDataChangeEvent subshard somewhere on lower level
114                 // register to the notification manager with snapshot and forward child notifications to parent
115                 LOG.debug("Adding new subshard{{}} to listener at {}", maybeAffected.getPrefix(), listenerPath);
116                 subshardListener.addSubshard(maybeAffected);
117             } else if (maybeAffected.getPrefix().getRootIdentifier().contains(listenerPath)) {
118                 // bind path is inside subshard
119                 // TODO can this happen? seems like in ShardedDOMDataTree we are
120                 // already registering to the lowest shard possible
121                 throw new UnsupportedOperationException("Listener should be registered directly "
122                         + "into initialDataChangeEvent subshard");
123             }
124         }
125
126         return reg;
127     }
128
129     private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
130             setupContextWithoutSubshards(final YangInstanceIdentifier shardLookup,
131                                          final YangInstanceIdentifier listenerPath,
132                                          final DOMDataTreeListenerWithSubshards listener) {
133
134         LOG.debug("Registering root listener full path: {}, path inside shard: {}", shardLookup, listenerPath);
135
136         // register in the shard tree
137         final RegistrationTreeNode<AbstractDOMDataTreeChangeListenerRegistration<?>> node =
138                 findNodeFor(listenerPath.getPathArguments());
139
140         // register listener in CDS
141         final ProxyRegistration proxyReg = new ProxyRegistration(distributedDataStore
142                 .registerProxyListener(shardLookup, listenerPath, listener), listener);
143
144         @SuppressWarnings("unchecked")
145         final AbstractDOMDataTreeChangeListenerRegistration<L> registration =
146             new AbstractDOMDataTreeChangeListenerRegistration<L>((L) listener) {
147                 @Override
148                 protected void removeRegistration() {
149                     listener.close();
150                     DistributedShardChangePublisher.this.removeRegistration(node, this);
151                     registrationRemoved(this);
152                     proxyReg.close();
153                 }
154             };
155         addRegistration(node, registration);
156
157         return registration;
158     }
159
160     private static Iterable<PathArgument> stripShardPath(final YangInstanceIdentifier shardPath,
161                                                          final YangInstanceIdentifier listenerPath) {
162         if (shardPath.isEmpty()) {
163             return listenerPath.getPathArguments();
164         }
165
166         final List<PathArgument> listenerPathArgs = new ArrayList<>(listenerPath.getPathArguments());
167         final Iterator<PathArgument> shardIter = shardPath.getPathArguments().iterator();
168         final Iterator<PathArgument> listenerIter = listenerPathArgs.iterator();
169
170         while (shardIter.hasNext()) {
171             if (shardIter.next().equals(listenerIter.next())) {
172                 listenerIter.remove();
173             } else {
174                 break;
175             }
176         }
177
178         return listenerPathArgs;
179     }
180
181     private static final class ProxyRegistration implements ListenerRegistration<DOMDataTreeChangeListener> {
182
183         private final ListenerRegistration<org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy;
184         private final DOMDataTreeChangeListener listener;
185
186         private ProxyRegistration(
187                 final ListenerRegistration<
188                         org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy,
189                 final DOMDataTreeChangeListener listener) {
190             this.proxy = proxy;
191             this.listener = listener;
192         }
193
194         @Override
195         public DOMDataTreeChangeListener getInstance() {
196             return listener;
197         }
198
199         @Override
200         public void close() {
201             proxy.close();
202         }
203     }
204
205     synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath,
206             final Collection<DataTreeCandidate> changes) throws DataValidationFailedException {
207         final DataTreeModification modification = dataTree.takeSnapshot().newModification();
208         for (final DataTreeCandidate change : changes) {
209             try {
210                 DataTreeCandidates.applyToModification(modification, change);
211             } catch (SchemaValidationFailedException e) {
212                 LOG.error("Validation failed {}", e);
213             }
214         }
215
216         modification.ready();
217
218         final DataTreeCandidate candidate;
219
220         dataTree.validate(modification);
221
222         // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree
223         candidate = dataTree.prepare(modification);
224         dataTree.commit(candidate);
225
226
227         DataTreeCandidateNode modifiedChild = candidate.getRootNode();
228
229         for (final PathArgument pathArgument : listenerPath.getPathArguments()) {
230             modifiedChild = modifiedChild.getModifiedChild(pathArgument);
231         }
232
233         if (modifiedChild == null) {
234             modifiedChild = new EmptyDataTreeCandidateNode(dataTree.getRootPath().getLastPathArgument());
235         }
236
237         return DataTreeCandidates.newDataTreeCandidate(dataTree.getRootPath(), modifiedChild);
238     }
239
240
241     private final class DOMDataTreeListenerWithSubshards implements DOMDataTreeChangeListener {
242
243         private final YangInstanceIdentifier listenerPath;
244         private final DOMDataTreeChangeListener delegate;
245         private final Map<YangInstanceIdentifier, ListenerRegistration<DOMDataTreeChangeListener>> registrations =
246                 new ConcurrentHashMap<>();
247
248         @GuardedBy("this")
249         private final Collection<DataTreeCandidate> stashedDataTreeCandidates = new LinkedList<>();
250
251         DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath,
252                                          final DOMDataTreeChangeListener delegate) {
253             this.listenerPath = Preconditions.checkNotNull(listenerPath);
254             this.delegate = Preconditions.checkNotNull(delegate);
255         }
256
257         @Override
258         public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
259             LOG.debug("Received data changed {}", changes);
260
261             if (!stashedDataTreeCandidates.isEmpty()) {
262                 LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates);
263                 changes.addAll(stashedDataTreeCandidates);
264                 stashedDataTreeCandidates.clear();
265             }
266
267             try {
268                 applyChanges(listenerPath, changes);
269             } catch (final DataValidationFailedException e) {
270                 // TODO should we fail here? What if stashed changes
271                 // (changes from subshards) got ahead more than one generation
272                 // from current shard. Than we can fail to apply this changes
273                 // upon current data tree, but once we get respective changes
274                 // from current shard, we can apply also changes from
275                 // subshards.
276                 //
277                 // However, we can loose ability to notice and report some
278                 // errors then. For example, we cannot detect potential lost
279                 // changes from current shard.
280                 LOG.error("Validation failed for modification built from changes {}, current data tree: {}",
281                         changes, dataTree, e);
282                 throw new RuntimeException("Notification validation failed", e);
283             }
284
285             delegate.onDataTreeChanged(changes);
286         }
287
288         synchronized void onDataTreeChanged(final YangInstanceIdentifier pathFromRoot,
289                                             final Collection<DataTreeCandidate> changes) {
290             final YangInstanceIdentifier changeId =
291                     YangInstanceIdentifier.create(stripShardPath(dataTree.getRootPath(), pathFromRoot));
292
293             final List<DataTreeCandidate> newCandidates = changes.stream()
294                     .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode()))
295                     .collect(Collectors.toList());
296
297             try {
298                 delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
299             } catch (final DataValidationFailedException e) {
300                 // We cannot apply changes from subshard to current data tree.
301                 // Maybe changes from current shard haven't been applied to
302                 // data tree yet. Postpone processing of these changes till we
303                 // receive changes from current shard.
304                 LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.",
305                         pathFromRoot, changes, dataTree, e);
306                 stashedDataTreeCandidates.addAll(newCandidates);
307             }
308         }
309
310         void addSubshard(final ChildShardContext context) {
311             Preconditions.checkState(context.getShard() instanceof DOMStoreTreeChangePublisher,
312                     "All subshards that are initialDataChangeEvent part of ListenerContext need to be listenable");
313
314             final DOMStoreTreeChangePublisher listenableShard = (DOMStoreTreeChangePublisher) context.getShard();
315             // since this is going into subshard we want to listen for ALL changes in the subshard
316             registrations.put(context.getPrefix().getRootIdentifier(),
317                     listenableShard.registerTreeChangeListener(
318                             context.getPrefix().getRootIdentifier(), changes -> onDataTreeChanged(
319                                     context.getPrefix().getRootIdentifier(), changes)));
320         }
321
322         void close() {
323             for (final ListenerRegistration<DOMDataTreeChangeListener> registration : registrations.values()) {
324                 registration.close();
325             }
326             registrations.clear();
327         }
328     }
329
330     private static final class EmptyDataTreeCandidateNode implements DataTreeCandidateNode {
331
332         private final PathArgument identifier;
333
334         EmptyDataTreeCandidateNode(final PathArgument identifier) {
335             this.identifier = Preconditions.checkNotNull(identifier, "Identifier should not be null");
336         }
337
338         @Nonnull
339         @Override
340         public PathArgument getIdentifier() {
341             return identifier;
342         }
343
344         @Nonnull
345         @Override
346         public Collection<DataTreeCandidateNode> getChildNodes() {
347             return Collections.emptySet();
348         }
349
350         @Nullable
351         @Override
352         @SuppressWarnings("checkstyle:hiddenField")
353         public DataTreeCandidateNode getModifiedChild(final PathArgument identifier) {
354             return null;
355         }
356
357         @Nonnull
358         @Override
359         public ModificationType getModificationType() {
360             return ModificationType.UNMODIFIED;
361         }
362
363         @Nonnull
364         @Override
365         public Optional<NormalizedNode<?, ?>> getDataAfter() {
366             return Optional.absent();
367         }
368
369         @Nonnull
370         @Override
371         public Optional<NormalizedNode<?, ?>> getDataBefore() {
372             return Optional.absent();
373         }
374     }
375 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.