Adjust to yangtools-2.0.0 changes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / DistributedShardChangePublisher.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import com.google.common.base.Preconditions;
12 import java.util.ArrayList;
13 import java.util.Collection;
14 import java.util.Collections;
15 import java.util.Iterator;
16 import java.util.LinkedList;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.Optional;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.stream.Collectors;
22 import javax.annotation.Nonnull;
23 import javax.annotation.Nullable;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
26 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
28 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
29 import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
30 import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree;
31 import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode;
32 import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext;
33 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
34 import org.opendaylight.yangtools.concepts.ListenerRegistration;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
36 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
37 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
42 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
43 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
44 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
45 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
46 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
47 import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 public class DistributedShardChangePublisher
52         extends AbstractRegistrationTree<AbstractDOMDataTreeChangeListenerRegistration<?>>
53         implements DOMStoreTreeChangePublisher {
54
55     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class);
56
57     private final AbstractDataStore distributedDataStore;
58     private final YangInstanceIdentifier shardPath;
59
60     private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards;
61
62     @GuardedBy("this")
63     private final DataTree dataTree;
64
65     public DistributedShardChangePublisher(final DataStoreClient client,
66                                            final AbstractDataStore distributedDataStore,
67                                            final DOMDataTreeIdentifier prefix,
68                                            final Map<DOMDataTreeIdentifier, ChildShardContext> childShards) {
69         this.distributedDataStore = distributedDataStore;
70         // TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea
71         // maybe the whole listener logic would be better in the backend shards where we have direct access to the
72         // dataTree and wont have to cache it redundantly.
73
74         final DataTreeConfiguration baseConfig;
75         switch (prefix.getDatastoreType()) {
76             case CONFIGURATION:
77                 baseConfig = DataTreeConfiguration.DEFAULT_CONFIGURATION;
78                 break;
79             case OPERATIONAL:
80                 baseConfig = DataTreeConfiguration.DEFAULT_OPERATIONAL;
81                 break;
82             default:
83                 throw new UnsupportedOperationException("Unknown prefix type " + prefix.getDatastoreType());
84         }
85
86         this.dataTree = new InMemoryDataTreeFactory().create(new DataTreeConfiguration.Builder(baseConfig.getTreeType())
87                 .setMandatoryNodesValidation(baseConfig.isMandatoryNodesValidationEnabled())
88                 .setUniqueIndexes(baseConfig.isUniqueIndexEnabled())
89                 .setRootPath(prefix.getRootIdentifier())
90                 .build());
91
92         // XXX: can we guarantee that the root is present in the schemacontext?
93         this.dataTree.setSchemaContext(distributedDataStore.getActorContext().getSchemaContext());
94         this.shardPath = prefix.getRootIdentifier();
95         this.childShards = childShards;
96     }
97
98     protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
99         LOG.debug("Closing registration {}", registration);
100     }
101
102     @Override
103     public <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
104             registerTreeChangeListener(final YangInstanceIdentifier path, final L listener) {
105         takeLock();
106         try {
107             return setupListenerContext(path, listener);
108         } finally {
109             releaseLock();
110         }
111     }
112
113     private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
114             setupListenerContext(final YangInstanceIdentifier listenerPath, final L listener) {
115         // we need to register the listener registration path based on the shards root
116         // we have to strip the shard path from the listener path and then register
117         YangInstanceIdentifier strippedIdentifier = listenerPath;
118         if (!shardPath.isEmpty()) {
119             strippedIdentifier = YangInstanceIdentifier.create(stripShardPath(shardPath, listenerPath));
120         }
121
122         final DOMDataTreeListenerWithSubshards subshardListener =
123                 new DOMDataTreeListenerWithSubshards(strippedIdentifier, listener);
124         final AbstractDOMDataTreeChangeListenerRegistration<L> reg =
125                 setupContextWithoutSubshards(listenerPath, strippedIdentifier, subshardListener);
126
127         for (final ChildShardContext maybeAffected : childShards.values()) {
128             if (listenerPath.contains(maybeAffected.getPrefix().getRootIdentifier())) {
129                 // consumer has initialDataChangeEvent subshard somewhere on lower level
130                 // register to the notification manager with snapshot and forward child notifications to parent
131                 LOG.debug("Adding new subshard{{}} to listener at {}", maybeAffected.getPrefix(), listenerPath);
132                 subshardListener.addSubshard(maybeAffected);
133             } else if (maybeAffected.getPrefix().getRootIdentifier().contains(listenerPath)) {
134                 // bind path is inside subshard
135                 // TODO can this happen? seems like in ShardedDOMDataTree we are
136                 // already registering to the lowest shard possible
137                 throw new UnsupportedOperationException("Listener should be registered directly "
138                         + "into initialDataChangeEvent subshard");
139             }
140         }
141
142         return reg;
143     }
144
145     private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
146             setupContextWithoutSubshards(final YangInstanceIdentifier shardLookup,
147                                          final YangInstanceIdentifier listenerPath,
148                                          final DOMDataTreeListenerWithSubshards listener) {
149
150         LOG.debug("Registering root listener full path: {}, path inside shard: {}", shardLookup, listenerPath);
151
152         // register in the shard tree
153         final RegistrationTreeNode<AbstractDOMDataTreeChangeListenerRegistration<?>> node =
154                 findNodeFor(listenerPath.getPathArguments());
155
156         // register listener in CDS
157         final ProxyRegistration proxyReg = new ProxyRegistration(distributedDataStore
158                 .registerProxyListener(shardLookup, listenerPath, listener), listener);
159
160         @SuppressWarnings("unchecked")
161         final AbstractDOMDataTreeChangeListenerRegistration<L> registration =
162             new AbstractDOMDataTreeChangeListenerRegistration<L>((L) listener) {
163                 @Override
164                 protected void removeRegistration() {
165                     listener.close();
166                     DistributedShardChangePublisher.this.removeRegistration(node, this);
167                     registrationRemoved(this);
168                     proxyReg.close();
169                 }
170             };
171         addRegistration(node, registration);
172
173         return registration;
174     }
175
176     private static Iterable<PathArgument> stripShardPath(final YangInstanceIdentifier shardPath,
177                                                          final YangInstanceIdentifier listenerPath) {
178         if (shardPath.isEmpty()) {
179             return listenerPath.getPathArguments();
180         }
181
182         final List<PathArgument> listenerPathArgs = new ArrayList<>(listenerPath.getPathArguments());
183         final Iterator<PathArgument> shardIter = shardPath.getPathArguments().iterator();
184         final Iterator<PathArgument> listenerIter = listenerPathArgs.iterator();
185
186         while (shardIter.hasNext()) {
187             if (shardIter.next().equals(listenerIter.next())) {
188                 listenerIter.remove();
189             } else {
190                 break;
191             }
192         }
193
194         return listenerPathArgs;
195     }
196
197     private static final class ProxyRegistration implements ListenerRegistration<DOMDataTreeChangeListener> {
198
199         private final ListenerRegistration<org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy;
200         private final DOMDataTreeChangeListener listener;
201
202         private ProxyRegistration(
203                 final ListenerRegistration<
204                         org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy,
205                 final DOMDataTreeChangeListener listener) {
206             this.proxy = proxy;
207             this.listener = listener;
208         }
209
210         @Override
211         public DOMDataTreeChangeListener getInstance() {
212             return listener;
213         }
214
215         @Override
216         public void close() {
217             proxy.close();
218         }
219     }
220
221     synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath,
222             final Collection<DataTreeCandidate> changes) throws DataValidationFailedException {
223         final DataTreeModification modification = dataTree.takeSnapshot().newModification();
224         for (final DataTreeCandidate change : changes) {
225             try {
226                 DataTreeCandidates.applyToModification(modification, change);
227             } catch (SchemaValidationFailedException e) {
228                 LOG.error("Validation failed {}", e);
229             }
230         }
231
232         modification.ready();
233
234         final DataTreeCandidate candidate;
235
236         dataTree.validate(modification);
237
238         // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree
239         candidate = dataTree.prepare(modification);
240         dataTree.commit(candidate);
241
242
243         DataTreeCandidateNode modifiedChild = candidate.getRootNode();
244
245         for (final PathArgument pathArgument : listenerPath.getPathArguments()) {
246             modifiedChild = modifiedChild.getModifiedChild(pathArgument);
247         }
248
249         if (modifiedChild == null) {
250             modifiedChild = new EmptyDataTreeCandidateNode(dataTree.getRootPath().getLastPathArgument());
251         }
252
253         return DataTreeCandidates.newDataTreeCandidate(dataTree.getRootPath(), modifiedChild);
254     }
255
256
257     private final class DOMDataTreeListenerWithSubshards implements DOMDataTreeChangeListener {
258
259         private final YangInstanceIdentifier listenerPath;
260         private final DOMDataTreeChangeListener delegate;
261         private final Map<YangInstanceIdentifier, ListenerRegistration<DOMDataTreeChangeListener>> registrations =
262                 new ConcurrentHashMap<>();
263
264         @GuardedBy("this")
265         private final Collection<DataTreeCandidate> stashedDataTreeCandidates = new LinkedList<>();
266
267         DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath,
268                                          final DOMDataTreeChangeListener delegate) {
269             this.listenerPath = Preconditions.checkNotNull(listenerPath);
270             this.delegate = Preconditions.checkNotNull(delegate);
271         }
272
273         @Override
274         public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
275             LOG.debug("Received data changed {}", changes);
276
277             if (!stashedDataTreeCandidates.isEmpty()) {
278                 LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates);
279                 changes.addAll(stashedDataTreeCandidates);
280                 stashedDataTreeCandidates.clear();
281             }
282
283             try {
284                 applyChanges(listenerPath, changes);
285             } catch (final DataValidationFailedException e) {
286                 // TODO should we fail here? What if stashed changes
287                 // (changes from subshards) got ahead more than one generation
288                 // from current shard. Than we can fail to apply this changes
289                 // upon current data tree, but once we get respective changes
290                 // from current shard, we can apply also changes from
291                 // subshards.
292                 //
293                 // However, we can loose ability to notice and report some
294                 // errors then. For example, we cannot detect potential lost
295                 // changes from current shard.
296                 LOG.error("Validation failed for modification built from changes {}, current data tree: {}",
297                         changes, dataTree, e);
298                 throw new RuntimeException("Notification validation failed", e);
299             }
300
301             delegate.onDataTreeChanged(changes);
302         }
303
304         synchronized void onDataTreeChanged(final YangInstanceIdentifier pathFromRoot,
305                                             final Collection<DataTreeCandidate> changes) {
306             final YangInstanceIdentifier changeId =
307                     YangInstanceIdentifier.create(stripShardPath(dataTree.getRootPath(), pathFromRoot));
308
309             final List<DataTreeCandidate> newCandidates = changes.stream()
310                     .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode()))
311                     .collect(Collectors.toList());
312
313             try {
314                 delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
315             } catch (final DataValidationFailedException e) {
316                 // We cannot apply changes from subshard to current data tree.
317                 // Maybe changes from current shard haven't been applied to
318                 // data tree yet. Postpone processing of these changes till we
319                 // receive changes from current shard.
320                 LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.",
321                         pathFromRoot, changes, dataTree, e);
322                 stashedDataTreeCandidates.addAll(newCandidates);
323             }
324         }
325
326         void addSubshard(final ChildShardContext context) {
327             Preconditions.checkState(context.getShard() instanceof DOMStoreTreeChangePublisher,
328                     "All subshards that are initialDataChangeEvent part of ListenerContext need to be listenable");
329
330             final DOMStoreTreeChangePublisher listenableShard = (DOMStoreTreeChangePublisher) context.getShard();
331             // since this is going into subshard we want to listen for ALL changes in the subshard
332             registrations.put(context.getPrefix().getRootIdentifier(),
333                     listenableShard.registerTreeChangeListener(
334                             context.getPrefix().getRootIdentifier(), changes -> onDataTreeChanged(
335                                     context.getPrefix().getRootIdentifier(), changes)));
336         }
337
338         void close() {
339             for (final ListenerRegistration<DOMDataTreeChangeListener> registration : registrations.values()) {
340                 registration.close();
341             }
342             registrations.clear();
343         }
344     }
345
346     private static final class EmptyDataTreeCandidateNode implements DataTreeCandidateNode {
347
348         private final PathArgument identifier;
349
350         EmptyDataTreeCandidateNode(final PathArgument identifier) {
351             this.identifier = Preconditions.checkNotNull(identifier, "Identifier should not be null");
352         }
353
354         @Nonnull
355         @Override
356         public PathArgument getIdentifier() {
357             return identifier;
358         }
359
360         @Nonnull
361         @Override
362         public Collection<DataTreeCandidateNode> getChildNodes() {
363             return Collections.emptySet();
364         }
365
366         @Nullable
367         @Override
368         @SuppressWarnings("checkstyle:hiddenField")
369         public DataTreeCandidateNode getModifiedChild(final PathArgument identifier) {
370             return null;
371         }
372
373         @Nonnull
374         @Override
375         public ModificationType getModificationType() {
376             return ModificationType.UNMODIFIED;
377         }
378
379         @Nonnull
380         @Override
381         public Optional<NormalizedNode<?, ?>> getDataAfter() {
382             return Optional.empty();
383         }
384
385         @Nonnull
386         @Override
387         public Optional<NormalizedNode<?, ?>> getDataBefore() {
388             return Optional.empty();
389         }
390     }
391 }