BUG-8733: use DataTreeCandidateNodes.empty()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / DistributedShardChangePublisher.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import com.google.common.base.Preconditions;
12 import java.util.ArrayList;
13 import java.util.Collection;
14 import java.util.Collections;
15 import java.util.Iterator;
16 import java.util.LinkedList;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.stream.Collectors;
21 import javax.annotation.Nonnull;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
24 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
25 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
27 import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
28 import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree;
29 import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode;
30 import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext;
31 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
32 import org.opendaylight.yangtools.concepts.ListenerRegistration;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
37 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
42 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
43 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
44 import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 public class DistributedShardChangePublisher
49         extends AbstractRegistrationTree<AbstractDOMDataTreeChangeListenerRegistration<?>>
50         implements DOMStoreTreeChangePublisher {
51
52     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class);
53
54     private final AbstractDataStore distributedDataStore;
55     private final YangInstanceIdentifier shardPath;
56
57     // This will be useful for signaling back pressure
58     private final DataStoreClient client;
59
60     private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards;
61
62     @GuardedBy("this")
63     private final DataTree dataTree;
64
65     public DistributedShardChangePublisher(final DataStoreClient client,
66                                            final AbstractDataStore distributedDataStore,
67                                            final DOMDataTreeIdentifier prefix,
68                                            final Map<DOMDataTreeIdentifier, ChildShardContext> childShards) {
69         this.client = client;
70         this.distributedDataStore = distributedDataStore;
71         // TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea
72         // maybe the whole listener logic would be better in the backend shards where we have direct access to the
73         // dataTree and wont have to cache it redundantly.
74         this.dataTree = InMemoryDataTreeFactory.getInstance().create(
75                 TreeType.valueOf(prefix.getDatastoreType().name()), prefix.getRootIdentifier());
76
77         dataTree.setSchemaContext(distributedDataStore.getActorContext().getSchemaContext());
78
79         this.shardPath = prefix.getRootIdentifier();
80         this.childShards = childShards;
81     }
82
83     protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
84         LOG.debug("Closing registration {}", registration);
85     }
86
87     @Override
88     public <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
89             registerTreeChangeListener(final YangInstanceIdentifier path, final L listener) {
90         takeLock();
91         try {
92             return setupListenerContext(path, listener);
93         } finally {
94             releaseLock();
95         }
96     }
97
98     private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
99             setupListenerContext(final YangInstanceIdentifier listenerPath, final L listener) {
100         // we need to register the listener registration path based on the shards root
101         // we have to strip the shard path from the listener path and then register
102         YangInstanceIdentifier strippedIdentifier = listenerPath;
103         if (!shardPath.isEmpty()) {
104             strippedIdentifier = YangInstanceIdentifier.create(stripShardPath(shardPath, listenerPath));
105         }
106
107         final DOMDataTreeListenerWithSubshards subshardListener =
108                 new DOMDataTreeListenerWithSubshards(strippedIdentifier, listener);
109         final AbstractDOMDataTreeChangeListenerRegistration<L> reg =
110                 setupContextWithoutSubshards(listenerPath, strippedIdentifier, subshardListener);
111
112         for (final ChildShardContext maybeAffected : childShards.values()) {
113             if (listenerPath.contains(maybeAffected.getPrefix().getRootIdentifier())) {
114                 // consumer has initialDataChangeEvent subshard somewhere on lower level
115                 // register to the notification manager with snapshot and forward child notifications to parent
116                 LOG.debug("Adding new subshard{{}} to listener at {}", maybeAffected.getPrefix(), listenerPath);
117                 subshardListener.addSubshard(maybeAffected);
118             } else if (maybeAffected.getPrefix().getRootIdentifier().contains(listenerPath)) {
119                 // bind path is inside subshard
120                 // TODO can this happen? seems like in ShardedDOMDataTree we are
121                 // already registering to the lowest shard possible
122                 throw new UnsupportedOperationException("Listener should be registered directly "
123                         + "into initialDataChangeEvent subshard");
124             }
125         }
126
127         return reg;
128     }
129
130     private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
131             setupContextWithoutSubshards(final YangInstanceIdentifier shardLookup,
132                                          final YangInstanceIdentifier listenerPath,
133                                          final DOMDataTreeListenerWithSubshards listener) {
134
135         LOG.debug("Registering root listener full path: {}, path inside shard: {}", shardLookup, listenerPath);
136
137         // register in the shard tree
138         final RegistrationTreeNode<AbstractDOMDataTreeChangeListenerRegistration<?>> node =
139                 findNodeFor(listenerPath.getPathArguments());
140
141         // register listener in CDS
142         final ProxyRegistration proxyReg = new ProxyRegistration(distributedDataStore
143                 .registerProxyListener(shardLookup, listenerPath, listener), listener);
144
145         @SuppressWarnings("unchecked")
146         final AbstractDOMDataTreeChangeListenerRegistration<L> registration =
147             new AbstractDOMDataTreeChangeListenerRegistration<L>((L) listener) {
148                 @Override
149                 protected void removeRegistration() {
150                     listener.close();
151                     DistributedShardChangePublisher.this.removeRegistration(node, this);
152                     registrationRemoved(this);
153                     proxyReg.close();
154                 }
155             };
156         addRegistration(node, registration);
157
158         return registration;
159     }
160
161     private static Iterable<PathArgument> stripShardPath(final YangInstanceIdentifier shardPath,
162                                                          final YangInstanceIdentifier listenerPath) {
163         if (shardPath.isEmpty()) {
164             return listenerPath.getPathArguments();
165         }
166
167         final List<PathArgument> listenerPathArgs = new ArrayList<>(listenerPath.getPathArguments());
168         final Iterator<PathArgument> shardIter = shardPath.getPathArguments().iterator();
169         final Iterator<PathArgument> listenerIter = listenerPathArgs.iterator();
170
171         while (shardIter.hasNext()) {
172             if (shardIter.next().equals(listenerIter.next())) {
173                 listenerIter.remove();
174             } else {
175                 break;
176             }
177         }
178
179         return listenerPathArgs;
180     }
181
182     private static class ProxyRegistration implements ListenerRegistration<DOMDataTreeChangeListener> {
183
184         private final ListenerRegistration<org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy;
185         private final DOMDataTreeChangeListener listener;
186
187         private ProxyRegistration(
188                 final ListenerRegistration<
189                         org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy,
190                 final DOMDataTreeChangeListener listener) {
191             this.proxy = proxy;
192             this.listener = listener;
193         }
194
195         @Override
196         public DOMDataTreeChangeListener getInstance() {
197             return listener;
198         }
199
200         @Override
201         public void close() {
202             proxy.close();
203         }
204     }
205
206     synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath,
207             final Collection<DataTreeCandidate> changes) throws DataValidationFailedException {
208         final DataTreeModification modification = dataTree.takeSnapshot().newModification();
209         for (final DataTreeCandidate change : changes) {
210             try {
211                 DataTreeCandidates.applyToModification(modification, change);
212             } catch (SchemaValidationFailedException e) {
213                 LOG.error("Validation failed {}", e);
214             }
215         }
216
217         modification.ready();
218
219         final DataTreeCandidate candidate;
220
221         dataTree.validate(modification);
222
223         // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree
224         candidate = dataTree.prepare(modification);
225         dataTree.commit(candidate);
226
227
228         DataTreeCandidateNode modifiedChild = candidate.getRootNode();
229
230         for (final PathArgument pathArgument : listenerPath.getPathArguments()) {
231             modifiedChild = modifiedChild.getModifiedChild(pathArgument);
232         }
233
234         if (modifiedChild == null) {
235             modifiedChild = DataTreeCandidateNodes.empty(dataTree.getRootPath().getLastPathArgument());
236         }
237
238         return DataTreeCandidates.newDataTreeCandidate(dataTree.getRootPath(), modifiedChild);
239     }
240
241
242     private final class DOMDataTreeListenerWithSubshards implements DOMDataTreeChangeListener {
243
244         private final YangInstanceIdentifier listenerPath;
245         private final DOMDataTreeChangeListener delegate;
246         private final Map<YangInstanceIdentifier, ListenerRegistration<DOMDataTreeChangeListener>> registrations =
247                 new ConcurrentHashMap<>();
248
249         @GuardedBy("this")
250         private final Collection<DataTreeCandidate> stashedDataTreeCandidates = new LinkedList<>();
251
252         DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath,
253                                          final DOMDataTreeChangeListener delegate) {
254             this.listenerPath = Preconditions.checkNotNull(listenerPath);
255             this.delegate = Preconditions.checkNotNull(delegate);
256         }
257
258         @Override
259         public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
260             LOG.debug("Received data changed {}", changes);
261
262             if (!stashedDataTreeCandidates.isEmpty()) {
263                 LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates);
264                 changes.addAll(stashedDataTreeCandidates);
265                 stashedDataTreeCandidates.clear();
266             }
267
268             try {
269                 applyChanges(listenerPath, changes);
270             } catch (final DataValidationFailedException e) {
271                 // TODO should we fail here? What if stashed changes
272                 // (changes from subshards) got ahead more than one generation
273                 // from current shard. Than we can fail to apply this changes
274                 // upon current data tree, but once we get respective changes
275                 // from current shard, we can apply also changes from
276                 // subshards.
277                 //
278                 // However, we can loose ability to notice and report some
279                 // errors then. For example, we cannot detect potential lost
280                 // changes from current shard.
281                 LOG.error("Validation failed for modification built from changes {}, current data tree: {}",
282                         changes, dataTree, e);
283                 throw new RuntimeException("Notification validation failed", e);
284             }
285
286             delegate.onDataTreeChanged(changes);
287         }
288
289         synchronized void onDataTreeChanged(final YangInstanceIdentifier pathFromRoot,
290                                             final Collection<DataTreeCandidate> changes) {
291             final YangInstanceIdentifier changeId =
292                     YangInstanceIdentifier.create(stripShardPath(dataTree.getRootPath(), pathFromRoot));
293
294             final List<DataTreeCandidate> newCandidates = changes.stream()
295                     .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode()))
296                     .collect(Collectors.toList());
297
298             try {
299                 delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
300             } catch (final DataValidationFailedException e) {
301                 // We cannot apply changes from subshard to current data tree.
302                 // Maybe changes from current shard haven't been applied to
303                 // data tree yet. Postpone processing of these changes till we
304                 // receive changes from current shard.
305                 LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.",
306                         pathFromRoot, changes, dataTree);
307                 stashedDataTreeCandidates.addAll(newCandidates);
308             }
309         }
310
311         void addSubshard(final ChildShardContext context) {
312             Preconditions.checkState(context.getShard() instanceof DOMStoreTreeChangePublisher,
313                     "All subshards that are initialDataChangeEvent part of ListenerContext need to be listenable");
314
315             final DOMStoreTreeChangePublisher listenableShard = (DOMStoreTreeChangePublisher) context.getShard();
316             // since this is going into subshard we want to listen for ALL changes in the subshard
317             registrations.put(context.getPrefix().getRootIdentifier(),
318                     listenableShard.registerTreeChangeListener(
319                             context.getPrefix().getRootIdentifier(), changes -> onDataTreeChanged(
320                                     context.getPrefix().getRootIdentifier(), changes)));
321         }
322
323         void close() {
324             for (final ListenerRegistration<DOMDataTreeChangeListener> registration : registrations.values()) {
325                 registration.close();
326             }
327             registrations.clear();
328         }
329     }
330 }