* @param rpcs RPC types which became unavailable
*/
void onRpcUnavailable(@Nonnull Collection<DOMRpcIdentifier> rpcs);
+
+ /**
+ * Implementation filtering method. This method is useful for forwarding RPC implementations,
+ * which need to ensure they do not re-announce their own implementations. Without this method
+ * a forwarder which registers an implementation would be notified of its own implementation,
+ * potentially re-exporting it as local -- hence creating a forwarding loop.
+ *
+ * @param impl RPC implementation being registered
+ * @return False if the implementation should not be reported, defaults to true.
+ */
+ default boolean acceptsImplementation(final DOMRpcImplementation impl) {
+ return true;
+ }
}
* or report a subclass of {@link DOMRpcException} reporting a transport
* error.
*/
- @Nonnull CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(@Nonnull DOMRpcIdentifier rpc, @Nullable NormalizedNode<?, ?> input);
+ @Nonnull CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(@Nonnull DOMRpcIdentifier rpc,
+ @Nullable NormalizedNode<?, ?> input);
+
+ /**
+ * Return the relative invocation cost of this implementation. Default implementation return 0.
+ *
+ * @return Non-negative cost of invoking this implementation.
+ */
+ default long invocationCost() {
+ return 0;
+ }
}
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
+import com.google.common.collect.Maps;
import com.google.common.util.concurrent.CheckedFuture;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
private final Map<YangInstanceIdentifier, List<DOMRpcImplementation>> impls;
private final SchemaPath schemaPath;
- protected AbstractDOMRpcRoutingTableEntry(final SchemaPath schemaPath, final Map<YangInstanceIdentifier, List<DOMRpcImplementation>> impls) {
+ AbstractDOMRpcRoutingTableEntry(final SchemaPath schemaPath,
+ final Map<YangInstanceIdentifier, List<DOMRpcImplementation>> impls) {
this.schemaPath = Preconditions.checkNotNull(schemaPath);
this.impls = Preconditions.checkNotNull(impls);
}
- protected final SchemaPath getSchemaPath() {
+ final SchemaPath getSchemaPath() {
return schemaPath;
}
- protected final List<DOMRpcImplementation> getImplementations(final YangInstanceIdentifier context) {
+ final List<DOMRpcImplementation> getImplementations(final YangInstanceIdentifier context) {
return impls.get(context);
}
return impls;
}
- public boolean containsContext(final YangInstanceIdentifier contextReference) {
+ final boolean containsContext(final YangInstanceIdentifier contextReference) {
return impls.containsKey(contextReference);
}
- final Set<YangInstanceIdentifier> registeredIdentifiers() {
- return impls.keySet();
+ final Set<YangInstanceIdentifier> registeredIdentifiers(final DOMRpcAvailabilityListener l) {
+ return Maps.filterValues(impls, list -> list.stream().anyMatch(l::acceptsImplementation)).keySet();
}
/**
* @param newRpcs List of new RPCs, must be mutable
* @return
*/
- final AbstractDOMRpcRoutingTableEntry add(final DOMRpcImplementation implementation, final List<YangInstanceIdentifier> newRpcs) {
+ final AbstractDOMRpcRoutingTableEntry add(final DOMRpcImplementation implementation,
+ final List<YangInstanceIdentifier> newRpcs) {
final Builder<YangInstanceIdentifier, List<DOMRpcImplementation>> vb = ImmutableMap.builder();
for (final Entry<YangInstanceIdentifier, List<DOMRpcImplementation>> ve : impls.entrySet()) {
if (newRpcs.remove(ve.getKey())) {
- final ArrayList<DOMRpcImplementation> i = new ArrayList<>(ve.getValue().size() + 1);
+ final List<DOMRpcImplementation> i = new ArrayList<>(ve.getValue().size() + 1);
i.addAll(ve.getValue());
i.add(implementation);
+
+ // New implementation is at the end, this will move it to be the last among implementations
+ // with equal cost -- relying on sort() being stable.
+ i.sort((a, b) -> Long.compare(a.invocationCost(), b.invocationCost()));
vb.put(ve.getKey(), i);
} else {
vb.put(ve);
}
}
for(final YangInstanceIdentifier ii : newRpcs) {
- final ArrayList<DOMRpcImplementation> impl = new ArrayList<>(1);
+ final List<DOMRpcImplementation> impl = new ArrayList<>(1);
impl.add(implementation);
- vb.put(ii,impl);
+ vb.put(ii, impl);
}
return newInstance(vb.build());
}
- final AbstractDOMRpcRoutingTableEntry remove(final DOMRpcImplementation implementation, final List<YangInstanceIdentifier> removed) {
+ final AbstractDOMRpcRoutingTableEntry remove(final DOMRpcImplementation implementation,
+ final List<YangInstanceIdentifier> removed) {
final Builder<YangInstanceIdentifier, List<DOMRpcImplementation>> vb = ImmutableMap.builder();
for (final Entry<YangInstanceIdentifier, List<DOMRpcImplementation>> ve : impls.entrySet()) {
if (removed.remove(ve.getKey())) {
- final ArrayList<DOMRpcImplementation> i = new ArrayList<>(ve.getValue());
+ final List<DOMRpcImplementation> i = new ArrayList<>(ve.getValue());
i.remove(implementation);
// We could trimToSize(), but that may perform another copy just to get rid
// of a single element. That is probably not worth the trouble.
*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.MapDifference.ValueDifference;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcProviderService, SchemaContextListener {
- private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("DOMRpcRouter-listener-%s").setDaemon(true).build();
+ private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder()
+ .setNameFormat("DOMRpcRouter-listener-%s").setDaemon(true).build();
+
private final ExecutorService listenerNotifier = Executors.newSingleThreadExecutor(THREAD_FACTORY);
+
@GuardedBy("this")
- private Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> listeners = Collections.emptyList();
+ private Collection<Registration<?>> listeners = Collections.emptyList();
+
private volatile DOMRpcRoutingTable routingTable = DOMRpcRoutingTable.EMPTY;
- public static DOMRpcRouter newInstance(SchemaService schemaService) {
+ public static DOMRpcRouter newInstance(final SchemaService schemaService) {
final DOMRpcRouter rpcRouter = new DOMRpcRouter();
schemaService.registerSchemaContextListener(rpcRouter);
return rpcRouter;
}
@Override
- public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(final T implementation, final DOMRpcIdentifier... rpcs) {
+ public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(
+ final T implementation, final DOMRpcIdentifier... rpcs) {
return registerRpcImplementation(implementation, ImmutableSet.copyOf(rpcs));
}
- private static Collection<DOMRpcIdentifier> notPresentRpcs(final DOMRpcRoutingTable table, final Collection<DOMRpcIdentifier> candidates) {
- return ImmutableSet.copyOf(Collections2.filter(candidates, new Predicate<DOMRpcIdentifier>() {
- @Override
- public boolean apply(final DOMRpcIdentifier input) {
- return !table.contains(input);
- }
- }));
- }
-
- private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation, final Set<DOMRpcIdentifier> rpcs) {
+ private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation,
+ final Set<DOMRpcIdentifier> rpcs) {
final DOMRpcRoutingTable oldTable = routingTable;
final DOMRpcRoutingTable newTable = oldTable.remove(implementation, rpcs);
-
- final Collection<DOMRpcIdentifier> removedRpcs = notPresentRpcs(newTable, rpcs);
routingTable = newTable;
- if(!removedRpcs.isEmpty()) {
- final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
- listenerNotifier.execute(new Runnable() {
- @Override
- public void run() {
- for (final ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
- // Need to ensure removed listeners do not get notified
- synchronized (DOMRpcRouter.this) {
- if (listeners.contains(l)) {
- l.getInstance().onRpcUnavailable(removedRpcs);
- }
- }
- }
- }
- });
- }
+
+ listenerNotifier.execute(() -> notifyRemoved(newTable, implementation));
}
@Override
- public synchronized <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(final T implementation, final Set<DOMRpcIdentifier> rpcs) {
+ public synchronized <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(
+ final T implementation, final Set<DOMRpcIdentifier> rpcs) {
final DOMRpcRoutingTable oldTable = routingTable;
final DOMRpcRoutingTable newTable = oldTable.add(implementation, rpcs);
-
- final Collection<DOMRpcIdentifier> addedRpcs = notPresentRpcs(oldTable, rpcs);
routingTable = newTable;
- if(!addedRpcs.isEmpty()) {
- final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
- listenerNotifier.execute(new Runnable() {
- @Override
- public void run() {
- for (final ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
- // Need to ensure removed listeners do not get notified
- synchronized (DOMRpcRouter.this) {
- if (listeners.contains(l)) {
- l.getInstance().onRpcAvailable(addedRpcs);
- }
- }
- }
- }
- });
- }
+ listenerNotifier.execute(() -> notifyAdded(newTable, implementation));
return new AbstractDOMRpcImplementationRegistration<T>(implementation) {
@Override
}
@Override
- public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type, final NormalizedNode<?, ?> input) {
+ public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type,
+ final NormalizedNode<?, ?> input) {
return routingTable.invokeRpc(type, input);
}
private synchronized void removeListener(final ListenerRegistration<? extends DOMRpcAvailabilityListener> reg) {
- listeners = ImmutableList.copyOf(Collections2.filter(listeners, new Predicate<Object>() {
- @Override
- public boolean apply(final Object input) {
- return !reg.equals(input);
- }
- }));
+ listeners = ImmutableList.copyOf(Collections2.filter(listeners, i -> !reg.equals(i)));
}
- @Override
- public synchronized <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(final T listener) {
- final ListenerRegistration<T> ret = new AbstractListenerRegistration<T>(listener) {
- @Override
- protected void removeRegistration() {
- removeListener(this);
- }
- };
+ private synchronized void notifyAdded(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
+ for (Registration<?> l : listeners) {
+ l.addRpc(newTable, impl);
+ }
+ }
+
+ private synchronized void notifyRemoved(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
+ for (Registration<?> l : listeners) {
+ l.removeRpc(newTable, impl);
+ }
+ }
- final Builder<ListenerRegistration<? extends DOMRpcAvailabilityListener>> b = ImmutableList.builder();
+ @Override
+ public synchronized <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(
+ final T listener) {
+ final Registration<T> ret = new Registration<>(this, listener);
+ final Builder<Registration<?>> b = ImmutableList.builder();
b.addAll(listeners);
b.add(ret);
listeners = b.build();
- final Map<SchemaPath, Set<YangInstanceIdentifier>> capturedRpcs = routingTable.getRpcs();
-
- listenerNotifier.execute(new Runnable() {
- @Override
- public void run() {
- for (final Entry<SchemaPath, Set<YangInstanceIdentifier>> e : capturedRpcs.entrySet()) {
- listener.onRpcAvailable(Collections2.transform(e.getValue(), new Function<YangInstanceIdentifier, DOMRpcIdentifier>() {
- @Override
- public DOMRpcIdentifier apply(final YangInstanceIdentifier input) {
- return DOMRpcIdentifier.create(e.getKey(), input);
- }
- }));
- }
- }
- });
+ listenerNotifier.execute(() -> ret.initialTable(routingTable));
return ret;
}
listenerNotifier.shutdown();
}
+ private static final class Registration<T extends DOMRpcAvailabilityListener>
+ extends AbstractListenerRegistration<T> {
+
+ private final DOMRpcRouter router;
+
+ private Map<SchemaPath, Set<YangInstanceIdentifier>> prevRpcs;
+
+ Registration(final DOMRpcRouter router, final T listener) {
+ super(listener);
+ this.router = router;
+ }
+
+ @Override
+ protected void removeRegistration() {
+ router.removeListener(this);
+ }
+
+ void initialTable(final DOMRpcRoutingTable newTable) {
+ final T l = getInstance();
+ if (l == null) {
+ return;
+ }
+
+ final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs = newTable.getRpcs(l);
+ final Collection<DOMRpcIdentifier> added = new ArrayList<>();
+ for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : rpcs.entrySet()) {
+ added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
+ }
+ prevRpcs = rpcs;
+ if (!added.isEmpty()) {
+ l.onRpcAvailable(added);
+ }
+ }
+
+ void addRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
+ final T l = getInstance();
+ if (l == null || !l.acceptsImplementation(impl)) {
+ return;
+ }
+
+ final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs = newTable.getRpcs(l);
+ final MapDifference<SchemaPath, Set<YangInstanceIdentifier>> diff = Maps.difference(prevRpcs, rpcs);
+
+ final Collection<DOMRpcIdentifier> added = new ArrayList<>();
+ for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : diff.entriesOnlyOnRight().entrySet()) {
+ added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
+ }
+ for (Entry<SchemaPath, ValueDifference<Set<YangInstanceIdentifier>>> e : diff.entriesDiffering().entrySet()) {
+ for (YangInstanceIdentifier i : Sets.difference(e.getValue().rightValue(), e.getValue().leftValue())) {
+ added.add(DOMRpcIdentifier.create(e.getKey(), i));
+ }
+ }
+
+ prevRpcs = rpcs;
+ if (!added.isEmpty()) {
+ l.onRpcAvailable(added);
+ }
+ }
+
+ void removeRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
+ final T l = getInstance();
+ if (l == null || !l.acceptsImplementation(impl)) {
+ return;
+ }
+
+ final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs = newTable.getRpcs(l);
+ final MapDifference<SchemaPath, Set<YangInstanceIdentifier>> diff = Maps.difference(prevRpcs, rpcs);
+
+ final Collection<DOMRpcIdentifier> removed = new ArrayList<>();
+ for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : diff.entriesOnlyOnLeft().entrySet()) {
+ removed.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
+ }
+ for (Entry<SchemaPath, ValueDifference<Set<YangInstanceIdentifier>>> e : diff.entriesDiffering().entrySet()) {
+ for (YangInstanceIdentifier i : Sets.difference(e.getValue().leftValue(), e.getValue().rightValue())) {
+ removed.add(DOMRpcIdentifier.create(e.getKey(), i));
+ }
+ }
+
+ prevRpcs = rpcs;
+ if (!removed.isEmpty()) {
+ l.onRpcUnavailable(removed);
+ }
+ }
+ }
}
*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Maps;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation;
import org.opendaylight.yangtools.yang.model.api.UnknownSchemaNode;
final class DOMRpcRoutingTable {
- private static final QName CONTEXT_REFERENCE = QName.create("urn:opendaylight:yang:extension:yang-ext", "2013-07-09", "context-reference");
-
- static final DOMRpcRoutingTable EMPTY = new DOMRpcRoutingTable();
- private static final Function<AbstractDOMRpcRoutingTableEntry, Set<YangInstanceIdentifier>> EXTRACT_IDENTIFIERS =
- new Function<AbstractDOMRpcRoutingTableEntry, Set<YangInstanceIdentifier>>() {
- @Override
- public Set<YangInstanceIdentifier> apply(final AbstractDOMRpcRoutingTableEntry input) {
- return input.registeredIdentifiers();
- }
- };
+ private static final QName CONTEXT_REFERENCE = QName.create("urn:opendaylight:yang:extension:yang-ext",
+ "2013-07-09", "context-reference").intern();
+
+ static final DOMRpcRoutingTable EMPTY = new DOMRpcRoutingTable(ImmutableMap.of(), null);
+
private final Map<SchemaPath, AbstractDOMRpcRoutingTableEntry> rpcs;
private final SchemaContext schemaContext;
- private DOMRpcRoutingTable() {
- rpcs = Collections.emptyMap();
- schemaContext = null;
- }
-
private DOMRpcRoutingTable(final Map<SchemaPath, AbstractDOMRpcRoutingTableEntry> rpcs, final SchemaContext schemaContext) {
this.rpcs = Preconditions.checkNotNull(rpcs);
this.schemaContext = schemaContext;
}
- private static ListMultimap<SchemaPath, YangInstanceIdentifier> decomposeIdentifiers(final Set<DOMRpcIdentifier> rpcs) {
+ static ListMultimap<SchemaPath, YangInstanceIdentifier> decomposeIdentifiers(final Set<DOMRpcIdentifier> rpcs) {
final ListMultimap<SchemaPath, YangInstanceIdentifier> ret = LinkedListMultimap.create();
for (DOMRpcIdentifier i : rpcs) {
ret.put(i.getType(), i.getContextReference());
return contexts != null && contexts.containsContext(input.getContextReference());
}
- Map<SchemaPath, Set<YangInstanceIdentifier>> getRpcs() {
- return Maps.transformValues(rpcs, EXTRACT_IDENTIFIERS);
+ Map<SchemaPath, Set<YangInstanceIdentifier>> getRpcs(final DOMRpcAvailabilityListener l) {
+ final Map<SchemaPath, Set<YangInstanceIdentifier>> ret = new HashMap<>(rpcs.size());
+ for (Entry<SchemaPath, AbstractDOMRpcRoutingTableEntry> e : rpcs.entrySet()) {
+ final Set<YangInstanceIdentifier> ids = e.getValue().registeredIdentifiers(l);
+ if (!ids.isEmpty()) {
+ ret.put(e.getKey(), ids);
+ }
+ }
+
+ return ret;
}
private static RpcDefinition findRpcDefinition(final SchemaContext context, final SchemaPath schemaPath) {
return null;
}
- private static AbstractDOMRpcRoutingTableEntry createRpcEntry(final SchemaContext context, final SchemaPath key, final Map<YangInstanceIdentifier, List<DOMRpcImplementation>> implementations) {
+ private static AbstractDOMRpcRoutingTableEntry createRpcEntry(final SchemaContext context, final SchemaPath key,
+ final Map<YangInstanceIdentifier, List<DOMRpcImplementation>> implementations) {
final RpcDefinition rpcDef = findRpcDefinition(context, key);
- if (rpcDef != null) {
- final ContainerSchemaNode input = rpcDef.getInput();
- if (input != null) {
- for (DataSchemaNode c : input.getChildNodes()) {
- for (UnknownSchemaNode extension : c.getUnknownSchemaNodes()) {
- if (CONTEXT_REFERENCE.equals(extension.getNodeType())) {
- final YangInstanceIdentifier keyId = YangInstanceIdentifier.of(c.getQName());
- return new RoutedDOMRpcRoutingTableEntry(rpcDef, keyId, implementations);
- }
+ if (rpcDef == null) {
+ return new UnknownDOMRpcRoutingTableEntry(key, implementations);
+ }
+
+ final ContainerSchemaNode input = rpcDef.getInput();
+ if (input != null) {
+ for (DataSchemaNode c : input.getChildNodes()) {
+ for (UnknownSchemaNode extension : c.getUnknownSchemaNodes()) {
+ if (CONTEXT_REFERENCE.equals(extension.getNodeType())) {
+ final YangInstanceIdentifier keyId = YangInstanceIdentifier.of(c.getQName());
+ return new RoutedDOMRpcRoutingTableEntry(rpcDef, keyId, implementations);
}
}
}
-
- return new GlobalDOMRpcRoutingTableEntry(rpcDef, implementations);
- } else {
- return new UnknownDOMRpcRoutingTableEntry(key, implementations);
}
+
+ return new GlobalDOMRpcRoutingTableEntry(rpcDef, implementations);
}
CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type, final NormalizedNode<?, ?> input) {
final AbstractDOMRpcRoutingTableEntry entry = rpcs.get(type);
if (entry == null) {
- return Futures.<DOMRpcResult, DOMRpcException>immediateFailedCheckedFuture(new DOMRpcImplementationNotAvailableException("No implementation of RPC %s available", type));
+ return Futures.<DOMRpcResult, DOMRpcException>immediateFailedCheckedFuture(
+ new DOMRpcImplementationNotAvailableException("No implementation of RPC %s available", type));
}
return entry.invokeRpc(input);
return new DOMRpcRoutingTable(b.build(), context);
}
-
}