2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.util.Iterator;
17 import java.util.List;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.stream.Collectors;
21 import javax.annotation.Nonnull;
22 import javax.annotation.Nullable;
23 import javax.annotation.concurrent.GuardedBy;
24 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
25 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
28 import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
29 import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree;
30 import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode;
31 import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext;
32 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
33 import org.opendaylight.yangtools.concepts.ListenerRegistration;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
36 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
37 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
42 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
43 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
44 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
45 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
46 import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
50 public class DistributedShardChangePublisher
51 extends AbstractRegistrationTree<AbstractDOMDataTreeChangeListenerRegistration<?>>
52 implements DOMStoreTreeChangePublisher {
54 private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class);
56 private final DistributedDataStore distributedDataStore;
57 private final YangInstanceIdentifier shardPath;
59 // This will be useful for signaling back pressure
60 private final DataStoreClient client;
62 private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards;
65 private final DataTree dataTree;
67 public DistributedShardChangePublisher(final DataStoreClient client,
68 final DistributedDataStore distributedDataStore,
69 final DOMDataTreeIdentifier prefix,
70 final Map<DOMDataTreeIdentifier, ChildShardContext> childShards) {
72 this.distributedDataStore = distributedDataStore;
73 // TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea
74 // maybe the whole listener logic would be better in the backend shards where we have direct access to the
75 // dataTree and wont have to cache it redundantly.
76 this.dataTree = InMemoryDataTreeFactory.getInstance().create(
77 TreeType.valueOf(prefix.getDatastoreType().name()), prefix.getRootIdentifier());
79 dataTree.setSchemaContext(distributedDataStore.getActorContext().getSchemaContext());
81 this.shardPath = prefix.getRootIdentifier();
82 this.childShards = childShards;
85 protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
86 LOG.debug("Closing registration {}", registration);
90 public <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
91 registerTreeChangeListener(final YangInstanceIdentifier path, final L listener) {
94 return setupListenerContext(path, listener);
100 private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
101 setupListenerContext(final YangInstanceIdentifier listenerPath, final L listener) {
102 // we need to register the listener registration path based on the shards root
103 // we have to strip the shard path from the listener path and then register
104 YangInstanceIdentifier strippedIdentifier = listenerPath;
105 if (!shardPath.isEmpty()) {
106 strippedIdentifier = YangInstanceIdentifier.create(stripShardPath(shardPath, listenerPath));
109 final DOMDataTreeListenerWithSubshards subshardListener =
110 new DOMDataTreeListenerWithSubshards(strippedIdentifier, listener);
111 final AbstractDOMDataTreeChangeListenerRegistration<L> reg =
112 setupContextWithoutSubshards(listenerPath, strippedIdentifier, subshardListener);
114 for (final ChildShardContext maybeAffected : childShards.values()) {
115 if (listenerPath.contains(maybeAffected.getPrefix().getRootIdentifier())) {
116 // consumer has initialDataChangeEvent subshard somewhere on lower level
117 // register to the notification manager with snapshot and forward child notifications to parent
118 LOG.debug("Adding new subshard{{}} to listener at {}", maybeAffected.getPrefix(), listenerPath);
119 subshardListener.addSubshard(maybeAffected);
120 } else if (maybeAffected.getPrefix().getRootIdentifier().contains(listenerPath)) {
121 // bind path is inside subshard
122 // TODO can this happen? seems like in ShardedDOMDataTree we are
123 // already registering to the lowest shard possible
124 throw new UnsupportedOperationException("Listener should be registered directly "
125 + "into initialDataChangeEvent subshard");
132 private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
133 setupContextWithoutSubshards(final YangInstanceIdentifier shardLookup,
134 final YangInstanceIdentifier listenerPath,
135 final DOMDataTreeListenerWithSubshards listener) {
137 LOG.debug("Registering root listener full path: {}, path inside shard: {}", shardLookup, listenerPath);
139 // register in the shard tree
140 final RegistrationTreeNode<AbstractDOMDataTreeChangeListenerRegistration<?>> node =
141 findNodeFor(listenerPath.getPathArguments());
143 // register listener in CDS
144 final ProxyRegistration proxyReg = new ProxyRegistration(distributedDataStore
145 .registerProxyListener(shardLookup, listenerPath, listener), listener);
147 @SuppressWarnings("unchecked")
148 final AbstractDOMDataTreeChangeListenerRegistration<L> registration =
149 new AbstractDOMDataTreeChangeListenerRegistration<L>((L) listener) {
151 protected void removeRegistration() {
153 DistributedShardChangePublisher.this.removeRegistration(node, this);
154 registrationRemoved(this);
158 addRegistration(node, registration);
163 private static Iterable<PathArgument> stripShardPath(final YangInstanceIdentifier shardPath,
164 final YangInstanceIdentifier listenerPath) {
165 if (shardPath.isEmpty()) {
166 return listenerPath.getPathArguments();
169 final List<PathArgument> listenerPathArgs = new ArrayList<>(listenerPath.getPathArguments());
170 final Iterator<PathArgument> shardIter = shardPath.getPathArguments().iterator();
171 final Iterator<PathArgument> listenerIter = listenerPathArgs.iterator();
173 while (shardIter.hasNext()) {
174 if (shardIter.next().equals(listenerIter.next())) {
175 listenerIter.remove();
181 return listenerPathArgs;
184 private static class ProxyRegistration implements ListenerRegistration<DOMDataTreeChangeListener> {
186 private final ListenerRegistration<org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy;
187 private final DOMDataTreeChangeListener listener;
189 private ProxyRegistration(
190 final ListenerRegistration<
191 org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy,
192 final DOMDataTreeChangeListener listener) {
194 this.listener = listener;
198 public DOMDataTreeChangeListener getInstance() {
203 public void close() {
208 synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath,
209 final Collection<DataTreeCandidate> changes) {
210 final DataTreeModification modification = dataTree.takeSnapshot().newModification();
211 for (final DataTreeCandidate change : changes) {
213 DataTreeCandidates.applyToModification(modification, change);
214 } catch (SchemaValidationFailedException e) {
215 LOG.error("Validation failed {}", e);
219 modification.ready();
221 final DataTreeCandidate candidate;
224 dataTree.validate(modification);
225 } catch (final DataValidationFailedException e) {
226 LOG.error("Validation failed for built modification, modification {}, current data tree: {}",
227 modification, dataTree, e);
228 throw new RuntimeException("Notification validation failed", e);
231 // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree
232 candidate = dataTree.prepare(modification);
233 dataTree.commit(candidate);
236 DataTreeCandidateNode modifiedChild = candidate.getRootNode();
238 for (final PathArgument pathArgument : listenerPath.getPathArguments()) {
239 modifiedChild = modifiedChild.getModifiedChild(pathArgument);
242 if (modifiedChild == null) {
243 modifiedChild = new EmptyDataTreeCandidateNode(dataTree.getRootPath().getLastPathArgument());
246 return DataTreeCandidates.newDataTreeCandidate(dataTree.getRootPath(), modifiedChild);
250 private final class DOMDataTreeListenerWithSubshards implements DOMDataTreeChangeListener {
252 private final YangInstanceIdentifier listenerPath;
253 private final DOMDataTreeChangeListener delegate;
255 private final Map<YangInstanceIdentifier, ListenerRegistration<DOMDataTreeChangeListener>> registrations =
256 new ConcurrentHashMap<>();
258 DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath,
259 final DOMDataTreeChangeListener delegate) {
260 this.listenerPath = Preconditions.checkNotNull(listenerPath);
261 this.delegate = Preconditions.checkNotNull(delegate);
265 public void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
266 LOG.debug("Received data changed {}", changes);
267 applyChanges(listenerPath, changes);
268 delegate.onDataTreeChanged(changes);
271 synchronized void onDataTreeChanged(final YangInstanceIdentifier pathFromRoot,
272 final Collection<DataTreeCandidate> changes) {
273 final YangInstanceIdentifier changeId =
274 YangInstanceIdentifier.create(stripShardPath(dataTree.getRootPath(), pathFromRoot));
276 final List<DataTreeCandidate> newCandidates = changes.stream()
277 .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode()))
278 .collect(Collectors.toList());
279 delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
282 void addSubshard(final ChildShardContext context) {
283 Preconditions.checkState(context.getShard() instanceof DOMStoreTreeChangePublisher,
284 "All subshards that are initialDataChangeEvent part of ListenerContext need to be listenable");
286 final DOMStoreTreeChangePublisher listenableShard = (DOMStoreTreeChangePublisher) context.getShard();
287 // since this is going into subshard we want to listen for ALL changes in the subshard
288 registrations.put(context.getPrefix().getRootIdentifier(),
289 listenableShard.registerTreeChangeListener(
290 context.getPrefix().getRootIdentifier(), changes -> onDataTreeChanged(
291 context.getPrefix().getRootIdentifier(), changes)));
295 for (final ListenerRegistration<DOMDataTreeChangeListener> registration : registrations.values()) {
296 registration.close();
298 registrations.clear();
302 private static final class EmptyDataTreeCandidateNode implements DataTreeCandidateNode {
304 private final PathArgument identifier;
306 EmptyDataTreeCandidateNode(final PathArgument identifier) {
307 this.identifier = Preconditions.checkNotNull(identifier, "Identifier should not be null");
312 public PathArgument getIdentifier() {
318 public Collection<DataTreeCandidateNode> getChildNodes() {
319 return Collections.emptySet();
324 public DataTreeCandidateNode getModifiedChild(final PathArgument identifier) {
330 public ModificationType getModificationType() {
331 return ModificationType.UNMODIFIED;
336 public Optional<NormalizedNode<?, ?>> getDataAfter() {
337 return Optional.absent();
342 public Optional<NormalizedNode<?, ?>> getDataBefore() {
343 return Optional.absent();