*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableList.Builder;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import javax.annotation.concurrent.GuardedBy;
+import java.util.WeakHashMap;
+import java.util.stream.Collectors;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.api.DefaultDOMRpcException;
import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMRpcImplementationRegistration;
+import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
+import org.opendaylight.mdsal.common.api.MappingCheckedFuture;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.util.concurrent.ExceptionMapper;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcProviderService, SchemaContextListener {
- private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("DOMRpcRouter-listener-%s").setDaemon(true).build();
- private final ExecutorService listenerNotifier = Executors.newSingleThreadExecutor(THREAD_FACTORY);
- @GuardedBy("this")
- private Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> listeners = Collections.emptyList();
- private volatile DOMRpcRoutingTable routingTable = DOMRpcRoutingTable.EMPTY;
-
- @Override
- public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(final T implementation, final DOMRpcIdentifier... rpcs) {
- return registerRpcImplementation(implementation, ImmutableSet.copyOf(rpcs));
+ private static final ExceptionMapper<org.opendaylight.mdsal.dom.api.DOMRpcException> MDSAL_DOM_RPC_EX_MAPPER =
+ new ExceptionMapper<org.opendaylight.mdsal.dom.api.DOMRpcException>(
+ "rpc", org.opendaylight.mdsal.dom.api.DOMRpcException.class) {
+ @Override
+ protected org.opendaylight.mdsal.dom.api.DOMRpcException newWithCause(String message, Throwable cause) {
+ return cause instanceof org.opendaylight.mdsal.dom.api.DOMRpcException
+ ? (org.opendaylight.mdsal.dom.api.DOMRpcException)cause
+ : new org.opendaylight.mdsal.dom.api.DefaultDOMRpcException("RPC failed", cause);
+ }
+ };
+
+ private static final ExceptionMapper<DOMRpcException> LEGACY_DOM_RPC_EX_MAPPER =
+ new ExceptionMapper<DOMRpcException>("rpc", DOMRpcException.class) {
+ @Override
+ protected DOMRpcException newWithCause(String message, Throwable cause) {
+ return cause instanceof DOMRpcException ? (DOMRpcException)cause
+ : cause instanceof org.opendaylight.mdsal.dom.api.DOMRpcImplementationNotAvailableException
+ ? new DOMRpcImplementationNotAvailableException(cause.getMessage(), cause.getCause())
+ : new DefaultDOMRpcException("RPC failed", cause);
+ }
+ };
+
+ // This mapping is used to translate mdsal DOMRpcImplementations to their corresponding legacy
+ // DOMRpcImplementations registered thru this interface when invoking a DOMRpcAvailabilityListener.
+ private final Map<org.opendaylight.mdsal.dom.api.DOMRpcImplementation, DOMRpcImplementation> implMapping =
+ Collections.synchronizedMap(new WeakHashMap<>());
+
+ private final org.opendaylight.mdsal.dom.api.DOMRpcService delegateRpcService;
+ private final org.opendaylight.mdsal.dom.api.DOMRpcProviderService delegateRpcProviderService;
+
+ @VisibleForTesting
+ public DOMRpcRouter() {
+ org.opendaylight.mdsal.dom.broker.DOMRpcRouter delegate = new org.opendaylight.mdsal.dom.broker.DOMRpcRouter();
+ this.delegateRpcService = delegate;
+ this.delegateRpcProviderService = delegate;
}
- private static Collection<DOMRpcIdentifier> notPresentRpcs(final DOMRpcRoutingTable table, final Collection<DOMRpcIdentifier> candidates) {
- return ImmutableSet.copyOf(Collections2.filter(candidates, new Predicate<DOMRpcIdentifier>() {
- @Override
- public boolean apply(final DOMRpcIdentifier input) {
- return !table.contains(input);
- }
- }));
+ public DOMRpcRouter(final org.opendaylight.mdsal.dom.api.DOMRpcService delegateRpcService,
+ final org.opendaylight.mdsal.dom.api.DOMRpcProviderService delegateRpcProviderService) {
+ this.delegateRpcService = delegateRpcService;
+ this.delegateRpcProviderService = delegateRpcProviderService;
}
- private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation, final Set<DOMRpcIdentifier> rpcs) {
- final DOMRpcRoutingTable oldTable = routingTable;
- final DOMRpcRoutingTable newTable = oldTable.remove(implementation, rpcs);
-
- final Collection<DOMRpcIdentifier> removedRpcs = notPresentRpcs(newTable, rpcs);
- final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
- routingTable = newTable;
-
- listenerNotifier.execute(new Runnable() {
- @Override
- public void run() {
- for (ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
- // Need to ensure removed listeners do not get notified
- synchronized (DOMRpcRouter.this) {
- if (listeners.contains(l)) {
- l.getInstance().onRpcUnavailable(removedRpcs);
- }
- }
- }
- }
- });
+ @Override
+ public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(
+ final T implementation, final DOMRpcIdentifier... rpcs) {
+ return registerRpcImplementation(implementation, ImmutableSet.copyOf(rpcs));
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Override
- public synchronized <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(final T implementation, final Set<DOMRpcIdentifier> rpcs) {
- final DOMRpcRoutingTable oldTable = routingTable;
- final DOMRpcRoutingTable newTable = oldTable.add(implementation, rpcs);
-
- final Collection<DOMRpcIdentifier> addedRpcs = notPresentRpcs(oldTable, rpcs);
- final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
- routingTable = newTable;
+ public synchronized <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(
+ final T implementation, final Set<DOMRpcIdentifier> rpcs) {
+ org.opendaylight.mdsal.dom.api.DOMRpcImplementation delegateImpl =
+ new org.opendaylight.mdsal.dom.api.DOMRpcImplementation() {
+ @Override
+ public CheckedFuture<org.opendaylight.mdsal.dom.api.DOMRpcResult,
+ org.opendaylight.mdsal.dom.api.DOMRpcException> invokeRpc(
+ org.opendaylight.mdsal.dom.api.DOMRpcIdentifier rpc, NormalizedNode<?, ?> input) {
+ final ListenableFuture future = implementation.invokeRpc(convert(rpc), input);
+ return MappingCheckedFuture.create(future, MDSAL_DOM_RPC_EX_MAPPER);
+ }
- listenerNotifier.execute(new Runnable() {
- @Override
- public void run() {
- for (ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
- // Need to ensure removed listeners do not get notified
- synchronized (DOMRpcRouter.this) {
- if (listeners.contains(l)) {
- l.getInstance().onRpcAvailable(addedRpcs);
- }
- }
+ @Override
+ public long invocationCost() {
+ return implementation.invocationCost();
}
- }
- });
+ };
+
+ implMapping.put(delegateImpl, implementation);
+
+ final org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration
+ <org.opendaylight.mdsal.dom.api.DOMRpcImplementation> reg = delegateRpcProviderService
+ .registerRpcImplementation(delegateImpl,
+ rpcs.stream().map(DOMRpcRouter::convert).collect(Collectors.toSet()));
return new AbstractDOMRpcImplementationRegistration<T>(implementation) {
@Override
protected void removeRegistration() {
- removeRpcImplementation(getInstance(), rpcs);
+ reg.close();
+ implMapping.remove(delegateImpl);
}
};
}
- @Override
- public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type, final NormalizedNode<?, ?> input) {
- return routingTable.invokeRpc(type, input);
+ private static org.opendaylight.mdsal.dom.api.DOMRpcIdentifier convert(DOMRpcIdentifier from) {
+ return org.opendaylight.mdsal.dom.api.DOMRpcIdentifier.create(from.getType(), from.getContextReference());
}
- private synchronized void removeListener(final ListenerRegistration<? extends DOMRpcAvailabilityListener> reg) {
- listeners = ImmutableList.copyOf(Collections2.filter(listeners, new Predicate<Object>() {
- @Override
- public boolean apply(final Object input) {
- return !reg.equals(input);
- }
- }));
+ private static DOMRpcIdentifier convert(org.opendaylight.mdsal.dom.api.DOMRpcIdentifier from) {
+ return DOMRpcIdentifier.create(from.getType(), from.getContextReference());
+ }
+
+ private static ListenableFuture<DOMRpcResult> convert(org.opendaylight.mdsal.dom.api.DOMRpcResult from) {
+ return from == null ? Futures.immediateFuture(null)
+ : from instanceof DOMRpcResult ? Futures.immediateFuture((DOMRpcResult)from)
+ : Futures.immediateFuture(new DefaultDOMRpcResult(from.getResult(), from.getErrors()));
+ }
+
+ private static Collection<DOMRpcIdentifier> convert(
+ Collection<org.opendaylight.mdsal.dom.api.DOMRpcIdentifier> from) {
+ return from.stream().map(DOMRpcRouter::convert).collect(Collectors.toList());
}
@Override
- public synchronized <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(final T listener) {
- final ListenerRegistration<T> ret = new AbstractListenerRegistration<T>(listener) {
- @Override
- protected void removeRegistration() {
- removeListener(this);
- }
- };
+ public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type,
+ final NormalizedNode<?, ?> input) {
+ final ListenableFuture<DOMRpcResult> future = Futures.transformAsync(delegateRpcService.invokeRpc(type, input),
+ DOMRpcRouter::convert, MoreExecutors.directExecutor());
+ return MappingCheckedFuture.create(future, LEGACY_DOM_RPC_EX_MAPPER);
+ }
- final Builder<ListenerRegistration<? extends DOMRpcAvailabilityListener>> b = ImmutableList.builder();
- b.addAll(listeners);
- b.add(ret);
- listeners = b.build();
- final Map<SchemaPath, Set<YangInstanceIdentifier>> capturedRpcs = routingTable.getRpcs();
+ @Override
+ public synchronized <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(
+ final T listener) {
+ final ListenerRegistration<org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener> reg =
+ delegateRpcService.registerRpcListener(new org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener() {
+ @Override
+ public void onRpcAvailable(Collection<org.opendaylight.mdsal.dom.api.DOMRpcIdentifier> rpcs) {
+ listener.onRpcAvailable(convert(rpcs));
+ }
- listenerNotifier.execute(new Runnable() {
- @Override
- public void run() {
- for (final Entry<SchemaPath, Set<YangInstanceIdentifier>> e : capturedRpcs.entrySet()) {
- listener.onRpcAvailable(Collections2.transform(e.getValue(), new Function<YangInstanceIdentifier, DOMRpcIdentifier>() {
- @Override
- public DOMRpcIdentifier apply(final YangInstanceIdentifier input) {
- return DOMRpcIdentifier.create(e.getKey(), input);
- }
- }));
+ @Override
+ public void onRpcUnavailable(Collection<org.opendaylight.mdsal.dom.api.DOMRpcIdentifier> rpcs) {
+ listener.onRpcUnavailable(convert(rpcs));
}
- }
- });
- return ret;
- }
+ @Override
+ public boolean acceptsImplementation(final org.opendaylight.mdsal.dom.api.DOMRpcImplementation impl) {
+ // If the DOMRpcImplementation wasn't registered thru this interface then the mapping won't be
+ // present - in this we can't call the listener so just assume acceptance which is the default
+ // behavior. This should be fine since a legacy listener would not be aware of implementation types
+ // registered via the new mdsal API.
+ final DOMRpcImplementation legacyImpl = implMapping.get(impl);
+ return legacyImpl != null ? listener.acceptsImplementation(legacyImpl) : true;
+ }
+ });
- @Override
- public synchronized void onGlobalContextUpdated(final SchemaContext context) {
- final DOMRpcRoutingTable oldTable = routingTable;
- final DOMRpcRoutingTable newTable = oldTable.setSchemaContext(context);
- routingTable = newTable;
+ return new AbstractListenerRegistration<T>(listener) {
+ @Override
+ protected void removeRegistration() {
+ reg.close();
+ }
+ };
}
@Override
public void close() {
- listenerNotifier.shutdown();
}
+ @Override
+ @VisibleForTesting
+ public void onGlobalContextUpdated(final SchemaContext context) {
+ if (delegateRpcService instanceof SchemaContextListener) {
+ ((SchemaContextListener)delegateRpcService).onGlobalContextUpdated(context);
+ }
+ }
}