*/
package org.opendaylight.mdsal.dom.broker;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
+import com.google.common.collect.Maps;
import com.google.common.util.concurrent.CheckedFuture;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
import org.opendaylight.mdsal.dom.api.DOMRpcException;
import org.opendaylight.mdsal.dom.api.DOMRpcImplementation;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
private final Map<YangInstanceIdentifier, List<DOMRpcImplementation>> impls;
private final SchemaPath schemaPath;
- protected AbstractDOMRpcRoutingTableEntry(final SchemaPath schemaPath, final Map<YangInstanceIdentifier,
+ AbstractDOMRpcRoutingTableEntry(final SchemaPath schemaPath, final Map<YangInstanceIdentifier,
List<DOMRpcImplementation>> impls) {
this.schemaPath = Preconditions.checkNotNull(schemaPath);
this.impls = Preconditions.checkNotNull(impls);
}
- protected final SchemaPath getSchemaPath() {
+ final SchemaPath getSchemaPath() {
return schemaPath;
}
- protected final List<DOMRpcImplementation> getImplementations(final YangInstanceIdentifier context) {
+ final List<DOMRpcImplementation> getImplementations(final YangInstanceIdentifier context) {
return impls.get(context);
}
return impls;
}
- public boolean containsContext(final YangInstanceIdentifier contextReference) {
+ final boolean containsContext(final YangInstanceIdentifier contextReference) {
return impls.containsKey(contextReference);
}
+ final Set<YangInstanceIdentifier> registeredIdentifiers(final DOMRpcAvailabilityListener listener) {
+ return Maps.filterValues(impls, list -> list.stream().anyMatch(listener::acceptsImplementation)).keySet();
+ }
+
+ @VisibleForTesting
final Set<YangInstanceIdentifier> registeredIdentifiers() {
return impls.keySet();
}
final Builder<YangInstanceIdentifier, List<DOMRpcImplementation>> vb = ImmutableMap.builder();
for (final Entry<YangInstanceIdentifier, List<DOMRpcImplementation>> ve : impls.entrySet()) {
if (newRpcs.remove(ve.getKey())) {
- final ArrayList<DOMRpcImplementation> i = new ArrayList<>(ve.getValue().size() + 1);
+ final List<DOMRpcImplementation> i = new ArrayList<>(ve.getValue().size() + 1);
i.addAll(ve.getValue());
i.add(implementation);
+
+ // New implementation is at the end, this will move it to be the last among implementations
+ // with equal cost -- relying on sort() being stable.
+ i.sort((i1, i2) -> Long.compare(i1.invocationCost(), i2.invocationCost()));
vb.put(ve.getKey(), i);
} else {
vb.put(ve);
}
}
for (final YangInstanceIdentifier ii : newRpcs) {
- final ArrayList<DOMRpcImplementation> impl = new ArrayList<>(1);
+ final List<DOMRpcImplementation> impl = new ArrayList<>(1);
impl.add(implementation);
- vb.put(ii,impl);
+ vb.put(ii, impl);
}
return newInstance(vb.build());
final Builder<YangInstanceIdentifier, List<DOMRpcImplementation>> vb = ImmutableMap.builder();
for (final Entry<YangInstanceIdentifier, List<DOMRpcImplementation>> ve : impls.entrySet()) {
if (removed.remove(ve.getKey())) {
- final ArrayList<DOMRpcImplementation> i = new ArrayList<>(ve.getValue());
+ final List<DOMRpcImplementation> i = new ArrayList<>(ve.getValue());
i.remove(implementation);
// We could trimToSize(), but that may perform another copy just to get rid
// of a single element. That is probably not worth the trouble.
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.MapDifference.ValueDifference;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcProviderService, SchemaContextListener {
private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat(
"DOMRpcRouter-listener-%s").setDaemon(true).build();
- private final ExecutorService listenerNotifier = Executors.newSingleThreadExecutor(THREAD_FACTORY);
- @GuardedBy("this")
- private Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> listeners = Collections.emptyList();
- private volatile DOMRpcRoutingTable routingTable = DOMRpcRoutingTable.EMPTY;
- private static Collection<DOMRpcIdentifier> notPresentRpcs(final DOMRpcRoutingTable table,
- final Collection<DOMRpcIdentifier> candidates) {
- return ImmutableSet.copyOf(Collections2.filter(candidates, input -> !table.contains(input)));
- }
+ private final ExecutorService listenerNotifier = Executors.newSingleThreadExecutor(THREAD_FACTORY);
- private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation,
- final Set<DOMRpcIdentifier> rpcs) {
- final DOMRpcRoutingTable oldTable = routingTable;
- final DOMRpcRoutingTable newTable = oldTable.remove(implementation, rpcs);
+ @GuardedBy("this")
+ private Collection<Registration<?>> listeners = Collections.emptyList();
- final Collection<DOMRpcIdentifier> removedRpcs = notPresentRpcs(newTable, rpcs);
- routingTable = newTable;
- if (!removedRpcs.isEmpty()) {
- final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
- listenerNotifier.execute(() -> {
- for (final ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
- // Need to ensure removed listeners do not get notified
- synchronized (DOMRpcRouter.this) {
- if (listeners.contains(l)) {
- l.getInstance().onRpcUnavailable(removedRpcs);
- }
- }
- }
- });
- }
- }
+ private volatile DOMRpcRoutingTable routingTable = DOMRpcRoutingTable.EMPTY;
@Override
public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(
registerRpcImplementation(final T implementation, final Set<DOMRpcIdentifier> rpcs) {
final DOMRpcRoutingTable oldTable = routingTable;
final DOMRpcRoutingTable newTable = oldTable.add(implementation, rpcs);
-
- final Collection<DOMRpcIdentifier> addedRpcs = notPresentRpcs(oldTable, rpcs);
routingTable = newTable;
- if (!addedRpcs.isEmpty()) {
- final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
- listenerNotifier.execute(() -> {
- for (final ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
- // Need to ensure removed listeners do not get notified
- synchronized (DOMRpcRouter.this) {
- if (listeners.contains(l)) {
- l.getInstance().onRpcAvailable(addedRpcs);
- }
- }
- }
- });
- }
+ listenerNotifier.execute(() -> notifyAdded(newTable, implementation));
return new AbstractDOMRpcImplementationRegistration<T>(implementation) {
@Override
};
}
+ private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation,
+ final Set<DOMRpcIdentifier> rpcs) {
+ final DOMRpcRoutingTable oldTable = routingTable;
+ final DOMRpcRoutingTable newTable = oldTable.remove(implementation, rpcs);
+ routingTable = newTable;
+
+ listenerNotifier.execute(() -> notifyRemoved(newTable, implementation));
+ }
+
@Override
public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type,
final NormalizedNode<?, ?> input) {
listeners = ImmutableList.copyOf(Collections2.filter(listeners, input -> !reg.equals(input)));
}
+ private synchronized void notifyAdded(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
+ for (Registration<?> l : listeners) {
+ l.addRpc(newTable, impl);
+ }
+ }
+
+ private synchronized void notifyRemoved(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
+ for (Registration<?> l : listeners) {
+ l.removeRpc(newTable, impl);
+ }
+ }
+
@Override
public synchronized <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(
final T listener) {
- final ListenerRegistration<T> ret = new AbstractListenerRegistration<T>(listener) {
- @Override
- protected void removeRegistration() {
- removeListener(this);
- }
- };
-
- final Builder<ListenerRegistration<? extends DOMRpcAvailabilityListener>> b = ImmutableList.builder();
+ final Registration<T> ret = new Registration<>(this, listener);
+ final Builder<Registration<?>> b = ImmutableList.builder();
b.addAll(listeners);
b.add(ret);
listeners = b.build();
- final Map<SchemaPath, Set<YangInstanceIdentifier>> capturedRpcs = routingTable.getRpcs();
-
- listenerNotifier.execute(() -> {
- for (final Entry<SchemaPath, Set<YangInstanceIdentifier>> e : capturedRpcs.entrySet()) {
- listener.onRpcAvailable(Collections2.transform(e.getValue(),
- input -> DOMRpcIdentifier.create(e.getKey(), input)));
- }
- });
+ listenerNotifier.execute(() -> ret.initialTable(routingTable));
return ret;
}
listenerNotifier.shutdown();
}
+ private static final class Registration<T extends DOMRpcAvailabilityListener>
+ extends AbstractListenerRegistration<T> {
+
+ private final DOMRpcRouter router;
+
+ private Map<SchemaPath, Set<YangInstanceIdentifier>> prevRpcs;
+
+ Registration(final DOMRpcRouter router, final T listener) {
+ super(listener);
+ this.router = router;
+ }
+
+ @Override
+ protected void removeRegistration() {
+ router.removeListener(this);
+ }
+
+ void initialTable(final DOMRpcRoutingTable newTable) {
+ final T l = getInstance();
+ if (l == null) {
+ return;
+ }
+
+ final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs = newTable.getRpcs(l);
+ final Collection<DOMRpcIdentifier> added = new ArrayList<>();
+ for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : rpcs.entrySet()) {
+ added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
+ }
+ prevRpcs = rpcs;
+ if (!added.isEmpty()) {
+ l.onRpcAvailable(added);
+ }
+ }
+
+ void addRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
+ final T l = getInstance();
+ if (l == null || !l.acceptsImplementation(impl)) {
+ return;
+ }
+
+ final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs = newTable.getRpcs(l);
+ final MapDifference<SchemaPath, Set<YangInstanceIdentifier>> diff = Maps.difference(prevRpcs, rpcs);
+
+ final Collection<DOMRpcIdentifier> added = new ArrayList<>();
+ for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : diff.entriesOnlyOnRight().entrySet()) {
+ added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
+ }
+ for (Entry<SchemaPath, ValueDifference<Set<YangInstanceIdentifier>>> e :
+ diff.entriesDiffering().entrySet()) {
+ for (YangInstanceIdentifier i : Sets.difference(e.getValue().rightValue(), e.getValue().leftValue())) {
+ added.add(DOMRpcIdentifier.create(e.getKey(), i));
+ }
+ }
+
+ prevRpcs = rpcs;
+ if (!added.isEmpty()) {
+ l.onRpcAvailable(added);
+ }
+ }
+
+ void removeRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
+ final T l = getInstance();
+ if (l == null || !l.acceptsImplementation(impl)) {
+ return;
+ }
+
+ final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs = newTable.getRpcs(l);
+ final MapDifference<SchemaPath, Set<YangInstanceIdentifier>> diff = Maps.difference(prevRpcs, rpcs);
+
+ final Collection<DOMRpcIdentifier> removed = new ArrayList<>();
+ for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : diff.entriesOnlyOnLeft().entrySet()) {
+ removed.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
+ }
+ for (Entry<SchemaPath, ValueDifference<Set<YangInstanceIdentifier>>> e :
+ diff.entriesDiffering().entrySet()) {
+ for (YangInstanceIdentifier i : Sets.difference(e.getValue().leftValue(), e.getValue().rightValue())) {
+ removed.add(DOMRpcIdentifier.create(e.getKey(), i));
+ }
+ }
+
+ prevRpcs = rpcs;
+ if (!removed.isEmpty()) {
+ l.onRpcUnavailable(removed);
+ }
+ }
+ }
}