import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListenableFuture;
-public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String> {
+public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class);
"2013-07-09", "context-reference");
private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
-
+
private final String identifier;
private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
private RpcImplementation defaultImplementation;
private SchemaContextProvider schemaProvider;
+ private RoutedRpcDefaultImplementation defaultDelegate;
public SchemaAwareRpcBroker(String identifier, SchemaContextProvider schemaProvider) {
super();
this.schemaProvider = schemaProvider;
}
+ public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
+ return defaultDelegate;
+ }
+
+ @Override
+ public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) {
+ this.defaultDelegate = defaultDelegate;
+ }
+
@Override
public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
checkArgument(rpcType != null, "RPC Type should not be null");
}
@Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
return findRpcImplemention(rpc).invokeRpc(rpc, input);
}
if (potentialImpl != null) {
return potentialImpl;
}
+
potentialImpl = defaultImplementation;
- checkState(potentialImpl != null, "Implementation is not available.");
+ if( potentialImpl == null ) {
+ throw new UnsupportedOperationException( "No implementation for this operation is available." );
+ }
+
return potentialImpl;
}
if (CONTEXT_REFERENCE.equals(extension.getNodeType())) {
return Optional.fromNullable(extension.getQName());
}
- ;
}
return Optional.absent();
}
return ret;
}
+ @Override
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
+ checkState(defaultDelegate != null);
+ return defaultDelegate.invokeRpc(rpc, identifier, input);
+ }
+
private static abstract class RoutingStrategy implements Identifiable<QName> {
private final QName identifier;
}
}
- private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<QName> {
+ private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<RpcRoutingContext> {
private final RoutedRpcStrategy strategy;
private final Set<QName> supportedRpcs;
+ private final RpcRoutingContext identifier;
private RpcImplementation defaultDelegate;
private final ConcurrentMap<InstanceIdentifier, RoutedRpcRegImpl> implementations = new ConcurrentHashMap<>();
- private SchemaAwareRpcBroker router;
+ private final SchemaAwareRpcBroker router;
public RoutedRpcSelector(RoutedRpcStrategy strategy, SchemaAwareRpcBroker router) {
super();
this.strategy = strategy;
supportedRpcs = ImmutableSet.of(strategy.getIdentifier());
+ identifier = RpcRoutingContext.create(strategy.context, strategy.getIdentifier());
this.router = router;
}
@Override
- public QName getIdentifier() {
- return strategy.getIdentifier();
+ public RpcRoutingContext getIdentifier() {
+ return identifier;
}
@Override
}
@Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input"));
checkArgument(inputContainer != null, "Rpc payload must contain input element");
SimpleNode<?> routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf());
checkArgument(routeContainer != null, "Leaf %s must be set with value", strategy.getLeaf());
Object route = routeContainer.getValue();
+ checkArgument(route instanceof InstanceIdentifier,
+ "The routed node %s is not an instance identifier", route);
RpcImplementation potential = null;
if (route != null) {
RoutedRpcRegImpl potentialReg = implementations.get(route);
}
}
if (potential == null) {
- potential = defaultDelegate;
+ return router.invokeRpc(rpc, (InstanceIdentifier) route, input);
}
checkState(potential != null, "No implementation is available for rpc:%s path:%s", rpc, route);
return potential.invokeRpc(rpc, input);
RoutedRpcRegistration {
private final QName type;
- private RoutedRpcSelector router;
+ private final RoutedRpcSelector router;
public RoutedRpcRegImpl(QName rpcType, RpcImplementation implementation, RoutedRpcSelector routedRpcSelector) {
super(implementation);
routeListener.getInstance().onRouteChange(change);
} catch (Exception e) {
LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
-
+
}
}
-
+
}
-
+
private void notifyPathWithdrawal(QName context,QName identifier, InstanceIdentifier path) {
RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
}
}
}
-
+
@Override
public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
L listener) {
- return routeChangeListeners.registerWithType(listener);
+ ListenerRegistration<L> reg = routeChangeListeners.registerWithType(listener);
+ RouteChange<RpcRoutingContext, InstanceIdentifier> initial = createInitialRouteChange();
+ try {
+ listener.onRouteChange(initial);
+ } catch (Exception e) {
+ LOG.error("Unhandled exception during sending initial route change event {} to {}",initial,listener, e);
+ }
+ return reg;
+ }
+
+ private RouteChange<RpcRoutingContext, InstanceIdentifier> createInitialRouteChange() {
+ FluentIterable<RoutedRpcSelector> rpcSelectors = FluentIterable.from(implementations.values()).filter(RoutedRpcSelector.class);
+
+
+ ImmutableMap.Builder<RpcRoutingContext, Set<InstanceIdentifier>> announcements = ImmutableMap.builder();
+ ImmutableMap.Builder<RpcRoutingContext, Set<InstanceIdentifier>> removals = ImmutableMap.builder();
+ for (RoutedRpcSelector routedRpcSelector : rpcSelectors) {
+ final RpcRoutingContext context = routedRpcSelector.getIdentifier();
+ final Set<InstanceIdentifier> paths = ImmutableSet.copyOf(routedRpcSelector.implementations.keySet());
+ announcements.put(context, paths);
+ }
+ return RoutingUtils.change(announcements.build(), removals.build());
}
}