2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import com.google.common.base.Preconditions;
12 import java.util.ArrayList;
13 import java.util.Collection;
14 import java.util.Collections;
15 import java.util.Iterator;
16 import java.util.LinkedList;
17 import java.util.List;
19 import java.util.Optional;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.stream.Collectors;
22 import javax.annotation.Nonnull;
23 import javax.annotation.Nullable;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
26 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
28 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
29 import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
30 import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree;
31 import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode;
32 import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext;
33 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
34 import org.opendaylight.yangtools.concepts.ListenerRegistration;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
36 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
37 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
42 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
43 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
44 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
45 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
46 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
47 import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 public class DistributedShardChangePublisher
52 extends AbstractRegistrationTree<AbstractDOMDataTreeChangeListenerRegistration<?>>
53 implements DOMStoreTreeChangePublisher {
55 private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class);
57 private final AbstractDataStore distributedDataStore;
58 private final YangInstanceIdentifier shardPath;
60 private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards;
63 private final DataTree dataTree;
65 public DistributedShardChangePublisher(final DataStoreClient client,
66 final AbstractDataStore distributedDataStore,
67 final DOMDataTreeIdentifier prefix,
68 final Map<DOMDataTreeIdentifier, ChildShardContext> childShards) {
69 this.distributedDataStore = distributedDataStore;
70 // TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea
71 // maybe the whole listener logic would be better in the backend shards where we have direct access to the
72 // dataTree and wont have to cache it redundantly.
74 final DataTreeConfiguration baseConfig;
75 switch (prefix.getDatastoreType()) {
77 baseConfig = DataTreeConfiguration.DEFAULT_CONFIGURATION;
80 baseConfig = DataTreeConfiguration.DEFAULT_OPERATIONAL;
83 throw new UnsupportedOperationException("Unknown prefix type " + prefix.getDatastoreType());
86 this.dataTree = new InMemoryDataTreeFactory().create(new DataTreeConfiguration.Builder(baseConfig.getTreeType())
87 .setMandatoryNodesValidation(baseConfig.isMandatoryNodesValidationEnabled())
88 .setUniqueIndexes(baseConfig.isUniqueIndexEnabled())
89 .setRootPath(prefix.getRootIdentifier())
92 // XXX: can we guarantee that the root is present in the schemacontext?
93 this.dataTree.setSchemaContext(distributedDataStore.getActorUtils().getSchemaContext());
94 this.shardPath = prefix.getRootIdentifier();
95 this.childShards = childShards;
98 protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
99 LOG.debug("Closing registration {}", registration);
103 public <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
104 registerTreeChangeListener(final YangInstanceIdentifier path, final L listener) {
107 return setupListenerContext(path, listener);
113 private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
114 setupListenerContext(final YangInstanceIdentifier listenerPath, final L listener) {
115 // we need to register the listener registration path based on the shards root
116 // we have to strip the shard path from the listener path and then register
117 YangInstanceIdentifier strippedIdentifier = listenerPath;
118 if (!shardPath.isEmpty()) {
119 strippedIdentifier = YangInstanceIdentifier.create(stripShardPath(shardPath, listenerPath));
122 final DOMDataTreeListenerWithSubshards subshardListener =
123 new DOMDataTreeListenerWithSubshards(strippedIdentifier, listener);
124 final AbstractDOMDataTreeChangeListenerRegistration<L> reg =
125 setupContextWithoutSubshards(listenerPath, strippedIdentifier, subshardListener);
127 for (final ChildShardContext maybeAffected : childShards.values()) {
128 if (listenerPath.contains(maybeAffected.getPrefix().getRootIdentifier())) {
129 // consumer has initialDataChangeEvent subshard somewhere on lower level
130 // register to the notification manager with snapshot and forward child notifications to parent
131 LOG.debug("Adding new subshard{{}} to listener at {}", maybeAffected.getPrefix(), listenerPath);
132 subshardListener.addSubshard(maybeAffected);
133 } else if (maybeAffected.getPrefix().getRootIdentifier().contains(listenerPath)) {
134 // bind path is inside subshard
135 // TODO can this happen? seems like in ShardedDOMDataTree we are
136 // already registering to the lowest shard possible
137 throw new UnsupportedOperationException("Listener should be registered directly "
138 + "into initialDataChangeEvent subshard");
145 private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
146 setupContextWithoutSubshards(final YangInstanceIdentifier shardLookup,
147 final YangInstanceIdentifier listenerPath,
148 final DOMDataTreeListenerWithSubshards listener) {
150 LOG.debug("Registering root listener full path: {}, path inside shard: {}", shardLookup, listenerPath);
152 // register in the shard tree
153 final RegistrationTreeNode<AbstractDOMDataTreeChangeListenerRegistration<?>> node =
154 findNodeFor(listenerPath.getPathArguments());
156 // register listener in CDS
157 ListenerRegistration<DOMDataTreeChangeListener> listenerReg = distributedDataStore
158 .registerProxyListener(shardLookup, listenerPath, listener);
160 @SuppressWarnings("unchecked")
161 final AbstractDOMDataTreeChangeListenerRegistration<L> registration =
162 new AbstractDOMDataTreeChangeListenerRegistration<L>((L) listener) {
164 protected void removeRegistration() {
166 DistributedShardChangePublisher.this.removeRegistration(node, this);
167 registrationRemoved(this);
171 addRegistration(node, registration);
176 private static Iterable<PathArgument> stripShardPath(final YangInstanceIdentifier shardPath,
177 final YangInstanceIdentifier listenerPath) {
178 if (shardPath.isEmpty()) {
179 return listenerPath.getPathArguments();
182 final List<PathArgument> listenerPathArgs = new ArrayList<>(listenerPath.getPathArguments());
183 final Iterator<PathArgument> shardIter = shardPath.getPathArguments().iterator();
184 final Iterator<PathArgument> listenerIter = listenerPathArgs.iterator();
186 while (shardIter.hasNext()) {
187 if (shardIter.next().equals(listenerIter.next())) {
188 listenerIter.remove();
194 return listenerPathArgs;
197 synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath,
198 final Collection<DataTreeCandidate> changes) throws DataValidationFailedException {
199 final DataTreeModification modification = dataTree.takeSnapshot().newModification();
200 for (final DataTreeCandidate change : changes) {
202 DataTreeCandidates.applyToModification(modification, change);
203 } catch (SchemaValidationFailedException e) {
204 LOG.error("Validation failed", e);
208 modification.ready();
210 final DataTreeCandidate candidate;
212 dataTree.validate(modification);
214 // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree
215 candidate = dataTree.prepare(modification);
216 dataTree.commit(candidate);
219 DataTreeCandidateNode modifiedChild = candidate.getRootNode();
221 for (final PathArgument pathArgument : listenerPath.getPathArguments()) {
222 modifiedChild = modifiedChild.getModifiedChild(pathArgument);
225 if (modifiedChild == null) {
226 modifiedChild = new EmptyDataTreeCandidateNode(dataTree.getRootPath().getLastPathArgument());
229 return DataTreeCandidates.newDataTreeCandidate(dataTree.getRootPath(), modifiedChild);
233 private final class DOMDataTreeListenerWithSubshards implements DOMDataTreeChangeListener {
235 private final YangInstanceIdentifier listenerPath;
236 private final DOMDataTreeChangeListener delegate;
237 private final Map<YangInstanceIdentifier, ListenerRegistration<DOMDataTreeChangeListener>> registrations =
238 new ConcurrentHashMap<>();
241 private final Collection<DataTreeCandidate> stashedDataTreeCandidates = new LinkedList<>();
243 DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath,
244 final DOMDataTreeChangeListener delegate) {
245 this.listenerPath = Preconditions.checkNotNull(listenerPath);
246 this.delegate = Preconditions.checkNotNull(delegate);
250 public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
251 LOG.debug("Received data changed {}", changes);
253 if (!stashedDataTreeCandidates.isEmpty()) {
254 LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates);
255 changes.addAll(stashedDataTreeCandidates);
256 stashedDataTreeCandidates.clear();
260 applyChanges(listenerPath, changes);
261 } catch (final DataValidationFailedException e) {
262 // TODO should we fail here? What if stashed changes
263 // (changes from subshards) got ahead more than one generation
264 // from current shard. Than we can fail to apply this changes
265 // upon current data tree, but once we get respective changes
266 // from current shard, we can apply also changes from
269 // However, we can loose ability to notice and report some
270 // errors then. For example, we cannot detect potential lost
271 // changes from current shard.
272 LOG.error("Validation failed for modification built from changes {}, current data tree: {}",
273 changes, dataTree, e);
274 throw new RuntimeException("Notification validation failed", e);
277 delegate.onDataTreeChanged(changes);
280 synchronized void onDataTreeChanged(final YangInstanceIdentifier pathFromRoot,
281 final Collection<DataTreeCandidate> changes) {
282 final YangInstanceIdentifier changeId =
283 YangInstanceIdentifier.create(stripShardPath(dataTree.getRootPath(), pathFromRoot));
285 final List<DataTreeCandidate> newCandidates = changes.stream()
286 .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode()))
287 .collect(Collectors.toList());
290 delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
291 } catch (final DataValidationFailedException e) {
292 // We cannot apply changes from subshard to current data tree.
293 // Maybe changes from current shard haven't been applied to
294 // data tree yet. Postpone processing of these changes till we
295 // receive changes from current shard.
296 LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.",
297 pathFromRoot, changes, dataTree, e);
298 stashedDataTreeCandidates.addAll(newCandidates);
302 void addSubshard(final ChildShardContext context) {
303 Preconditions.checkState(context.getShard() instanceof DOMStoreTreeChangePublisher,
304 "All subshards that are initialDataChangeEvent part of ListenerContext need to be listenable");
306 final DOMStoreTreeChangePublisher listenableShard = (DOMStoreTreeChangePublisher) context.getShard();
307 // since this is going into subshard we want to listen for ALL changes in the subshard
308 registrations.put(context.getPrefix().getRootIdentifier(),
309 listenableShard.registerTreeChangeListener(
310 context.getPrefix().getRootIdentifier(), changes -> onDataTreeChanged(
311 context.getPrefix().getRootIdentifier(), changes)));
315 for (final ListenerRegistration<DOMDataTreeChangeListener> registration : registrations.values()) {
316 registration.close();
318 registrations.clear();
322 private static final class EmptyDataTreeCandidateNode implements DataTreeCandidateNode {
324 private final PathArgument identifier;
326 EmptyDataTreeCandidateNode(final PathArgument identifier) {
327 this.identifier = Preconditions.checkNotNull(identifier, "Identifier should not be null");
332 public PathArgument getIdentifier() {
338 public Collection<DataTreeCandidateNode> getChildNodes() {
339 return Collections.emptySet();
344 @SuppressWarnings("checkstyle:hiddenField")
345 public DataTreeCandidateNode getModifiedChild(final PathArgument identifier) {
351 public ModificationType getModificationType() {
352 return ModificationType.UNMODIFIED;
357 public Optional<NormalizedNode<?, ?>> getDataAfter() {
358 return Optional.empty();
363 public Optional<NormalizedNode<?, ?>> getDataBefore() {
364 return Optional.empty();