package org.opendaylight.mdsal.dom.broker;
import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import com.google.common.collect.ListMultimap;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
+import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeListenerAggregator;
+import org.opendaylight.mdsal.dom.spi.shard.ListenableDOMDataTreeShard;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public final class ShardedDOMDataTree implements DOMDataTreeService, DOMDataTreeShardingService {
+ private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMDataTree.class);
@GuardedBy("this")
private final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<?>> shards = DOMDataTreePrefixTable.create();
final Collection<DOMDataTreeIdentifier> subtrees, final boolean allowRxMerges,
final Collection<DOMDataTreeProducer> producers) throws DOMDataTreeLoopException {
Preconditions.checkNotNull(listener, "listener");
- Preconditions.checkArgument(!subtrees.isEmpty(), "Subtrees must not be empty.");
- final ShardedDOMDataTreeListenerContext<T> listenerContext =
- ShardedDOMDataTreeListenerContext.create(listener);
- try {
- // FIXME: Add attachment of producers
- for (final DOMDataTreeProducer producer : producers) {
- Preconditions.checkArgument(producer instanceof ShardedDOMDataTreeProducer);
- final ShardedDOMDataTreeProducer castedProducer = (ShardedDOMDataTreeProducer) producer;
- simpleLoopCheck(subtrees, castedProducer.getSubtrees());
- // FIXME: We should also unbound listeners
- castedProducer.bindToListener(listenerContext);
- }
- for (final DOMDataTreeIdentifier subtree : subtrees) {
- final DOMDataTreeShard shard = shards.lookup(subtree).getValue().getInstance();
- // FIXME: What should we do if listener is wildcard? And shards are on per
- // node basis?
- Preconditions.checkArgument(shard instanceof DOMStoreTreeChangePublisher,
- "Subtree %s does not point to listenable subtree.", subtree);
+ // Cross-check specified trees for exclusivity and eliminate duplicates, noDupSubtrees is effectively a Set
+ final Collection<DOMDataTreeIdentifier> noDupSubtrees;
+ switch (subtrees.size()) {
+ case 0:
+ throw new IllegalArgumentException("Subtrees must not be empty.");
+ case 1:
+ noDupSubtrees = subtrees;
+ break;
+ default:
+ // Check subtrees for mutual inclusion, this is an O(N**2) operation
+ for (DOMDataTreeIdentifier toCheck : subtrees) {
+ for (DOMDataTreeIdentifier against : subtrees) {
+ if (!toCheck.equals(against)) {
+ Preconditions.checkArgument(!toCheck.contains(against), "Subtree %s contains subtree %s",
+ toCheck, against);
+ }
+ }
+ }
- listenerContext.register(subtree, (DOMStoreTreeChangePublisher) shard);
- }
- } catch (final Exception e) {
- listenerContext.close();
- throw e;
+ noDupSubtrees = ImmutableSet.copyOf(subtrees);
+ }
+
+ LOG.trace("Requested registration of listener {} to subtrees {}", listener, noDupSubtrees);
+
+ // Lookup shards corresponding to subtrees and construct a map of which subtrees we want from which shard
+ final ListMultimap<DOMDataTreeShardRegistration<?>, DOMDataTreeIdentifier> needed =
+ ArrayListMultimap.create();
+ for (final DOMDataTreeIdentifier subtree : subtrees) {
+ final DOMDataTreeShardRegistration<?> reg = Verify.verifyNotNull(shards.lookup(subtree).getValue());
+ needed.put(reg, subtree);
+ }
+
+ LOG.trace("Listener {} is attaching to shards {}", listener, needed);
+
+ // Sanity check: all selected shards have to support one of the listening interfaces
+ needed.asMap().forEach((reg, trees) -> {
+ final DOMDataTreeShard shard = reg.getInstance();
+ Preconditions.checkArgument(shard instanceof ListenableDOMDataTreeShard
+ || shard instanceof DOMStoreTreeChangePublisher, "Subtrees %s do not point to listenable subtree.",
+ trees);
+ });
+
+ // Sanity check: all producers have to come from this implementation and must not form loops
+ for (DOMDataTreeProducer producer : producers) {
+ Preconditions.checkArgument(producer instanceof ShardedDOMDataTreeProducer);
+ simpleLoopCheck(subtrees, ((ShardedDOMDataTreeProducer) producer).getSubtrees());
}
+
+ final ListenerRegistration<?> underlyingRegistration = createRegisteredListener(listener, needed.asMap(),
+ allowRxMerges, producers);
return new AbstractListenerRegistration<T>(listener) {
@Override
protected void removeRegistration() {
- ShardedDOMDataTree.this.removeListener(listenerContext);
+ ShardedDOMDataTree.this.removeListener(listener);
+ underlyingRegistration.close();
}
};
}
+ private static ListenerRegistration<?> createRegisteredListener(final DOMDataTreeListener userListener,
+ final Map<DOMDataTreeShardRegistration<?>, Collection<DOMDataTreeIdentifier>> needed,
+ final boolean allowRxMerges, final Collection<DOMDataTreeProducer> producers) {
+ // FIXME: Add attachment of producers
+ for (final DOMDataTreeProducer producer : producers) {
+ // FIXME: We should also unbound listeners
+ ((ShardedDOMDataTreeProducer) producer).bindToListener(userListener);
+ }
+
+ return DOMDataTreeListenerAggregator.aggregateIfNeeded(userListener, needed, allowRxMerges,
+ DOMDataTreeShardRegistration::getInstance);
+ }
+
private static void simpleLoopCheck(final Collection<DOMDataTreeIdentifier> listen,
final Set<DOMDataTreeIdentifier> writes) throws DOMDataTreeLoopException {
for (final DOMDataTreeIdentifier listenPath : listen) {
}
}
- void removeListener(final ShardedDOMDataTreeListenerContext<?> listener) {
+ void removeListener(final DOMDataTreeListener listener) {
// FIXME: detach producers
- listener.close();
}
}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.mdsal.dom.broker;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.Map;
-import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.util.MapAdaptor;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-
-@Deprecated
-class ShardedDOMDataTreeListenerContext<T extends DOMDataTreeListener> implements AutoCloseable {
-
- private final DOMDataTreeListener listener;
- private final EnumMap<LogicalDatastoreType, StoreListener> storeListeners = new EnumMap<>(
- LogicalDatastoreType.class);
- private final Collection<ListenerRegistration<?>> registrations = new ArrayList<>();
-
- // FIXME: Probably should be encapsulated into state object
- @GuardedBy("this")
- private Collection<DataTreeCandidate> unreported = new ArrayList<>();
- @GuardedBy("this")
- private Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> currentData = Collections.emptyMap();
-
- private ShardedDOMDataTreeListenerContext(final T listener) {
- for (LogicalDatastoreType type : LogicalDatastoreType.values()) {
- storeListeners.put(type, new StoreListener(type));
- }
- this.listener = Preconditions.checkNotNull(listener, "listener");
- }
-
- static <T extends DOMDataTreeListener> ShardedDOMDataTreeListenerContext<T> create(final T listener) {
- return new ShardedDOMDataTreeListenerContext<>(listener);
- }
-
- synchronized void notifyListener() {
- Collection<DataTreeCandidate> changesToNotify = unreported;
- unreported = new ArrayList<>();
- listener.onDataTreeChanged(changesToNotify, currentData);
- }
-
- void register(final DOMDataTreeIdentifier subtree, final DOMStoreTreeChangePublisher shard) {
- ListenerRegistration<?> storeReg =
- shard.registerTreeChangeListener(subtree.getRootIdentifier(),
- storeListeners.get(subtree.getDatastoreType()));
- registrations.add(storeReg);
- }
-
- private final class StoreListener implements DOMDataTreeChangeListener {
-
- private final LogicalDatastoreType type;
-
- StoreListener(final LogicalDatastoreType type) {
- this.type = type;
- }
-
- @Override
- public void onDataTreeChanged(final Collection<DataTreeCandidate> changes) {
- receivedDataTreeChanges(type, changes);
- scheduleNotification();
- }
-
- }
-
- // FIXME: Should be able to run parallel to notifyListener and should honor
- // allowRxMerges
- synchronized void receivedDataTreeChanges(final LogicalDatastoreType type,
- final Collection<DataTreeCandidate> changes) {
- Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> updatedData =
- MapAdaptor.getDefaultInstance().takeSnapshot(currentData);
- for (DataTreeCandidate change : changes) {
- // FIXME: Make sure only one is reported / merged
- unreported.add(change);
- DOMDataTreeIdentifier treeId = new DOMDataTreeIdentifier(type, change.getRootPath());
- // FIXME: Probably we should apply data tree candidate to previously observed state
- Optional<NormalizedNode<?, ?>> dataAfter = change.getRootNode().getDataAfter();
- if (dataAfter.isPresent()) {
- updatedData.put(treeId, dataAfter.get());
- } else {
- updatedData.remove(treeId);
- }
- }
- currentData = MapAdaptor.getDefaultInstance().optimize(updatedData);
- }
-
- void scheduleNotification() {
- // FIXME: This callout should schedule delivery task
- notifyListener();
- }
-
- @Override
- public void close() {
- for (ListenerRegistration<?> reg : registrations) {
- reg.close();
- }
- }
-
- DOMDataTreeListener getListener() {
- return listener;
- }
-}
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerBusyException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
AtomicIntegerFieldUpdater.newUpdater(ShardedDOMDataTreeProducer.class, "closed");
private volatile int closed;
- private volatile ShardedDOMDataTreeListenerContext<?> attachedListener;
+ private volatile DOMDataTreeListener attachedListener;
private volatile ProducerLayout layout;
private ShardedDOMDataTreeProducer(final ShardedDOMDataTree dataTree,
}
}
- void bindToListener(final ShardedDOMDataTreeListenerContext<?> listener) {
- Preconditions.checkNotNull(listener);
-
- final ShardedDOMDataTreeListenerContext<?> local = attachedListener;
- if (local != null) {
- throw new IllegalStateException(String.format("Producer %s is already attached to listener %s", this,
- local.getListener()));
- }
-
- this.attachedListener = listener;
+ void bindToListener(final DOMDataTreeListener listener) {
+ final DOMDataTreeListener local = attachedListener;
+ Preconditions.checkState(local == null, "Producer %s is already attached to listener %s", this, local);
+ this.attachedListener = Preconditions.checkNotNull(listener);
}
}