X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fdom%2Fbroker%2Fimpl%2FSchemaAwareRpcBroker.java;h=44e7abc3aa117fcb778d56f7b1423f5bd07c2649;hp=22319abb17df7d1dec301105a59c50fad2cb1164;hb=0eb621d29daaf08979c356e2148e99c48458e169;hpb=4eb724db3877173d502ba38c6d83bec780b38bb2 diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java index 22319abb17..44e7abc3aa 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java @@ -17,6 +17,7 @@ import java.util.concurrent.ConcurrentMap; import org.opendaylight.controller.md.sal.common.api.routing.RouteChange; import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener; import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils; +import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy; import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration; import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration; import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation; @@ -24,36 +25,33 @@ import org.opendaylight.controller.sal.core.api.RpcImplementation; import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; import org.opendaylight.controller.sal.core.api.RpcRoutingContext; import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter; -import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.concepts.util.ListenerRegistry; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.SimpleNode; -import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode; -import org.opendaylight.yangtools.yang.model.api.DataSchemaNode; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.model.api.Module; import org.opendaylight.yangtools.yang.model.api.RpcDefinition; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import org.opendaylight.yangtools.yang.model.api.UnknownSchemaNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ListenableFuture; public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, RoutedRpcDefaultImplementation { private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class); - private static final QName CONTEXT_REFERENCE = QName.create("urn:opendaylight:yang:extension:yang-ext", - "2013-07-09", "context-reference"); + private final ListenerRegistry rpcRegistrationListeners = new ListenerRegistry<>(); - private final ListenerRegistry> routeChangeListeners = new ListenerRegistry<>(); - + private final ListenerRegistry> routeChangeListeners = new ListenerRegistry<>(); + private final String identifier; private final ConcurrentMap implementations = new ConcurrentHashMap<>(); @@ -61,7 +59,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro private SchemaContextProvider schemaProvider; private RoutedRpcDefaultImplementation defaultDelegate; - public SchemaAwareRpcBroker(String identifier, SchemaContextProvider schemaProvider) { + public SchemaAwareRpcBroker(final String identifier, final SchemaContextProvider schemaProvider) { super(); this.identifier = identifier; this.schemaProvider = schemaProvider; @@ -71,7 +69,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro return defaultImplementation; } - public void setDefaultImplementation(RpcImplementation defaultImplementation) { + public void setDefaultImplementation(final RpcImplementation defaultImplementation) { this.defaultImplementation = defaultImplementation; } @@ -79,27 +77,27 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro return schemaProvider; } - public void setSchemaProvider(SchemaContextProvider schemaProvider) { + public void setSchemaProvider(final SchemaContextProvider schemaProvider) { this.schemaProvider = schemaProvider; } - public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() { - return defaultDelegate; - } + public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() { + return defaultDelegate; + } @Override - public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) { - this.defaultDelegate = defaultDelegate; - } + public void setRoutedRpcDefaultDelegate(final RoutedRpcDefaultImplementation defaultDelegate) { + this.defaultDelegate = defaultDelegate; + } - @Override - public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) { + @Override + public RoutedRpcRegistration addRoutedRpcImplementation(final QName rpcType, final RpcImplementation implementation) { checkArgument(rpcType != null, "RPC Type should not be null"); checkArgument(implementation != null, "RPC Implementatoin should not be null"); return getOrCreateRoutedRpcRouter(rpcType).addRoutedRpcImplementation(rpcType, implementation); } - private RoutedRpcSelector getOrCreateRoutedRpcRouter(QName rpcType) { + private RoutedRpcSelector getOrCreateRoutedRpcRouter(final QName rpcType) { RoutedRpcSelector potential = getRoutedRpcRouter(rpcType); if (potential != null) { return potential; @@ -110,15 +108,15 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro return potential; } RpcDefinition definition = findRpcDefinition(rpcType); - RoutingStrategy strategy = getRoutingStrategy(definition); - checkState(strategy instanceof RoutedRpcStrategy, "Rpc %s is not routed.", rpcType); - potential = new RoutedRpcSelector((RoutedRpcStrategy) strategy, this); + RpcRoutingStrategy strategy = RpcRoutingStrategy.from(definition); + checkState(strategy.isContextBasedRouted(), "Rpc %s is not routed.", rpcType); + potential = new RoutedRpcSelector( strategy, this); implementations.put(rpcType, potential); return potential; } } - private RoutedRpcSelector getRoutedRpcRouter(QName rpcType) { + private RoutedRpcSelector getRoutedRpcRouter(final QName rpcType) { RpcImplementation potential = implementations.get(rpcType); if (potential != null) { checkState(potential instanceof RoutedRpcSelector, "Rpc %s is not routed.", rpcType); @@ -129,25 +127,38 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro } @Override - public RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation) + public RpcRegistration addRpcImplementation(final QName rpcType, final RpcImplementation implementation) throws IllegalArgumentException { checkArgument(rpcType != null, "RPC Type should not be null"); checkArgument(implementation != null, "RPC Implementatoin should not be null"); checkState(!hasRpcImplementation(rpcType), "Implementation already registered"); RpcDefinition definition = findRpcDefinition(rpcType); - checkArgument(!isRoutedRpc(definition), "RPC Type must not be routed."); + checkArgument(!RpcRoutingStrategy.from(definition).isContextBasedRouted(), "RPC Type must not be content routed."); GlobalRpcRegistration reg = new GlobalRpcRegistration(rpcType, implementation, this); - implementations.putIfAbsent(rpcType, implementation); + final RpcImplementation previous = implementations.putIfAbsent(rpcType, implementation); + Preconditions.checkState(previous == null, "Rpc %s is already registered.",rpcType); + notifyRpcAdded(rpcType); return reg; } - private boolean isRoutedRpc(RpcDefinition definition) { - return getRoutingStrategy(definition) instanceof RoutedRpcStrategy; + private void notifyRpcAdded(final QName rpcType) { + for (ListenerRegistration listener : rpcRegistrationListeners) { + try { + listener.getInstance().onRpcImplementationAdded(rpcType); + } catch (Exception ex) { + LOG.error("Unhandled exception during invoking listener {}", listener.getInstance(), ex); + } + + } } @Override - public ListenerRegistration addRpcRegistrationListener(RpcRegistrationListener listener) { - return rpcRegistrationListeners.register(listener); + public ListenerRegistration addRpcRegistrationListener(final RpcRegistrationListener listener) { + ListenerRegistration reg = rpcRegistrationListeners.register(listener); + for (QName impl : implementations.keySet()) { + listener.onRpcImplementationAdded(impl); + } + return reg; } @Override @@ -161,26 +172,30 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro } @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input) { + public ListenableFuture> invokeRpc(final QName rpc, final CompositeNode input) { return findRpcImplemention(rpc).invokeRpc(rpc, input); } - private RpcImplementation findRpcImplemention(QName rpc) { + private RpcImplementation findRpcImplemention(final QName rpc) { checkArgument(rpc != null, "Rpc name should not be null"); RpcImplementation potentialImpl = implementations.get(rpc); if (potentialImpl != null) { return potentialImpl; } + potentialImpl = defaultImplementation; - checkState(potentialImpl != null, "Implementation is not available."); + if( potentialImpl == null ) { + throw new UnsupportedOperationException( "No implementation for this operation is available." ); + } + return potentialImpl; } - private boolean hasRpcImplementation(QName rpc) { + private boolean hasRpcImplementation(final QName rpc) { return implementations.containsKey(rpc); } - private RpcDefinition findRpcDefinition(QName rpcType) { + private RpcDefinition findRpcDefinition(final QName rpcType) { checkArgument(rpcType != null, "Rpc name must be supplied."); checkState(schemaProvider != null, "Schema Provider is not available."); SchemaContext ctx = schemaProvider.getSchemaContext(); @@ -190,7 +205,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro return findRpcDefinition(rpcType, module.getRpcs()); } - static private RpcDefinition findRpcDefinition(QName rpcType, Set rpcs) { + static private RpcDefinition findRpcDefinition(final QName rpcType, final Set rpcs) { checkState(rpcs != null, "Rpc schema is not available."); for (RpcDefinition rpc : rpcs) { if (rpcType.equals(rpc.getQName())) { @@ -200,242 +215,33 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro throw new IllegalArgumentException("Supplied Rpc Type is not defined."); } - private RoutingStrategy getRoutingStrategy(RpcDefinition rpc) { - ContainerSchemaNode input = rpc.getInput(); - if (input != null) { - for (DataSchemaNode schemaNode : input.getChildNodes()) { - Optional context = getRoutingContext(schemaNode); - if (context.isPresent()) { - return createRoutedStrategy(rpc, context.get(), schemaNode.getQName()); - } - } - } - return createGlobalStrategy(rpc); - } - - private static RoutingStrategy createRoutedStrategy(RpcDefinition rpc, QName context, QName leafNode) { - return new RoutedRpcStrategy(rpc.getQName(), context, leafNode); - } - - private Optional getRoutingContext(DataSchemaNode schemaNode) { - for (UnknownSchemaNode extension : schemaNode.getUnknownSchemaNodes()) { - if (CONTEXT_REFERENCE.equals(extension.getNodeType())) { - return Optional.fromNullable(extension.getQName()); - } - ; - } - return Optional.absent(); - } - - private static RoutingStrategy createGlobalStrategy(RpcDefinition rpc) { - GlobalRpcStrategy ret = new GlobalRpcStrategy(rpc.getQName()); - return ret; - } - @Override - public RpcResult invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) { - checkState(defaultDelegate != null); - return defaultDelegate.invokeRpc(rpc, identifier, input); + public ListenableFuture> invokeRpc(final QName rpc, final YangInstanceIdentifier route, final CompositeNode input) { + checkState(defaultDelegate != null, "No implementation is available for rpc:%s path:%s", rpc, route); + return defaultDelegate.invokeRpc(rpc, route, input); } - private static abstract class RoutingStrategy implements Identifiable { - - private final QName identifier; - - public RoutingStrategy(QName identifier) { - super(); - this.identifier = identifier; - } - - @Override - public QName getIdentifier() { - return identifier; - } - } - - private static class GlobalRpcStrategy extends RoutingStrategy { - - public GlobalRpcStrategy(QName identifier) { - super(identifier); - } - } - - private static class RoutedRpcStrategy extends RoutingStrategy { - - private final QName context; - private final QName leaf; - - public RoutedRpcStrategy(QName identifier, QName ctx, QName leaf) { - super(identifier); - this.context = ctx; - this.leaf = leaf; - } - - public QName getContext() { - return context; - } - - public QName getLeaf() { - return leaf; - } - } - - private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable { - - private final RoutedRpcStrategy strategy; - private final Set supportedRpcs; - private RpcImplementation defaultDelegate; - private final ConcurrentMap implementations = new ConcurrentHashMap<>(); - private SchemaAwareRpcBroker router; - - public RoutedRpcSelector(RoutedRpcStrategy strategy, SchemaAwareRpcBroker router) { - super(); - this.strategy = strategy; - supportedRpcs = ImmutableSet.of(strategy.getIdentifier()); - this.router = router; - } - - @Override - public QName getIdentifier() { - return strategy.getIdentifier(); - } - - @Override - public void close() throws Exception { - - } - - @Override - public Set getSupportedRpcs() { - return supportedRpcs; - } - - public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) { - return new RoutedRpcRegImpl(rpcType, implementation, this); - } - - @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input) { - CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input")); - checkArgument(inputContainer != null, "Rpc payload must contain input element"); - SimpleNode routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf()); - checkArgument(routeContainer != null, "Leaf %s must be set with value", strategy.getLeaf()); - Object route = routeContainer.getValue(); - checkArgument(route instanceof InstanceIdentifier); - RpcImplementation potential = null; - if (route != null) { - RoutedRpcRegImpl potentialReg = implementations.get(route); - if (potentialReg != null) { - potential = potentialReg.getInstance(); - } - } - if (potential == null) { - return router.invokeRpc(rpc, (InstanceIdentifier) route, input); - } - checkState(potential != null, "No implementation is available for rpc:%s path:%s", rpc, route); - return potential.invokeRpc(rpc, input); - } - - public void addPath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) { - //checkArgument(strategy.getContext().equals(context),"Supplied context is not supported."); - RoutedRpcRegImpl previous = implementations.put(path, routedRpcRegImpl); - if (previous == null) { - router.notifyPathAnnouncement(context,strategy.getIdentifier(), path); - } - - } - - public void removePath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) { - boolean removed = implementations.remove(path, routedRpcRegImpl); - if (removed) { - router.notifyPathWithdrawal(context, strategy.getIdentifier(), path); - } - } - } - - private static class GlobalRpcRegistration extends AbstractObjectRegistration implements - RpcRegistration { - private final QName type; - private SchemaAwareRpcBroker router; - - public GlobalRpcRegistration(QName type, RpcImplementation instance, SchemaAwareRpcBroker router) { - super(instance); - this.type = type; - this.router = router; - } - - @Override - public QName getType() { - return type; - } - - @Override - protected void removeRegistration() { - if (router != null) { - router.remove(this); - router = null; - } - } - } - - private static class RoutedRpcRegImpl extends AbstractObjectRegistration implements - RoutedRpcRegistration { - - private final QName type; - private RoutedRpcSelector router; - - public RoutedRpcRegImpl(QName rpcType, RpcImplementation implementation, RoutedRpcSelector routedRpcSelector) { - super(implementation); - this.type = rpcType; - router = routedRpcSelector; - } - - @Override - public void registerPath(QName context, InstanceIdentifier path) { - router.addPath(context, path, this); - } - - @Override - public void unregisterPath(QName context, InstanceIdentifier path) { - router.removePath(context, path, this); - } - - @Override - protected void removeRegistration() { - - } - - @Override - public QName getType() { - return type; - } - - } - - private void remove(GlobalRpcRegistration registration) { + void remove(final GlobalRpcRegistration registration) { implementations.remove(registration.getType(), registration); } - private void notifyPathAnnouncement(QName context, QName identifier, InstanceIdentifier path) { + void notifyPathAnnouncement(final QName context, final QName identifier, final YangInstanceIdentifier path) { RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier); - RouteChange change = RoutingUtils.announcementChange(contextWrapped , path); - for(ListenerRegistration> routeListener : routeChangeListeners) { + RouteChange change = RoutingUtils.announcementChange(contextWrapped , path); + for(ListenerRegistration> routeListener : routeChangeListeners) { try { routeListener.getInstance().onRouteChange(change); } catch (Exception e) { LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e); - } } - - } - + } - private void notifyPathWithdrawal(QName context,QName identifier, InstanceIdentifier path) { + void notifyPathWithdrawal(final QName context,final QName identifier, final YangInstanceIdentifier path) { RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier); - RouteChange change = RoutingUtils.removalChange(contextWrapped , path); - for(ListenerRegistration> routeListener : routeChangeListeners) { + RouteChange change = RoutingUtils.removalChange(contextWrapped , path); + for(ListenerRegistration> routeListener : routeChangeListeners) { try { routeListener.getInstance().onRouteChange(change); } catch (Exception e) { @@ -443,10 +249,31 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro } } } - + @Override - public > ListenerRegistration registerRouteChangeListener( - L listener) { - return routeChangeListeners.registerWithType(listener); + public > ListenerRegistration registerRouteChangeListener( + final L listener) { + ListenerRegistration reg = routeChangeListeners.registerWithType(listener); + RouteChange initial = createInitialRouteChange(); + try { + listener.onRouteChange(initial); + } catch (Exception e) { + LOG.error("Unhandled exception during sending initial route change event {} to {}",initial,listener, e); + } + return reg; + } + + private RouteChange createInitialRouteChange() { + FluentIterable rpcSelectors = FluentIterable.from(implementations.values()).filter(RoutedRpcSelector.class); + + + ImmutableMap.Builder> announcements = ImmutableMap.builder(); + ImmutableMap.Builder> removals = ImmutableMap.builder(); + for (RoutedRpcSelector routedRpcSelector : rpcSelectors) { + final RpcRoutingContext context = routedRpcSelector.getIdentifier(); + final Set paths = ImmutableSet.copyOf(routedRpcSelector.implementations.keySet()); + announcements.put(context, paths); + } + return RoutingUtils.change(announcements.build(), removals.build()); } }