X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fbroker%2Fimpl%2FDOMRpcRouter.java;h=285173596208ffe301faf398bef0b8aa2993d73b;hp=d72f714a5f83d8b061697ce599c5c0d9748db7ad;hb=adf49155eced15c9f654d7bed7ee45cd95686e4f;hpb=e3998d55e33da9f6ecb69da75ecc71a047b6362b diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMRpcRouter.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMRpcRouter.java index d72f714a5f..2851735962 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMRpcRouter.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMRpcRouter.java @@ -7,14 +7,18 @@ */ package org.opendaylight.controller.md.sal.dom.broker.impl; -import com.google.common.base.Function; -import com.google.common.base.Predicate; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList.Builder; -import com.google.common.collect.ImmutableSet; +import com.google.common.collect.MapDifference; +import com.google.common.collect.MapDifference.ValueDifference; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -33,6 +37,7 @@ import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMRpcImplementationRegistration; +import org.opendaylight.controller.sal.core.api.model.SchemaService; import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -42,71 +47,39 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; import org.opendaylight.yangtools.yang.model.api.SchemaPath; public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcProviderService, SchemaContextListener { - private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("DOMRpcRouter-listener-%s").setDaemon(true).build(); + private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder() + .setNameFormat("DOMRpcRouter-listener-%s").setDaemon(true).build(); + private final ExecutorService listenerNotifier = Executors.newSingleThreadExecutor(THREAD_FACTORY); + @GuardedBy("this") - private Collection> listeners = Collections.emptyList(); - private volatile DOMRpcRoutingTable routingTable = DOMRpcRoutingTable.EMPTY; + private Collection> listeners = Collections.emptyList(); - @Override - public DOMRpcImplementationRegistration registerRpcImplementation(final T implementation, final DOMRpcIdentifier... rpcs) { - return registerRpcImplementation(implementation, ImmutableSet.copyOf(rpcs)); - } + private volatile DOMRpcRoutingTable routingTable = DOMRpcRoutingTable.EMPTY; - private static Collection notPresentRpcs(final DOMRpcRoutingTable table, final Collection candidates) { - return ImmutableSet.copyOf(Collections2.filter(candidates, new Predicate() { - @Override - public boolean apply(final DOMRpcIdentifier input) { - return !table.contains(input); - } - })); + public static DOMRpcRouter newInstance(final SchemaService schemaService) { + final DOMRpcRouter rpcRouter = new DOMRpcRouter(); + schemaService.registerSchemaContextListener(rpcRouter); + return rpcRouter; } - private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation, final Set rpcs) { + private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation, + final Set rpcs) { final DOMRpcRoutingTable oldTable = routingTable; final DOMRpcRoutingTable newTable = oldTable.remove(implementation, rpcs); - - final Collection removedRpcs = notPresentRpcs(newTable, rpcs); - final Collection> capturedListeners = listeners; routingTable = newTable; - listenerNotifier.execute(new Runnable() { - @Override - public void run() { - for (ListenerRegistration l : capturedListeners) { - // Need to ensure removed listeners do not get notified - synchronized (DOMRpcRouter.this) { - if (listeners.contains(l)) { - l.getInstance().onRpcUnavailable(removedRpcs); - } - } - } - } - }); + listenerNotifier.execute(() -> notifyRemoved(newTable, implementation)); } @Override - public synchronized DOMRpcImplementationRegistration registerRpcImplementation(final T implementation, final Set rpcs) { + public synchronized DOMRpcImplementationRegistration registerRpcImplementation( + final T implementation, final Set rpcs) { final DOMRpcRoutingTable oldTable = routingTable; final DOMRpcRoutingTable newTable = oldTable.add(implementation, rpcs); - - final Collection addedRpcs = notPresentRpcs(oldTable, rpcs); - final Collection> capturedListeners = listeners; routingTable = newTable; - listenerNotifier.execute(new Runnable() { - @Override - public void run() { - for (ListenerRegistration l : capturedListeners) { - // Need to ensure removed listeners do not get notified - synchronized (DOMRpcRouter.this) { - if (listeners.contains(l)) { - l.getInstance().onRpcAvailable(addedRpcs); - } - } - } - } - }); + listenerNotifier.execute(() -> notifyAdded(newTable, implementation)); return new AbstractDOMRpcImplementationRegistration(implementation) { @Override @@ -117,48 +90,37 @@ public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcP } @Override - public CheckedFuture invokeRpc(final SchemaPath type, final NormalizedNode input) { + public CheckedFuture invokeRpc(final SchemaPath type, + final NormalizedNode input) { return routingTable.invokeRpc(type, input); } private synchronized void removeListener(final ListenerRegistration reg) { - listeners = ImmutableList.copyOf(Collections2.filter(listeners, new Predicate() { - @Override - public boolean apply(final Object input) { - return !reg.equals(input); - } - })); + listeners = ImmutableList.copyOf(Collections2.filter(listeners, i -> !reg.equals(i))); } - @Override - public synchronized ListenerRegistration registerRpcListener(final T listener) { - final ListenerRegistration ret = new AbstractListenerRegistration(listener) { - @Override - protected void removeRegistration() { - removeListener(this); - } - }; + private synchronized void notifyAdded(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) { + for (Registration l : listeners) { + l.addRpc(newTable, impl); + } + } + + private synchronized void notifyRemoved(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) { + for (Registration l : listeners) { + l.removeRpc(newTable, impl); + } + } - final Builder> b = ImmutableList.builder(); + @Override + public synchronized ListenerRegistration registerRpcListener( + final T listener) { + final Registration ret = new Registration<>(this, listener, routingTable.getRpcs(listener)); + final Builder> b = ImmutableList.builder(); b.addAll(listeners); b.add(ret); listeners = b.build(); - final Map> capturedRpcs = routingTable.getRpcs(); - - listenerNotifier.execute(new Runnable() { - @Override - public void run() { - for (final Entry> e : capturedRpcs.entrySet()) { - listener.onRpcAvailable(Collections2.transform(e.getValue(), new Function() { - @Override - public DOMRpcIdentifier apply(final YangInstanceIdentifier input) { - return DOMRpcIdentifier.create(e.getKey(), input); - } - })); - } - } - }); + listenerNotifier.execute(ret::initialTable); return ret; } @@ -174,4 +136,85 @@ public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcP listenerNotifier.shutdown(); } + private static final class Registration + extends AbstractListenerRegistration { + + private final DOMRpcRouter router; + + private Map> prevRpcs; + + Registration(final DOMRpcRouter router, final T listener, + final Map> rpcs) { + super(Preconditions.checkNotNull(listener)); + this.router = Preconditions.checkNotNull(router); + this.prevRpcs = Preconditions.checkNotNull(rpcs); + } + + @Override + protected void removeRegistration() { + router.removeListener(this); + } + + void initialTable() { + final Collection added = new ArrayList<>(); + for (Entry> e : prevRpcs.entrySet()) { + added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i))); + } + + if (!added.isEmpty()) { + final T l = getInstance(); + l.onRpcAvailable(added); + } + } + + void addRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) { + final T l = getInstance(); + if (!l.acceptsImplementation(impl)) { + return; + } + + final Map> rpcs = Verify.verifyNotNull(newTable.getRpcs(l)); + final MapDifference> diff = Maps.difference(prevRpcs, rpcs); + + final Collection added = new ArrayList<>(); + for (Entry> e : diff.entriesOnlyOnRight().entrySet()) { + added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i))); + } + for (Entry>> e : diff.entriesDiffering().entrySet()) { + for (YangInstanceIdentifier i : Sets.difference(e.getValue().rightValue(), e.getValue().leftValue())) { + added.add(DOMRpcIdentifier.create(e.getKey(), i)); + } + } + + prevRpcs = rpcs; + if (!added.isEmpty()) { + l.onRpcAvailable(added); + } + } + + void removeRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) { + final T l = getInstance(); + if (!l.acceptsImplementation(impl)) { + return; + } + + final Map> rpcs = Verify.verifyNotNull(newTable.getRpcs(l)); + final MapDifference> diff = Maps.difference(prevRpcs, rpcs); + + final Collection removed = new ArrayList<>(); + for (Entry> e : diff.entriesOnlyOnLeft().entrySet()) { + removed.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i))); + } + for (Entry>> e : diff.entriesDiffering().entrySet()) { + for (YangInstanceIdentifier i : Sets.difference(e.getValue().leftValue(), e.getValue().rightValue())) { + removed.add(DOMRpcIdentifier.create(e.getKey(), i)); + } + } + + prevRpcs = rpcs; + if (!removed.isEmpty()) { + l.onRpcUnavailable(removed); + } + } + } }