<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-broker-impl</artifactId>
<version>1.0-SNAPSHOT</version>
- <scope>runtime</scope>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.sal.binding.codegen.impl;
import org.opendaylight.yangtools.yang.binding.RpcService;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
import org.opendaylight.controller.sal.binding.api.rpc.RpcRoutingTable;
+import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeHelper;
import org.opendaylight.yangtools.yang.binding.BaseIdentity;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@Override
public RpcRegistration<T> registerDefaultService(T service) {
// TODO Auto-generated method stub
+ RuntimeCodeHelper.setDelegate(invocationProxy, service);
return null;
}
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.sal.binding.impl;
+import java.util.EventListener;
import java.util.Map;
import java.util.Map.Entry;
import java.util.HashMap;
private final Map<Class<? extends RpcService>, RpcRouter<?>> rpcRouters = new WeakHashMap<>();
private final ListenerRegistry<RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>> routeChangeListeners = ListenerRegistry
.create();
+ private final ListenerRegistry<RouterInstantiationListener> routerInstantiationListener = ListenerRegistry.create();
private final static Logger LOG = LoggerFactory.getLogger(RpcProviderRegistryImpl.class);
-
+
private final String name;
public String getName() {
T publicProxy = getRpcService(type);
RpcService currentDelegate = RuntimeCodeHelper.getDelegate(publicProxy);
checkState(currentDelegate == null, "Rpc service is already registered");
- LOG.debug("Registering {} as global implementation of {} in {}",implementation,type.getSimpleName(),this);
+ LOG.debug("Registering {} as global implementation of {} in {}", implementation, type.getSimpleName(), this);
RuntimeCodeHelper.setDelegate(publicProxy, implementation);
return new RpcProxyRegistration<T>(type, implementation, this);
}
if (potentialProxy != null) {
return potentialProxy;
}
- synchronized(this) {
+ synchronized (this) {
/**
- * Potential proxy could be instantiated by other thread while we were
- * waiting for the lock.
+ * Potential proxy could be instantiated by other thread while we
+ * were waiting for the lock.
*/
-
+
potentialProxy = (T) publicProxies.get(type);
if (potentialProxy != null) {
return (T) potentialProxy;
}
T proxy = rpcFactory.getDirectProxyFor(type);
- LOG.debug("Created {} as public proxy for {} in {}",proxy,type.getSimpleName(),this);
+ LOG.debug("Created {} as public proxy for {} in {}", proxy, type.getSimpleName(), this);
publicProxies.put(type, proxy);
return proxy;
}
}
- private <T extends RpcService> RpcRouter<T> getRpcRouter(Class<T> type) {
+ @SuppressWarnings("unchecked")
+ public <T extends RpcService> RpcRouter<T> getRpcRouter(Class<T> type) {
RpcRouter<?> potentialRouter = rpcRouters.get(type);
if (potentialRouter != null) {
return (RpcRouter<T>) potentialRouter;
}
- synchronized(this) {
+ synchronized (this) {
/**
- * Potential Router could be instantiated by other thread while we were
- * waiting for the lock.
+ * Potential Router could be instantiated by other thread while we
+ * were waiting for the lock.
*/
- potentialRouter = rpcRouters.get(type);
+ potentialRouter = rpcRouters.get(type);
if (potentialRouter != null) {
return (RpcRouter<T>) potentialRouter;
}
- RpcRouter<T> router = rpcFactory.getRouterFor(type,name);
+ RpcRouter<T> router = rpcFactory.getRouterFor(type, name);
router.registerRouteChangeListener(new RouteChangeForwarder(type));
- LOG.debug("Registering router {} as global implementation of {} in {}",router,type.getSimpleName(),this);
+ LOG.debug("Registering router {} as global implementation of {} in {}", router, type.getSimpleName(), this);
RuntimeCodeHelper.setDelegate(getRpcService(type), router.getInvocationProxy());
rpcRouters.put(type, router);
+ notifyListenersRoutedCreated(router);
return router;
}
}
+ private void notifyListenersRoutedCreated(RpcRouter router) {
+
+ for (ListenerRegistration<RouterInstantiationListener> listener : routerInstantiationListener) {
+ try {
+ listener.getInstance().onRpcRouterCreated(router);
+ } catch (Exception e) {
+ LOG.error("Unhandled exception during invoking listener {}", e);
+ }
+ }
+
+ }
+
+ public ListenerRegistration<RouterInstantiationListener> registerRouterInstantiationListener(
+ RouterInstantiationListener listener) {
+ ListenerRegistration<RouterInstantiationListener> reg = routerInstantiationListener.register(listener);
+ try {
+ for (RpcRouter<?> router : rpcRouters.values()) {
+ listener.onRpcRouterCreated(router);
+ }
+ } catch (Exception e) {
+ LOG.error("Unhandled exception during invoking listener {}", e);
+ }
+ return reg;
+ }
+
@Override
public <L extends RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>> ListenerRegistration<L> registerRouteChangeListener(
L listener) {
this.rpcFactory = rpcFactory;
}
+ public interface RouterInstantiationListener extends EventListener {
+ void onRpcRouterCreated(RpcRouter<?> router);
+ }
+
private class RouteChangeForwarder<T extends RpcService> implements
RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>> {
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.sal.binding.impl.connect.dom;
import static com.google.common.base.Preconditions.checkNotNull;
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.sal.binding.impl.connect.dom;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
import java.lang.ref.WeakReference;
+import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider;
+import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl;
-import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
+import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.RouterInstantiationListener;
import org.opendaylight.controller.sal.common.util.CommitHandlerTransactions;
import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.controller.sal.core.api.Provider;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
+import org.opendaylight.controller.sal.core.api.Provider;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-
-import static com.google.common.base.Preconditions.*;
+import com.google.common.collect.ImmutableSet.Builder;
+import com.google.common.util.concurrent.Futures;
public class BindingIndependentConnector implements //
RuntimeDataProvider, //
private final Logger LOG = LoggerFactory.getLogger(BindingIndependentConnector.class);
+ @SuppressWarnings( "deprecation")
private static final InstanceIdentifier<? extends DataObject> ROOT = InstanceIdentifier.builder().toInstance();
private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier ROOT_BI = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
.builder().toInstance();
+ private final static Method EQUALS_METHOD;
+
+
private BindingIndependentMappingService mappingService;
private org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService;
private boolean notificationForwarding = false;
+ private RpcProviderRegistryImpl baRpcRegistryImpl;
+
+ private org.opendaylight.controller.sal.dom.broker.spi.RpcRouter biRouter;
+
+
+ static {
+ try {
+ EQUALS_METHOD = Object.class.getMethod("equals", Object.class);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override
public DataObject readOperationalData(InstanceIdentifier<? extends DataObject> path) {
try {
org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
CompositeNode result = biDataService.readOperationalData(biPath);
- return potentialAugmentationRead(path,biPath,result);
+ return potentialAugmentationRead(path, biPath, result);
} catch (DeserializationException e) {
throw new IllegalStateException(e);
}
}
- private DataObject potentialAugmentationRead(InstanceIdentifier<? extends DataObject> path, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath, CompositeNode result) throws DeserializationException {
+ private DataObject potentialAugmentationRead(InstanceIdentifier<? extends DataObject> path,
+ org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath, CompositeNode result)
+ throws DeserializationException {
Class<? extends DataObject> targetType = path.getTargetType();
if (Augmentation.class.isAssignableFrom(targetType)) {
path = mappingService.fromDataDom(biPath);
try {
org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
CompositeNode result = biDataService.readConfigurationData(biPath);
- return potentialAugmentationRead(path,biPath,result);
+ return potentialAugmentationRead(path, biPath, result);
} catch (DeserializationException e) {
throw new IllegalStateException(e);
}
baDataService.registerCommitHandlerListener(domToBindingCommitHandler);
dataForwarding = true;
}
-
+
public void startRpcForwarding() {
if (baRpcRegistry != null && biRpcRegistry != null && baRpcRegistry instanceof RouteChangePublisher<?, ?>) {
- checkState(!rpcForwarding,"Connector is already forwarding RPCs");
+ checkState(!rpcForwarding, "Connector is already forwarding RPCs");
domToBindingRpcManager = baRpcRegistry.registerRouteChangeListener(new DomToBindingRpcForwardingManager());
+ if (baRpcRegistry instanceof RpcProviderRegistryImpl) {
+ baRpcRegistryImpl = (RpcProviderRegistryImpl) baRpcRegistry;
+ baRpcRegistryImpl.registerRouterInstantiationListener(domToBindingRpcManager.getInstance());
+ }
+ if(biRpcRegistry instanceof org.opendaylight.controller.sal.dom.broker.spi.RpcRouter) {
+ biRouter = (org.opendaylight.controller.sal.dom.broker.spi.RpcRouter) biRpcRegistry;
+ }
rpcForwarding = true;
}
}
-
+
public void startNotificationForwarding() {
checkState(!notificationForwarding, "Connector is already forwarding notifications.");
notificationForwarding = true;
public void onSessionInitiated(ProviderSession session) {
setDomDataService(session.getService(org.opendaylight.controller.sal.core.api.data.DataProviderService.class));
setDomRpcRegistry(session.getService(RpcProvisionRegistry.class));
-
+
}
public <T extends RpcService> void onRpcRouterCreated(Class<T> serviceType, RpcRouter<T> router) {
}
}
+ /**
+ * Manager responsible for instantiating forwarders responsible for
+ * forwarding of RPC invocations from DOM Broker to Binding Aware Broker
+ *
+ */
private class DomToBindingRpcForwardingManager implements
- RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>> {
+ RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>,
+ RouterInstantiationListener {
private final Map<Class<? extends RpcService>, DomToBindingRpcForwarder> forwarders = new WeakHashMap<>();
+ private RpcProviderRegistryImpl registryImpl;
+
+ public RpcProviderRegistryImpl getRegistryImpl() {
+ return registryImpl;
+ }
+
+ public void setRegistryImpl(RpcProviderRegistryImpl registryImpl) {
+ this.registryImpl = registryImpl;
+ }
+
+
+ @Override
+ public void onRpcRouterCreated(RpcRouter<?> router) {
+ Class<? extends BaseIdentity> ctx = router.getContexts().iterator().next();
+ getRpcForwarder(router.getServiceType(), ctx);
+ }
@Override
public void onRouteChange(RouteChange<RpcContextIdentifier, InstanceIdentifier<?>> change) {
} else {
potential = new DomToBindingRpcForwarder(service, context);
}
+
forwarders.put(service, potential);
return potential;
}
}
- private class DomToBindingRpcForwarder implements RpcImplementation {
+ private class DomToBindingRpcForwarder implements RpcImplementation, InvocationHandler {
private final Set<QName> supportedRpcs;
private final WeakReference<Class<? extends RpcService>> rpcServiceType;
private Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
+ private Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
+ private WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
public DomToBindingRpcForwarder(Class<? extends RpcService> service) {
this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
this.supportedRpcs = mappingService.getRpcQNamesFor(service);
- for (QName rpc : supportedRpcs) {
- biRpcRegistry.addRpcImplementation(rpc, this);
+ try {
+ for (QName rpc : supportedRpcs) {
+ RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service);
+ strategiesByMethod.put(strategy.targetMethod, strategy);
+ strategiesByQName.put(rpc, strategy);
+ biRpcRegistry.addRpcImplementation(rpc, this);
+ }
+
+ } catch (Exception e) {
+ LOG.error("Could not forward Rpcs of type {}", service.getName());
}
registrations = ImmutableSet.of();
}
+ /**
+ * Constructor for Routed RPC Forwareder.
+ *
+ * @param service
+ * @param context
+ */
public DomToBindingRpcForwarder(Class<? extends RpcService> service, Class<? extends BaseIdentity> context) {
this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
this.supportedRpcs = mappingService.getRpcQNamesFor(service);
- registrations = new HashSet<>();
- for (QName rpc : supportedRpcs) {
- registrations.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this));
+ Builder<RoutedRpcRegistration> registrationsBuilder = ImmutableSet
+ .<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> builder();
+ try {
+ for (QName rpc : supportedRpcs) {
+ RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service);
+ strategiesByMethod.put(strategy.targetMethod, strategy);
+ strategiesByQName.put(rpc, strategy);
+ registrationsBuilder.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this));
+ }
+ createDefaultDomForwarder();
+ } catch (Exception e) {
+ LOG.error("Could not forward Rpcs of type {}", service.getName(),e);
}
- registrations = ImmutableSet.copyOf(registrations);
+ registrations = registrationsBuilder.build();
}
public void registerPaths(Class<? extends BaseIdentity> context, Class<? extends RpcService> service,
}
}
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ if(EQUALS_METHOD.equals(method)) {
+ return false;
+ }
+ RpcInvocationStrategy strategy = strategiesByMethod.get(method);
+ checkState(strategy != null);
+ checkArgument(args.length <= 2);
+ if(args.length == 1) {
+ checkArgument(args[0] instanceof DataObject);
+ return strategy.forwardToDomBroker((DataObject) args[0]);
+ }
+ return strategy.forwardToDomBroker(null);
+ }
+
public void removePaths(Class<? extends BaseIdentity> context, Class<? extends RpcService> service,
Set<InstanceIdentifier<?>> set) {
QName ctx = BindingReflections.findQName(context);
return supportedRpcs;
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public void createDefaultDomForwarder() {
+ if (baRpcRegistryImpl != null) {
+ Class<?> cls = rpcServiceType.get();
+ ClassLoader clsLoader = cls.getClassLoader();
+ RpcService proxy = (RpcService) Proxy.newProxyInstance(clsLoader, new Class<?>[] { cls }, this);
+
+ RpcRouter rpcRouter = baRpcRegistryImpl.getRpcRouter(rpcServiceType.get());
+ rpcRouter.registerDefaultService(proxy);
+ }
+ }
+
@Override
public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode domInput) {
checkArgument(rpc != null);
checkState(rpcService != null);
CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input"));
try {
- return resolveInvocationStrategy(rpc, rpcType).invokeOn(rpcService, domUnwrappedInput);
+ return resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
- private RpcInvocationStrategy resolveInvocationStrategy(final QName rpc,
+ private RpcInvocationStrategy resolveInvocationStrategy(QName rpc) {
+ return strategiesByQName.get(rpc);
+ }
+
+ private RpcInvocationStrategy createInvocationStrategy(final QName rpc,
final Class<? extends RpcService> rpcType) throws Exception {
return ClassLoaderUtils.withClassLoader(rpcType.getClassLoader(), new Callable<RpcInvocationStrategy>() {
@Override
RpcInvocationStrategy strategy = null;
if (outputClass.isPresent()) {
if (inputClass.isPresent()) {
- strategy = new DefaultInvocationStrategy(targetMethod, outputClass.get(), inputClass.get());
+ strategy = new DefaultInvocationStrategy(rpc,targetMethod, outputClass.get(), inputClass.get());
} else {
- strategy = new NoInputNoOutputInvocationStrategy(targetMethod);
+ strategy = new NoInputNoOutputInvocationStrategy(rpc,targetMethod);
}
} else {
strategy = null;
private abstract class RpcInvocationStrategy {
protected final Method targetMethod;
+ protected final QName rpc;
- public RpcInvocationStrategy(Method targetMethod) {
+ public RpcInvocationStrategy(QName rpc,Method targetMethod) {
this.targetMethod = targetMethod;
+ this.rpc = rpc;
}
+ public abstract Future<RpcResult<?>> forwardToDomBroker(DataObject input);
+
public abstract RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput)
throws Exception;
private WeakReference<Class> outputClass;
@SuppressWarnings({ "rawtypes", "unchecked" })
- public DefaultInvocationStrategy(Method targetMethod, Class<?> outputClass,
+ public DefaultInvocationStrategy(QName rpc, Method targetMethod, Class<?> outputClass,
Class<? extends DataContainer> inputClass) {
- super(targetMethod);
+ super(rpc,targetMethod);
this.outputClass = new WeakReference(outputClass);
this.inputClass = new WeakReference(inputClass);
}
RpcResult<?> bindingResult = result.get();
return Rpcs.getRpcResult(true);
}
+
+ @Override
+ public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
+ if(biRouter != null) {
+ CompositeNode xml = mappingService.toDataDom(input);
+ CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
+ RpcResult<CompositeNode> result = biRouter.invokeRpc(rpc, wrappedXml);
+ Object baResultValue = null;
+ if(result.getResult() != null) {
+ baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), result.getResult());
+ }
+ RpcResult<?> baResult = Rpcs.getRpcResult(result.isSuccessful(), baResultValue, result.getErrors());
+ return Futures.<RpcResult<?>>immediateFuture(baResult);
+ }
+ return Futures.<RpcResult<?>>immediateFuture(Rpcs.getRpcResult(false));
+ }
}
private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy {
- public NoInputNoOutputInvocationStrategy(Method targetMethod) {
- super(targetMethod);
+ public NoInputNoOutputInvocationStrategy(QName rpc, Method targetMethod) {
+ super(rpc,targetMethod);
}
public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
RpcResult<Void> bindingResult = result.get();
return Rpcs.getRpcResult(bindingResult.isSuccessful(), bindingResult.getErrors());
}
+
+ @Override
+ public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
+ return Futures.immediateFuture(null);
+ }
}
public boolean isRpcForwarding() {
import org.opendaylight.controller.sal.dom.broker.MountPointManagerImpl;
import org.opendaylight.controller.sal.dom.broker.impl.DataStoreStatsWrapper;
import org.opendaylight.controller.sal.dom.broker.impl.HashMapDataStore;
-import org.opendaylight.controller.sal.dom.broker.impl.RpcRouterImpl;
import org.opendaylight.controller.sal.dom.broker.impl.SchemaAwareDataStoreAdapter;
+import org.opendaylight.controller.sal.dom.broker.impl.SchemaAwareRpcBroker;
+import org.opendaylight.controller.sal.dom.broker.impl.SchemaContextProvider;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcResult;
import static com.google.common.base.Preconditions.*;
-public class BindingTestContext implements AutoCloseable {
+public class BindingTestContext implements AutoCloseable, SchemaContextProvider {
public static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier TREE_ROOT = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
.builder().toInstance();
private MountPointManagerImpl biMountImpl;
+ private SchemaContext schemaContext;
+
+ public SchemaContext getSchemaContext() {
+ return schemaContext;
+ }
+
protected BindingTestContext(ListeningExecutorService executor, ClassPool classPool, boolean startWithSchema) {
this.executor = executor;
this.classPool = classPool;
}
public void updateYangSchema(String[] files) {
- SchemaContext context = getContext(files);
+ schemaContext = getContext(files);
if (schemaAwareDataStore != null) {
- schemaAwareDataStore.onGlobalContextUpdated(context);
+ schemaAwareDataStore.onGlobalContextUpdated(schemaContext);
}
if (mappingServiceImpl != null) {
- mappingServiceImpl.onGlobalContextUpdated(context);
+ mappingServiceImpl.onGlobalContextUpdated(schemaContext);
}
}
checkState(executor != null);
biBrokerImpl = new BrokerImpl();
biBrokerImpl.setExecutor(executor);
- biBrokerImpl.setRouter(new RpcRouterImpl("test"));
+ biBrokerImpl.setRouter(new SchemaAwareRpcBroker("/", this));
}
public void startBindingNotificationBroker() {
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.sal.binding.test.connect.dom;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertNotSame;
+import static junit.framework.Assert.assertTrue;
+
import java.math.BigInteger;
import java.util.Collections;
+import java.util.Set;
import java.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.controller.sal.binding.test.util.BindingBrokerTestFactory;
import org.opendaylight.controller.sal.binding.test.util.BindingTestContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlowRemoved;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yangtools.yang.binding.BaseIdentity;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
-import static junit.framework.Assert.*;
-
-import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
public static final NodeId NODE_B = new NodeId("b");
public static final NodeId NODE_C = new NodeId("c");
public static final NodeId NODE_D = new NodeId("d");
+
+ private static final QName NODE_ID_QNAME = QName.create(Node.QNAME, "id");
+ private static final QName ADD_FLOW_QNAME = QName.create(NodeFlowRemoved.QNAME, "add-flow");
public static final InstanceIdentifier<Node> BA_NODE_A_ID = createBANodeIdentifier(NODE_A);
public static final InstanceIdentifier<Node> BA_NODE_B_ID = createBANodeIdentifier(NODE_B);
public static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier BI_NODE_C_ID = createBINodeIdentifier(NODE_C);
public static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier BI_NODE_D_ID = createBINodeIdentifier(NODE_D);
- private static final QName NODE_ID_QNAME = QName.create(Node.QNAME, "id");
- private static final QName ADD_FLOW_QNAME = QName.create(NodeFlowRemoved.QNAME, "add-flow");
- private static final QName REMOVE_FLOW_QNAME = QName.create(NodeFlowRemoved.QNAME, "remove-flow");
- private static final QName UPDATE_FLOW_QNAME = QName.create(NodeFlowRemoved.QNAME, "update-flow");
+
@Before
public void setup() {
assertEquals(addFlowA, flowService.getReceivedAddFlows().get(BA_NODE_A_ID).iterator().next());
}
- public void bindingRpcInvoker_DomRoutedProviderTest() {
-
+ @Test
+ public void bindingRpcInvoker_DomRoutedProviderTest() throws Exception {
+ AddFlowOutputBuilder builder = new AddFlowOutputBuilder();
+ builder.setTransactionId(new TransactionId(BigInteger.valueOf(10)));
+ final AddFlowOutput output = builder.build();
+ org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration registration = biRpcRegistry.addRoutedRpcImplementation(ADD_FLOW_QNAME, new RpcImplementation() {
+ @Override
+ public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+ CompositeNode result = testContext.getBindingToDomMappingService().toDataDom(output);
+ return Rpcs.getRpcResult(true, result, ImmutableList.<RpcError>of());
+ }
+
+ @Override
+ public Set<QName> getSupportedRpcs() {
+ return ImmutableSet.of(ADD_FLOW_QNAME);
+ }
+ });
+ registration.registerPath(NodeContext.QNAME, BI_NODE_C_ID);
+
+ SalFlowService baFlowInvoker = baRpcRegistry.getRpcService(SalFlowService.class);
+ Future<RpcResult<AddFlowOutput>> baResult = baFlowInvoker.addFlow(addFlow(BA_NODE_C_ID).setPriority(500).build());
+ assertNotNull(baResult);
+ assertEquals(output,baResult.get().getResult());
}
private CompositeNode toDomRpcInput(DataObject addFlowA) {
package org.opendaylight.controller.sal.core.api;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher;
import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-public interface RpcProvisionRegistry extends BrokerService {
+public interface RpcProvisionRegistry extends BrokerService, RouteChangePublisher<RpcRoutingContext, InstanceIdentifier> {
/**
* Registers an implementation of the rpc.
*/
RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation)
throws IllegalArgumentException;
+
+ ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener);
RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation);
}
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.sal.core.api;
import java.util.EventListener;
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.sal.core.api;
+import java.io.Serializable;
+
+import org.opendaylight.yangtools.concepts.Immutable;
import org.opendaylight.yangtools.yang.common.QName;
-public interface RpcRoutingContext {
+public class RpcRoutingContext implements Immutable, Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -9079324728075883325L;
+
+ private final QName context;
+ private final QName rpc;
+
+
+ private RpcRoutingContext(QName context, QName rpc) {
+ super();
+ this.context = context;
+ this.rpc = rpc;
+ }
+
+ public static final RpcRoutingContext create(QName context, QName rpc) {
+ return new RpcRoutingContext(context, rpc);
+ }
+
+ public QName getContext() {
+ return context;
+ }
+
+ public QName getRpc() {
+ return rpc;
+ }
+
+ @Override
+ public String toString() {
+ return "RpcRoutingContext [context=" + context + ", rpc=" + rpc + "]";
+ }
- public QName getContext();
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((context == null) ? 0 : context.hashCode());
+ result = prime * result + ((rpc == null) ? 0 : rpc.hashCode());
+ return result;
+ }
- public QName getRpcType();
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ RpcRoutingContext other = (RpcRoutingContext) obj;
+ if (context == null) {
+ if (other.context != null)
+ return false;
+ } else if (!context.equals(other.context))
+ return false;
+ if (rpc == null) {
+ if (other.rpc != null)
+ return false;
+ } else if (!rpc.equals(other.rpc))
+ return false;
+ return true;
+ }
}
<configuration>
<instructions>
<Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+ <Export-Package>
+ org.opendaylight.controller.sal.dom.broker.spi
+ </Export-Package>
<Private-Package>
- org.opendaylight.controller.sal.dom.broker.*,
- org.opendaylight.controller.config.yang.md.sal.dom.impl
+ org.opendaylight.controller.sal.dom.broker,
+ org.opendaylight.controller.sal.dom.broker.impl,
+ org.opendaylight.controller.sal.dom.broker.osgi,
+ org.opendaylight.controller.config.yang.md.sal.dom.impl,
+ org.opendaylight.controller.config.yang.md.sal.dom.statistics,
+ org.opendaylight.yangtools.yang.util
</Private-Package>
<Import-Package>
*
+++ /dev/null
-package org.opendaylight.controller.sal.dom.broker;
-
-public class $ModuleInfo {
-
-
-}
import org.opendaylight.controller.sal.core.api.data.DataStore
import org.opendaylight.controller.sal.dom.broker.impl.SchemaAwareDataStoreAdapter
import org.opendaylight.controller.sal.core.api.model.SchemaServiceListener
-import org.opendaylight.controller.sal.dom.broker.impl.RpcRouterImpl
+import org.opendaylight.controller.sal.dom.broker.impl.SchemaAwareRpcBroker
class BrokerConfigActivator implements AutoCloseable {
val emptyProperties = new Hashtable<String, String>();
broker.setBundleContext(context);
- broker.setRouter(new RpcRouterImpl("Rpc router"))
+
schemaService = new SchemaServiceImpl();
schemaService.setContext(context);
schemaService.setParser(new YangParserImpl());
schemaService.start();
schemaReg = context.registerService(SchemaService, schemaService, emptyProperties);
+ broker.setRouter(new SchemaAwareRpcBroker("/",schemaService));
+
dataService = new DataBrokerImpl();
dataService.setExecutor(broker.getExecutor());
import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry
import org.opendaylight.controller.sal.core.api.RpcImplementation
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
+import org.opendaylight.controller.sal.core.api.RpcRoutingContext
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
private static val log = LoggerFactory.getLogger(BrokerImpl);
router.addRoutedRpcImplementation(rpcType,implementation);
}
+ override addRpcRegistrationListener(RpcRegistrationListener listener) {
+ return router.addRpcRegistrationListener(listener);
+ }
+
+ override <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> registerRouteChangeListener(L listener) {
+ return router.registerRouteChangeListener(listener);
+ }
+
}
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.sal.dom.broker;
import java.util.List;
import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
import org.opendaylight.controller.md.sal.common.api.data.DataReader;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
import org.opendaylight.controller.sal.common.DataStoreIdentifier;
import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
import org.opendaylight.controller.sal.core.api.data.DataChangeListener;
import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
import org.opendaylight.controller.sal.core.api.data.DataValidator;
import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
-import org.opendaylight.controller.sal.dom.broker.impl.DataReaderRouter;
import org.opendaylight.controller.sal.dom.broker.impl.NotificationRouterImpl;
-import org.opendaylight.controller.sal.dom.broker.impl.RpcRouterImpl;
+import org.opendaylight.controller.sal.dom.broker.impl.SchemaAwareRpcBroker;
+import org.opendaylight.controller.sal.dom.broker.impl.SchemaContextProvider;
import org.opendaylight.controller.sal.dom.broker.spi.NotificationRouter;
-import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.InstanceIdentifierBuilder;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-public class MountPointImpl implements MountProvisionInstance {
+public class MountPointImpl implements MountProvisionInstance, SchemaContextProvider {
- private final RpcRouter rpcs;
+ private final SchemaAwareRpcBroker rpcs;
private final DataBrokerImpl dataReader;
private final NotificationRouter notificationRouter;
private final DataReader<InstanceIdentifier,CompositeNode> readWrapper;
public MountPointImpl(InstanceIdentifier path) {
this.mountPath = path;
- rpcs = new RpcRouterImpl("");
+ rpcs = new SchemaAwareRpcBroker(path.toString(),this);
dataReader = new DataBrokerImpl();
notificationRouter = new NotificationRouterImpl();
readWrapper = new ReadWrapper();
@Override
public Future<RpcResult<CompositeNode>> rpc(QName type, CompositeNode input) {
- // TODO Auto-generated method stub
return null;
}
RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier, CompositeNode>> commitHandlerListener) {
return dataReader.registerCommitHandlerListener(commitHandlerListener);
}
+
+ @Override
+ public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
+ L listener) {
+ return rpcs.registerRouteChangeListener(listener);
+ }
}
import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.controller.sal.core.api.model.SchemaServiceListener;
+import org.opendaylight.controller.sal.dom.broker.impl.SchemaContextProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.*;
public class SchemaServiceImpl implements //
+ SchemaContextProvider, //
SchemaService, //
ServiceTrackerCustomizer<SchemaServiceListener, SchemaServiceListener>, //
AutoCloseable {
listenerTracker.open();
}
+
+ @Override
+ public SchemaContext getSchemaContext() {
+ return getGlobalContext();
+ }
+
public SchemaContext getGlobalContext() {
return getSchemaContextSnapshot();
}
+++ /dev/null
-package org.opendaylight.controller.sal.dom.broker.impl
-
-import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
-import org.opendaylight.yangtools.concepts.Identifiable
-import org.opendaylight.yangtools.yang.common.QName
-import org.opendaylight.controller.sal.core.api.RpcImplementation
-import org.opendaylight.yangtools.yang.data.api.CompositeNode
-import static com.google.common.base.Preconditions.*;
-import java.util.Map
-import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration
-import java.util.concurrent.ConcurrentHashMap
-import java.util.Set
-import java.util.Collections
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
-import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
-import org.slf4j.LoggerFactory
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry
-import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
-
-class RpcRouterImpl implements RpcRouter, Identifiable<String> {
-
- static val log = LoggerFactory.getLogger(RpcRouterImpl)
-
- Map<QName, RpcRegistration> implementations = new ConcurrentHashMap();
-
- @Property
- val Set<QName> supportedRpcs = Collections.unmodifiableSet(implementations.keySet);
-
- private val rpcRegistrationListeners = new ListenerRegistry<RpcRegistrationListener>();
-
- @Property
- val String identifier;
-
- new(String name) {
- _identifier = name;
- }
-
- override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
- checkNotNull(rpcType, "Rpc Type should not be null");
- checkNotNull(implementation, "Implementation should not be null.");
- val reg = new RoutedRpcRegistrationImpl(rpcType, implementation, this);
- implementations.put(rpcType, reg)
-
- for (listener : rpcRegistrationListeners.listeners) {
- try {
- listener.instance.onRpcImplementationAdded(rpcType);
- } catch (Exception e) {
- log.error("Unhandled exception during invoking listener", e);
- }
- }
-
- return reg;
- }
-
- override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException {
- checkNotNull(rpcType, "Rpc Type should not be null");
- checkNotNull(implementation, "Implementation should not be null.");
- checkState(!implementations.containsKey(rpcType), "Provider for supplied rpc is already registered.");
- val reg = new RpcRegistrationImpl(rpcType, implementation, this);
- implementations.put(rpcType, reg)
-
- for (listener : rpcRegistrationListeners.listeners) {
- try {
- listener.instance.onRpcImplementationAdded(rpcType);
- } catch (Exception e) {
- log.error("Unhandled exception during invoking listener", e);
- }
- }
-
- return reg;
-
- }
-
- override invokeRpc(QName rpc, CompositeNode input) {
- checkNotNull(rpc, "Rpc Type should not be null");
-
- val impl = implementations.get(rpc);
- checkState(impl !== null, "Provider for supplied rpc is not registered.");
-
- return impl.instance.invokeRpc(rpc, input);
- }
-
- def remove(RpcRegistrationImpl impl) {
- val existing = implementations.get(impl.type);
- if (existing == impl) {
- implementations.remove(impl.type);
- }
- for (listener : rpcRegistrationListeners.listeners) {
- try {
- listener.instance.onRpcImplementationRemoved(impl.type);
- } catch (Exception e) {
- log.error("Unhandled exception during invoking listener", e);
- }
- }
- }
-
- override addRpcRegistrationListener(RpcRegistrationListener listener) {
- rpcRegistrationListeners.register(listener);
- }
-
-}
-
-class RpcRegistrationImpl extends AbstractObjectRegistration<RpcImplementation> implements RpcRegistration {
-
- @Property
- val QName type;
-
- @Property
- var RpcRouterImpl router;
-
- new(QName type, RpcImplementation instance, RpcRouterImpl router) {
- super(instance)
- _type = type
- _router = router
- }
-
- override protected removeRegistration() {
- router.remove(this);
- }
-}
-class RoutedRpcRegistrationImpl extends RpcRegistrationImpl implements RoutedRpcRegistration {
-
-
- new(QName type, RpcImplementation instance, RpcRouterImpl router) {
- super(type,instance,router)
- }
-
- override protected removeRegistration() {
- router.remove(this);
- }
- override registerPath(QName context, InstanceIdentifier path) {
- //
-
- }
-
- override unregisterPath(QName context, InstanceIdentifier path) {
- //
- }
-}
--- /dev/null
+package org.opendaylight.controller.sal.dom.broker.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
+import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
+import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
+import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.SimpleNode;
+import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
+import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.UnknownSchemaNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+
+public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class);
+
+ private static final QName CONTEXT_REFERENCE = QName.create("urn:opendaylight:yang:extension:yang-ext",
+ "2013-07-09", "context-reference");
+ private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
+ private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
+
+
+ private final String identifier;
+ private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
+ private RpcImplementation defaultImplementation;
+ private SchemaContextProvider schemaProvider;
+
+ public SchemaAwareRpcBroker(String identifier, SchemaContextProvider schemaProvider) {
+ super();
+ this.identifier = identifier;
+ this.schemaProvider = schemaProvider;
+ }
+
+ public RpcImplementation getDefaultImplementation() {
+ return defaultImplementation;
+ }
+
+ public void setDefaultImplementation(RpcImplementation defaultImplementation) {
+ this.defaultImplementation = defaultImplementation;
+ }
+
+ public SchemaContextProvider getSchemaProvider() {
+ return schemaProvider;
+ }
+
+ public void setSchemaProvider(SchemaContextProvider schemaProvider) {
+ this.schemaProvider = schemaProvider;
+ }
+
+ @Override
+ public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
+ checkArgument(rpcType != null, "RPC Type should not be null");
+ checkArgument(implementation != null, "RPC Implementatoin should not be null");
+ return getOrCreateRoutedRpcRouter(rpcType).addRoutedRpcImplementation(rpcType, implementation);
+ }
+
+ private RoutedRpcSelector getOrCreateRoutedRpcRouter(QName rpcType) {
+ RoutedRpcSelector potential = getRoutedRpcRouter(rpcType);
+ if (potential != null) {
+ return potential;
+ }
+ synchronized (implementations) {
+ potential = getRoutedRpcRouter(rpcType);
+ if (potential != null) {
+ return potential;
+ }
+ RpcDefinition definition = findRpcDefinition(rpcType);
+ RoutingStrategy strategy = getRoutingStrategy(definition);
+ checkState(strategy instanceof RoutedRpcStrategy, "Rpc %s is not routed.", rpcType);
+ potential = new RoutedRpcSelector((RoutedRpcStrategy) strategy, this);
+ implementations.put(rpcType, potential);
+ return potential;
+ }
+ }
+
+ private RoutedRpcSelector getRoutedRpcRouter(QName rpcType) {
+ RpcImplementation potential = implementations.get(rpcType);
+ if (potential != null) {
+ checkState(potential instanceof RoutedRpcSelector, "Rpc %s is not routed.", rpcType);
+ return (RoutedRpcSelector) potential;
+ }
+ return null;
+
+ }
+
+ @Override
+ public RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation)
+ throws IllegalArgumentException {
+ checkArgument(rpcType != null, "RPC Type should not be null");
+ checkArgument(implementation != null, "RPC Implementatoin should not be null");
+ checkState(!hasRpcImplementation(rpcType), "Implementation already registered");
+ RpcDefinition definition = findRpcDefinition(rpcType);
+ checkArgument(!isRoutedRpc(definition), "RPC Type must not be routed.");
+ GlobalRpcRegistration reg = new GlobalRpcRegistration(rpcType, implementation, this);
+ return reg;
+ }
+
+ private boolean isRoutedRpc(RpcDefinition definition) {
+ return getRoutingStrategy(definition) instanceof RoutedRpcStrategy;
+ }
+
+ @Override
+ public ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener) {
+ return rpcRegistrationListeners.register(listener);
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public Set<QName> getSupportedRpcs() {
+ return ImmutableSet.copyOf(implementations.keySet());
+ }
+
+ @Override
+ public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+ return findRpcImplemention(rpc).invokeRpc(rpc, input);
+ }
+
+ private RpcImplementation findRpcImplemention(QName rpc) {
+ checkArgument(rpc != null, "Rpc name should not be null");
+ RpcImplementation potentialImpl = implementations.get(rpc);
+ if (potentialImpl != null) {
+ return potentialImpl;
+ }
+ potentialImpl = defaultImplementation;
+ checkState(potentialImpl != null, "Implementation is not available.");
+ return potentialImpl;
+ }
+
+ private boolean hasRpcImplementation(QName rpc) {
+ return implementations.containsKey(rpc);
+ }
+
+ private RpcDefinition findRpcDefinition(QName rpcType) {
+ checkArgument(rpcType != null, "Rpc name must be supplied.");
+ checkState(schemaProvider != null, "Schema Provider is not available.");
+ SchemaContext ctx = schemaProvider.getSchemaContext();
+ checkState(ctx != null, "YANG Schema Context is not available.");
+ Module module = ctx.findModuleByNamespaceAndRevision(rpcType.getNamespace(), rpcType.getRevision());
+ checkState(module != null, "YANG Module is not available.");
+ return findRpcDefinition(rpcType, module.getRpcs());
+ }
+
+ static private RpcDefinition findRpcDefinition(QName rpcType, Set<RpcDefinition> rpcs) {
+ checkState(rpcs != null, "Rpc schema is not available.");
+ for (RpcDefinition rpc : rpcs) {
+ if (rpcType.equals(rpc.getQName())) {
+ return rpc;
+ }
+ }
+ throw new IllegalArgumentException("Supplied Rpc Type is not defined.");
+ }
+
+ private RoutingStrategy getRoutingStrategy(RpcDefinition rpc) {
+ ContainerSchemaNode input = rpc.getInput();
+ if (input != null) {
+ for (DataSchemaNode schemaNode : input.getChildNodes()) {
+ Optional<QName> context = getRoutingContext(schemaNode);
+ if (context.isPresent()) {
+ return createRoutedStrategy(rpc, context.get(), schemaNode.getQName());
+ }
+ }
+ }
+ return createGlobalStrategy(rpc);
+ }
+
+ private static RoutingStrategy createRoutedStrategy(RpcDefinition rpc, QName context, QName leafNode) {
+ return new RoutedRpcStrategy(rpc.getQName(), context, leafNode);
+ }
+
+ private Optional<QName> getRoutingContext(DataSchemaNode schemaNode) {
+ for (UnknownSchemaNode extension : schemaNode.getUnknownSchemaNodes()) {
+ if (CONTEXT_REFERENCE.equals(extension.getNodeType())) {
+ return Optional.fromNullable(extension.getQName());
+ }
+ ;
+ }
+ return Optional.absent();
+ }
+
+ private static RoutingStrategy createGlobalStrategy(RpcDefinition rpc) {
+ GlobalRpcStrategy ret = new GlobalRpcStrategy(rpc.getQName());
+ return ret;
+ }
+
+ private static abstract class RoutingStrategy implements Identifiable<QName> {
+
+ private final QName identifier;
+
+ public RoutingStrategy(QName identifier) {
+ super();
+ this.identifier = identifier;
+ }
+
+ @Override
+ public QName getIdentifier() {
+ return identifier;
+ }
+ }
+
+ private static class GlobalRpcStrategy extends RoutingStrategy {
+
+ public GlobalRpcStrategy(QName identifier) {
+ super(identifier);
+ }
+ }
+
+ private static class RoutedRpcStrategy extends RoutingStrategy {
+
+ private final QName context;
+ private final QName leaf;
+
+ public RoutedRpcStrategy(QName identifier, QName ctx, QName leaf) {
+ super(identifier);
+ this.context = ctx;
+ this.leaf = leaf;
+ }
+
+ public QName getContext() {
+ return context;
+ }
+
+ public QName getLeaf() {
+ return leaf;
+ }
+ }
+
+ private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<QName> {
+
+ private final RoutedRpcStrategy strategy;
+ private final Set<QName> supportedRpcs;
+ private RpcImplementation defaultDelegate;
+ private final ConcurrentMap<InstanceIdentifier, RoutedRpcRegImpl> implementations = new ConcurrentHashMap<>();
+ private SchemaAwareRpcBroker router;
+
+ public RoutedRpcSelector(RoutedRpcStrategy strategy, SchemaAwareRpcBroker router) {
+ super();
+ this.strategy = strategy;
+ supportedRpcs = ImmutableSet.of(strategy.getIdentifier());
+ this.router = router;
+ }
+
+ @Override
+ public QName getIdentifier() {
+ return strategy.getIdentifier();
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+
+ @Override
+ public Set<QName> getSupportedRpcs() {
+ return supportedRpcs;
+ }
+
+ public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
+ return new RoutedRpcRegImpl(rpcType, implementation, this);
+ }
+
+ @Override
+ public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+ CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input"));
+ checkArgument(inputContainer != null, "Rpc payload must contain input element");
+ SimpleNode<?> routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf());
+ checkArgument(routeContainer != null, "Leaf %s must be set with value", strategy.getLeaf());
+ Object route = routeContainer.getValue();
+ RpcImplementation potential = null;
+ if (route != null) {
+ RoutedRpcRegImpl potentialReg = implementations.get(route);
+ if (potentialReg != null) {
+ potential = potentialReg.getInstance();
+ }
+ }
+ if (potential == null) {
+ potential = defaultDelegate;
+ }
+ checkState(potential != null, "No implementation is available for rpc:%s path:%s", rpc, route);
+ return potential.invokeRpc(rpc, input);
+ }
+
+ public void addPath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
+ //checkArgument(strategy.getContext().equals(context),"Supplied context is not supported.");
+ RoutedRpcRegImpl previous = implementations.put(path, routedRpcRegImpl);
+ if (previous == null) {
+ router.notifyPathAnnouncement(context,strategy.getIdentifier(), path);
+ }
+
+ }
+
+ public void removePath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
+ boolean removed = implementations.remove(path, routedRpcRegImpl);
+ if (removed) {
+ router.notifyPathWithdrawal(context, strategy.getIdentifier(), path);
+ }
+ }
+ }
+
+ private static class GlobalRpcRegistration extends AbstractObjectRegistration<RpcImplementation> implements
+ RpcRegistration {
+ private final QName type;
+ private SchemaAwareRpcBroker router;
+
+ public GlobalRpcRegistration(QName type, RpcImplementation instance, SchemaAwareRpcBroker router) {
+ super(instance);
+ this.type = type;
+ this.router = router;
+ }
+
+ @Override
+ public QName getType() {
+ return type;
+ }
+
+ @Override
+ protected void removeRegistration() {
+ if (router != null) {
+ router.remove(this);
+ router = null;
+ }
+ }
+ }
+
+ private static class RoutedRpcRegImpl extends AbstractObjectRegistration<RpcImplementation> implements
+ RoutedRpcRegistration {
+
+ private final QName type;
+ private RoutedRpcSelector router;
+
+ public RoutedRpcRegImpl(QName rpcType, RpcImplementation implementation, RoutedRpcSelector routedRpcSelector) {
+ super(implementation);
+ this.type = rpcType;
+ router = routedRpcSelector;
+ }
+
+ @Override
+ public void registerPath(QName context, InstanceIdentifier path) {
+ router.addPath(context, path, this);
+ }
+
+ @Override
+ public void unregisterPath(QName context, InstanceIdentifier path) {
+ router.removePath(context, path, this);
+ }
+
+ @Override
+ protected void removeRegistration() {
+
+ }
+
+ @Override
+ public QName getType() {
+ return type;
+ }
+
+ }
+
+ private void remove(GlobalRpcRegistration registration) {
+ implementations.remove(registration.getType(), registration);
+ }
+
+ private void notifyPathAnnouncement(QName context, QName identifier, InstanceIdentifier path) {
+ RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
+ RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.announcementChange(contextWrapped , path);
+ for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
+ try {
+ routeListener.getInstance().onRouteChange(change);
+ } catch (Exception e) {
+ LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
+
+ }
+ }
+
+ }
+
+
+
+ private void notifyPathWithdrawal(QName context,QName identifier, InstanceIdentifier path) {
+ RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
+ RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.removalChange(contextWrapped , path);
+ for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
+ try {
+ routeListener.getInstance().onRouteChange(change);
+ } catch (Exception e) {
+ LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
+ }
+ }
+ }
+
+ @Override
+ public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
+ L listener) {
+ return routeChangeListeners.registerWithType(listener);
+ }
+}
--- /dev/null
+package org.opendaylight.controller.sal.dom.broker.impl;
+
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+import com.google.common.base.Optional;
+
+public interface SchemaContextProvider {
+
+ SchemaContext getSchemaContext();
+
+}
@Override
public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input);
-
- ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener);
}