*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableList.Builder;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.Collection;
+import com.google.common.util.concurrent.FluentFuture;
import java.util.Collections;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import javax.annotation.concurrent.GuardedBy;
+import java.util.WeakHashMap;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMRpcImplementationRegistration;
-import org.opendaylight.controller.sal.core.api.model.SchemaService;
+import org.opendaylight.controller.sal.core.compat.DOMRpcServiceAdapter;
+import org.opendaylight.controller.sal.core.compat.LegacyDOMRpcResultFutureAdapter;
+import org.opendaylight.controller.sal.core.compat.MdsalDOMRpcResultFutureAdapter;
+import org.opendaylight.controller.sal.core.compat.RpcAvailabilityListenerAdapter;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcProviderService, SchemaContextListener {
- private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("DOMRpcRouter-listener-%s").setDaemon(true).build();
- private final ExecutorService listenerNotifier = Executors.newSingleThreadExecutor(THREAD_FACTORY);
- @GuardedBy("this")
- private Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> listeners = Collections.emptyList();
- private volatile DOMRpcRoutingTable routingTable = DOMRpcRoutingTable.EMPTY;
-
- public static DOMRpcRouter newInstance(SchemaService schemaService) {
- final DOMRpcRouter rpcRouter = new DOMRpcRouter();
- schemaService.registerSchemaContextListener(rpcRouter);
- return rpcRouter;
+ // This mapping is used to translate mdsal DOMRpcImplementations to their corresponding legacy
+ // DOMRpcImplementations registered thru this interface when invoking a DOMRpcAvailabilityListener.
+ private final Map<org.opendaylight.mdsal.dom.api.DOMRpcImplementation, DOMRpcImplementation> implMapping =
+ Collections.synchronizedMap(new WeakHashMap<>());
+
+ private final org.opendaylight.mdsal.dom.api.DOMRpcService delegateRpcService;
+ private final org.opendaylight.mdsal.dom.api.DOMRpcProviderService delegateRpcProviderService;
+
+ // Note - this is only used for backward compatibility for UTs that use the empty constructor which creates
+ // a local mdsal DOMRpcRouter that needs to be updated with the SchemaContext. In production, the mdsal API
+ // services are passed via the constructor and are set up externally with the SchemaContext.
+ private final SchemaContextListener delegateSchemaContextListener;
+
+ @VisibleForTesting
+ public DOMRpcRouter() {
+ org.opendaylight.mdsal.dom.broker.DOMRpcRouter delegate = new org.opendaylight.mdsal.dom.broker.DOMRpcRouter();
+ this.delegateRpcService = delegate.getRpcService();
+ this.delegateRpcProviderService = delegate.getRpcProviderService();
+ this.delegateSchemaContextListener = delegate;
}
- @Override
- public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(final T implementation, final DOMRpcIdentifier... rpcs) {
- return registerRpcImplementation(implementation, ImmutableSet.copyOf(rpcs));
+ public DOMRpcRouter(final org.opendaylight.mdsal.dom.api.DOMRpcService delegateRpcService,
+ final org.opendaylight.mdsal.dom.api.DOMRpcProviderService delegateRpcProviderService) {
+ this.delegateRpcService = delegateRpcService;
+ this.delegateRpcProviderService = delegateRpcProviderService;
+ this.delegateSchemaContextListener = null;
}
- private static Collection<DOMRpcIdentifier> notPresentRpcs(final DOMRpcRoutingTable table, final Collection<DOMRpcIdentifier> candidates) {
- return ImmutableSet.copyOf(Collections2.filter(candidates, new Predicate<DOMRpcIdentifier>() {
- @Override
- public boolean apply(final DOMRpcIdentifier input) {
- return !table.contains(input);
- }
- }));
+ @Override
+ public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(
+ final T implementation, final DOMRpcIdentifier... rpcs) {
+ return registerRpcImplementation(implementation, ImmutableSet.copyOf(rpcs));
}
- private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation, final Set<DOMRpcIdentifier> rpcs) {
- final DOMRpcRoutingTable oldTable = routingTable;
- final DOMRpcRoutingTable newTable = oldTable.remove(implementation, rpcs);
-
- final Collection<DOMRpcIdentifier> removedRpcs = notPresentRpcs(newTable, rpcs);
- routingTable = newTable;
- if(!removedRpcs.isEmpty()) {
- final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
- listenerNotifier.execute(new Runnable() {
+ @Override
+ public synchronized <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(
+ final T implementation, final Set<DOMRpcIdentifier> rpcs) {
+ org.opendaylight.mdsal.dom.api.DOMRpcImplementation delegateImpl =
+ new org.opendaylight.mdsal.dom.api.DOMRpcImplementation() {
@Override
- public void run() {
- for (final ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
- // Need to ensure removed listeners do not get notified
- synchronized (DOMRpcRouter.this) {
- if (listeners.contains(l)) {
- l.getInstance().onRpcUnavailable(removedRpcs);
- }
- }
- }
+ public FluentFuture<org.opendaylight.mdsal.dom.api.DOMRpcResult> invokeRpc(
+ final org.opendaylight.mdsal.dom.api.DOMRpcIdentifier rpc, final NormalizedNode<?, ?> input) {
+ return new MdsalDOMRpcResultFutureAdapter(implementation.invokeRpc(DOMRpcIdentifier.fromMdsal(rpc),
+ input));
}
- });
- }
- }
- @Override
- public synchronized <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(final T implementation, final Set<DOMRpcIdentifier> rpcs) {
- final DOMRpcRoutingTable oldTable = routingTable;
- final DOMRpcRoutingTable newTable = oldTable.add(implementation, rpcs);
- final Collection<DOMRpcIdentifier> addedRpcs = notPresentRpcs(oldTable, rpcs);
- routingTable = newTable;
-
- if(!addedRpcs.isEmpty()) {
- final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
- listenerNotifier.execute(new Runnable() {
@Override
- public void run() {
- for (final ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
- // Need to ensure removed listeners do not get notified
- synchronized (DOMRpcRouter.this) {
- if (listeners.contains(l)) {
- l.getInstance().onRpcAvailable(addedRpcs);
- }
- }
- }
+ public long invocationCost() {
+ return implementation.invocationCost();
}
- });
- }
+ };
+
+ implMapping.put(delegateImpl, implementation);
+
+ final org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration
+ <org.opendaylight.mdsal.dom.api.DOMRpcImplementation> reg = delegateRpcProviderService
+ .registerRpcImplementation(delegateImpl, DOMRpcServiceAdapter.convert(rpcs));
return new AbstractDOMRpcImplementationRegistration<T>(implementation) {
@Override
protected void removeRegistration() {
- removeRpcImplementation(getInstance(), rpcs);
+ reg.close();
+ implMapping.remove(delegateImpl);
}
};
}
@Override
- public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type, final NormalizedNode<?, ?> input) {
- return routingTable.invokeRpc(type, input);
- }
-
- private synchronized void removeListener(final ListenerRegistration<? extends DOMRpcAvailabilityListener> reg) {
- listeners = ImmutableList.copyOf(Collections2.filter(listeners, new Predicate<Object>() {
- @Override
- public boolean apply(final Object input) {
- return !reg.equals(input);
- }
- }));
+ public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type,
+ final NormalizedNode<?, ?> input) {
+ final FluentFuture<org.opendaylight.mdsal.dom.api.DOMRpcResult> future =
+ delegateRpcService.invokeRpc(type, input);
+ return future instanceof MdsalDOMRpcResultFutureAdapter ? ((MdsalDOMRpcResultFutureAdapter)future).delegate()
+ : new LegacyDOMRpcResultFutureAdapter(future);
}
@Override
- public synchronized <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(final T listener) {
- final ListenerRegistration<T> ret = new AbstractListenerRegistration<T>(listener) {
+ public synchronized <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(
+ final T listener) {
+ final ListenerRegistration<org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener> reg =
+ delegateRpcService.registerRpcListener(new RpcAvailabilityListenerAdapter<T>(listener) {
+ @Override
+ public boolean acceptsImplementation(final org.opendaylight.mdsal.dom.api.DOMRpcImplementation impl) {
+ // If the DOMRpcImplementation wasn't registered thru this interface then the mapping won't be
+ // present - in this we can't call the listener so just assume acceptance which is the default
+ // behavior. This should be fine since a legacy listener would not be aware of implementation types
+ // registered via the new mdsal API.
+ final DOMRpcImplementation legacyImpl = implMapping.get(impl);
+ return legacyImpl != null ? delegate().acceptsImplementation(legacyImpl) : true;
+ }
+ });
+
+ return new AbstractListenerRegistration<T>(listener) {
@Override
protected void removeRegistration() {
- removeListener(this);
+ reg.close();
}
};
-
- final Builder<ListenerRegistration<? extends DOMRpcAvailabilityListener>> b = ImmutableList.builder();
- b.addAll(listeners);
- b.add(ret);
- listeners = b.build();
- final Map<SchemaPath, Set<YangInstanceIdentifier>> capturedRpcs = routingTable.getRpcs();
-
- listenerNotifier.execute(new Runnable() {
- @Override
- public void run() {
- for (final Entry<SchemaPath, Set<YangInstanceIdentifier>> e : capturedRpcs.entrySet()) {
- listener.onRpcAvailable(Collections2.transform(e.getValue(), new Function<YangInstanceIdentifier, DOMRpcIdentifier>() {
- @Override
- public DOMRpcIdentifier apply(final YangInstanceIdentifier input) {
- return DOMRpcIdentifier.create(e.getKey(), input);
- }
- }));
- }
- }
- });
-
- return ret;
}
@Override
- public synchronized void onGlobalContextUpdated(final SchemaContext context) {
- final DOMRpcRoutingTable oldTable = routingTable;
- final DOMRpcRoutingTable newTable = oldTable.setSchemaContext(context);
- routingTable = newTable;
+ public void close() {
}
@Override
- public void close() {
- listenerNotifier.shutdown();
+ @VisibleForTesting
+ public void onGlobalContextUpdated(final SchemaContext context) {
+ if (delegateSchemaContextListener != null) {
+ delegateSchemaContextListener.onGlobalContextUpdated(context);
+ }
}
-
}