BUG-3128: Teach DOMRpcRouter about new concepts 65/50565/5
authorRobert Varga <rovarga@cisco.com>
Tue, 17 Jan 2017 15:52:49 +0000 (16:52 +0100)
committerRobert Varga <rovarga@cisco.com>
Tue, 24 Jan 2017 10:04:56 +0000 (11:04 +0100)
This is the implementation part of
https://git.opendaylight.org/gerrit/50487, teaching the
DOMRpcRouter to filter listener notifications and order
the implementations according to their cost.

Change-Id: Ib04d77786811d91bb22800eb5e64d3fd57ed659a
Signed-off-by: Robert Varga <rovarga@cisco.com>
dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/AbstractDOMRpcRoutingTableEntry.java
dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMRpcRouter.java
dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMRpcRoutingTable.java

index 021dd96b1177a66fcb80aac378fcd9fa99cf26dd..79d5315f4e6615f4ee676e0ca2474028f3791369 100644 (file)
@@ -7,15 +7,18 @@
  */
 package org.opendaylight.mdsal.dom.broker;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
+import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.CheckedFuture;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
 import org.opendaylight.mdsal.dom.api.DOMRpcException;
 import org.opendaylight.mdsal.dom.api.DOMRpcImplementation;
 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
@@ -27,17 +30,17 @@ abstract class AbstractDOMRpcRoutingTableEntry {
     private final Map<YangInstanceIdentifier, List<DOMRpcImplementation>> impls;
     private final SchemaPath schemaPath;
 
-    protected AbstractDOMRpcRoutingTableEntry(final SchemaPath schemaPath, final Map<YangInstanceIdentifier,
+    AbstractDOMRpcRoutingTableEntry(final SchemaPath schemaPath, final Map<YangInstanceIdentifier,
             List<DOMRpcImplementation>> impls) {
         this.schemaPath = Preconditions.checkNotNull(schemaPath);
         this.impls = Preconditions.checkNotNull(impls);
     }
 
-    protected final SchemaPath getSchemaPath() {
+    final SchemaPath getSchemaPath() {
         return schemaPath;
     }
 
-    protected final List<DOMRpcImplementation> getImplementations(final YangInstanceIdentifier context) {
+    final List<DOMRpcImplementation> getImplementations(final YangInstanceIdentifier context) {
         return impls.get(context);
     }
 
@@ -45,10 +48,15 @@ abstract class AbstractDOMRpcRoutingTableEntry {
         return impls;
     }
 
-    public boolean containsContext(final YangInstanceIdentifier contextReference) {
+    final boolean containsContext(final YangInstanceIdentifier contextReference) {
         return impls.containsKey(contextReference);
     }
 
+    final Set<YangInstanceIdentifier> registeredIdentifiers(final DOMRpcAvailabilityListener listener) {
+        return Maps.filterValues(impls, list -> list.stream().anyMatch(listener::acceptsImplementation)).keySet();
+    }
+
+    @VisibleForTesting
     final Set<YangInstanceIdentifier> registeredIdentifiers() {
         return impls.keySet();
     }
@@ -65,18 +73,22 @@ abstract class AbstractDOMRpcRoutingTableEntry {
         final Builder<YangInstanceIdentifier, List<DOMRpcImplementation>> vb = ImmutableMap.builder();
         for (final Entry<YangInstanceIdentifier, List<DOMRpcImplementation>> ve : impls.entrySet()) {
             if (newRpcs.remove(ve.getKey())) {
-                final ArrayList<DOMRpcImplementation> i = new ArrayList<>(ve.getValue().size() + 1);
+                final List<DOMRpcImplementation> i = new ArrayList<>(ve.getValue().size() + 1);
                 i.addAll(ve.getValue());
                 i.add(implementation);
+
+                // New implementation is at the end, this will move it to be the last among implementations
+                // with equal cost -- relying on sort() being stable.
+                i.sort((i1, i2) -> Long.compare(i1.invocationCost(), i2.invocationCost()));
                 vb.put(ve.getKey(), i);
             } else {
                 vb.put(ve);
             }
         }
         for (final YangInstanceIdentifier ii : newRpcs) {
-            final ArrayList<DOMRpcImplementation> impl = new ArrayList<>(1);
+            final List<DOMRpcImplementation> impl = new ArrayList<>(1);
             impl.add(implementation);
-            vb.put(ii,impl);
+            vb.put(ii, impl);
         }
 
         return newInstance(vb.build());
@@ -87,7 +99,7 @@ abstract class AbstractDOMRpcRoutingTableEntry {
         final Builder<YangInstanceIdentifier, List<DOMRpcImplementation>> vb = ImmutableMap.builder();
         for (final Entry<YangInstanceIdentifier, List<DOMRpcImplementation>> ve : impls.entrySet()) {
             if (removed.remove(ve.getKey())) {
-                final ArrayList<DOMRpcImplementation> i = new ArrayList<>(ve.getValue());
+                final List<DOMRpcImplementation> i = new ArrayList<>(ve.getValue());
                 i.remove(implementation);
                 // We could trimToSize(), but that may perform another copy just to get rid
                 // of a single element. That is probably not worth the trouble.
index 9563f22de8a2d02186421bf513f0da1a9930fb2a..d376e65ec575049c80e03da5e8242c7b170f3729 100644 (file)
@@ -11,8 +11,13 @@ import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableList.Builder;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.MapDifference.ValueDifference;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
@@ -42,37 +47,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcProviderService, SchemaContextListener {
     private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat(
             "DOMRpcRouter-listener-%s").setDaemon(true).build();
-    private final ExecutorService listenerNotifier = Executors.newSingleThreadExecutor(THREAD_FACTORY);
-    @GuardedBy("this")
-    private Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> listeners = Collections.emptyList();
-    private volatile DOMRpcRoutingTable routingTable = DOMRpcRoutingTable.EMPTY;
 
-    private static Collection<DOMRpcIdentifier> notPresentRpcs(final DOMRpcRoutingTable table,
-            final Collection<DOMRpcIdentifier> candidates) {
-        return ImmutableSet.copyOf(Collections2.filter(candidates, input -> !table.contains(input)));
-    }
+    private final ExecutorService listenerNotifier = Executors.newSingleThreadExecutor(THREAD_FACTORY);
 
-    private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation,
-            final Set<DOMRpcIdentifier> rpcs) {
-        final DOMRpcRoutingTable oldTable = routingTable;
-        final DOMRpcRoutingTable newTable = oldTable.remove(implementation, rpcs);
+    @GuardedBy("this")
+    private Collection<Registration<?>> listeners = Collections.emptyList();
 
-        final Collection<DOMRpcIdentifier> removedRpcs = notPresentRpcs(newTable, rpcs);
-        routingTable = newTable;
-        if (!removedRpcs.isEmpty()) {
-            final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
-            listenerNotifier.execute(() -> {
-                for (final ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
-                    // Need to ensure removed listeners do not get notified
-                    synchronized (DOMRpcRouter.this) {
-                        if (listeners.contains(l)) {
-                            l.getInstance().onRpcUnavailable(removedRpcs);
-                        }
-                    }
-                }
-            });
-        }
-    }
+    private volatile DOMRpcRoutingTable routingTable = DOMRpcRoutingTable.EMPTY;
 
     @Override
     public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(
@@ -85,23 +66,9 @@ public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcP
             registerRpcImplementation(final T implementation, final Set<DOMRpcIdentifier> rpcs) {
         final DOMRpcRoutingTable oldTable = routingTable;
         final DOMRpcRoutingTable newTable = oldTable.add(implementation, rpcs);
-
-        final Collection<DOMRpcIdentifier> addedRpcs = notPresentRpcs(oldTable, rpcs);
         routingTable = newTable;
 
-        if (!addedRpcs.isEmpty()) {
-            final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
-            listenerNotifier.execute(() -> {
-                for (final ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
-                    // Need to ensure removed listeners do not get notified
-                    synchronized (DOMRpcRouter.this) {
-                        if (listeners.contains(l)) {
-                            l.getInstance().onRpcAvailable(addedRpcs);
-                        }
-                    }
-                }
-            });
-        }
+        listenerNotifier.execute(() -> notifyAdded(newTable, implementation));
 
         return new AbstractDOMRpcImplementationRegistration<T>(implementation) {
             @Override
@@ -111,6 +78,15 @@ public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcP
         };
     }
 
+    private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation,
+            final Set<DOMRpcIdentifier> rpcs) {
+        final DOMRpcRoutingTable oldTable = routingTable;
+        final DOMRpcRoutingTable newTable = oldTable.remove(implementation, rpcs);
+        routingTable = newTable;
+
+        listenerNotifier.execute(() -> notifyRemoved(newTable, implementation));
+    }
+
     @Override
     public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type,
             final NormalizedNode<?, ?> input) {
@@ -121,29 +97,28 @@ public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcP
         listeners = ImmutableList.copyOf(Collections2.filter(listeners, input -> !reg.equals(input)));
     }
 
+    private synchronized void notifyAdded(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
+        for (Registration<?> l : listeners) {
+            l.addRpc(newTable, impl);
+        }
+    }
+
+    private synchronized void notifyRemoved(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
+        for (Registration<?> l : listeners) {
+            l.removeRpc(newTable, impl);
+        }
+    }
+
     @Override
     public synchronized <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(
             final T listener) {
-        final ListenerRegistration<T> ret = new AbstractListenerRegistration<T>(listener) {
-            @Override
-            protected void removeRegistration() {
-                removeListener(this);
-            }
-        };
-
-        final Builder<ListenerRegistration<? extends DOMRpcAvailabilityListener>> b = ImmutableList.builder();
+        final Registration<T> ret = new Registration<>(this, listener);
+        final Builder<Registration<?>> b = ImmutableList.builder();
         b.addAll(listeners);
         b.add(ret);
         listeners = b.build();
-        final Map<SchemaPath, Set<YangInstanceIdentifier>> capturedRpcs = routingTable.getRpcs();
-
-        listenerNotifier.execute(() -> {
-            for (final Entry<SchemaPath, Set<YangInstanceIdentifier>> e : capturedRpcs.entrySet()) {
-                listener.onRpcAvailable(Collections2.transform(e.getValue(),
-                    input -> DOMRpcIdentifier.create(e.getKey(), input)));
-            }
-        });
 
+        listenerNotifier.execute(() -> ret.initialTable(routingTable));
         return ret;
     }
 
@@ -159,4 +134,90 @@ public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcP
         listenerNotifier.shutdown();
     }
 
+    private static final class Registration<T extends DOMRpcAvailabilityListener>
+        extends AbstractListenerRegistration<T> {
+
+        private final DOMRpcRouter router;
+
+        private Map<SchemaPath, Set<YangInstanceIdentifier>> prevRpcs;
+
+        Registration(final DOMRpcRouter router, final T listener) {
+            super(listener);
+            this.router = router;
+        }
+
+        @Override
+        protected void removeRegistration() {
+            router.removeListener(this);
+        }
+
+        void initialTable(final DOMRpcRoutingTable newTable) {
+            final T l = getInstance();
+            if (l == null) {
+                return;
+            }
+
+            final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs = newTable.getRpcs(l);
+            final Collection<DOMRpcIdentifier> added = new ArrayList<>();
+            for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : rpcs.entrySet()) {
+                added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
+            }
+            prevRpcs = rpcs;
+            if (!added.isEmpty()) {
+                l.onRpcAvailable(added);
+            }
+        }
+
+        void addRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
+            final T l = getInstance();
+            if (l == null || !l.acceptsImplementation(impl)) {
+                return;
+            }
+
+            final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs = newTable.getRpcs(l);
+            final MapDifference<SchemaPath, Set<YangInstanceIdentifier>> diff = Maps.difference(prevRpcs, rpcs);
+
+            final Collection<DOMRpcIdentifier> added = new ArrayList<>();
+            for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : diff.entriesOnlyOnRight().entrySet()) {
+                added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
+            }
+            for (Entry<SchemaPath, ValueDifference<Set<YangInstanceIdentifier>>> e :
+                    diff.entriesDiffering().entrySet()) {
+                for (YangInstanceIdentifier i : Sets.difference(e.getValue().rightValue(), e.getValue().leftValue())) {
+                    added.add(DOMRpcIdentifier.create(e.getKey(), i));
+                }
+            }
+
+            prevRpcs = rpcs;
+            if (!added.isEmpty()) {
+                l.onRpcAvailable(added);
+            }
+        }
+
+        void removeRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
+            final T l = getInstance();
+            if (l == null || !l.acceptsImplementation(impl)) {
+                return;
+            }
+
+            final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs = newTable.getRpcs(l);
+            final MapDifference<SchemaPath, Set<YangInstanceIdentifier>> diff = Maps.difference(prevRpcs, rpcs);
+
+            final Collection<DOMRpcIdentifier> removed = new ArrayList<>();
+            for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : diff.entriesOnlyOnLeft().entrySet()) {
+                removed.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
+            }
+            for (Entry<SchemaPath, ValueDifference<Set<YangInstanceIdentifier>>> e :
+                    diff.entriesDiffering().entrySet()) {
+                for (YangInstanceIdentifier i : Sets.difference(e.getValue().leftValue(), e.getValue().rightValue())) {
+                    removed.add(DOMRpcIdentifier.create(e.getKey(), i));
+                }
+            }
+
+            prevRpcs = rpcs;
+            if (!removed.isEmpty()) {
+                l.onRpcUnavailable(removed);
+            }
+        }
+    }
 }
index e6138b7f52e98dd116c8d394bfbb572d983bdcde..be14139de1a4799c915549d4670ca1aa4285c2bd 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.mdsal.dom.broker;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -18,10 +19,12 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
 import org.opendaylight.mdsal.dom.api.DOMRpcException;
 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMRpcImplementation;
@@ -122,10 +125,23 @@ final class DOMRpcRoutingTable {
         return contexts != null && contexts.containsContext(input.getContextReference());
     }
 
+    @VisibleForTesting
     Map<SchemaPath, Set<YangInstanceIdentifier>> getRpcs() {
         return Maps.transformValues(rpcs, AbstractDOMRpcRoutingTableEntry::registeredIdentifiers);
     }
 
+    Map<SchemaPath, Set<YangInstanceIdentifier>> getRpcs(final DOMRpcAvailabilityListener listener) {
+        final Map<SchemaPath, Set<YangInstanceIdentifier>> ret = new HashMap<>(rpcs.size());
+        for (Entry<SchemaPath, AbstractDOMRpcRoutingTableEntry> e : rpcs.entrySet()) {
+            final Set<YangInstanceIdentifier> ids = e.getValue().registeredIdentifiers(listener);
+            if (!ids.isEmpty()) {
+                ret.put(e.getKey(), ids);
+            }
+        }
+
+        return ret;
+    }
+
     private static RpcDefinition findRpcDefinition(final SchemaContext context, final SchemaPath schemaPath) {
         if (context != null) {
             final QName qname = schemaPath.getPathFromRoot().iterator().next();
@@ -177,5 +193,4 @@ final class DOMRpcRoutingTable {
 
         return new DOMRpcRoutingTable(b.build(), context);
     }
-
 }