BUG-3128: Update RPC router concepts
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMRpcRouter.java
index d72f714a5f83d8b061697ce599c5c0d9748db7ad..daf0d065f8b5435ed025465d4e3a5918d38c844a 100644 (file)
@@ -7,14 +7,17 @@
  */
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableList.Builder;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.MapDifference.ValueDifference;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
@@ -33,6 +36,7 @@ import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMRpcImplementationRegistration;
+import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -42,71 +46,45 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
 public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcProviderService, SchemaContextListener {
-    private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("DOMRpcRouter-listener-%s").setDaemon(true).build();
+    private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder()
+            .setNameFormat("DOMRpcRouter-listener-%s").setDaemon(true).build();
+
     private final ExecutorService listenerNotifier = Executors.newSingleThreadExecutor(THREAD_FACTORY);
+
     @GuardedBy("this")
-    private Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> listeners = Collections.emptyList();
+    private Collection<Registration<?>> listeners = Collections.emptyList();
+
     private volatile DOMRpcRoutingTable routingTable = DOMRpcRoutingTable.EMPTY;
 
-    @Override
-    public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(final T implementation, final DOMRpcIdentifier... rpcs) {
-        return registerRpcImplementation(implementation, ImmutableSet.copyOf(rpcs));
+    public static DOMRpcRouter newInstance(final SchemaService schemaService) {
+        final DOMRpcRouter rpcRouter = new DOMRpcRouter();
+        schemaService.registerSchemaContextListener(rpcRouter);
+        return rpcRouter;
     }
 
-    private static Collection<DOMRpcIdentifier> notPresentRpcs(final DOMRpcRoutingTable table, final Collection<DOMRpcIdentifier> candidates) {
-        return ImmutableSet.copyOf(Collections2.filter(candidates, new Predicate<DOMRpcIdentifier>() {
-            @Override
-            public boolean apply(final DOMRpcIdentifier input) {
-                return !table.contains(input);
-            }
-        }));
+    @Override
+    public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(
+            final T implementation, final DOMRpcIdentifier... rpcs) {
+        return registerRpcImplementation(implementation, ImmutableSet.copyOf(rpcs));
     }
 
-    private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation, final Set<DOMRpcIdentifier> rpcs) {
+    private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation,
+            final Set<DOMRpcIdentifier> rpcs) {
         final DOMRpcRoutingTable oldTable = routingTable;
         final DOMRpcRoutingTable newTable = oldTable.remove(implementation, rpcs);
-
-        final Collection<DOMRpcIdentifier> removedRpcs = notPresentRpcs(newTable, rpcs);
-        final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
         routingTable = newTable;
 
-        listenerNotifier.execute(new Runnable() {
-            @Override
-            public void run() {
-                for (ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
-                    // Need to ensure removed listeners do not get notified
-                    synchronized (DOMRpcRouter.this) {
-                        if (listeners.contains(l)) {
-                            l.getInstance().onRpcUnavailable(removedRpcs);
-                        }
-                    }
-                }
-            }
-        });
+        listenerNotifier.execute(() -> notifyRemoved(newTable, implementation));
     }
 
     @Override
-    public synchronized <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(final T implementation, final Set<DOMRpcIdentifier> rpcs) {
+    public synchronized <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(
+            final T implementation, final Set<DOMRpcIdentifier> rpcs) {
         final DOMRpcRoutingTable oldTable = routingTable;
         final DOMRpcRoutingTable newTable = oldTable.add(implementation, rpcs);
-
-        final Collection<DOMRpcIdentifier> addedRpcs = notPresentRpcs(oldTable, rpcs);
-        final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
         routingTable = newTable;
 
-        listenerNotifier.execute(new Runnable() {
-            @Override
-            public void run() {
-                for (ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
-                    // Need to ensure removed listeners do not get notified
-                    synchronized (DOMRpcRouter.this) {
-                        if (listeners.contains(l)) {
-                            l.getInstance().onRpcAvailable(addedRpcs);
-                        }
-                    }
-                }
-            }
-        });
+        listenerNotifier.execute(() -> notifyAdded(newTable, implementation));
 
         return new AbstractDOMRpcImplementationRegistration<T>(implementation) {
             @Override
@@ -117,48 +95,37 @@ public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcP
     }
 
     @Override
-    public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type, final NormalizedNode<?, ?> input) {
+    public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type,
+            final NormalizedNode<?, ?> input) {
         return routingTable.invokeRpc(type, input);
     }
 
     private synchronized void removeListener(final ListenerRegistration<? extends DOMRpcAvailabilityListener> reg) {
-        listeners = ImmutableList.copyOf(Collections2.filter(listeners, new Predicate<Object>() {
-            @Override
-            public boolean apply(final Object input) {
-                return !reg.equals(input);
-            }
-        }));
+        listeners = ImmutableList.copyOf(Collections2.filter(listeners, i -> !reg.equals(i)));
     }
 
-    @Override
-    public synchronized <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(final T listener) {
-        final ListenerRegistration<T> ret = new AbstractListenerRegistration<T>(listener) {
-            @Override
-            protected void removeRegistration() {
-                removeListener(this);
-            }
-        };
+    private synchronized void notifyAdded(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
+        for (Registration<?> l : listeners) {
+            l.addRpc(newTable, impl);
+        }
+    }
+
+    private synchronized void notifyRemoved(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
+        for (Registration<?> l : listeners) {
+            l.removeRpc(newTable, impl);
+        }
+    }
 
-        final Builder<ListenerRegistration<? extends DOMRpcAvailabilityListener>> b = ImmutableList.builder();
+    @Override
+    public synchronized <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(
+            final T listener) {
+        final Registration<T> ret = new Registration<>(this, listener);
+        final Builder<Registration<?>> b = ImmutableList.builder();
         b.addAll(listeners);
         b.add(ret);
         listeners = b.build();
-        final Map<SchemaPath, Set<YangInstanceIdentifier>> capturedRpcs = routingTable.getRpcs();
-
-        listenerNotifier.execute(new Runnable() {
-            @Override
-            public void run() {
-                for (final Entry<SchemaPath, Set<YangInstanceIdentifier>> e : capturedRpcs.entrySet()) {
-                    listener.onRpcAvailable(Collections2.transform(e.getValue(), new Function<YangInstanceIdentifier, DOMRpcIdentifier>() {
-                        @Override
-                        public DOMRpcIdentifier apply(final YangInstanceIdentifier input) {
-                            return DOMRpcIdentifier.create(e.getKey(), input);
-                        }
-                    }));
-                }
-            }
-        });
 
+        listenerNotifier.execute(() -> ret.initialTable(routingTable));
         return ret;
     }
 
@@ -174,4 +141,88 @@ public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcP
         listenerNotifier.shutdown();
     }
 
+    private static final class Registration<T extends DOMRpcAvailabilityListener>
+        extends AbstractListenerRegistration<T> {
+
+        private final DOMRpcRouter router;
+
+        private Map<SchemaPath, Set<YangInstanceIdentifier>> prevRpcs;
+
+        Registration(final DOMRpcRouter router, final T listener) {
+            super(listener);
+            this.router = router;
+        }
+
+        @Override
+        protected void removeRegistration() {
+            router.removeListener(this);
+        }
+
+        void initialTable(final DOMRpcRoutingTable newTable) {
+            final T l = getInstance();
+            if (l == null) {
+                return;
+            }
+
+            final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs = newTable.getRpcs(l);
+            final Collection<DOMRpcIdentifier> added = new ArrayList<>();
+            for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : rpcs.entrySet()) {
+                added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
+            }
+            prevRpcs = rpcs;
+            if (!added.isEmpty()) {
+                l.onRpcAvailable(added);
+            }
+        }
+
+        void addRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
+            final T l = getInstance();
+            if (l == null || !l.acceptsImplementation(impl)) {
+                return;
+            }
+
+            final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs = newTable.getRpcs(l);
+            final MapDifference<SchemaPath, Set<YangInstanceIdentifier>> diff = Maps.difference(prevRpcs, rpcs);
+
+            final Collection<DOMRpcIdentifier> added = new ArrayList<>();
+            for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : diff.entriesOnlyOnRight().entrySet()) {
+                added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
+            }
+            for (Entry<SchemaPath, ValueDifference<Set<YangInstanceIdentifier>>> e : diff.entriesDiffering().entrySet()) {
+                for (YangInstanceIdentifier i : Sets.difference(e.getValue().rightValue(), e.getValue().leftValue())) {
+                    added.add(DOMRpcIdentifier.create(e.getKey(), i));
+                }
+            }
+
+            prevRpcs = rpcs;
+            if (!added.isEmpty()) {
+                l.onRpcAvailable(added);
+            }
+        }
+
+        void removeRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
+            final T l = getInstance();
+            if (l == null || !l.acceptsImplementation(impl)) {
+                return;
+            }
+
+            final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs = newTable.getRpcs(l);
+            final MapDifference<SchemaPath, Set<YangInstanceIdentifier>> diff = Maps.difference(prevRpcs, rpcs);
+
+            final Collection<DOMRpcIdentifier> removed = new ArrayList<>();
+            for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : diff.entriesOnlyOnLeft().entrySet()) {
+                removed.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
+            }
+            for (Entry<SchemaPath, ValueDifference<Set<YangInstanceIdentifier>>> e : diff.entriesDiffering().entrySet()) {
+                for (YangInstanceIdentifier i : Sets.difference(e.getValue().leftValue(), e.getValue().rightValue())) {
+                    removed.add(DOMRpcIdentifier.create(e.getKey(), i));
+                }
+            }
+
+            prevRpcs = rpcs;
+            if (!removed.isEmpty()) {
+                l.onRpcUnavailable(removed);
+            }
+        }
+    }
 }