X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fdom%2Fbroker%2Fimpl%2FSchemaAwareRpcBroker.java;h=f47e1efc3fb364d812c08721b3e1b2d78ee6a056;hp=f0f5ac3e33926376d59d67669672fa7a15870189;hb=ff61f9fe0055942eba9cb34d8ac03e0a2a866898;hpb=702ba6faa3f7f2f6bd8ae02b3bc1abc06aea1c26 diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java index f0f5ac3e33..f47e1efc3f 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java @@ -1,3 +1,10 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ package org.opendaylight.controller.sal.dom.broker.impl; import static com.google.common.base.Preconditions.checkArgument; @@ -12,6 +19,7 @@ import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils; import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration; import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration; +import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation; import org.opendaylight.controller.sal.core.api.RpcImplementation; import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; import org.opendaylight.controller.sal.core.api.RpcRoutingContext; @@ -35,9 +43,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Optional; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ListenableFuture; -public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { +public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, RoutedRpcDefaultImplementation { private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class); @@ -45,12 +56,13 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { "2013-07-09", "context-reference"); private final ListenerRegistry rpcRegistrationListeners = new ListenerRegistry<>(); private final ListenerRegistry> routeChangeListeners = new ListenerRegistry<>(); - + private final String identifier; private final ConcurrentMap implementations = new ConcurrentHashMap<>(); private RpcImplementation defaultImplementation; private SchemaContextProvider schemaProvider; + private RoutedRpcDefaultImplementation defaultDelegate; public SchemaAwareRpcBroker(String identifier, SchemaContextProvider schemaProvider) { super(); @@ -74,6 +86,15 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { this.schemaProvider = schemaProvider; } + public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() { + return defaultDelegate; + } + + @Override + public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) { + this.defaultDelegate = defaultDelegate; + } + @Override public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) { checkArgument(rpcType != null, "RPC Type should not be null"); @@ -143,7 +164,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { } @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input) { + public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { return findRpcImplemention(rpc).invokeRpc(rpc, input); } @@ -214,6 +235,12 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { return ret; } + @Override + public ListenableFuture> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) { + checkState(defaultDelegate != null); + return defaultDelegate.invokeRpc(rpc, identifier, input); + } + private static abstract class RoutingStrategy implements Identifiable { private final QName identifier; @@ -256,24 +283,26 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { } } - private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable { + private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable { private final RoutedRpcStrategy strategy; private final Set supportedRpcs; + private final RpcRoutingContext identifier; private RpcImplementation defaultDelegate; private final ConcurrentMap implementations = new ConcurrentHashMap<>(); - private SchemaAwareRpcBroker router; + private final SchemaAwareRpcBroker router; public RoutedRpcSelector(RoutedRpcStrategy strategy, SchemaAwareRpcBroker router) { super(); this.strategy = strategy; supportedRpcs = ImmutableSet.of(strategy.getIdentifier()); + identifier = RpcRoutingContext.create(strategy.context, strategy.getIdentifier()); this.router = router; } @Override - public QName getIdentifier() { - return strategy.getIdentifier(); + public RpcRoutingContext getIdentifier() { + return identifier; } @Override @@ -291,12 +320,13 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { } @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input) { + public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input")); checkArgument(inputContainer != null, "Rpc payload must contain input element"); SimpleNode routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf()); checkArgument(routeContainer != null, "Leaf %s must be set with value", strategy.getLeaf()); Object route = routeContainer.getValue(); + checkArgument(route instanceof InstanceIdentifier); RpcImplementation potential = null; if (route != null) { RoutedRpcRegImpl potentialReg = implementations.get(route); @@ -305,7 +335,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { } } if (potential == null) { - potential = defaultDelegate; + return router.invokeRpc(rpc, (InstanceIdentifier) route, input); } checkState(potential != null, "No implementation is available for rpc:%s path:%s", rpc, route); return potential.invokeRpc(rpc, input); @@ -357,7 +387,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { RoutedRpcRegistration { private final QName type; - private RoutedRpcSelector router; + private final RoutedRpcSelector router; public RoutedRpcRegImpl(QName rpcType, RpcImplementation implementation, RoutedRpcSelector routedRpcSelector) { super(implementation); @@ -399,13 +429,13 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { routeListener.getInstance().onRouteChange(change); } catch (Exception e) { LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e); - + } } - + } - + private void notifyPathWithdrawal(QName context,QName identifier, InstanceIdentifier path) { RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier); @@ -418,10 +448,31 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable { } } } - + @Override public > ListenerRegistration registerRouteChangeListener( L listener) { - return routeChangeListeners.registerWithType(listener); + ListenerRegistration reg = routeChangeListeners.registerWithType(listener); + RouteChange initial = createInitialRouteChange(); + try { + listener.onRouteChange(initial); + } catch (Exception e) { + LOG.error("Unhandled exception during sending initial route change event {} to {}",initial,listener, e); + } + return reg; + } + + private RouteChange createInitialRouteChange() { + FluentIterable rpcSelectors = FluentIterable.from(implementations.values()).filter(RoutedRpcSelector.class); + + + ImmutableMap.Builder> announcements = ImmutableMap.builder(); + ImmutableMap.Builder> removals = ImmutableMap.builder(); + for (RoutedRpcSelector routedRpcSelector : rpcSelectors) { + final RpcRoutingContext context = routedRpcSelector.getIdentifier(); + final Set paths = ImmutableSet.copyOf(routedRpcSelector.implementations.keySet()); + announcements.put(context, paths); + } + return RoutingUtils.change(announcements.build(), removals.build()); } }