2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.sharding;
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.util.Iterator;
17 import java.util.LinkedList;
18 import java.util.List;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.stream.Collectors;
22 import org.checkerframework.checker.lock.qual.GuardedBy;
23 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
24 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
25 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
27 import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
28 import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree;
29 import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode;
30 import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext;
31 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
32 import org.opendaylight.yangtools.concepts.ListenerRegistration;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
37 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
42 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
43 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
44 import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
48 @Deprecated(forRemoval = true)
49 public class DistributedShardChangePublisher
50 extends AbstractRegistrationTree<AbstractDOMDataTreeChangeListenerRegistration<?>>
51 implements DOMStoreTreeChangePublisher {
53 private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class);
55 private final DistributedDataStoreInterface distributedDataStore;
56 private final YangInstanceIdentifier shardPath;
58 private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards;
61 private final DataTree dataTree;
63 public DistributedShardChangePublisher(final DataStoreClient client,
64 final DistributedDataStoreInterface distributedDataStore,
65 final DOMDataTreeIdentifier prefix,
66 final Map<DOMDataTreeIdentifier, ChildShardContext> childShards) {
67 this.distributedDataStore = distributedDataStore;
68 // TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea
69 // maybe the whole listener logic would be better in the backend shards where we have direct access to the
70 // dataTree and wont have to cache it redundantly.
72 final DataTreeConfiguration baseConfig;
73 switch (prefix.getDatastoreType()) {
75 baseConfig = DataTreeConfiguration.DEFAULT_CONFIGURATION;
78 baseConfig = DataTreeConfiguration.DEFAULT_OPERATIONAL;
81 throw new UnsupportedOperationException("Unknown prefix type " + prefix.getDatastoreType());
84 this.dataTree = new InMemoryDataTreeFactory().create(new DataTreeConfiguration.Builder(baseConfig.getTreeType())
85 .setMandatoryNodesValidation(baseConfig.isMandatoryNodesValidationEnabled())
86 .setUniqueIndexes(baseConfig.isUniqueIndexEnabled())
87 .setRootPath(prefix.getRootIdentifier())
90 // XXX: can we guarantee that the root is present in the schemacontext?
91 this.dataTree.setEffectiveModelContext(distributedDataStore.getActorUtils().getSchemaContext());
92 this.shardPath = prefix.getRootIdentifier();
93 this.childShards = childShards;
96 protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
97 LOG.debug("Closing registration {}", registration);
101 public <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
102 registerTreeChangeListener(final YangInstanceIdentifier path, final L listener) {
105 return setupListenerContext(path, listener);
111 private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
112 setupListenerContext(final YangInstanceIdentifier listenerPath, final L listener) {
113 // we need to register the listener registration path based on the shards root
114 // we have to strip the shard path from the listener path and then register
115 YangInstanceIdentifier strippedIdentifier = listenerPath;
116 if (!shardPath.isEmpty()) {
117 strippedIdentifier = YangInstanceIdentifier.create(stripShardPath(shardPath, listenerPath));
120 final DOMDataTreeListenerWithSubshards subshardListener =
121 new DOMDataTreeListenerWithSubshards(strippedIdentifier, listener);
122 final AbstractDOMDataTreeChangeListenerRegistration<L> reg =
123 setupContextWithoutSubshards(listenerPath, strippedIdentifier, subshardListener);
125 for (final ChildShardContext maybeAffected : childShards.values()) {
126 if (listenerPath.contains(maybeAffected.getPrefix().getRootIdentifier())) {
127 // consumer has initialDataChangeEvent subshard somewhere on lower level
128 // register to the notification manager with snapshot and forward child notifications to parent
129 LOG.debug("Adding new subshard{{}} to listener at {}", maybeAffected.getPrefix(), listenerPath);
130 subshardListener.addSubshard(maybeAffected);
131 } else if (maybeAffected.getPrefix().getRootIdentifier().contains(listenerPath)) {
132 // bind path is inside subshard
133 // TODO can this happen? seems like in ShardedDOMDataTree we are
134 // already registering to the lowest shard possible
135 throw new UnsupportedOperationException("Listener should be registered directly "
136 + "into initialDataChangeEvent subshard");
143 private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
144 setupContextWithoutSubshards(final YangInstanceIdentifier shardLookup,
145 final YangInstanceIdentifier listenerPath,
146 final DOMDataTreeListenerWithSubshards listener) {
148 LOG.debug("Registering root listener full path: {}, path inside shard: {}", shardLookup, listenerPath);
150 // register in the shard tree
151 final RegistrationTreeNode<AbstractDOMDataTreeChangeListenerRegistration<?>> node =
152 findNodeFor(listenerPath.getPathArguments());
154 // register listener in CDS
155 ListenerRegistration<DOMDataTreeChangeListener> listenerReg = distributedDataStore
156 .registerProxyListener(shardLookup, listenerPath, listener);
158 @SuppressWarnings("unchecked")
159 final AbstractDOMDataTreeChangeListenerRegistration<L> registration =
160 new AbstractDOMDataTreeChangeListenerRegistration<>((L) listener) {
162 protected void removeRegistration() {
164 DistributedShardChangePublisher.this.removeRegistration(node, this);
165 registrationRemoved(this);
169 addRegistration(node, registration);
174 private static Iterable<PathArgument> stripShardPath(final YangInstanceIdentifier shardPath,
175 final YangInstanceIdentifier listenerPath) {
176 if (shardPath.isEmpty()) {
177 return listenerPath.getPathArguments();
180 final List<PathArgument> listenerPathArgs = new ArrayList<>(listenerPath.getPathArguments());
181 final Iterator<PathArgument> shardIter = shardPath.getPathArguments().iterator();
182 final Iterator<PathArgument> listenerIter = listenerPathArgs.iterator();
184 while (shardIter.hasNext()) {
185 if (shardIter.next().equals(listenerIter.next())) {
186 listenerIter.remove();
192 return listenerPathArgs;
195 synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath,
196 final Collection<DataTreeCandidate> changes) throws DataValidationFailedException {
197 final DataTreeModification modification = dataTree.takeSnapshot().newModification();
198 for (final DataTreeCandidate change : changes) {
200 DataTreeCandidates.applyToModification(modification, change);
201 } catch (SchemaValidationFailedException e) {
202 LOG.error("Validation failed", e);
206 modification.ready();
208 final DataTreeCandidate candidate;
210 dataTree.validate(modification);
212 // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree
213 candidate = dataTree.prepare(modification);
214 dataTree.commit(candidate);
217 DataTreeCandidateNode modifiedChild = candidate.getRootNode();
219 for (final PathArgument pathArgument : listenerPath.getPathArguments()) {
220 modifiedChild = modifiedChild.getModifiedChild(pathArgument).orElse(null);
224 if (modifiedChild == null) {
225 modifiedChild = DataTreeCandidateNodes.empty(dataTree.getRootPath().getLastPathArgument());
228 return DataTreeCandidates.newDataTreeCandidate(dataTree.getRootPath(), modifiedChild);
232 private final class DOMDataTreeListenerWithSubshards implements DOMDataTreeChangeListener {
234 private final YangInstanceIdentifier listenerPath;
235 private final DOMDataTreeChangeListener delegate;
236 private final Map<YangInstanceIdentifier, ListenerRegistration<DOMDataTreeChangeListener>> registrations =
237 new ConcurrentHashMap<>();
240 private final Collection<DataTreeCandidate> stashedDataTreeCandidates = new LinkedList<>();
242 DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath,
243 final DOMDataTreeChangeListener delegate) {
244 this.listenerPath = requireNonNull(listenerPath);
245 this.delegate = requireNonNull(delegate);
249 public synchronized void onDataTreeChanged(final Collection<DataTreeCandidate> changes) {
250 LOG.debug("Received data changed {}", changes);
252 if (!stashedDataTreeCandidates.isEmpty()) {
253 LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates);
254 changes.addAll(stashedDataTreeCandidates);
255 stashedDataTreeCandidates.clear();
259 applyChanges(listenerPath, changes);
260 } catch (final DataValidationFailedException e) {
261 // TODO should we fail here? What if stashed changes
262 // (changes from subshards) got ahead more than one generation
263 // from current shard. Than we can fail to apply this changes
264 // upon current data tree, but once we get respective changes
265 // from current shard, we can apply also changes from
268 // However, we can loose ability to notice and report some
269 // errors then. For example, we cannot detect potential lost
270 // changes from current shard.
271 LOG.error("Validation failed for modification built from changes {}, current data tree: {}",
272 changes, dataTree, e);
273 throw new RuntimeException("Notification validation failed", e);
276 delegate.onDataTreeChanged(changes);
279 synchronized void onDataTreeChanged(final YangInstanceIdentifier pathFromRoot,
280 final Collection<DataTreeCandidate> changes) {
281 final YangInstanceIdentifier changeId =
282 YangInstanceIdentifier.create(stripShardPath(dataTree.getRootPath(), pathFromRoot));
284 final List<DataTreeCandidate> newCandidates = changes.stream()
285 .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode()))
286 .collect(Collectors.toList());
289 delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
290 } catch (final DataValidationFailedException e) {
291 // We cannot apply changes from subshard to current data tree.
292 // Maybe changes from current shard haven't been applied to
293 // data tree yet. Postpone processing of these changes till we
294 // receive changes from current shard.
295 LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.",
296 pathFromRoot, changes, dataTree, e);
297 stashedDataTreeCandidates.addAll(newCandidates);
301 void addSubshard(final ChildShardContext context) {
302 checkState(context.getShard() instanceof DOMStoreTreeChangePublisher,
303 "All subshards that are initialDataChangeEvent part of ListenerContext need to be listenable");
305 final DOMStoreTreeChangePublisher listenableShard = (DOMStoreTreeChangePublisher) context.getShard();
306 // since this is going into subshard we want to listen for ALL changes in the subshard
307 registrations.put(context.getPrefix().getRootIdentifier(),
308 listenableShard.registerTreeChangeListener(
309 context.getPrefix().getRootIdentifier(), changes -> onDataTreeChanged(
310 context.getPrefix().getRootIdentifier(), changes)));
314 for (final ListenerRegistration<DOMDataTreeChangeListener> registration : registrations.values()) {
315 registration.close();
317 registrations.clear();