2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import com.google.common.base.Preconditions;
12 import java.util.ArrayList;
13 import java.util.Collection;
14 import java.util.Collections;
15 import java.util.Iterator;
16 import java.util.LinkedList;
17 import java.util.List;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.stream.Collectors;
21 import javax.annotation.Nonnull;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
24 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
25 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
27 import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
28 import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree;
29 import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode;
30 import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext;
31 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
32 import org.opendaylight.yangtools.concepts.ListenerRegistration;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
37 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
42 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
43 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
44 import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
48 final class DistributedShardChangePublisher
49 extends AbstractRegistrationTree<AbstractDOMDataTreeChangeListenerRegistration<?>>
50 implements DOMStoreTreeChangePublisher {
52 private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class);
54 private final AbstractDataStore distributedDataStore;
55 private final YangInstanceIdentifier shardPath;
57 // This will be useful for signaling back pressure
58 private final DataStoreClient client;
60 private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards;
63 private final DataTree dataTree;
65 DistributedShardChangePublisher(final DataStoreClient client, final AbstractDataStore distributedDataStore,
66 final DOMDataTreeIdentifier prefix, final Map<DOMDataTreeIdentifier, ChildShardContext> childShards) {
68 this.distributedDataStore = distributedDataStore;
69 // TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea
70 // maybe the whole listener logic would be better in the backend shards where we have direct access to the
71 // dataTree and wont have to cache it redundantly.
72 this.dataTree = InMemoryDataTreeFactory.getInstance().create(
73 TreeType.valueOf(prefix.getDatastoreType().name()), prefix.getRootIdentifier());
75 dataTree.setSchemaContext(distributedDataStore.getActorContext().getSchemaContext());
77 this.shardPath = prefix.getRootIdentifier();
78 this.childShards = childShards;
81 protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
82 LOG.debug("Closing registration {}", registration);
86 public <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
87 registerTreeChangeListener(final YangInstanceIdentifier path, final L listener) {
90 return setupListenerContext(path, listener);
96 private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
97 setupListenerContext(final YangInstanceIdentifier listenerPath, final L listener) {
98 // we need to register the listener registration path based on the shards root
99 // we have to strip the shard path from the listener path and then register
100 YangInstanceIdentifier strippedIdentifier = listenerPath;
101 if (!shardPath.isEmpty()) {
102 strippedIdentifier = YangInstanceIdentifier.create(stripShardPath(shardPath, listenerPath));
105 final DOMDataTreeListenerWithSubshards subshardListener =
106 new DOMDataTreeListenerWithSubshards(strippedIdentifier, listener);
107 final AbstractDOMDataTreeChangeListenerRegistration<L> reg =
108 setupContextWithoutSubshards(listenerPath, strippedIdentifier, subshardListener);
110 for (final ChildShardContext maybeAffected : childShards.values()) {
111 if (listenerPath.contains(maybeAffected.getPrefix().getRootIdentifier())) {
112 // consumer has initialDataChangeEvent subshard somewhere on lower level
113 // register to the notification manager with snapshot and forward child notifications to parent
114 LOG.debug("Adding new subshard{{}} to listener at {}", maybeAffected.getPrefix(), listenerPath);
115 subshardListener.addSubshard(maybeAffected);
116 } else if (maybeAffected.getPrefix().getRootIdentifier().contains(listenerPath)) {
117 // bind path is inside subshard
118 // TODO can this happen? seems like in ShardedDOMDataTree we are
119 // already registering to the lowest shard possible
120 throw new UnsupportedOperationException("Listener should be registered directly "
121 + "into initialDataChangeEvent subshard");
128 private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
129 setupContextWithoutSubshards(final YangInstanceIdentifier shardLookup,
130 final YangInstanceIdentifier listenerPath,
131 final DOMDataTreeListenerWithSubshards listener) {
133 LOG.debug("Registering root listener full path: {}, path inside shard: {}", shardLookup, listenerPath);
135 // register in the shard tree
136 final RegistrationTreeNode<AbstractDOMDataTreeChangeListenerRegistration<?>> node =
137 findNodeFor(listenerPath.getPathArguments());
139 // register listener in CDS
140 final ListenerRegistration<?> dsReg = distributedDataStore.registerProxyListener(shardLookup, listenerPath,
143 @SuppressWarnings("unchecked")
144 final AbstractDOMDataTreeChangeListenerRegistration<L> registration =
145 new AbstractDOMDataTreeChangeListenerRegistration<L>((L) listener) {
147 protected void removeRegistration() {
149 DistributedShardChangePublisher.this.removeRegistration(node, this);
150 registrationRemoved(this);
154 addRegistration(node, registration);
159 private static Iterable<PathArgument> stripShardPath(final YangInstanceIdentifier shardPath,
160 final YangInstanceIdentifier listenerPath) {
161 if (shardPath.isEmpty()) {
162 return listenerPath.getPathArguments();
165 final List<PathArgument> listenerPathArgs = new ArrayList<>(listenerPath.getPathArguments());
166 final Iterator<PathArgument> shardIter = shardPath.getPathArguments().iterator();
167 final Iterator<PathArgument> listenerIter = listenerPathArgs.iterator();
169 while (shardIter.hasNext()) {
170 if (shardIter.next().equals(listenerIter.next())) {
171 listenerIter.remove();
177 return listenerPathArgs;
180 synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath,
181 final Collection<DataTreeCandidate> changes) throws DataValidationFailedException {
182 final DataTreeModification modification = dataTree.takeSnapshot().newModification();
183 for (final DataTreeCandidate change : changes) {
185 DataTreeCandidates.applyToModification(modification, change);
186 } catch (SchemaValidationFailedException e) {
187 LOG.error("Validation failed {}", e);
191 modification.ready();
193 final DataTreeCandidate candidate;
195 dataTree.validate(modification);
197 // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree
198 candidate = dataTree.prepare(modification);
199 dataTree.commit(candidate);
202 DataTreeCandidateNode modifiedChild = candidate.getRootNode();
204 for (final PathArgument pathArgument : listenerPath.getPathArguments()) {
205 modifiedChild = modifiedChild.getModifiedChild(pathArgument);
208 if (modifiedChild == null) {
209 modifiedChild = DataTreeCandidateNodes.empty(dataTree.getRootPath().getLastPathArgument());
212 return DataTreeCandidates.newDataTreeCandidate(dataTree.getRootPath(), modifiedChild);
216 private final class DOMDataTreeListenerWithSubshards implements DOMDataTreeChangeListener {
218 private final YangInstanceIdentifier listenerPath;
219 private final DOMDataTreeChangeListener delegate;
220 private final Map<YangInstanceIdentifier, ListenerRegistration<DOMDataTreeChangeListener>> registrations =
221 new ConcurrentHashMap<>();
224 private final Collection<DataTreeCandidate> stashedDataTreeCandidates = new LinkedList<>();
226 DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath,
227 final DOMDataTreeChangeListener delegate) {
228 this.listenerPath = Preconditions.checkNotNull(listenerPath);
229 this.delegate = Preconditions.checkNotNull(delegate);
233 public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
234 LOG.debug("Received data changed {}", changes);
236 if (!stashedDataTreeCandidates.isEmpty()) {
237 LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates);
238 changes.addAll(stashedDataTreeCandidates);
239 stashedDataTreeCandidates.clear();
243 applyChanges(listenerPath, changes);
244 } catch (final DataValidationFailedException e) {
245 // TODO should we fail here? What if stashed changes
246 // (changes from subshards) got ahead more than one generation
247 // from current shard. Than we can fail to apply this changes
248 // upon current data tree, but once we get respective changes
249 // from current shard, we can apply also changes from
252 // However, we can loose ability to notice and report some
253 // errors then. For example, we cannot detect potential lost
254 // changes from current shard.
255 LOG.error("Validation failed for modification built from changes {}, current data tree: {}",
256 changes, dataTree, e);
257 throw new RuntimeException("Notification validation failed", e);
260 delegate.onDataTreeChanged(changes);
263 synchronized void onDataTreeChanged(final YangInstanceIdentifier pathFromRoot,
264 final Collection<DataTreeCandidate> changes) {
265 final YangInstanceIdentifier changeId =
266 YangInstanceIdentifier.create(stripShardPath(dataTree.getRootPath(), pathFromRoot));
268 final List<DataTreeCandidate> newCandidates = changes.stream()
269 .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode()))
270 .collect(Collectors.toList());
273 delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
274 } catch (final DataValidationFailedException e) {
275 // We cannot apply changes from subshard to current data tree.
276 // Maybe changes from current shard haven't been applied to
277 // data tree yet. Postpone processing of these changes till we
278 // receive changes from current shard.
279 LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.",
280 pathFromRoot, changes, dataTree);
281 stashedDataTreeCandidates.addAll(newCandidates);
285 void addSubshard(final ChildShardContext context) {
286 Preconditions.checkState(context.getShard() instanceof DOMStoreTreeChangePublisher,
287 "All subshards that are initialDataChangeEvent part of ListenerContext need to be listenable");
289 final DOMStoreTreeChangePublisher listenableShard = (DOMStoreTreeChangePublisher) context.getShard();
290 // since this is going into subshard we want to listen for ALL changes in the subshard
291 registrations.put(context.getPrefix().getRootIdentifier(),
292 listenableShard.registerTreeChangeListener(
293 context.getPrefix().getRootIdentifier(), changes -> onDataTreeChanged(
294 context.getPrefix().getRootIdentifier(), changes)));
298 for (final ListenerRegistration<DOMDataTreeChangeListener> registration : registrations.values()) {
299 registration.close();
301 registrations.clear();