import java.util.concurrent.Executors
import java.util.Collections
import org.opendaylight.yangtools.yang.binding.DataObject
-import org.opendaylight.controller.sal.binding.impl.connect.dom.ConnectorActivator
+import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.Callable
+import java.util.WeakHashMap
+import javax.annotation.concurrent.GuardedBy
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry
-class BindingAwareBrokerImpl implements BindingAwareBroker, AutoCloseable {
+class BindingAwareBrokerImpl implements BindingAwareBroker, RpcProviderRegistry, AutoCloseable {
private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl)
-
private InstanceIdentifier<? extends DataObject> root = InstanceIdentifier.builder().toInstance();
- private val clsPool = ClassPool.getDefault()
- private var RuntimeCodeGenerator generator;
-
+ private static val clsPool = ClassPool.getDefault()
+ public static var RuntimeCodeGenerator generator;
/**
* Map of all Managed Direct Proxies
*
*
*/
- private val Map<Class<? extends RpcService>, RpcRouter<? extends RpcService>> rpcRouters = new ConcurrentHashMap();
+ private val Map<Class<? extends RpcService>, RpcRouter<? extends RpcService>> rpcRouters = new WeakHashMap();
@Property
- private var NotificationBrokerImpl notifyBroker
-
+ private var NotificationProviderService notifyBroker
+
@Property
- private var DataBrokerImpl dataBroker
-
+ private var DataProviderService dataBroker
+
@Property
var BundleContext brokerBundleContext
-
+
ServiceRegistration<NotificationProviderService> notifyProviderRegistration
-
+
ServiceRegistration<NotificationService> notifyConsumerRegistration
-
+
ServiceRegistration<DataProviderService> dataProviderRegistration
-
+
ServiceRegistration<DataBrokerService> dataConsumerRegistration
-
- ConnectorActivator connectorActivator
-
-
+
+ private val proxyGenerationLock = new ReentrantLock;
+
+ private val routerGenerationLock = new ReentrantLock;
+
public new(BundleContext bundleContext) {
_brokerBundleContext = bundleContext;
}
initGenerator();
val executor = Executors.newCachedThreadPool;
+
// Initialization of notificationBroker
log.info("Starting MD-SAL: Binding Aware Notification Broker");
- notifyBroker = new NotificationBrokerImpl(executor);
- notifyBroker.invokerFactory = generator.invokerFactory;
log.info("Starting MD-SAL: Binding Aware Data Broker");
- dataBroker = new DataBrokerImpl();
- dataBroker.executor = executor;
- val brokerProperties = newProperties();
-
-
log.info("Starting MD-SAL: Binding Aware Data Broker");
- notifyProviderRegistration = brokerBundleContext.registerService(NotificationProviderService, notifyBroker,
- brokerProperties)
- notifyConsumerRegistration = brokerBundleContext.registerService(NotificationService, notifyBroker, brokerProperties)
- dataProviderRegistration = brokerBundleContext.registerService(DataProviderService, dataBroker, brokerProperties)
- dataConsumerRegistration = brokerBundleContext.registerService(DataBrokerService, dataBroker, brokerProperties)
-
- connectorActivator = new ConnectorActivator(dataBroker,brokerBundleContext);
- connectorActivator.start();
log.info("MD-SAL: Binding Aware Broker Started");
}
* If proxy class does not exist for supplied service class it will be generated automatically.
*/
private def <T extends RpcService> getManagedDirectProxy(Class<T> service) {
-
var RpcProxyContext existing = null
+
if ((existing = managedProxies.get(service)) != null) {
return existing.proxy
}
- val proxyInstance = generator.getDirectProxyFor(service)
- val rpcProxyCtx = new RpcProxyContext(proxyInstance.class)
- val properties = new Hashtable<String, String>()
- rpcProxyCtx.proxy = proxyInstance as RpcService
-
- properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY
- rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties)
- managedProxies.put(service, rpcProxyCtx)
- return rpcProxyCtx.proxy
+ return withLock(proxyGenerationLock) [ |
+ val maybeProxy = managedProxies.get(service);
+ if (maybeProxy !== null) {
+ return maybeProxy.proxy;
+ }
+
+
+ val proxyInstance = generator.getDirectProxyFor(service)
+ val rpcProxyCtx = new RpcProxyContext(proxyInstance.class)
+ val properties = new Hashtable<String, String>()
+ rpcProxyCtx.proxy = proxyInstance as RpcService
+ properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY
+ rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties)
+ managedProxies.put(service, rpcProxyCtx)
+ return rpcProxyCtx.proxy
+ ]
+ }
+
+ private static def <T> T withLock(ReentrantLock lock, Callable<T> method) {
+ try {
+ lock.lock();
+ val ret = method.call;
+ return ret;
+ } finally {
+ lock.unlock();
+ }
}
/**
* Registers RPC Implementation
*
*/
- def <T extends RpcService> registerRpcImplementation(Class<T> type, T service, OsgiProviderContext context,
- Hashtable<String, String> properties) {
+ override <T extends RpcService> addRpcImplementation(Class<T> type, T service) {
+ checkNotNull(type, "Service type should not be null")
+ checkNotNull(service, "Service type should not be null")
+
val proxy = getManagedDirectProxy(type)
checkState(proxy.delegate === null, "The Service for type %s is already registered", type)
- val osgiReg = context.bundleContext.registerService(type, service, properties);
proxy.delegate = service;
- return new RpcServiceRegistrationImpl<T>(type, service, osgiReg,this);
+ return new RpcServiceRegistrationImpl<T>(type, service, this);
}
- def <T extends RpcService> RoutedRpcRegistration<T> registerRoutedRpcImplementation(Class<T> type, T service,
- OsgiProviderContext context) {
+ override <T extends RpcService> RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> type, T service) {
+ checkNotNull(type, "Service type should not be null")
+ checkNotNull(service, "Service type should not be null")
+
val router = resolveRpcRouter(type);
checkState(router !== null)
return new RoutedRpcRegistrationImpl<T>(service, router, this)
}
+
+ override <T extends RpcService> getRpcService(Class<T> service) {
+ checkNotNull(service, "Service should not be null");
+ return getManagedDirectProxy(service) as T;
+ }
private def <T extends RpcService> RpcRouter<T> resolveRpcRouter(Class<T> type) {
}
// We created Router
- val newRouter = generator.getRouterFor(type);
- checkState(newRouter !== null);
- rpcRouters.put(type, newRouter);
-
- // We create / update Direct Proxy for router
- val proxy = getManagedDirectProxy(type);
- proxy.delegate = newRouter.invocationProxy
- return newRouter;
+ return withLock(routerGenerationLock) [ |
+ val maybeRouter = rpcRouters.get(type);
+ if (maybeRouter !== null) {
+ return maybeRouter as RpcRouter<T>;
+ }
+
+ val newRouter = generator.getRouterFor(type);
+ checkState(newRouter !== null);
+ rpcRouters.put(type, newRouter);
+ // We create / update Direct Proxy for router
+ val proxy = getManagedDirectProxy(type);
+ proxy.delegate = newRouter.invocationProxy
+ return newRouter;
+ ]
}
// Updating internal structure of registration
routingTable.updateRoute(path, registration.instance)
+
// Update routing table / send announce to message bus
-
val success = paths.put(context, path);
}
routingTable.removeRoute(path)
}
}
-
+
protected def <T extends RpcService> void unregisterRpcService(RpcServiceRegistrationImpl<T> registration) {
val type = registration.serviceType;
-
+
val proxy = managedProxies.get(type);
- if(proxy.proxy.delegate === registration.instance) {
+ if (proxy.proxy.delegate === registration.instance) {
proxy.proxy.delegate = null;
}
}
-
+
def createDelegate(Class<? extends RpcService> type) {
getManagedDirectProxy(type);
}
-
+
def getRpcRouters() {
return Collections.unmodifiableMap(rpcRouters);
}
-
+
override close() {
dataConsumerRegistration.unregister()
dataProviderRegistration.unregister()
notifyConsumerRegistration.unregister()
notifyProviderRegistration.unregister()
}
-
+
}
class RoutedRpcRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RoutedRpcRegistration<T> {
checkClosed()
broker.unregisterPath(this, context, path);
}
-
+
override getServiceType() {
return router.serviceType;
}
}
}
-class RpcServiceRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RpcRegistration<T> {
- val ServiceRegistration<T> osgiRegistration;
+class RpcServiceRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RpcRegistration<T> {
+
private var BindingAwareBrokerImpl broker;
-
+
@Property
val Class<T> serviceType;
- public new(Class<T> type, T service, ServiceRegistration<T> osgiReg,BindingAwareBrokerImpl broker) {
+ public new(Class<T> type, T service, BindingAwareBrokerImpl broker) {
super(service);
this._serviceType = type;
- this.osgiRegistration = osgiReg;
- this.broker= broker;
+ this.broker = broker;
}
override protected removeRegistration() {
broker.unregisterRpcService(this);
broker = null;
}
-
+
}