import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration
import org.opendaylight.controller.sal.binding.api.data.DataProviderService
import org.opendaylight.controller.sal.binding.api.data.DataBrokerService
+import org.opendaylight.controller.sal.binding.spi.RpcRouter
+import java.util.concurrent.ConcurrentHashMap
+import static com.google.common.base.Preconditions.*
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
+import org.opendaylight.yangtools.yang.binding.BaseIdentity
+import com.google.common.collect.Multimap
+import com.google.common.collect.HashMultimap
+import static org.opendaylight.controller.sal.binding.impl.util.ClassLoaderUtils.*
+import java.util.concurrent.Executors
+import java.util.Collections
+import org.opendaylight.yangtools.yang.binding.DataObject
+import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.Callable
+import java.util.WeakHashMap
+import javax.annotation.concurrent.GuardedBy
-class BindingAwareBrokerImpl implements BindingAwareBroker {
+class BindingAwareBrokerImpl implements BindingAwareBroker, AutoCloseable {
private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl)
- private val clsPool = ClassPool.getDefault()
- private var RuntimeCodeGenerator generator;
- private Map<Class<? extends RpcService>, RpcProxyContext> managedProxies = new HashMap();
- private var NotificationBrokerImpl notifyBroker
- private var DataBrokerImpl dataBroker
- private var ServiceRegistration<NotificationProviderService> notifyBrokerRegistration
+ private InstanceIdentifier<? extends DataObject> root = InstanceIdentifier.builder().toInstance();
+
+ private static val clsPool = ClassPool.getDefault()
+ public static var RuntimeCodeGenerator generator;
+
+ /**
+ * Map of all Managed Direct Proxies
+ *
+ */
+ private val Map<Class<? extends RpcService>, RpcProxyContext> managedProxies = new ConcurrentHashMap();
+
+ /**
+ *
+ * Map of all available Rpc Routers
+ *
+ *
+ */
+ private val Map<Class<? extends RpcService>, RpcRouter<? extends RpcService>> rpcRouters = new WeakHashMap();
+
+ @Property
+ private var NotificationProviderService notifyBroker
+
+ @Property
+ private var DataProviderService dataBroker
@Property
var BundleContext brokerBundleContext
+ ServiceRegistration<NotificationProviderService> notifyProviderRegistration
+
+ ServiceRegistration<NotificationService> notifyConsumerRegistration
+
+ ServiceRegistration<DataProviderService> dataProviderRegistration
+
+ ServiceRegistration<DataBrokerService> dataConsumerRegistration
+
+ private val proxyGenerationLock = new ReentrantLock;
+
+ private val routerGenerationLock = new ReentrantLock;
+
+ public new(BundleContext bundleContext) {
+ _brokerBundleContext = bundleContext;
+ }
+
def start() {
+ log.info("Starting MD-SAL: Binding Aware Broker");
initGenerator();
+ val executor = Executors.newCachedThreadPool;
+
// Initialization of notificationBroker
- notifyBroker = new NotificationBrokerImpl(null);
- dataBroker = new DataBrokerImpl();
- val brokerProperties = newProperties();
- notifyBrokerRegistration = brokerBundleContext.registerService(NotificationProviderService, notifyBroker,
- brokerProperties)
- brokerBundleContext.registerService(NotificationService, notifyBroker, brokerProperties)
- brokerBundleContext.registerService(DataProviderService,dataBroker,brokerProperties)
- brokerBundleContext.registerService(DataBrokerService,dataBroker,brokerProperties)
-
+ log.info("Starting MD-SAL: Binding Aware Notification Broker");
+
+ log.info("Starting MD-SAL: Binding Aware Data Broker");
+
+ log.info("Starting MD-SAL: Binding Aware Data Broker");
+ log.info("MD-SAL: Binding Aware Broker Started");
}
def initGenerator() {
*
* If proxy class does not exist for supplied service class it will be generated automatically.
*/
- def <T extends RpcService> getManagedDirectProxy(Class<T> service) {
-
+ private def <T extends RpcService> getManagedDirectProxy(Class<T> service) {
var RpcProxyContext existing = null
- if((existing = managedProxies.get(service)) != null) {
+
+ if ((existing = managedProxies.get(service)) != null) {
return existing.proxy
}
- val proxyClass = generator.generateDirectProxy(service)
- val rpcProxyCtx = new RpcProxyContext(proxyClass)
- val properties = new Hashtable<String, String>()
- rpcProxyCtx.proxy = proxyClass.newInstance as RpcService
+ return withLock(proxyGenerationLock) [ |
+ val maybeProxy = managedProxies.get(service);
+ if (maybeProxy !== null) {
+ return maybeProxy.proxy;
+ }
+
+
+ val proxyInstance = generator.getDirectProxyFor(service)
+ val rpcProxyCtx = new RpcProxyContext(proxyInstance.class)
+ val properties = new Hashtable<String, String>()
+ rpcProxyCtx.proxy = proxyInstance as RpcService
+ properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY
+ rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties)
+ managedProxies.put(service, rpcProxyCtx)
+ return rpcProxyCtx.proxy
+ ]
+ }
- properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY
- rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties)
- managedProxies.put(service, rpcProxyCtx)
- return rpcProxyCtx.proxy
+ private static def <T> T withLock(ReentrantLock lock, Callable<T> method) {
+ try {
+ lock.lock();
+ val ret = method.call;
+ return ret;
+ } finally {
+ lock.unlock();
+ }
}
/**
def <T extends RpcService> registerRpcImplementation(Class<T> type, T service, OsgiProviderContext context,
Hashtable<String, String> properties) {
val proxy = getManagedDirectProxy(type)
- if(proxy.delegate != null) {
- throw new IllegalStateException("Service " + type + "is already registered");
- }
+ checkState(proxy.delegate === null, "The Service for type %s is already registered", type)
+
val osgiReg = context.bundleContext.registerService(type, service, properties);
proxy.delegate = service;
- return new RpcServiceRegistrationImpl<T>(type, service, osgiReg);
+ return new RpcServiceRegistrationImpl<T>(type, service, osgiReg, this);
}
- def <T extends RpcService> RpcRegistration<T> registerMountedRpcImplementation(Class<T> tyoe, T service, InstanceIdentifier<?> identifier,
- OsgiProviderContext context, Hashtable<String, String> properties) {
- throw new UnsupportedOperationException("TODO: auto-generated method stub")
+ def <T extends RpcService> RoutedRpcRegistration<T> registerRoutedRpcImplementation(Class<T> type, T service,
+ OsgiProviderContext context) {
+ val router = resolveRpcRouter(type);
+ checkState(router !== null)
+ return new RoutedRpcRegistrationImpl<T>(service, router, this)
}
- def <T extends RpcService> RoutedRpcRegistration<T> registerRoutedRpcImplementation(Class<T> type, T service, OsgiProviderContext context,
- Hashtable<String, String> properties) {
- throw new UnsupportedOperationException("TODO: auto-generated method stub")
+ private def <T extends RpcService> RpcRouter<T> resolveRpcRouter(Class<T> type) {
+
+ val router = rpcRouters.get(type);
+ if (router !== null) {
+ return router as RpcRouter<T>;
+ }
+
+ // We created Router
+ return withLock(routerGenerationLock) [ |
+ val maybeRouter = rpcRouters.get(type);
+ if (maybeRouter !== null) {
+ return maybeRouter as RpcRouter<T>;
+ }
+
+ val newRouter = generator.getRouterFor(type);
+ checkState(newRouter !== null);
+ rpcRouters.put(type, newRouter);
+ // We create / update Direct Proxy for router
+ val proxy = getManagedDirectProxy(type);
+ proxy.delegate = newRouter.invocationProxy
+ return newRouter;
+ ]
+
+ }
+
+ protected def <T extends RpcService> void registerPath(RoutedRpcRegistrationImpl<T> registration,
+ Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
+
+ val router = registration.router;
+ val paths = registration.registeredPaths;
+
+ val routingTable = router.getRoutingTable(context)
+ checkState(routingTable != null);
+
+ // Updating internal structure of registration
+ routingTable.updateRoute(path, registration.instance)
+
+ // Update routing table / send announce to message bus
+ val success = paths.put(context, path);
+ }
+
+ protected def <T extends RpcService> void unregisterPath(RoutedRpcRegistrationImpl<T> registration,
+ Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
+
+ val router = registration.router;
+ val paths = registration.registeredPaths;
+
+ val routingTable = router.getRoutingTable(context)
+ checkState(routingTable != null);
+
+ // Updating internal structure of registration
+ val target = routingTable.getRoute(path)
+ checkState(target === registration.instance)
+ routingTable.removeRoute(path)
+ checkState(paths.remove(context, path));
+ }
+
+ protected def <T extends RpcService> void unregisterRoutedRpcService(RoutedRpcRegistrationImpl<T> registration) {
+
+ val router = registration.router;
+ val paths = registration.registeredPaths;
+
+ for (ctxMap : registration.registeredPaths.entries) {
+ val context = ctxMap.key
+ val routingTable = router.getRoutingTable(context)
+ val path = ctxMap.value
+ routingTable.removeRoute(path)
+ }
+ }
+
+ protected def <T extends RpcService> void unregisterRpcService(RpcServiceRegistrationImpl<T> registration) {
+
+ val type = registration.serviceType;
+
+ val proxy = managedProxies.get(type);
+ if (proxy.proxy.delegate === registration.instance) {
+ proxy.proxy.delegate = null;
+ }
+ }
+
+ def createDelegate(Class<? extends RpcService> type) {
+ getManagedDirectProxy(type);
+ }
+
+ def getRpcRouters() {
+ return Collections.unmodifiableMap(rpcRouters);
+ }
+
+ override close() {
+ dataConsumerRegistration.unregister()
+ dataProviderRegistration.unregister()
+ notifyConsumerRegistration.unregister()
+ notifyProviderRegistration.unregister()
+ }
+
+}
+
+class RoutedRpcRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RoutedRpcRegistration<T> {
+
+ @Property
+ private val BindingAwareBrokerImpl broker;
+
+ @Property
+ private val RpcRouter<T> router;
+
+ @Property
+ private val Multimap<Class<? extends BaseIdentity>, InstanceIdentifier<?>> registeredPaths = HashMultimap.create();
+
+ private var closed = false;
+
+ new(T instance, RpcRouter<T> backingRouter, BindingAwareBrokerImpl broker) {
+ super(instance)
+ _router = backingRouter;
+ _broker = broker;
+ }
+
+ override protected removeRegistration() {
+ closed = true
+ broker.unregisterRoutedRpcService(this)
+ }
+
+ override registerInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
+ registerPath(context, instance);
+ }
+
+ override unregisterInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
+ unregisterPath(context, instance);
+ }
+
+ override registerPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
+ checkClosed()
+ broker.registerPath(this, context, path);
+ }
+
+ override unregisterPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
+ checkClosed()
+ broker.unregisterPath(this, context, path);
+ }
+
+ override getServiceType() {
+ return router.serviceType;
+ }
+
+ private def checkClosed() {
+ if (closed)
+ throw new IllegalStateException("Registration was closed.");
+ }
+
+}
+
+class RpcServiceRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RpcRegistration<T> {
+
+ val ServiceRegistration<T> osgiRegistration;
+ private var BindingAwareBrokerImpl broker;
+
+ @Property
+ val Class<T> serviceType;
+
+ public new(Class<T> type, T service, ServiceRegistration<T> osgiReg, BindingAwareBrokerImpl broker) {
+ super(service);
+ this._serviceType = type;
+ this.osgiRegistration = osgiReg;
+ this.broker = broker;
+ }
+
+ override protected removeRegistration() {
+ broker.unregisterRpcService(this);
+ broker = null;
}
}