From: Robert Varga Date: Mon, 16 Jan 2017 09:15:49 +0000 (+0100) Subject: BUG-3128: Update RPC router concepts X-Git-Tag: release/carbon~321 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=dc76c5f86830b541fe9c4f2a011e199486558779 BUG-3128: Update RPC router concepts Enrich DOMRpcImplementation with invocation cost, which is taken into account when deciding which implementation to invoke. This allows local RPCs to be prioritized over remote ones. Also add the ability to filter implementations when notifying availability listener. This allows remote RPC to filter its own registrations, preventing re-forwarding loops, where a remote implementation would be forwarded as a local one. Change-Id: Id1d78d5031904e19134c103e12b79d68cf0b98c3 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMRpcAvailabilityListener.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMRpcAvailabilityListener.java index 77d42a43c9..35c58feb16 100644 --- a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMRpcAvailabilityListener.java +++ b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMRpcAvailabilityListener.java @@ -29,4 +29,17 @@ public interface DOMRpcAvailabilityListener extends EventListener { * @param rpcs RPC types which became unavailable */ void onRpcUnavailable(@Nonnull Collection rpcs); + + /** + * Implementation filtering method. This method is useful for forwarding RPC implementations, + * which need to ensure they do not re-announce their own implementations. Without this method + * a forwarder which registers an implementation would be notified of its own implementation, + * potentially re-exporting it as local -- hence creating a forwarding loop. + * + * @param impl RPC implementation being registered + * @return False if the implementation should not be reported, defaults to true. + */ + default boolean acceptsImplementation(final DOMRpcImplementation impl) { + return true; + } } diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMRpcImplementation.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMRpcImplementation.java index c246c76270..448cbd4cc3 100644 --- a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMRpcImplementation.java +++ b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMRpcImplementation.java @@ -27,5 +27,15 @@ public interface DOMRpcImplementation { * or report a subclass of {@link DOMRpcException} reporting a transport * error. */ - @Nonnull CheckedFuture invokeRpc(@Nonnull DOMRpcIdentifier rpc, @Nullable NormalizedNode input); + @Nonnull CheckedFuture invokeRpc(@Nonnull DOMRpcIdentifier rpc, + @Nullable NormalizedNode input); + + /** + * Return the relative invocation cost of this implementation. Default implementation return 0. + * + * @return Non-negative cost of invoking this implementation. + */ + default long invocationCost() { + return 0; + } } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/AbstractDOMRpcRoutingTableEntry.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/AbstractDOMRpcRoutingTableEntry.java index 08c43845fc..0cce2f1d80 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/AbstractDOMRpcRoutingTableEntry.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/AbstractDOMRpcRoutingTableEntry.java @@ -10,12 +10,14 @@ package org.opendaylight.controller.md.sal.dom.broker.impl; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.CheckedFuture; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener; import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; @@ -27,16 +29,17 @@ abstract class AbstractDOMRpcRoutingTableEntry { private final Map> impls; private final SchemaPath schemaPath; - protected AbstractDOMRpcRoutingTableEntry(final SchemaPath schemaPath, final Map> impls) { + AbstractDOMRpcRoutingTableEntry(final SchemaPath schemaPath, + final Map> impls) { this.schemaPath = Preconditions.checkNotNull(schemaPath); this.impls = Preconditions.checkNotNull(impls); } - protected final SchemaPath getSchemaPath() { + final SchemaPath getSchemaPath() { return schemaPath; } - protected final List getImplementations(final YangInstanceIdentifier context) { + final List getImplementations(final YangInstanceIdentifier context) { return impls.get(context); } @@ -44,12 +47,12 @@ abstract class AbstractDOMRpcRoutingTableEntry { return impls; } - public boolean containsContext(final YangInstanceIdentifier contextReference) { + final boolean containsContext(final YangInstanceIdentifier contextReference) { return impls.containsKey(contextReference); } - final Set registeredIdentifiers() { - return impls.keySet(); + final Set registeredIdentifiers(final DOMRpcAvailabilityListener l) { + return Maps.filterValues(impls, list -> list.stream().anyMatch(l::acceptsImplementation)).keySet(); } /** @@ -58,32 +61,38 @@ abstract class AbstractDOMRpcRoutingTableEntry { * @param newRpcs List of new RPCs, must be mutable * @return */ - final AbstractDOMRpcRoutingTableEntry add(final DOMRpcImplementation implementation, final List newRpcs) { + final AbstractDOMRpcRoutingTableEntry add(final DOMRpcImplementation implementation, + final List newRpcs) { final Builder> vb = ImmutableMap.builder(); for (final Entry> ve : impls.entrySet()) { if (newRpcs.remove(ve.getKey())) { - final ArrayList i = new ArrayList<>(ve.getValue().size() + 1); + final List i = new ArrayList<>(ve.getValue().size() + 1); i.addAll(ve.getValue()); i.add(implementation); + + // New implementation is at the end, this will move it to be the last among implementations + // with equal cost -- relying on sort() being stable. + i.sort((a, b) -> Long.compare(a.invocationCost(), b.invocationCost())); vb.put(ve.getKey(), i); } else { vb.put(ve); } } for(final YangInstanceIdentifier ii : newRpcs) { - final ArrayList impl = new ArrayList<>(1); + final List impl = new ArrayList<>(1); impl.add(implementation); - vb.put(ii,impl); + vb.put(ii, impl); } return newInstance(vb.build()); } - final AbstractDOMRpcRoutingTableEntry remove(final DOMRpcImplementation implementation, final List removed) { + final AbstractDOMRpcRoutingTableEntry remove(final DOMRpcImplementation implementation, + final List removed) { final Builder> vb = ImmutableMap.builder(); for (final Entry> ve : impls.entrySet()) { if (removed.remove(ve.getKey())) { - final ArrayList i = new ArrayList<>(ve.getValue()); + final List i = new ArrayList<>(ve.getValue()); i.remove(implementation); // We could trimToSize(), but that may perform another copy just to get rid // of a single element. That is probably not worth the trouble. diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMRpcRouter.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMRpcRouter.java index 60e4db5ca6..daf0d065f8 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMRpcRouter.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMRpcRouter.java @@ -7,14 +7,17 @@ */ package org.opendaylight.controller.md.sal.dom.broker.impl; -import com.google.common.base.Function; -import com.google.common.base.Predicate; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList.Builder; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.MapDifference; +import com.google.common.collect.MapDifference.ValueDifference; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -43,80 +46,45 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; import org.opendaylight.yangtools.yang.model.api.SchemaPath; public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcProviderService, SchemaContextListener { - private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("DOMRpcRouter-listener-%s").setDaemon(true).build(); + private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder() + .setNameFormat("DOMRpcRouter-listener-%s").setDaemon(true).build(); + private final ExecutorService listenerNotifier = Executors.newSingleThreadExecutor(THREAD_FACTORY); + @GuardedBy("this") - private Collection> listeners = Collections.emptyList(); + private Collection> listeners = Collections.emptyList(); + private volatile DOMRpcRoutingTable routingTable = DOMRpcRoutingTable.EMPTY; - public static DOMRpcRouter newInstance(SchemaService schemaService) { + public static DOMRpcRouter newInstance(final SchemaService schemaService) { final DOMRpcRouter rpcRouter = new DOMRpcRouter(); schemaService.registerSchemaContextListener(rpcRouter); return rpcRouter; } @Override - public DOMRpcImplementationRegistration registerRpcImplementation(final T implementation, final DOMRpcIdentifier... rpcs) { + public DOMRpcImplementationRegistration registerRpcImplementation( + final T implementation, final DOMRpcIdentifier... rpcs) { return registerRpcImplementation(implementation, ImmutableSet.copyOf(rpcs)); } - private static Collection notPresentRpcs(final DOMRpcRoutingTable table, final Collection candidates) { - return ImmutableSet.copyOf(Collections2.filter(candidates, new Predicate() { - @Override - public boolean apply(final DOMRpcIdentifier input) { - return !table.contains(input); - } - })); - } - - private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation, final Set rpcs) { + private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation, + final Set rpcs) { final DOMRpcRoutingTable oldTable = routingTable; final DOMRpcRoutingTable newTable = oldTable.remove(implementation, rpcs); - - final Collection removedRpcs = notPresentRpcs(newTable, rpcs); routingTable = newTable; - if(!removedRpcs.isEmpty()) { - final Collection> capturedListeners = listeners; - listenerNotifier.execute(new Runnable() { - @Override - public void run() { - for (final ListenerRegistration l : capturedListeners) { - // Need to ensure removed listeners do not get notified - synchronized (DOMRpcRouter.this) { - if (listeners.contains(l)) { - l.getInstance().onRpcUnavailable(removedRpcs); - } - } - } - } - }); - } + + listenerNotifier.execute(() -> notifyRemoved(newTable, implementation)); } @Override - public synchronized DOMRpcImplementationRegistration registerRpcImplementation(final T implementation, final Set rpcs) { + public synchronized DOMRpcImplementationRegistration registerRpcImplementation( + final T implementation, final Set rpcs) { final DOMRpcRoutingTable oldTable = routingTable; final DOMRpcRoutingTable newTable = oldTable.add(implementation, rpcs); - - final Collection addedRpcs = notPresentRpcs(oldTable, rpcs); routingTable = newTable; - if(!addedRpcs.isEmpty()) { - final Collection> capturedListeners = listeners; - listenerNotifier.execute(new Runnable() { - @Override - public void run() { - for (final ListenerRegistration l : capturedListeners) { - // Need to ensure removed listeners do not get notified - synchronized (DOMRpcRouter.this) { - if (listeners.contains(l)) { - l.getInstance().onRpcAvailable(addedRpcs); - } - } - } - } - }); - } + listenerNotifier.execute(() -> notifyAdded(newTable, implementation)); return new AbstractDOMRpcImplementationRegistration(implementation) { @Override @@ -127,48 +95,37 @@ public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcP } @Override - public CheckedFuture invokeRpc(final SchemaPath type, final NormalizedNode input) { + public CheckedFuture invokeRpc(final SchemaPath type, + final NormalizedNode input) { return routingTable.invokeRpc(type, input); } private synchronized void removeListener(final ListenerRegistration reg) { - listeners = ImmutableList.copyOf(Collections2.filter(listeners, new Predicate() { - @Override - public boolean apply(final Object input) { - return !reg.equals(input); - } - })); + listeners = ImmutableList.copyOf(Collections2.filter(listeners, i -> !reg.equals(i))); } - @Override - public synchronized ListenerRegistration registerRpcListener(final T listener) { - final ListenerRegistration ret = new AbstractListenerRegistration(listener) { - @Override - protected void removeRegistration() { - removeListener(this); - } - }; + private synchronized void notifyAdded(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) { + for (Registration l : listeners) { + l.addRpc(newTable, impl); + } + } + + private synchronized void notifyRemoved(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) { + for (Registration l : listeners) { + l.removeRpc(newTable, impl); + } + } - final Builder> b = ImmutableList.builder(); + @Override + public synchronized ListenerRegistration registerRpcListener( + final T listener) { + final Registration ret = new Registration<>(this, listener); + final Builder> b = ImmutableList.builder(); b.addAll(listeners); b.add(ret); listeners = b.build(); - final Map> capturedRpcs = routingTable.getRpcs(); - - listenerNotifier.execute(new Runnable() { - @Override - public void run() { - for (final Entry> e : capturedRpcs.entrySet()) { - listener.onRpcAvailable(Collections2.transform(e.getValue(), new Function() { - @Override - public DOMRpcIdentifier apply(final YangInstanceIdentifier input) { - return DOMRpcIdentifier.create(e.getKey(), input); - } - })); - } - } - }); + listenerNotifier.execute(() -> ret.initialTable(routingTable)); return ret; } @@ -184,4 +141,88 @@ public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcP listenerNotifier.shutdown(); } + private static final class Registration + extends AbstractListenerRegistration { + + private final DOMRpcRouter router; + + private Map> prevRpcs; + + Registration(final DOMRpcRouter router, final T listener) { + super(listener); + this.router = router; + } + + @Override + protected void removeRegistration() { + router.removeListener(this); + } + + void initialTable(final DOMRpcRoutingTable newTable) { + final T l = getInstance(); + if (l == null) { + return; + } + + final Map> rpcs = newTable.getRpcs(l); + final Collection added = new ArrayList<>(); + for (Entry> e : rpcs.entrySet()) { + added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i))); + } + prevRpcs = rpcs; + if (!added.isEmpty()) { + l.onRpcAvailable(added); + } + } + + void addRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) { + final T l = getInstance(); + if (l == null || !l.acceptsImplementation(impl)) { + return; + } + + final Map> rpcs = newTable.getRpcs(l); + final MapDifference> diff = Maps.difference(prevRpcs, rpcs); + + final Collection added = new ArrayList<>(); + for (Entry> e : diff.entriesOnlyOnRight().entrySet()) { + added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i))); + } + for (Entry>> e : diff.entriesDiffering().entrySet()) { + for (YangInstanceIdentifier i : Sets.difference(e.getValue().rightValue(), e.getValue().leftValue())) { + added.add(DOMRpcIdentifier.create(e.getKey(), i)); + } + } + + prevRpcs = rpcs; + if (!added.isEmpty()) { + l.onRpcAvailable(added); + } + } + + void removeRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) { + final T l = getInstance(); + if (l == null || !l.acceptsImplementation(impl)) { + return; + } + + final Map> rpcs = newTable.getRpcs(l); + final MapDifference> diff = Maps.difference(prevRpcs, rpcs); + + final Collection removed = new ArrayList<>(); + for (Entry> e : diff.entriesOnlyOnLeft().entrySet()) { + removed.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i))); + } + for (Entry>> e : diff.entriesDiffering().entrySet()) { + for (YangInstanceIdentifier i : Sets.difference(e.getValue().leftValue(), e.getValue().rightValue())) { + removed.add(DOMRpcIdentifier.create(e.getKey(), i)); + } + } + + prevRpcs = rpcs; + if (!removed.isEmpty()) { + l.onRpcUnavailable(removed); + } + } + } } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMRpcRoutingTable.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMRpcRoutingTable.java index d570041630..22e3305495 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMRpcRoutingTable.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMRpcRoutingTable.java @@ -7,22 +7,22 @@ */ package org.opendaylight.controller.md.sal.dom.broker.impl; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.ListMultimap; -import com.google.common.collect.Maps; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener; import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation; @@ -40,30 +40,20 @@ import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.opendaylight.yangtools.yang.model.api.UnknownSchemaNode; final class DOMRpcRoutingTable { - private static final QName CONTEXT_REFERENCE = QName.create("urn:opendaylight:yang:extension:yang-ext", "2013-07-09", "context-reference"); - - static final DOMRpcRoutingTable EMPTY = new DOMRpcRoutingTable(); - private static final Function> EXTRACT_IDENTIFIERS = - new Function>() { - @Override - public Set apply(final AbstractDOMRpcRoutingTableEntry input) { - return input.registeredIdentifiers(); - } - }; + private static final QName CONTEXT_REFERENCE = QName.create("urn:opendaylight:yang:extension:yang-ext", + "2013-07-09", "context-reference").intern(); + + static final DOMRpcRoutingTable EMPTY = new DOMRpcRoutingTable(ImmutableMap.of(), null); + private final Map rpcs; private final SchemaContext schemaContext; - private DOMRpcRoutingTable() { - rpcs = Collections.emptyMap(); - schemaContext = null; - } - private DOMRpcRoutingTable(final Map rpcs, final SchemaContext schemaContext) { this.rpcs = Preconditions.checkNotNull(rpcs); this.schemaContext = schemaContext; } - private static ListMultimap decomposeIdentifiers(final Set rpcs) { + static ListMultimap decomposeIdentifiers(final Set rpcs) { final ListMultimap ret = LinkedListMultimap.create(); for (DOMRpcIdentifier i : rpcs) { ret.put(i.getType(), i.getContextReference()); @@ -136,8 +126,16 @@ final class DOMRpcRoutingTable { return contexts != null && contexts.containsContext(input.getContextReference()); } - Map> getRpcs() { - return Maps.transformValues(rpcs, EXTRACT_IDENTIFIERS); + Map> getRpcs(final DOMRpcAvailabilityListener l) { + final Map> ret = new HashMap<>(rpcs.size()); + for (Entry e : rpcs.entrySet()) { + final Set ids = e.getValue().registeredIdentifiers(l); + if (!ids.isEmpty()) { + ret.put(e.getKey(), ids); + } + } + + return ret; } private static RpcDefinition findRpcDefinition(final SchemaContext context, final SchemaPath schemaPath) { @@ -156,31 +154,33 @@ final class DOMRpcRoutingTable { return null; } - private static AbstractDOMRpcRoutingTableEntry createRpcEntry(final SchemaContext context, final SchemaPath key, final Map> implementations) { + private static AbstractDOMRpcRoutingTableEntry createRpcEntry(final SchemaContext context, final SchemaPath key, + final Map> implementations) { final RpcDefinition rpcDef = findRpcDefinition(context, key); - if (rpcDef != null) { - final ContainerSchemaNode input = rpcDef.getInput(); - if (input != null) { - for (DataSchemaNode c : input.getChildNodes()) { - for (UnknownSchemaNode extension : c.getUnknownSchemaNodes()) { - if (CONTEXT_REFERENCE.equals(extension.getNodeType())) { - final YangInstanceIdentifier keyId = YangInstanceIdentifier.of(c.getQName()); - return new RoutedDOMRpcRoutingTableEntry(rpcDef, keyId, implementations); - } + if (rpcDef == null) { + return new UnknownDOMRpcRoutingTableEntry(key, implementations); + } + + final ContainerSchemaNode input = rpcDef.getInput(); + if (input != null) { + for (DataSchemaNode c : input.getChildNodes()) { + for (UnknownSchemaNode extension : c.getUnknownSchemaNodes()) { + if (CONTEXT_REFERENCE.equals(extension.getNodeType())) { + final YangInstanceIdentifier keyId = YangInstanceIdentifier.of(c.getQName()); + return new RoutedDOMRpcRoutingTableEntry(rpcDef, keyId, implementations); } } } - - return new GlobalDOMRpcRoutingTableEntry(rpcDef, implementations); - } else { - return new UnknownDOMRpcRoutingTableEntry(key, implementations); } + + return new GlobalDOMRpcRoutingTableEntry(rpcDef, implementations); } CheckedFuture invokeRpc(final SchemaPath type, final NormalizedNode input) { final AbstractDOMRpcRoutingTableEntry entry = rpcs.get(type); if (entry == null) { - return Futures.immediateFailedCheckedFuture(new DOMRpcImplementationNotAvailableException("No implementation of RPC %s available", type)); + return Futures.immediateFailedCheckedFuture( + new DOMRpcImplementationNotAvailableException("No implementation of RPC %s available", type)); } return entry.invokeRpc(input); @@ -195,5 +195,4 @@ final class DOMRpcRoutingTable { return new DOMRpcRoutingTable(b.build(), context); } - }