*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.MapDifference.ValueDifference;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMRpcImplementationRegistration;
+import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcProviderService, SchemaContextListener {
- private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("DOMRpcRouter-listener-%s").setDaemon(true).build();
+ private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder()
+ .setNameFormat("DOMRpcRouter-listener-%s").setDaemon(true).build();
+
private final ExecutorService listenerNotifier = Executors.newSingleThreadExecutor(THREAD_FACTORY);
+
@GuardedBy("this")
- private Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> listeners = Collections.emptyList();
+ private Collection<Registration<?>> listeners = Collections.emptyList();
+
private volatile DOMRpcRoutingTable routingTable = DOMRpcRoutingTable.EMPTY;
- @Override
- public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(final T implementation, final DOMRpcIdentifier... rpcs) {
- return registerRpcImplementation(implementation, ImmutableSet.copyOf(rpcs));
+ public static DOMRpcRouter newInstance(final SchemaService schemaService) {
+ final DOMRpcRouter rpcRouter = new DOMRpcRouter();
+ schemaService.registerSchemaContextListener(rpcRouter);
+ return rpcRouter;
}
- private static Collection<DOMRpcIdentifier> notPresentRpcs(final DOMRpcRoutingTable table, final Collection<DOMRpcIdentifier> candidates) {
- return ImmutableSet.copyOf(Collections2.filter(candidates, new Predicate<DOMRpcIdentifier>() {
- @Override
- public boolean apply(final DOMRpcIdentifier input) {
- return !table.contains(input);
- }
- }));
+ @Override
+ public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(
+ final T implementation, final DOMRpcIdentifier... rpcs) {
+ return registerRpcImplementation(implementation, ImmutableSet.copyOf(rpcs));
}
- private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation, final Set<DOMRpcIdentifier> rpcs) {
+ private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation,
+ final Set<DOMRpcIdentifier> rpcs) {
final DOMRpcRoutingTable oldTable = routingTable;
final DOMRpcRoutingTable newTable = oldTable.remove(implementation, rpcs);
-
- final Collection<DOMRpcIdentifier> removedRpcs = notPresentRpcs(newTable, rpcs);
- final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
routingTable = newTable;
- listenerNotifier.execute(new Runnable() {
- @Override
- public void run() {
- for (ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
- // Need to ensure removed listeners do not get notified
- synchronized (DOMRpcRouter.this) {
- if (listeners.contains(l)) {
- l.getInstance().onRpcUnavailable(removedRpcs);
- }
- }
- }
- }
- });
+ listenerNotifier.execute(() -> notifyRemoved(newTable, implementation));
}
@Override
- public synchronized <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(final T implementation, final Set<DOMRpcIdentifier> rpcs) {
+ public synchronized <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(
+ final T implementation, final Set<DOMRpcIdentifier> rpcs) {
final DOMRpcRoutingTable oldTable = routingTable;
final DOMRpcRoutingTable newTable = oldTable.add(implementation, rpcs);
-
- final Collection<DOMRpcIdentifier> addedRpcs = notPresentRpcs(oldTable, rpcs);
- final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
routingTable = newTable;
- listenerNotifier.execute(new Runnable() {
- @Override
- public void run() {
- for (ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
- // Need to ensure removed listeners do not get notified
- synchronized (DOMRpcRouter.this) {
- if (listeners.contains(l)) {
- l.getInstance().onRpcAvailable(addedRpcs);
- }
- }
- }
- }
- });
+ listenerNotifier.execute(() -> notifyAdded(newTable, implementation));
return new AbstractDOMRpcImplementationRegistration<T>(implementation) {
@Override
}
@Override
- public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type, final NormalizedNode<?, ?> input) {
+ public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type,
+ final NormalizedNode<?, ?> input) {
return routingTable.invokeRpc(type, input);
}
private synchronized void removeListener(final ListenerRegistration<? extends DOMRpcAvailabilityListener> reg) {
- listeners = ImmutableList.copyOf(Collections2.filter(listeners, new Predicate<Object>() {
- @Override
- public boolean apply(final Object input) {
- return !reg.equals(input);
- }
- }));
+ listeners = ImmutableList.copyOf(Collections2.filter(listeners, i -> !reg.equals(i)));
}
- @Override
- public synchronized <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(final T listener) {
- final ListenerRegistration<T> ret = new AbstractListenerRegistration<T>(listener) {
- @Override
- protected void removeRegistration() {
- removeListener(this);
- }
- };
+ private synchronized void notifyAdded(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
+ for (Registration<?> l : listeners) {
+ l.addRpc(newTable, impl);
+ }
+ }
+
+ private synchronized void notifyRemoved(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
+ for (Registration<?> l : listeners) {
+ l.removeRpc(newTable, impl);
+ }
+ }
- final Builder<ListenerRegistration<? extends DOMRpcAvailabilityListener>> b = ImmutableList.builder();
+ @Override
+ public synchronized <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(
+ final T listener) {
+ final Registration<T> ret = new Registration<>(this, listener);
+ final Builder<Registration<?>> b = ImmutableList.builder();
b.addAll(listeners);
b.add(ret);
listeners = b.build();
- final Map<SchemaPath, Set<YangInstanceIdentifier>> capturedRpcs = routingTable.getRpcs();
-
- listenerNotifier.execute(new Runnable() {
- @Override
- public void run() {
- for (final Entry<SchemaPath, Set<YangInstanceIdentifier>> e : capturedRpcs.entrySet()) {
- listener.onRpcAvailable(Collections2.transform(e.getValue(), new Function<YangInstanceIdentifier, DOMRpcIdentifier>() {
- @Override
- public DOMRpcIdentifier apply(final YangInstanceIdentifier input) {
- return DOMRpcIdentifier.create(e.getKey(), input);
- }
- }));
- }
- }
- });
+ listenerNotifier.execute(() -> ret.initialTable(routingTable));
return ret;
}
listenerNotifier.shutdown();
}
+ private static final class Registration<T extends DOMRpcAvailabilityListener>
+ extends AbstractListenerRegistration<T> {
+
+ private final DOMRpcRouter router;
+
+ private Map<SchemaPath, Set<YangInstanceIdentifier>> prevRpcs;
+
+ Registration(final DOMRpcRouter router, final T listener) {
+ super(listener);
+ this.router = router;
+ }
+
+ @Override
+ protected void removeRegistration() {
+ router.removeListener(this);
+ }
+
+ void initialTable(final DOMRpcRoutingTable newTable) {
+ final T l = getInstance();
+ if (l == null) {
+ return;
+ }
+
+ final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs = newTable.getRpcs(l);
+ final Collection<DOMRpcIdentifier> added = new ArrayList<>();
+ for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : rpcs.entrySet()) {
+ added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
+ }
+ prevRpcs = rpcs;
+ if (!added.isEmpty()) {
+ l.onRpcAvailable(added);
+ }
+ }
+
+ void addRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
+ final T l = getInstance();
+ if (l == null || !l.acceptsImplementation(impl)) {
+ return;
+ }
+
+ final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs = newTable.getRpcs(l);
+ final MapDifference<SchemaPath, Set<YangInstanceIdentifier>> diff = Maps.difference(prevRpcs, rpcs);
+
+ final Collection<DOMRpcIdentifier> added = new ArrayList<>();
+ for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : diff.entriesOnlyOnRight().entrySet()) {
+ added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
+ }
+ for (Entry<SchemaPath, ValueDifference<Set<YangInstanceIdentifier>>> e : diff.entriesDiffering().entrySet()) {
+ for (YangInstanceIdentifier i : Sets.difference(e.getValue().rightValue(), e.getValue().leftValue())) {
+ added.add(DOMRpcIdentifier.create(e.getKey(), i));
+ }
+ }
+
+ prevRpcs = rpcs;
+ if (!added.isEmpty()) {
+ l.onRpcAvailable(added);
+ }
+ }
+
+ void removeRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
+ final T l = getInstance();
+ if (l == null || !l.acceptsImplementation(impl)) {
+ return;
+ }
+
+ final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs = newTable.getRpcs(l);
+ final MapDifference<SchemaPath, Set<YangInstanceIdentifier>> diff = Maps.difference(prevRpcs, rpcs);
+
+ final Collection<DOMRpcIdentifier> removed = new ArrayList<>();
+ for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : diff.entriesOnlyOnLeft().entrySet()) {
+ removed.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
+ }
+ for (Entry<SchemaPath, ValueDifference<Set<YangInstanceIdentifier>>> e : diff.entriesDiffering().entrySet()) {
+ for (YangInstanceIdentifier i : Sets.difference(e.getValue().leftValue(), e.getValue().rightValue())) {
+ removed.add(DOMRpcIdentifier.create(e.getKey(), i));
+ }
+ }
+
+ prevRpcs = rpcs;
+ if (!removed.isEmpty()) {
+ l.onRpcUnavailable(removed);
+ }
+ }
+ }
}