Merge "Leafref and identityref types to Json"
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / BindingAwareBrokerImpl.xtend
index bd7f25c02ffb866376515165e5adbe18eee34273..9381a5a070e67e7c82226c5b920b06f9a952cc67 100644 (file)
@@ -40,14 +40,23 @@ import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
 import org.opendaylight.yangtools.yang.binding.BaseIdentity
 import com.google.common.collect.Multimap
 import com.google.common.collect.HashMultimap
-import static org.opendaylight.controller.sal.binding.impl.osgi.ClassLoaderUtils.*
+import static org.opendaylight.controller.sal.binding.impl.util.ClassLoaderUtils.*
 import java.util.concurrent.Executors
-
-class BindingAwareBrokerImpl implements BindingAwareBroker {
+import java.util.Collections
+import org.opendaylight.yangtools.yang.binding.DataObject
+import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.Callable
+import java.util.WeakHashMap
+import javax.annotation.concurrent.GuardedBy
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry
+
+class BindingAwareBrokerImpl implements BindingAwareBroker, RpcProviderRegistry, AutoCloseable {
     private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl)
 
-    private val clsPool = ClassPool.getDefault()
-    private var RuntimeCodeGenerator generator;
+    private InstanceIdentifier<? extends DataObject> root = InstanceIdentifier.builder().toInstance();
+
+    private static val clsPool = ClassPool.getDefault()
+    public static var RuntimeCodeGenerator generator;
 
     /**
      * Map of all Managed Direct Proxies
@@ -61,30 +70,46 @@ class BindingAwareBrokerImpl implements BindingAwareBroker {
      * 
      * 
      */
-    private val Map<Class<? extends RpcService>, RpcRouter<? extends RpcService>> rpcRouters = new ConcurrentHashMap();
+    private val Map<Class<? extends RpcService>, RpcRouter<? extends RpcService>> rpcRouters = new WeakHashMap();
 
-    private var NotificationBrokerImpl notifyBroker
-    private var DataBrokerImpl dataBroker
-    private var ServiceRegistration<NotificationProviderService> notifyBrokerRegistration
+    @Property
+    private var NotificationProviderService notifyBroker
+
+    @Property
+    private var DataProviderService dataBroker
 
     @Property
     var BundleContext brokerBundleContext
 
+    ServiceRegistration<NotificationProviderService> notifyProviderRegistration
+
+    ServiceRegistration<NotificationService> notifyConsumerRegistration
+
+    ServiceRegistration<DataProviderService> dataProviderRegistration
+
+    ServiceRegistration<DataBrokerService> dataConsumerRegistration
+
+    private val proxyGenerationLock = new ReentrantLock;
+
+    private val routerGenerationLock = new ReentrantLock;
+
+    public new(BundleContext bundleContext) {
+        _brokerBundleContext = bundleContext;
+    }
+
     def start() {
+        log.info("Starting MD-SAL: Binding Aware Broker");
         initGenerator();
 
         val executor = Executors.newCachedThreadPool;
+
         // Initialization of notificationBroker
-        notifyBroker = new NotificationBrokerImpl(executor);
-        notifyBroker.invokerFactory = generator.invokerFactory;
-        dataBroker = new DataBrokerImpl();
-        val brokerProperties = newProperties();
-        notifyBrokerRegistration = brokerBundleContext.registerService(NotificationProviderService, notifyBroker,
-            brokerProperties)
-        brokerBundleContext.registerService(NotificationService, notifyBroker, brokerProperties)
-        brokerBundleContext.registerService(DataProviderService, dataBroker, brokerProperties)
-        brokerBundleContext.registerService(DataBrokerService, dataBroker, brokerProperties)
+        log.info("Starting MD-SAL: Binding Aware Notification Broker");
 
+        log.info("Starting MD-SAL: Binding Aware Data Broker");
+
+        log.info("Starting MD-SAL: Binding Aware Data Broker");
+        log.info("MD-SAL: Binding Aware Broker Started");
     }
 
     def initGenerator() {
@@ -127,47 +152,67 @@ class BindingAwareBrokerImpl implements BindingAwareBroker {
      * If proxy class does not exist for supplied service class it will be generated automatically.
      */
     private def <T extends RpcService> getManagedDirectProxy(Class<T> service) {
-
         var RpcProxyContext existing = null
+
         if ((existing = managedProxies.get(service)) != null) {
             return existing.proxy
         }
-        val proxyInstance = generator.getDirectProxyFor(service)
-        val rpcProxyCtx = new RpcProxyContext(proxyInstance.class)
-        val properties = new Hashtable<String, String>()
-        rpcProxyCtx.proxy = proxyInstance as RpcService
-
-        properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY
-        rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties)
-        managedProxies.put(service, rpcProxyCtx)
-        return rpcProxyCtx.proxy
+        return withLock(proxyGenerationLock) [ |
+            val maybeProxy = managedProxies.get(service);
+            if (maybeProxy !== null) {
+                return maybeProxy.proxy;
+            }
+            
+            
+            val proxyInstance = generator.getDirectProxyFor(service)
+            val rpcProxyCtx = new RpcProxyContext(proxyInstance.class)
+            val properties = new Hashtable<String, String>()
+            rpcProxyCtx.proxy = proxyInstance as RpcService
+            properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY
+            rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties)
+            managedProxies.put(service, rpcProxyCtx)
+            return rpcProxyCtx.proxy
+        ]
+    }
+
+    private static def <T> T withLock(ReentrantLock lock, Callable<T> method) {
+        try {
+            lock.lock();
+            val ret = method.call;
+            return ret;
+        } finally {
+            lock.unlock();
+        }
     }
 
     /**
      * Registers RPC Implementation
      * 
      */
-    def <T extends RpcService> registerRpcImplementation(Class<T> type, T service, OsgiProviderContext context,
-        Hashtable<String, String> properties) {
+    override <T extends RpcService> addRpcImplementation(Class<T> type, T service) {
+        checkNotNull(type, "Service type should not be null")
+        checkNotNull(service, "Service type should not be null")
+        
         val proxy = getManagedDirectProxy(type)
-        checkState(proxy.delegate === null, "The Service for type {} is already registered", type)
+        checkState(proxy.delegate === null, "The Service for type %s is already registered", type)
 
-        val osgiReg = context.bundleContext.registerService(type, service, properties);
         proxy.delegate = service;
-        return new RpcServiceRegistrationImpl<T>(type, service, osgiReg);
+        return new RpcServiceRegistrationImpl<T>(type, service, this);
     }
 
-    def <T extends RpcService> RpcRegistration<T> registerMountedRpcImplementation(Class<T> type, T service,
-        InstanceIdentifier<?> identifier, OsgiProviderContext context) {
-        throw new UnsupportedOperationException("TODO: auto-generated method stub")
-    }
-
-    def <T extends RpcService> RoutedRpcRegistration<T> registerRoutedRpcImplementation(Class<T> type, T service,
-        OsgiProviderContext context) {
+    override <T extends RpcService> RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> type, T service) {
+        checkNotNull(type, "Service type should not be null")
+        checkNotNull(service, "Service type should not be null")
+        
         val router = resolveRpcRouter(type);
         checkState(router !== null)
         return new RoutedRpcRegistrationImpl<T>(service, router, this)
     }
+    
+    override <T extends RpcService> getRpcService(Class<T> service) {
+        checkNotNull(service, "Service should not be null");
+        return getManagedDirectProxy(service) as T;
+    }
 
     private def <T extends RpcService> RpcRouter<T> resolveRpcRouter(Class<T> type) {
 
@@ -177,14 +222,20 @@ class BindingAwareBrokerImpl implements BindingAwareBroker {
         }
 
         // We created Router
-        val newRouter = generator.getRouterFor(type);
-        checkState(newRouter !== null);
-        rpcRouters.put(type, newRouter);
-
-        // We create / update Direct Proxy for router
-        val proxy = getManagedDirectProxy(type);
-        proxy.delegate = newRouter.invocationProxy
-        return newRouter;
+        return withLock(routerGenerationLock) [ |
+            val maybeRouter = rpcRouters.get(type);
+            if (maybeRouter !== null) {
+                return maybeRouter as RpcRouter<T>;
+            }
+            
+            val newRouter = generator.getRouterFor(type);
+            checkState(newRouter !== null);
+            rpcRouters.put(type, newRouter);
+            // We create / update Direct Proxy for router
+            val proxy = getManagedDirectProxy(type);
+            proxy.delegate = newRouter.invocationProxy
+            return newRouter;
+        ]
 
     }
 
@@ -199,8 +250,8 @@ class BindingAwareBrokerImpl implements BindingAwareBroker {
 
         // Updating internal structure of registration
         routingTable.updateRoute(path, registration.instance)
+
         // Update routing table / send announce to message bus
-        
         val success = paths.put(context, path);
     }
 
@@ -230,14 +281,34 @@ class BindingAwareBrokerImpl implements BindingAwareBroker {
             val routingTable = router.getRoutingTable(context)
             val path = ctxMap.value
             routingTable.removeRoute(path)
+        }
+    }
+
+    protected def <T extends RpcService> void unregisterRpcService(RpcServiceRegistrationImpl<T> registration) {
 
+        val type = registration.serviceType;
+
+        val proxy = managedProxies.get(type);
+        if (proxy.proxy.delegate === registration.instance) {
+            proxy.proxy.delegate = null;
         }
     }
-    
+
     def createDelegate(Class<? extends RpcService> type) {
         getManagedDirectProxy(type);
     }
-    
+
+    def getRpcRouters() {
+        return Collections.unmodifiableMap(rpcRouters);
+    }
+
+    override close() {
+        dataConsumerRegistration.unregister()
+        dataProviderRegistration.unregister()
+        notifyConsumerRegistration.unregister()
+        notifyProviderRegistration.unregister()
+    }
+
 }
 
 class RoutedRpcRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RoutedRpcRegistration<T> {
@@ -272,10 +343,6 @@ class RoutedRpcRegistrationImpl<T extends RpcService> extends AbstractObjectRegi
         unregisterPath(context, instance);
     }
 
-    override getService() {
-        return instance;
-    }
-
     override registerPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
         checkClosed()
         broker.registerPath(this, context, path);
@@ -286,9 +353,33 @@ class RoutedRpcRegistrationImpl<T extends RpcService> extends AbstractObjectRegi
         broker.unregisterPath(this, context, path);
     }
 
+    override getServiceType() {
+        return router.serviceType;
+    }
+
     private def checkClosed() {
         if (closed)
             throw new IllegalStateException("Registration was closed.");
     }
 
 }
+
+class RpcServiceRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RpcRegistration<T> {
+
+    private var BindingAwareBrokerImpl broker;
+
+    @Property
+    val Class<T> serviceType;
+
+    public new(Class<T> type, T service, BindingAwareBrokerImpl broker) {
+        super(service);
+        this._serviceType = type;
+        this.broker = broker;
+    }
+
+    override protected removeRegistration() {
+        broker.unregisterRpcService(this);
+        broker = null;
+    }
+
+}