import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.RpcImplementationUnavailableException;
import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.ListenerRegistry;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.ListenableFuture;
-
+/**
+ * RPC broker responsible for routing requests to remote systems.
+ */
public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class);
private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
- private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
+ private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
private final String identifier;
}
@Override
- public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final InstanceIdentifier route, final CompositeNode input) {
- checkState(defaultDelegate != null, "No implementation is available for rpc:%s path:%s", rpc, route);
- return defaultDelegate.invokeRpc(rpc, route, input);
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final YangInstanceIdentifier route, final CompositeNode input) {
+ if (defaultDelegate == null) {
+ return Futures.immediateFailedCheckedFuture(new RpcImplementationUnavailableException("No RPC implementation found"));
+ }
+
+ LOG.debug("Forwarding RPC {} path {} to delegate {}", rpc, route);
+ return defaultDelegate.invokeRpc(rpc, route, input);
}
void remove(final GlobalRpcRegistration registration) {
implementations.remove(registration.getType(), registration);
}
- void notifyPathAnnouncement(final QName context, final QName identifier, final InstanceIdentifier path) {
+ void notifyPathAnnouncement(final QName context, final QName identifier, final YangInstanceIdentifier path) {
RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
- RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.announcementChange(contextWrapped , path);
- for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
+ RouteChange<RpcRoutingContext, YangInstanceIdentifier> change = RoutingUtils.announcementChange(contextWrapped , path);
+ for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>> routeListener : routeChangeListeners) {
try {
routeListener.getInstance().onRouteChange(change);
} catch (Exception e) {
}
- void notifyPathWithdrawal(final QName context,final QName identifier, final InstanceIdentifier path) {
+ void notifyPathWithdrawal(final QName context,final QName identifier, final YangInstanceIdentifier path) {
RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
- RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.removalChange(contextWrapped , path);
- for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
+ RouteChange<RpcRoutingContext, YangInstanceIdentifier> change = RoutingUtils.removalChange(contextWrapped , path);
+ for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>> routeListener : routeChangeListeners) {
try {
routeListener.getInstance().onRouteChange(change);
} catch (Exception e) {
}
@Override
- public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
+ public <L extends RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
final L listener) {
ListenerRegistration<L> reg = routeChangeListeners.registerWithType(listener);
- RouteChange<RpcRoutingContext, InstanceIdentifier> initial = createInitialRouteChange();
+ RouteChange<RpcRoutingContext, YangInstanceIdentifier> initial = createInitialRouteChange();
try {
listener.onRouteChange(initial);
} catch (Exception e) {
return reg;
}
- private RouteChange<RpcRoutingContext, InstanceIdentifier> createInitialRouteChange() {
+ private RouteChange<RpcRoutingContext, YangInstanceIdentifier> createInitialRouteChange() {
FluentIterable<RoutedRpcSelector> rpcSelectors = FluentIterable.from(implementations.values()).filter(RoutedRpcSelector.class);
- ImmutableMap.Builder<RpcRoutingContext, Set<InstanceIdentifier>> announcements = ImmutableMap.builder();
- ImmutableMap.Builder<RpcRoutingContext, Set<InstanceIdentifier>> removals = ImmutableMap.builder();
+ ImmutableMap.Builder<RpcRoutingContext, Set<YangInstanceIdentifier>> announcements = ImmutableMap.builder();
+ ImmutableMap.Builder<RpcRoutingContext, Set<YangInstanceIdentifier>> removals = ImmutableMap.builder();
for (RoutedRpcSelector routedRpcSelector : rpcSelectors) {
final RpcRoutingContext context = routedRpcSelector.getIdentifier();
- final Set<InstanceIdentifier> paths = ImmutableSet.copyOf(routedRpcSelector.implementations.keySet());
+ final Set<YangInstanceIdentifier> paths = ImmutableSet.copyOf(routedRpcSelector.implementations.keySet());
announcements.put(context, paths);
}
return RoutingUtils.change(announcements.build(), removals.build());