8981988196ce5786edf0e40e2c74defc590235a2
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / DistributedShardChangePublisher.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.sharding;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.util.Iterator;
17 import java.util.LinkedList;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.stream.Collectors;
22 import org.checkerframework.checker.lock.qual.GuardedBy;
23 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
24 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
25 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
27 import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
28 import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree;
29 import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode;
30 import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext;
31 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
32 import org.opendaylight.yangtools.concepts.ListenerRegistration;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
37 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
42 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
43 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
44 import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 public class DistributedShardChangePublisher
49         extends AbstractRegistrationTree<AbstractDOMDataTreeChangeListenerRegistration<?>>
50         implements DOMStoreTreeChangePublisher {
51
52     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class);
53
54     private final AbstractDataStore distributedDataStore;
55     private final YangInstanceIdentifier shardPath;
56
57     private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards;
58
59     @GuardedBy("this")
60     private final DataTree dataTree;
61
62     public DistributedShardChangePublisher(final DataStoreClient client,
63                                            final AbstractDataStore distributedDataStore,
64                                            final DOMDataTreeIdentifier prefix,
65                                            final Map<DOMDataTreeIdentifier, ChildShardContext> childShards) {
66         this.distributedDataStore = distributedDataStore;
67         // TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea
68         // maybe the whole listener logic would be better in the backend shards where we have direct access to the
69         // dataTree and wont have to cache it redundantly.
70
71         final DataTreeConfiguration baseConfig;
72         switch (prefix.getDatastoreType()) {
73             case CONFIGURATION:
74                 baseConfig = DataTreeConfiguration.DEFAULT_CONFIGURATION;
75                 break;
76             case OPERATIONAL:
77                 baseConfig = DataTreeConfiguration.DEFAULT_OPERATIONAL;
78                 break;
79             default:
80                 throw new UnsupportedOperationException("Unknown prefix type " + prefix.getDatastoreType());
81         }
82
83         this.dataTree = new InMemoryDataTreeFactory().create(new DataTreeConfiguration.Builder(baseConfig.getTreeType())
84                 .setMandatoryNodesValidation(baseConfig.isMandatoryNodesValidationEnabled())
85                 .setUniqueIndexes(baseConfig.isUniqueIndexEnabled())
86                 .setRootPath(prefix.getRootIdentifier())
87                 .build());
88
89         // XXX: can we guarantee that the root is present in the schemacontext?
90         this.dataTree.setEffectiveModelContext(distributedDataStore.getActorUtils().getSchemaContext());
91         this.shardPath = prefix.getRootIdentifier();
92         this.childShards = childShards;
93     }
94
95     protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
96         LOG.debug("Closing registration {}", registration);
97     }
98
99     @Override
100     public <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
101             registerTreeChangeListener(final YangInstanceIdentifier path, final L listener) {
102         takeLock();
103         try {
104             return setupListenerContext(path, listener);
105         } finally {
106             releaseLock();
107         }
108     }
109
110     private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
111             setupListenerContext(final YangInstanceIdentifier listenerPath, final L listener) {
112         // we need to register the listener registration path based on the shards root
113         // we have to strip the shard path from the listener path and then register
114         YangInstanceIdentifier strippedIdentifier = listenerPath;
115         if (!shardPath.isEmpty()) {
116             strippedIdentifier = YangInstanceIdentifier.create(stripShardPath(shardPath, listenerPath));
117         }
118
119         final DOMDataTreeListenerWithSubshards subshardListener =
120                 new DOMDataTreeListenerWithSubshards(strippedIdentifier, listener);
121         final AbstractDOMDataTreeChangeListenerRegistration<L> reg =
122                 setupContextWithoutSubshards(listenerPath, strippedIdentifier, subshardListener);
123
124         for (final ChildShardContext maybeAffected : childShards.values()) {
125             if (listenerPath.contains(maybeAffected.getPrefix().getRootIdentifier())) {
126                 // consumer has initialDataChangeEvent subshard somewhere on lower level
127                 // register to the notification manager with snapshot and forward child notifications to parent
128                 LOG.debug("Adding new subshard{{}} to listener at {}", maybeAffected.getPrefix(), listenerPath);
129                 subshardListener.addSubshard(maybeAffected);
130             } else if (maybeAffected.getPrefix().getRootIdentifier().contains(listenerPath)) {
131                 // bind path is inside subshard
132                 // TODO can this happen? seems like in ShardedDOMDataTree we are
133                 // already registering to the lowest shard possible
134                 throw new UnsupportedOperationException("Listener should be registered directly "
135                         + "into initialDataChangeEvent subshard");
136             }
137         }
138
139         return reg;
140     }
141
142     private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
143             setupContextWithoutSubshards(final YangInstanceIdentifier shardLookup,
144                                          final YangInstanceIdentifier listenerPath,
145                                          final DOMDataTreeListenerWithSubshards listener) {
146
147         LOG.debug("Registering root listener full path: {}, path inside shard: {}", shardLookup, listenerPath);
148
149         // register in the shard tree
150         final RegistrationTreeNode<AbstractDOMDataTreeChangeListenerRegistration<?>> node =
151                 findNodeFor(listenerPath.getPathArguments());
152
153         // register listener in CDS
154         ListenerRegistration<DOMDataTreeChangeListener> listenerReg = distributedDataStore
155                 .registerProxyListener(shardLookup, listenerPath, listener);
156
157         @SuppressWarnings("unchecked")
158         final AbstractDOMDataTreeChangeListenerRegistration<L> registration =
159             new AbstractDOMDataTreeChangeListenerRegistration<>((L) listener) {
160                 @Override
161                 protected void removeRegistration() {
162                     listener.close();
163                     DistributedShardChangePublisher.this.removeRegistration(node, this);
164                     registrationRemoved(this);
165                     listenerReg.close();
166                 }
167             };
168         addRegistration(node, registration);
169
170         return registration;
171     }
172
173     private static Iterable<PathArgument> stripShardPath(final YangInstanceIdentifier shardPath,
174                                                          final YangInstanceIdentifier listenerPath) {
175         if (shardPath.isEmpty()) {
176             return listenerPath.getPathArguments();
177         }
178
179         final List<PathArgument> listenerPathArgs = new ArrayList<>(listenerPath.getPathArguments());
180         final Iterator<PathArgument> shardIter = shardPath.getPathArguments().iterator();
181         final Iterator<PathArgument> listenerIter = listenerPathArgs.iterator();
182
183         while (shardIter.hasNext()) {
184             if (shardIter.next().equals(listenerIter.next())) {
185                 listenerIter.remove();
186             } else {
187                 break;
188             }
189         }
190
191         return listenerPathArgs;
192     }
193
194     synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath,
195             final Collection<DataTreeCandidate> changes) throws DataValidationFailedException {
196         final DataTreeModification modification = dataTree.takeSnapshot().newModification();
197         for (final DataTreeCandidate change : changes) {
198             try {
199                 DataTreeCandidates.applyToModification(modification, change);
200             } catch (SchemaValidationFailedException e) {
201                 LOG.error("Validation failed", e);
202             }
203         }
204
205         modification.ready();
206
207         final DataTreeCandidate candidate;
208
209         dataTree.validate(modification);
210
211         // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree
212         candidate = dataTree.prepare(modification);
213         dataTree.commit(candidate);
214
215
216         DataTreeCandidateNode modifiedChild = candidate.getRootNode();
217
218         for (final PathArgument pathArgument : listenerPath.getPathArguments()) {
219             modifiedChild = modifiedChild.getModifiedChild(pathArgument).orElse(null);
220         }
221
222
223         if (modifiedChild == null) {
224             modifiedChild = DataTreeCandidateNodes.empty(dataTree.getRootPath().getLastPathArgument());
225         }
226
227         return DataTreeCandidates.newDataTreeCandidate(dataTree.getRootPath(), modifiedChild);
228     }
229
230
231     private final class DOMDataTreeListenerWithSubshards implements DOMDataTreeChangeListener {
232
233         private final YangInstanceIdentifier listenerPath;
234         private final DOMDataTreeChangeListener delegate;
235         private final Map<YangInstanceIdentifier, ListenerRegistration<DOMDataTreeChangeListener>> registrations =
236                 new ConcurrentHashMap<>();
237
238         @GuardedBy("this")
239         private final Collection<DataTreeCandidate> stashedDataTreeCandidates = new LinkedList<>();
240
241         DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath,
242                                          final DOMDataTreeChangeListener delegate) {
243             this.listenerPath = requireNonNull(listenerPath);
244             this.delegate = requireNonNull(delegate);
245         }
246
247         @Override
248         public synchronized void onDataTreeChanged(final Collection<DataTreeCandidate> changes) {
249             LOG.debug("Received data changed {}", changes);
250
251             if (!stashedDataTreeCandidates.isEmpty()) {
252                 LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates);
253                 changes.addAll(stashedDataTreeCandidates);
254                 stashedDataTreeCandidates.clear();
255             }
256
257             try {
258                 applyChanges(listenerPath, changes);
259             } catch (final DataValidationFailedException e) {
260                 // TODO should we fail here? What if stashed changes
261                 // (changes from subshards) got ahead more than one generation
262                 // from current shard. Than we can fail to apply this changes
263                 // upon current data tree, but once we get respective changes
264                 // from current shard, we can apply also changes from
265                 // subshards.
266                 //
267                 // However, we can loose ability to notice and report some
268                 // errors then. For example, we cannot detect potential lost
269                 // changes from current shard.
270                 LOG.error("Validation failed for modification built from changes {}, current data tree: {}",
271                         changes, dataTree, e);
272                 throw new RuntimeException("Notification validation failed", e);
273             }
274
275             delegate.onDataTreeChanged(changes);
276         }
277
278         synchronized void onDataTreeChanged(final YangInstanceIdentifier pathFromRoot,
279                                             final Collection<DataTreeCandidate> changes) {
280             final YangInstanceIdentifier changeId =
281                     YangInstanceIdentifier.create(stripShardPath(dataTree.getRootPath(), pathFromRoot));
282
283             final List<DataTreeCandidate> newCandidates = changes.stream()
284                     .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode()))
285                     .collect(Collectors.toList());
286
287             try {
288                 delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
289             } catch (final DataValidationFailedException e) {
290                 // We cannot apply changes from subshard to current data tree.
291                 // Maybe changes from current shard haven't been applied to
292                 // data tree yet. Postpone processing of these changes till we
293                 // receive changes from current shard.
294                 LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.",
295                         pathFromRoot, changes, dataTree, e);
296                 stashedDataTreeCandidates.addAll(newCandidates);
297             }
298         }
299
300         void addSubshard(final ChildShardContext context) {
301             checkState(context.getShard() instanceof DOMStoreTreeChangePublisher,
302                     "All subshards that are initialDataChangeEvent part of ListenerContext need to be listenable");
303
304             final DOMStoreTreeChangePublisher listenableShard = (DOMStoreTreeChangePublisher) context.getShard();
305             // since this is going into subshard we want to listen for ALL changes in the subshard
306             registrations.put(context.getPrefix().getRootIdentifier(),
307                     listenableShard.registerTreeChangeListener(
308                             context.getPrefix().getRootIdentifier(), changes -> onDataTreeChanged(
309                                     context.getPrefix().getRootIdentifier(), changes)));
310         }
311
312         void close() {
313             for (final ListenerRegistration<DOMDataTreeChangeListener> registration : registrations.values()) {
314                 registration.close();
315             }
316             registrations.clear();
317         }
318     }
319 }