X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fdom%2Fbroker%2Fimpl%2FSchemaAwareRpcBroker.java;h=a2c43d0c73ba402f13bd7b604a9629b617637732;hp=28d5ae914fc686a8cd41032755b77db529f9a85b;hb=c3acce135d19955f72616c4c956668bb539f80f2;hpb=94d0d20b41d64bb6696c2a573ec367fcfddc44e9 diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java index 28d5ae914f..a2c43d0c73 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java @@ -19,6 +19,7 @@ import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils; import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration; import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration; +import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation; import org.opendaylight.controller.sal.core.api.RpcImplementation; import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; import org.opendaylight.controller.sal.core.api.RpcRoutingContext; @@ -42,9 +43,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Optional; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ListenableFuture; -public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { +public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, RoutedRpcDefaultImplementation { private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class); @@ -52,12 +56,13 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { "2013-07-09", "context-reference"); private final ListenerRegistry rpcRegistrationListeners = new ListenerRegistry<>(); private final ListenerRegistry> routeChangeListeners = new ListenerRegistry<>(); - + private final String identifier; private final ConcurrentMap implementations = new ConcurrentHashMap<>(); private RpcImplementation defaultImplementation; private SchemaContextProvider schemaProvider; + private RoutedRpcDefaultImplementation defaultDelegate; public SchemaAwareRpcBroker(String identifier, SchemaContextProvider schemaProvider) { super(); @@ -81,6 +86,15 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { this.schemaProvider = schemaProvider; } + public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() { + return defaultDelegate; + } + + @Override + public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) { + this.defaultDelegate = defaultDelegate; + } + @Override public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) { checkArgument(rpcType != null, "RPC Type should not be null"); @@ -150,7 +164,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { } @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input) { + public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { return findRpcImplemention(rpc).invokeRpc(rpc, input); } @@ -160,8 +174,12 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { if (potentialImpl != null) { return potentialImpl; } + potentialImpl = defaultImplementation; - checkState(potentialImpl != null, "Implementation is not available."); + if( potentialImpl == null ) { + throw new UnsupportedOperationException( "No implementation for this operation is available." ); + } + return potentialImpl; } @@ -221,6 +239,12 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { return ret; } + @Override + public ListenableFuture> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) { + checkState(defaultDelegate != null); + return defaultDelegate.invokeRpc(rpc, identifier, input); + } + private static abstract class RoutingStrategy implements Identifiable { private final QName identifier; @@ -263,24 +287,26 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { } } - private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable { + private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable { private final RoutedRpcStrategy strategy; private final Set supportedRpcs; + private final RpcRoutingContext identifier; private RpcImplementation defaultDelegate; private final ConcurrentMap implementations = new ConcurrentHashMap<>(); - private SchemaAwareRpcBroker router; + private final SchemaAwareRpcBroker router; public RoutedRpcSelector(RoutedRpcStrategy strategy, SchemaAwareRpcBroker router) { super(); this.strategy = strategy; supportedRpcs = ImmutableSet.of(strategy.getIdentifier()); + identifier = RpcRoutingContext.create(strategy.context, strategy.getIdentifier()); this.router = router; } @Override - public QName getIdentifier() { - return strategy.getIdentifier(); + public RpcRoutingContext getIdentifier() { + return identifier; } @Override @@ -298,12 +324,14 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { } @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input) { + public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input")); checkArgument(inputContainer != null, "Rpc payload must contain input element"); SimpleNode routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf()); checkArgument(routeContainer != null, "Leaf %s must be set with value", strategy.getLeaf()); Object route = routeContainer.getValue(); + checkArgument(route instanceof InstanceIdentifier, + "The routed node %s is not an instance identifier", route); RpcImplementation potential = null; if (route != null) { RoutedRpcRegImpl potentialReg = implementations.get(route); @@ -312,7 +340,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { } } if (potential == null) { - potential = defaultDelegate; + return router.invokeRpc(rpc, (InstanceIdentifier) route, input); } checkState(potential != null, "No implementation is available for rpc:%s path:%s", rpc, route); return potential.invokeRpc(rpc, input); @@ -364,7 +392,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { RoutedRpcRegistration { private final QName type; - private RoutedRpcSelector router; + private final RoutedRpcSelector router; public RoutedRpcRegImpl(QName rpcType, RpcImplementation implementation, RoutedRpcSelector routedRpcSelector) { super(implementation); @@ -406,13 +434,13 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { routeListener.getInstance().onRouteChange(change); } catch (Exception e) { LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e); - + } } - + } - + private void notifyPathWithdrawal(QName context,QName identifier, InstanceIdentifier path) { RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier); @@ -425,10 +453,31 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { } } } - + @Override public > ListenerRegistration registerRouteChangeListener( L listener) { - return routeChangeListeners.registerWithType(listener); + ListenerRegistration reg = routeChangeListeners.registerWithType(listener); + RouteChange initial = createInitialRouteChange(); + try { + listener.onRouteChange(initial); + } catch (Exception e) { + LOG.error("Unhandled exception during sending initial route change event {} to {}",initial,listener, e); + } + return reg; + } + + private RouteChange createInitialRouteChange() { + FluentIterable rpcSelectors = FluentIterable.from(implementations.values()).filter(RoutedRpcSelector.class); + + + ImmutableMap.Builder> announcements = ImmutableMap.builder(); + ImmutableMap.Builder> removals = ImmutableMap.builder(); + for (RoutedRpcSelector routedRpcSelector : rpcSelectors) { + final RpcRoutingContext context = routedRpcSelector.getIdentifier(); + final Set paths = ImmutableSet.copyOf(routedRpcSelector.implementations.keySet()); + announcements.put(context, paths); + } + return RoutingUtils.change(announcements.build(), removals.build()); } }