Bug 8116 - Make DistributedShardChangePublisher agnostic to data tree change events...
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / DistributedShardChangePublisher.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.util.Iterator;
17 import java.util.LinkedList;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.stream.Collectors;
22 import javax.annotation.Nonnull;
23 import javax.annotation.Nullable;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
26 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
28 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
29 import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
30 import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree;
31 import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode;
32 import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext;
33 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
34 import org.opendaylight.yangtools.concepts.ListenerRegistration;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
36 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
37 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
42 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
43 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
44 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
45 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
46 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
47 import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 public class DistributedShardChangePublisher
52         extends AbstractRegistrationTree<AbstractDOMDataTreeChangeListenerRegistration<?>>
53         implements DOMStoreTreeChangePublisher {
54
55     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class);
56
57     private final AbstractDataStore distributedDataStore;
58     private final YangInstanceIdentifier shardPath;
59
60     // This will be useful for signaling back pressure
61     private final DataStoreClient client;
62
63     private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards;
64
65     @GuardedBy("this")
66     private final DataTree dataTree;
67
68     public DistributedShardChangePublisher(final DataStoreClient client,
69                                            final AbstractDataStore distributedDataStore,
70                                            final DOMDataTreeIdentifier prefix,
71                                            final Map<DOMDataTreeIdentifier, ChildShardContext> childShards) {
72         this.client = client;
73         this.distributedDataStore = distributedDataStore;
74         // TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea
75         // maybe the whole listener logic would be better in the backend shards where we have direct access to the
76         // dataTree and wont have to cache it redundantly.
77         this.dataTree = InMemoryDataTreeFactory.getInstance().create(
78                 TreeType.valueOf(prefix.getDatastoreType().name()), prefix.getRootIdentifier());
79
80         dataTree.setSchemaContext(distributedDataStore.getActorContext().getSchemaContext());
81
82         this.shardPath = prefix.getRootIdentifier();
83         this.childShards = childShards;
84     }
85
86     protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
87         LOG.debug("Closing registration {}", registration);
88     }
89
90     @Override
91     public <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
92             registerTreeChangeListener(final YangInstanceIdentifier path, final L listener) {
93         takeLock();
94         try {
95             return setupListenerContext(path, listener);
96         } finally {
97             releaseLock();
98         }
99     }
100
101     private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
102             setupListenerContext(final YangInstanceIdentifier listenerPath, final L listener) {
103         // we need to register the listener registration path based on the shards root
104         // we have to strip the shard path from the listener path and then register
105         YangInstanceIdentifier strippedIdentifier = listenerPath;
106         if (!shardPath.isEmpty()) {
107             strippedIdentifier = YangInstanceIdentifier.create(stripShardPath(shardPath, listenerPath));
108         }
109
110         final DOMDataTreeListenerWithSubshards subshardListener =
111                 new DOMDataTreeListenerWithSubshards(strippedIdentifier, listener);
112         final AbstractDOMDataTreeChangeListenerRegistration<L> reg =
113                 setupContextWithoutSubshards(listenerPath, strippedIdentifier, subshardListener);
114
115         for (final ChildShardContext maybeAffected : childShards.values()) {
116             if (listenerPath.contains(maybeAffected.getPrefix().getRootIdentifier())) {
117                 // consumer has initialDataChangeEvent subshard somewhere on lower level
118                 // register to the notification manager with snapshot and forward child notifications to parent
119                 LOG.debug("Adding new subshard{{}} to listener at {}", maybeAffected.getPrefix(), listenerPath);
120                 subshardListener.addSubshard(maybeAffected);
121             } else if (maybeAffected.getPrefix().getRootIdentifier().contains(listenerPath)) {
122                 // bind path is inside subshard
123                 // TODO can this happen? seems like in ShardedDOMDataTree we are
124                 // already registering to the lowest shard possible
125                 throw new UnsupportedOperationException("Listener should be registered directly "
126                         + "into initialDataChangeEvent subshard");
127             }
128         }
129
130         return reg;
131     }
132
133     private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
134             setupContextWithoutSubshards(final YangInstanceIdentifier shardLookup,
135                                          final YangInstanceIdentifier listenerPath,
136                                          final DOMDataTreeListenerWithSubshards listener) {
137
138         LOG.debug("Registering root listener full path: {}, path inside shard: {}", shardLookup, listenerPath);
139
140         // register in the shard tree
141         final RegistrationTreeNode<AbstractDOMDataTreeChangeListenerRegistration<?>> node =
142                 findNodeFor(listenerPath.getPathArguments());
143
144         // register listener in CDS
145         final ProxyRegistration proxyReg = new ProxyRegistration(distributedDataStore
146                 .registerProxyListener(shardLookup, listenerPath, listener), listener);
147
148         @SuppressWarnings("unchecked")
149         final AbstractDOMDataTreeChangeListenerRegistration<L> registration =
150             new AbstractDOMDataTreeChangeListenerRegistration<L>((L) listener) {
151                 @Override
152                 protected void removeRegistration() {
153                     listener.close();
154                     DistributedShardChangePublisher.this.removeRegistration(node, this);
155                     registrationRemoved(this);
156                     proxyReg.close();
157                 }
158             };
159         addRegistration(node, registration);
160
161         return registration;
162     }
163
164     private static Iterable<PathArgument> stripShardPath(final YangInstanceIdentifier shardPath,
165                                                          final YangInstanceIdentifier listenerPath) {
166         if (shardPath.isEmpty()) {
167             return listenerPath.getPathArguments();
168         }
169
170         final List<PathArgument> listenerPathArgs = new ArrayList<>(listenerPath.getPathArguments());
171         final Iterator<PathArgument> shardIter = shardPath.getPathArguments().iterator();
172         final Iterator<PathArgument> listenerIter = listenerPathArgs.iterator();
173
174         while (shardIter.hasNext()) {
175             if (shardIter.next().equals(listenerIter.next())) {
176                 listenerIter.remove();
177             } else {
178                 break;
179             }
180         }
181
182         return listenerPathArgs;
183     }
184
185     private static class ProxyRegistration implements ListenerRegistration<DOMDataTreeChangeListener> {
186
187         private final ListenerRegistration<org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy;
188         private final DOMDataTreeChangeListener listener;
189
190         private ProxyRegistration(
191                 final ListenerRegistration<
192                         org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy,
193                 final DOMDataTreeChangeListener listener) {
194             this.proxy = proxy;
195             this.listener = listener;
196         }
197
198         @Override
199         public DOMDataTreeChangeListener getInstance() {
200             return listener;
201         }
202
203         @Override
204         public void close() {
205             proxy.close();
206         }
207     }
208
209     synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath,
210             final Collection<DataTreeCandidate> changes) throws DataValidationFailedException {
211         final DataTreeModification modification = dataTree.takeSnapshot().newModification();
212         for (final DataTreeCandidate change : changes) {
213             try {
214                 DataTreeCandidates.applyToModification(modification, change);
215             } catch (SchemaValidationFailedException e) {
216                 LOG.error("Validation failed {}", e);
217             }
218         }
219
220         modification.ready();
221
222         final DataTreeCandidate candidate;
223
224         dataTree.validate(modification);
225
226         // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree
227         candidate = dataTree.prepare(modification);
228         dataTree.commit(candidate);
229
230
231         DataTreeCandidateNode modifiedChild = candidate.getRootNode();
232
233         for (final PathArgument pathArgument : listenerPath.getPathArguments()) {
234             modifiedChild = modifiedChild.getModifiedChild(pathArgument);
235         }
236
237         if (modifiedChild == null) {
238             modifiedChild = new EmptyDataTreeCandidateNode(dataTree.getRootPath().getLastPathArgument());
239         }
240
241         return DataTreeCandidates.newDataTreeCandidate(dataTree.getRootPath(), modifiedChild);
242     }
243
244
245     private final class DOMDataTreeListenerWithSubshards implements DOMDataTreeChangeListener {
246
247         private final YangInstanceIdentifier listenerPath;
248         private final DOMDataTreeChangeListener delegate;
249         private final Map<YangInstanceIdentifier, ListenerRegistration<DOMDataTreeChangeListener>> registrations =
250                 new ConcurrentHashMap<>();
251
252         @GuardedBy("this")
253         private final Collection<DataTreeCandidate> stashedDataTreeCandidates = new LinkedList<>();
254
255         DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath,
256                                          final DOMDataTreeChangeListener delegate) {
257             this.listenerPath = Preconditions.checkNotNull(listenerPath);
258             this.delegate = Preconditions.checkNotNull(delegate);
259         }
260
261         @Override
262         public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
263             LOG.debug("Received data changed {}", changes);
264
265             if (!stashedDataTreeCandidates.isEmpty()) {
266                 LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates);
267                 changes.addAll(stashedDataTreeCandidates);
268                 stashedDataTreeCandidates.clear();
269             }
270
271             try {
272                 applyChanges(listenerPath, changes);
273             } catch (final DataValidationFailedException e) {
274                 // TODO should we fail here? What if stashed changes
275                 // (changes from subshards) got ahead more than one generation
276                 // from current shard. Than we can fail to apply this changes
277                 // upon current data tree, but once we get respective changes
278                 // from current shard, we can apply also changes from
279                 // subshards.
280                 //
281                 // However, we can loose ability to notice and report some
282                 // errors then. For example, we cannot detect potential lost
283                 // changes from current shard.
284                 LOG.error("Validation failed for modification built from changes {}, current data tree: {}",
285                         changes, dataTree, e);
286                 throw new RuntimeException("Notification validation failed", e);
287             }
288
289             delegate.onDataTreeChanged(changes);
290         }
291
292         synchronized void onDataTreeChanged(final YangInstanceIdentifier pathFromRoot,
293                                             final Collection<DataTreeCandidate> changes) {
294             final YangInstanceIdentifier changeId =
295                     YangInstanceIdentifier.create(stripShardPath(dataTree.getRootPath(), pathFromRoot));
296
297             final List<DataTreeCandidate> newCandidates = changes.stream()
298                     .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode()))
299                     .collect(Collectors.toList());
300
301             try {
302                 delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
303             } catch (final DataValidationFailedException e) {
304                 // We cannot apply changes from subshard to current data tree.
305                 // Maybe changes from current shard haven't been applied to
306                 // data tree yet. Postpone processing of these changes till we
307                 // receive changes from current shard.
308                 LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.",
309                         pathFromRoot, changes, dataTree);
310                 stashedDataTreeCandidates.addAll(newCandidates);
311             }
312         }
313
314         void addSubshard(final ChildShardContext context) {
315             Preconditions.checkState(context.getShard() instanceof DOMStoreTreeChangePublisher,
316                     "All subshards that are initialDataChangeEvent part of ListenerContext need to be listenable");
317
318             final DOMStoreTreeChangePublisher listenableShard = (DOMStoreTreeChangePublisher) context.getShard();
319             // since this is going into subshard we want to listen for ALL changes in the subshard
320             registrations.put(context.getPrefix().getRootIdentifier(),
321                     listenableShard.registerTreeChangeListener(
322                             context.getPrefix().getRootIdentifier(), changes -> onDataTreeChanged(
323                                     context.getPrefix().getRootIdentifier(), changes)));
324         }
325
326         void close() {
327             for (final ListenerRegistration<DOMDataTreeChangeListener> registration : registrations.values()) {
328                 registration.close();
329             }
330             registrations.clear();
331         }
332     }
333
334     private static final class EmptyDataTreeCandidateNode implements DataTreeCandidateNode {
335
336         private final PathArgument identifier;
337
338         EmptyDataTreeCandidateNode(final PathArgument identifier) {
339             this.identifier = Preconditions.checkNotNull(identifier, "Identifier should not be null");
340         }
341
342         @Nonnull
343         @Override
344         public PathArgument getIdentifier() {
345             return identifier;
346         }
347
348         @Nonnull
349         @Override
350         public Collection<DataTreeCandidateNode> getChildNodes() {
351             return Collections.emptySet();
352         }
353
354         @Nullable
355         @Override
356         public DataTreeCandidateNode getModifiedChild(final PathArgument identifier) {
357             return null;
358         }
359
360         @Nonnull
361         @Override
362         public ModificationType getModificationType() {
363             return ModificationType.UNMODIFIED;
364         }
365
366         @Nonnull
367         @Override
368         public Optional<NormalizedNode<?, ?>> getDataAfter() {
369             return Optional.absent();
370         }
371
372         @Nonnull
373         @Override
374         public Optional<NormalizedNode<?, ?>> getDataBefore() {
375             return Optional.absent();
376         }
377     }
378 }