package org.opendaylight.mdsal.dom.broker;
import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import com.google.common.collect.ListMultimap;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
+import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeListenerAggregator;
+import org.opendaylight.mdsal.dom.spi.shard.ListenableDOMDataTreeShard;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMDataTree.class);
@GuardedBy("this")
- private final DOMDataTreePrefixTable<ShardRegistration<?>> shards = DOMDataTreePrefixTable.create();
+ private final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<?>> shards = DOMDataTreePrefixTable.create();
@GuardedBy("this")
private final DOMDataTreePrefixTable<DOMDataTreeProducer> producers = DOMDataTreePrefixTable.create();
-
- void removeShard(final ShardRegistration<?> reg) {
+ void removeShard(final DOMDataTreeShardRegistration<?> reg) {
final DOMDataTreeIdentifier prefix = reg.getPrefix();
- final ShardRegistration<?> parentReg;
+ final DOMDataTreeShardRegistration<?> parentReg;
synchronized (this) {
shards.remove(prefix);
}
@Override
- public <T extends DOMDataTreeShard> ListenerRegistration<T> registerDataTreeShard(final DOMDataTreeIdentifier prefix, final T shard, final DOMDataTreeProducer producer) throws DOMDataTreeShardingConflictException {
+ public <T extends DOMDataTreeShard> ListenerRegistration<T> registerDataTreeShard(
+ final DOMDataTreeIdentifier prefix, final T shard, final DOMDataTreeProducer producer)
+ throws DOMDataTreeShardingConflictException {
- final DOMDataTreeIdentifier firstSubtree = Iterables.getOnlyElement(((ShardedDOMDataTreeProducer) producer).getSubtrees());
- Preconditions.checkArgument(firstSubtree != null, "Producer that is used to verify namespace claim can only claim a single namespace");
- Preconditions.checkArgument(prefix.equals(firstSubtree), "Trying to register shard to a different namespace than the producer has claimed");
+ final DOMDataTreeIdentifier firstSubtree = Iterables.getOnlyElement(((
+ ShardedDOMDataTreeProducer) producer).getSubtrees());
+ Preconditions.checkArgument(firstSubtree != null, "Producer that is used to verify namespace claim can"
+ + " only claim a single namespace");
+ Preconditions.checkArgument(prefix.equals(firstSubtree), "Trying to register shard to a different namespace"
+ + " than the producer has claimed");
- final ShardRegistration<T> reg;
- final ShardRegistration<?> parentReg;
+ final DOMDataTreeShardRegistration<T> reg;
+ final DOMDataTreeShardRegistration<?> parentReg;
synchronized (this) {
/*
* and if it exists, check if its registration prefix does not collide with
* this registration.
*/
- final DOMDataTreePrefixTableEntry<ShardRegistration<?>> parent = shards.lookup(prefix);
+ final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<?>> parent = shards.lookup(prefix);
if (parent != null) {
parentReg = parent.getValue();
if (parentReg != null && prefix.equals(parentReg.getPrefix())) {
// FIXME: wrap the shard in a proper adaptor based on implemented interface
- reg = new ShardRegistration<T>(this, prefix, shard);
+ reg = new DOMDataTreeShardRegistration<>(this, prefix, shard);
shards.store(prefix, reg);
}
@GuardedBy("this")
- private DOMDataTreeProducer createProducer(final Collection<DOMDataTreeIdentifier> subtrees, final Map<DOMDataTreeIdentifier, DOMDataTreeShard> shardMap) {
+ private DOMDataTreeProducer createProducer(final Collection<DOMDataTreeIdentifier> subtrees,
+ final Map<DOMDataTreeIdentifier, DOMDataTreeShard> shardMap) {
// Record the producer's attachment points
final DOMDataTreeProducer ret = ShardedDOMDataTreeProducer.create(this, subtrees, shardMap);
for (final DOMDataTreeIdentifier subtree : subtrees) {
final DOMDataTreeProducer producer = findProducer(subtree);
Preconditions.checkArgument(producer == null, "Subtree %s is attached to producer %s", subtree, producer);
- final DOMDataTreePrefixTableEntry<ShardRegistration<?>> possibleShardReg = shards.lookup(subtree);
- if (possibleShardReg != null) {
+ final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<?>> possibleShardReg =
+ shards.lookup(subtree);
+ if (possibleShardReg != null && possibleShardReg.getValue() != null) {
shardMap.put(subtree, possibleShardReg.getValue().getInstance());
}
}
return createProducer(subtrees, shardMap);
}
- synchronized DOMDataTreeProducer createProducer(final ShardedDOMDataTreeProducer parent, final Collection<DOMDataTreeIdentifier> subtrees) {
+ synchronized DOMDataTreeProducer createProducer(final ShardedDOMDataTreeProducer parent,
+ final Collection<DOMDataTreeIdentifier> subtrees) {
Preconditions.checkNotNull(parent);
final Map<DOMDataTreeIdentifier, DOMDataTreeShard> shardMap = new HashMap<>();
return createProducer(subtrees, shardMap);
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
@Override
public synchronized <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(final T listener,
final Collection<DOMDataTreeIdentifier> subtrees, final boolean allowRxMerges,
final Collection<DOMDataTreeProducer> producers) throws DOMDataTreeLoopException {
Preconditions.checkNotNull(listener, "listener");
- Preconditions.checkArgument(!subtrees.isEmpty(), "Subtrees must not be empty.");
- final ShardedDOMDataTreeListenerContext<T> listenerContext =
- ShardedDOMDataTreeListenerContext.create(listener, subtrees, allowRxMerges);
- try {
- // FIXME: Add attachment of producers
- for (final DOMDataTreeProducer producer : producers) {
- Preconditions.checkArgument(producer instanceof ShardedDOMDataTreeProducer);
- final ShardedDOMDataTreeProducer castedProducer = ((ShardedDOMDataTreeProducer) producer);
- simpleLoopCheck(subtrees, castedProducer.getSubtrees());
- // FIXME: We should also unbound listeners
- castedProducer.boundToListener(listenerContext);
- }
- for (final DOMDataTreeIdentifier subtree : subtrees) {
- final DOMDataTreeShard shard = shards.lookup(subtree).getValue().getInstance();
- // FIXME: What should we do if listener is wildcard? And shards are on per
- // node basis?
- Preconditions.checkArgument(shard instanceof DOMStoreTreeChangePublisher,
- "Subtree %s does not point to listenable subtree.", subtree);
+ // Cross-check specified trees for exclusivity and eliminate duplicates, noDupSubtrees is effectively a Set
+ final Collection<DOMDataTreeIdentifier> noDupSubtrees;
+ switch (subtrees.size()) {
+ case 0:
+ throw new IllegalArgumentException("Subtrees must not be empty.");
+ case 1:
+ noDupSubtrees = subtrees;
+ break;
+ default:
+ // Check subtrees for mutual inclusion, this is an O(N**2) operation
+ for (DOMDataTreeIdentifier toCheck : subtrees) {
+ for (DOMDataTreeIdentifier against : subtrees) {
+ if (!toCheck.equals(against)) {
+ Preconditions.checkArgument(!toCheck.contains(against), "Subtree %s contains subtree %s",
+ toCheck, against);
+ }
+ }
+ }
- listenerContext.register(subtree, (DOMStoreTreeChangePublisher) shard);
- }
- } catch (final Exception e) {
- listenerContext.close();
- throw e;
+ noDupSubtrees = ImmutableSet.copyOf(subtrees);
}
+
+ LOG.trace("Requested registration of listener {} to subtrees {}", listener, noDupSubtrees);
+
+ // Lookup shards corresponding to subtrees and construct a map of which subtrees we want from which shard
+ final ListMultimap<DOMDataTreeShardRegistration<?>, DOMDataTreeIdentifier> needed =
+ ArrayListMultimap.create();
+ for (final DOMDataTreeIdentifier subtree : subtrees) {
+ final DOMDataTreeShardRegistration<?> reg = Verify.verifyNotNull(shards.lookup(subtree).getValue());
+ needed.put(reg, subtree);
+ }
+
+ LOG.trace("Listener {} is attaching to shards {}", listener, needed);
+
+ // Sanity check: all selected shards have to support one of the listening interfaces
+ needed.asMap().forEach((reg, trees) -> {
+ final DOMDataTreeShard shard = reg.getInstance();
+ Preconditions.checkArgument(shard instanceof ListenableDOMDataTreeShard
+ || shard instanceof DOMStoreTreeChangePublisher, "Subtrees %s do not point to listenable subtree.",
+ trees);
+ });
+
+ // Sanity check: all producers have to come from this implementation and must not form loops
+ for (DOMDataTreeProducer producer : producers) {
+ Preconditions.checkArgument(producer instanceof ShardedDOMDataTreeProducer);
+ simpleLoopCheck(subtrees, ((ShardedDOMDataTreeProducer) producer).getSubtrees());
+ }
+
+ final ListenerRegistration<?> underlyingRegistration = createRegisteredListener(listener, needed.asMap(),
+ allowRxMerges, producers);
return new AbstractListenerRegistration<T>(listener) {
@Override
protected void removeRegistration() {
- ShardedDOMDataTree.this.removeListener(listenerContext);
+ ShardedDOMDataTree.this.removeListener(listener);
+ underlyingRegistration.close();
}
};
}
- private static void simpleLoopCheck(final Collection<DOMDataTreeIdentifier> listen, final Set<DOMDataTreeIdentifier> writes)
- throws DOMDataTreeLoopException {
- for(final DOMDataTreeIdentifier listenPath : listen) {
+ private static ListenerRegistration<?> createRegisteredListener(final DOMDataTreeListener userListener,
+ final Map<DOMDataTreeShardRegistration<?>, Collection<DOMDataTreeIdentifier>> needed,
+ final boolean allowRxMerges, final Collection<DOMDataTreeProducer> producers) {
+ // FIXME: Add attachment of producers
+ for (final DOMDataTreeProducer producer : producers) {
+ // FIXME: We should also unbound listeners
+ ((ShardedDOMDataTreeProducer) producer).bindToListener(userListener);
+ }
+
+ return DOMDataTreeListenerAggregator.aggregateIfNeeded(userListener, needed, allowRxMerges,
+ DOMDataTreeShardRegistration::getInstance);
+ }
+
+ private static void simpleLoopCheck(final Collection<DOMDataTreeIdentifier> listen,
+ final Set<DOMDataTreeIdentifier> writes) throws DOMDataTreeLoopException {
+ for (final DOMDataTreeIdentifier listenPath : listen) {
for (final DOMDataTreeIdentifier writePath : writes) {
if (listenPath.contains(writePath)) {
throw new DOMDataTreeLoopException(String.format(
}
}
- void removeListener(final ShardedDOMDataTreeListenerContext<?> listener) {
+ void removeListener(final DOMDataTreeListener listener) {
// FIXME: detach producers
- listener.close();
}
}