BUG-2138: DistributedShardListeners support for nested shards
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / DistributedShardChangePublisher.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.util.Iterator;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.stream.Collectors;
21 import javax.annotation.Nonnull;
22 import javax.annotation.Nullable;
23 import javax.annotation.concurrent.GuardedBy;
24 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
25 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
28 import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
29 import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree;
30 import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode;
31 import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext;
32 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
33 import org.opendaylight.yangtools.concepts.ListenerRegistration;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
36 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
37 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
42 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
43 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
44 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
45 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
46 import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 public class DistributedShardChangePublisher
51         extends AbstractRegistrationTree<AbstractDOMDataTreeChangeListenerRegistration<?>>
52         implements DOMStoreTreeChangePublisher {
53
54     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class);
55
56     private final DistributedDataStore distributedDataStore;
57     private final YangInstanceIdentifier shardPath;
58
59     // This will be useful for signaling back pressure
60     private final DataStoreClient client;
61
62     private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards;
63
64     @GuardedBy("this")
65     private final DataTree dataTree;
66
67     public DistributedShardChangePublisher(final DataStoreClient client,
68                                            final DistributedDataStore distributedDataStore,
69                                            final DOMDataTreeIdentifier prefix,
70                                            final Map<DOMDataTreeIdentifier, ChildShardContext> childShards) {
71         this.client = client;
72         this.distributedDataStore = distributedDataStore;
73         // TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea
74         // maybe the whole listener logic would be better in the backend shards where we have direct access to the
75         // dataTree and wont have to cache it redundantly.
76         this.dataTree = InMemoryDataTreeFactory.getInstance().create(
77                 TreeType.valueOf(prefix.getDatastoreType().name()), prefix.getRootIdentifier());
78
79         dataTree.setSchemaContext(distributedDataStore.getActorContext().getSchemaContext());
80
81         this.shardPath = prefix.getRootIdentifier();
82         this.childShards = childShards;
83     }
84
85     protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
86         LOG.debug("Closing registration {}", registration);
87     }
88
89     @Override
90     public <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
91             registerTreeChangeListener(final YangInstanceIdentifier path, final L listener) {
92         takeLock();
93         try {
94             return setupListenerContext(path, listener);
95         } finally {
96             releaseLock();
97         }
98     }
99
100     private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
101             setupListenerContext(final YangInstanceIdentifier listenerPath, final L listener) {
102         // we need to register the listener registration path based on the shards root
103         // we have to strip the shard path from the listener path and then register
104         YangInstanceIdentifier strippedIdentifier = listenerPath;
105         if (!shardPath.isEmpty()) {
106             strippedIdentifier = YangInstanceIdentifier.create(stripShardPath(shardPath, listenerPath));
107         }
108
109         final DOMDataTreeListenerWithSubshards subshardListener =
110                 new DOMDataTreeListenerWithSubshards(strippedIdentifier, listener);
111         final AbstractDOMDataTreeChangeListenerRegistration<L> reg =
112                 setupContextWithoutSubshards(listenerPath, strippedIdentifier, subshardListener);
113
114         for (final ChildShardContext maybeAffected : childShards.values()) {
115             if (listenerPath.contains(maybeAffected.getPrefix().getRootIdentifier())) {
116                 // consumer has initialDataChangeEvent subshard somewhere on lower level
117                 // register to the notification manager with snapshot and forward child notifications to parent
118                 LOG.debug("Adding new subshard{{}} to listener at {}", maybeAffected.getPrefix(), listenerPath);
119                 subshardListener.addSubshard(maybeAffected);
120             } else if (maybeAffected.getPrefix().getRootIdentifier().contains(listenerPath)) {
121                 // bind path is inside subshard
122                 // TODO can this happen? seems like in ShardedDOMDataTree we are
123                 // already registering to the lowest shard possible
124                 throw new UnsupportedOperationException("Listener should be registered directly "
125                         + "into initialDataChangeEvent subshard");
126             }
127         }
128
129         return reg;
130     }
131
132     private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
133             setupContextWithoutSubshards(final YangInstanceIdentifier shardLookup,
134                                          final YangInstanceIdentifier listenerPath,
135                                          final DOMDataTreeListenerWithSubshards listener) {
136
137         LOG.debug("Registering root listener full path: {}, path inside shard: {}", shardLookup, listenerPath);
138
139         // register in the shard tree
140         final RegistrationTreeNode<AbstractDOMDataTreeChangeListenerRegistration<?>> node =
141                 findNodeFor(listenerPath.getPathArguments());
142
143         // register listener in CDS
144         final ProxyRegistration proxyReg = new ProxyRegistration(distributedDataStore
145                 .registerProxyListener(shardLookup, listenerPath, listener), listener);
146
147         @SuppressWarnings("unchecked")
148         final AbstractDOMDataTreeChangeListenerRegistration<L> registration =
149             new AbstractDOMDataTreeChangeListenerRegistration<L>((L) listener) {
150                 @Override
151                 protected void removeRegistration() {
152                     listener.close();
153                     DistributedShardChangePublisher.this.removeRegistration(node, this);
154                     registrationRemoved(this);
155                     proxyReg.close();
156                 }
157             };
158         addRegistration(node, registration);
159
160         return registration;
161     }
162
163     private static Iterable<PathArgument> stripShardPath(final YangInstanceIdentifier shardPath,
164                                                          final YangInstanceIdentifier listenerPath) {
165         if (shardPath.isEmpty()) {
166             return listenerPath.getPathArguments();
167         }
168
169         final List<PathArgument> listenerPathArgs = new ArrayList<>(listenerPath.getPathArguments());
170         final Iterator<PathArgument> shardIter = shardPath.getPathArguments().iterator();
171         final Iterator<PathArgument> listenerIter = listenerPathArgs.iterator();
172
173         while (shardIter.hasNext()) {
174             if (shardIter.next().equals(listenerIter.next())) {
175                 listenerIter.remove();
176             } else {
177                 break;
178             }
179         }
180
181         return listenerPathArgs;
182     }
183
184     private static class ProxyRegistration implements ListenerRegistration<DOMDataTreeChangeListener> {
185
186         private final ListenerRegistration<org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy;
187         private final DOMDataTreeChangeListener listener;
188
189         private ProxyRegistration(
190                 final ListenerRegistration<
191                         org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy,
192                 final DOMDataTreeChangeListener listener) {
193             this.proxy = proxy;
194             this.listener = listener;
195         }
196
197         @Override
198         public DOMDataTreeChangeListener getInstance() {
199             return listener;
200         }
201
202         @Override
203         public void close() {
204             proxy.close();
205         }
206     }
207
208     synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath,
209             final Collection<DataTreeCandidate> changes) {
210         final DataTreeModification modification = dataTree.takeSnapshot().newModification();
211         for (final DataTreeCandidate change : changes) {
212             try {
213                 DataTreeCandidates.applyToModification(modification, change);
214             } catch (SchemaValidationFailedException e) {
215                 LOG.error("Validation failed {}", e);
216             }
217         }
218
219         modification.ready();
220
221         final DataTreeCandidate candidate;
222
223         try {
224             dataTree.validate(modification);
225         } catch (final DataValidationFailedException e) {
226             LOG.error("Validation failed for built modification, modification {}, current data tree: {}",
227                     modification, dataTree, e);
228             throw new RuntimeException("Notification validation failed", e);
229         }
230
231         // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree
232         candidate = dataTree.prepare(modification);
233         dataTree.commit(candidate);
234
235
236         DataTreeCandidateNode modifiedChild = candidate.getRootNode();
237
238         for (final PathArgument pathArgument : listenerPath.getPathArguments()) {
239             modifiedChild = modifiedChild.getModifiedChild(pathArgument);
240         }
241
242         if (modifiedChild == null) {
243             modifiedChild = new EmptyDataTreeCandidateNode(dataTree.getRootPath().getLastPathArgument());
244         }
245
246         return DataTreeCandidates.newDataTreeCandidate(dataTree.getRootPath(), modifiedChild);
247     }
248
249
250     private final class DOMDataTreeListenerWithSubshards implements DOMDataTreeChangeListener {
251
252         private final YangInstanceIdentifier listenerPath;
253         private final DOMDataTreeChangeListener delegate;
254
255         private final Map<YangInstanceIdentifier, ListenerRegistration<DOMDataTreeChangeListener>> registrations =
256                 new ConcurrentHashMap<>();
257
258         DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath,
259                                          final DOMDataTreeChangeListener delegate) {
260             this.listenerPath = Preconditions.checkNotNull(listenerPath);
261             this.delegate = Preconditions.checkNotNull(delegate);
262         }
263
264         @Override
265         public void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
266             LOG.debug("Received data changed {}", changes);
267             applyChanges(listenerPath, changes);
268             delegate.onDataTreeChanged(changes);
269         }
270
271         synchronized void onDataTreeChanged(final YangInstanceIdentifier pathFromRoot,
272                                             final Collection<DataTreeCandidate> changes) {
273             final YangInstanceIdentifier changeId =
274                     YangInstanceIdentifier.create(stripShardPath(dataTree.getRootPath(), pathFromRoot));
275
276             final List<DataTreeCandidate> newCandidates = changes.stream()
277                     .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode()))
278                     .collect(Collectors.toList());
279             delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
280         }
281
282         void addSubshard(final ChildShardContext context) {
283             Preconditions.checkState(context.getShard() instanceof DOMStoreTreeChangePublisher,
284                     "All subshards that are initialDataChangeEvent part of ListenerContext need to be listenable");
285
286             final DOMStoreTreeChangePublisher listenableShard = (DOMStoreTreeChangePublisher) context.getShard();
287             // since this is going into subshard we want to listen for ALL changes in the subshard
288             registrations.put(context.getPrefix().getRootIdentifier(),
289                     listenableShard.registerTreeChangeListener(
290                             context.getPrefix().getRootIdentifier(), changes -> onDataTreeChanged(
291                                     context.getPrefix().getRootIdentifier(), changes)));
292         }
293
294         void close() {
295             for (final ListenerRegistration<DOMDataTreeChangeListener> registration : registrations.values()) {
296                 registration.close();
297             }
298             registrations.clear();
299         }
300     }
301
302     private static final class EmptyDataTreeCandidateNode implements DataTreeCandidateNode {
303
304         private final PathArgument identifier;
305
306         EmptyDataTreeCandidateNode(final PathArgument identifier) {
307             this.identifier = Preconditions.checkNotNull(identifier, "Identifier should not be null");
308         }
309
310         @Nonnull
311         @Override
312         public PathArgument getIdentifier() {
313             return identifier;
314         }
315
316         @Nonnull
317         @Override
318         public Collection<DataTreeCandidateNode> getChildNodes() {
319             return Collections.emptySet();
320         }
321
322         @Nullable
323         @Override
324         public DataTreeCandidateNode getModifiedChild(final PathArgument identifier) {
325             return null;
326         }
327
328         @Nonnull
329         @Override
330         public ModificationType getModificationType() {
331             return ModificationType.UNMODIFIED;
332         }
333
334         @Nonnull
335         @Override
336         public Optional<NormalizedNode<?, ?>> getDataAfter() {
337             return Optional.absent();
338         }
339
340         @Nonnull
341         @Override
342         public Optional<NormalizedNode<?, ?>> getDataBefore() {
343             return Optional.absent();
344         }
345     }
346 }