X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fdom%2Fbroker%2Fimpl%2FSchemaAwareRpcBroker.java;h=a2c43d0c73ba402f13bd7b604a9629b617637732;hp=22319abb17df7d1dec301105a59c50fad2cb1164;hb=c3acce135d19955f72616c4c956668bb539f80f2;hpb=c0c97bdca0c42607e8034bc91f51edf96d9e72a9 diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java index 22319abb17..a2c43d0c73 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java @@ -43,7 +43,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Optional; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ListenableFuture; public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, RoutedRpcDefaultImplementation { @@ -53,7 +56,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro "2013-07-09", "context-reference"); private final ListenerRegistry rpcRegistrationListeners = new ListenerRegistry<>(); private final ListenerRegistry> routeChangeListeners = new ListenerRegistry<>(); - + private final String identifier; private final ConcurrentMap implementations = new ConcurrentHashMap<>(); @@ -83,16 +86,16 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro this.schemaProvider = schemaProvider; } - public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() { - return defaultDelegate; - } + public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() { + return defaultDelegate; + } @Override - public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) { - this.defaultDelegate = defaultDelegate; - } + public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) { + this.defaultDelegate = defaultDelegate; + } - @Override + @Override public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) { checkArgument(rpcType != null, "RPC Type should not be null"); checkArgument(implementation != null, "RPC Implementatoin should not be null"); @@ -161,7 +164,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro } @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input) { + public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { return findRpcImplemention(rpc).invokeRpc(rpc, input); } @@ -171,8 +174,12 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro if (potentialImpl != null) { return potentialImpl; } + potentialImpl = defaultImplementation; - checkState(potentialImpl != null, "Implementation is not available."); + if( potentialImpl == null ) { + throw new UnsupportedOperationException( "No implementation for this operation is available." ); + } + return potentialImpl; } @@ -233,7 +240,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro } @Override - public RpcResult invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) { + public ListenableFuture> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) { checkState(defaultDelegate != null); return defaultDelegate.invokeRpc(rpc, identifier, input); } @@ -280,24 +287,26 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro } } - private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable { + private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable { private final RoutedRpcStrategy strategy; private final Set supportedRpcs; + private final RpcRoutingContext identifier; private RpcImplementation defaultDelegate; private final ConcurrentMap implementations = new ConcurrentHashMap<>(); - private SchemaAwareRpcBroker router; + private final SchemaAwareRpcBroker router; public RoutedRpcSelector(RoutedRpcStrategy strategy, SchemaAwareRpcBroker router) { super(); this.strategy = strategy; supportedRpcs = ImmutableSet.of(strategy.getIdentifier()); + identifier = RpcRoutingContext.create(strategy.context, strategy.getIdentifier()); this.router = router; } @Override - public QName getIdentifier() { - return strategy.getIdentifier(); + public RpcRoutingContext getIdentifier() { + return identifier; } @Override @@ -315,13 +324,14 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro } @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input) { + public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input")); checkArgument(inputContainer != null, "Rpc payload must contain input element"); SimpleNode routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf()); checkArgument(routeContainer != null, "Leaf %s must be set with value", strategy.getLeaf()); Object route = routeContainer.getValue(); - checkArgument(route instanceof InstanceIdentifier); + checkArgument(route instanceof InstanceIdentifier, + "The routed node %s is not an instance identifier", route); RpcImplementation potential = null; if (route != null) { RoutedRpcRegImpl potentialReg = implementations.get(route); @@ -382,7 +392,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro RoutedRpcRegistration { private final QName type; - private RoutedRpcSelector router; + private final RoutedRpcSelector router; public RoutedRpcRegImpl(QName rpcType, RpcImplementation implementation, RoutedRpcSelector routedRpcSelector) { super(implementation); @@ -424,13 +434,13 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro routeListener.getInstance().onRouteChange(change); } catch (Exception e) { LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e); - + } } - + } - + private void notifyPathWithdrawal(QName context,QName identifier, InstanceIdentifier path) { RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier); @@ -443,10 +453,31 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro } } } - + @Override public > ListenerRegistration registerRouteChangeListener( L listener) { - return routeChangeListeners.registerWithType(listener); + ListenerRegistration reg = routeChangeListeners.registerWithType(listener); + RouteChange initial = createInitialRouteChange(); + try { + listener.onRouteChange(initial); + } catch (Exception e) { + LOG.error("Unhandled exception during sending initial route change event {} to {}",initial,listener, e); + } + return reg; + } + + private RouteChange createInitialRouteChange() { + FluentIterable rpcSelectors = FluentIterable.from(implementations.values()).filter(RoutedRpcSelector.class); + + + ImmutableMap.Builder> announcements = ImmutableMap.builder(); + ImmutableMap.Builder> removals = ImmutableMap.builder(); + for (RoutedRpcSelector routedRpcSelector : rpcSelectors) { + final RpcRoutingContext context = routedRpcSelector.getIdentifier(); + final Set paths = ImmutableSet.copyOf(routedRpcSelector.implementations.keySet()); + announcements.put(context, paths); + } + return RoutingUtils.change(announcements.build(), removals.build()); } }