Unified Two Phase Commit implementation, fixed BA to BI connection
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / BindingAwareBrokerImpl.xtend
index 298a74ece5f71982ae7bcbb48f205e4039c44046..31d5d0126fc8115546d1d1b7709671e946b2e026 100644 (file)
@@ -28,28 +28,99 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderCo
 
 import org.slf4j.LoggerFactory
 import org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService
+import org.opendaylight.controller.sal.binding.api.data.DataBrokerService
+import org.opendaylight.controller.sal.binding.spi.RpcRouter
+import java.util.concurrent.ConcurrentHashMap
+import static com.google.common.base.Preconditions.*
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
+import org.opendaylight.yangtools.yang.binding.BaseIdentity
+import com.google.common.collect.Multimap
+import com.google.common.collect.HashMultimap
+import static org.opendaylight.controller.sal.binding.impl.util.ClassLoaderUtils.*
+import java.util.concurrent.Executors
+import java.util.Collections
+import org.opendaylight.yangtools.yang.binding.DataObject
+import org.opendaylight.controller.sal.binding.impl.connect.dom.ConnectorActivator
 
-class BindingAwareBrokerImpl implements BindingAwareBroker {
+class BindingAwareBrokerImpl implements BindingAwareBroker, AutoCloseable {
     private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl)
-    
+
+
+    private InstanceIdentifier<? extends DataObject> root = InstanceIdentifier.builder().toInstance();
+
     private val clsPool = ClassPool.getDefault()
     private var RuntimeCodeGenerator generator;
-    private Map<Class<? extends RpcService>, RpcProxyContext> managedProxies = new HashMap();
+    
+
+    /**
+     * Map of all Managed Direct Proxies
+     * 
+     */
+    private val Map<Class<? extends RpcService>, RpcProxyContext> managedProxies = new ConcurrentHashMap();
+
+    /**
+     * 
+     * Map of all available Rpc Routers
+     * 
+     * 
+     */
+    private val Map<Class<? extends RpcService>, RpcRouter<? extends RpcService>> rpcRouters = new ConcurrentHashMap();
+
+    @Property
     private var NotificationBrokerImpl notifyBroker
-    private var ServiceRegistration<NotificationProviderService> notifyBrokerRegistration
+    
+    @Property
+    private var DataBrokerImpl dataBroker
     
     @Property
     var BundleContext brokerBundleContext
+    
+    ServiceRegistration<NotificationProviderService> notifyProviderRegistration
+    
+    ServiceRegistration<NotificationService> notifyConsumerRegistration
+    
+    ServiceRegistration<DataProviderService> dataProviderRegistration
+    
+    ServiceRegistration<DataBrokerService> dataConsumerRegistration
+    
+    ConnectorActivator connectorActivator
+   
+    
+    public new(BundleContext bundleContext) {
+        _brokerBundleContext = bundleContext;
+    }
 
     def start() {
+        log.info("Starting MD-SAL: Binding Aware Broker");
         initGenerator();
 
+        val executor = Executors.newCachedThreadPool;
         // Initialization of notificationBroker
-        notifyBroker = new NotificationBrokerImpl(null);
+        log.info("Starting MD-SAL: Binding Aware Notification Broker");
+        notifyBroker = new NotificationBrokerImpl(executor);
+        notifyBroker.invokerFactory = generator.invokerFactory;
+
+        log.info("Starting MD-SAL: Binding Aware Data Broker");
+        dataBroker = new DataBrokerImpl();
+        dataBroker.executor = executor;
+
         val brokerProperties = newProperties();
-        notifyBrokerRegistration = brokerBundleContext.registerService(NotificationProviderService, notifyBroker,
+        
+        
+        log.info("Starting MD-SAL: Binding Aware Data Broker");
+        notifyProviderRegistration = brokerBundleContext.registerService(NotificationProviderService, notifyBroker,
             brokerProperties)
-        brokerBundleContext.registerService(NotificationService, notifyBroker, brokerProperties)
+        notifyConsumerRegistration = brokerBundleContext.registerService(NotificationService, notifyBroker, brokerProperties)
+        dataProviderRegistration = brokerBundleContext.registerService(DataProviderService, dataBroker, brokerProperties)
+        dataConsumerRegistration = brokerBundleContext.registerService(DataBrokerService, dataBroker, brokerProperties)
+
+        connectorActivator = new ConnectorActivator(dataBroker,brokerBundleContext);
+        connectorActivator.start();
+        log.info("MD-SAL: Binding Aware Broker Started");
     }
 
     def initGenerator() {
@@ -91,22 +162,23 @@ class BindingAwareBrokerImpl implements BindingAwareBroker {
      * 
      * If proxy class does not exist for supplied service class it will be generated automatically.
      */
-    def <T extends RpcService> getManagedDirectProxy(Class<T> service) {
-        
+    private def <T extends RpcService> getManagedDirectProxy(Class<T> service) {
+
         var RpcProxyContext existing = null
         if ((existing = managedProxies.get(service)) != null) {
             return existing.proxy
         }
-        val proxyClass = generator.generateDirectProxy(service)
-        val rpcProxyCtx = new RpcProxyContext(proxyClass)
+        val proxyInstance = generator.getDirectProxyFor(service)
+        val rpcProxyCtx = new RpcProxyContext(proxyInstance.class)
         val properties = new Hashtable<String, String>()
-        rpcProxyCtx.proxy = proxyClass.newInstance as RpcService
+        rpcProxyCtx.proxy = proxyInstance as RpcService
 
         properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY
         rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties)
         managedProxies.put(service, rpcProxyCtx)
         return rpcProxyCtx.proxy
     }
+
     /**
      * Registers RPC Implementation
      * 
@@ -114,11 +186,181 @@ class BindingAwareBrokerImpl implements BindingAwareBroker {
     def <T extends RpcService> registerRpcImplementation(Class<T> type, T service, OsgiProviderContext context,
         Hashtable<String, String> properties) {
         val proxy = getManagedDirectProxy(type)
-        if(proxy.delegate != null) {
-            throw new IllegalStateException("Service " + type + "is already registered");
-        }
+        checkState(proxy.delegate === null, "The Service for type %s is already registered", type)
+
         val osgiReg = context.bundleContext.registerService(type, service, properties);
         proxy.delegate = service;
-        return new RpcServiceRegistrationImpl<T>(type, service, osgiReg);
+        return new RpcServiceRegistrationImpl<T>(type, service, osgiReg,this);
+    }
+
+    def <T extends RpcService> RoutedRpcRegistration<T> registerRoutedRpcImplementation(Class<T> type, T service,
+        OsgiProviderContext context) {
+        val router = resolveRpcRouter(type);
+        checkState(router !== null)
+        return new RoutedRpcRegistrationImpl<T>(service, router, this)
+    }
+
+    private def <T extends RpcService> RpcRouter<T> resolveRpcRouter(Class<T> type) {
+
+        val router = rpcRouters.get(type);
+        if (router !== null) {
+            return router as RpcRouter<T>;
+        }
+
+        // We created Router
+        val newRouter = generator.getRouterFor(type);
+        checkState(newRouter !== null);
+        rpcRouters.put(type, newRouter);
+
+        // We create / update Direct Proxy for router
+        val proxy = getManagedDirectProxy(type);
+        proxy.delegate = newRouter.invocationProxy
+        return newRouter;
+
+    }
+
+    protected def <T extends RpcService> void registerPath(RoutedRpcRegistrationImpl<T> registration,
+        Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
+
+        val router = registration.router;
+        val paths = registration.registeredPaths;
+
+        val routingTable = router.getRoutingTable(context)
+        checkState(routingTable != null);
+
+        // Updating internal structure of registration
+        routingTable.updateRoute(path, registration.instance)
+        // Update routing table / send announce to message bus
+        
+        val success = paths.put(context, path);
+    }
+
+    protected def <T extends RpcService> void unregisterPath(RoutedRpcRegistrationImpl<T> registration,
+        Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
+
+        val router = registration.router;
+        val paths = registration.registeredPaths;
+
+        val routingTable = router.getRoutingTable(context)
+        checkState(routingTable != null);
+
+        // Updating internal structure of registration
+        val target = routingTable.getRoute(path)
+        checkState(target === registration.instance)
+        routingTable.removeRoute(path)
+        checkState(paths.remove(context, path));
+    }
+
+    protected def <T extends RpcService> void unregisterRoutedRpcService(RoutedRpcRegistrationImpl<T> registration) {
+
+        val router = registration.router;
+        val paths = registration.registeredPaths;
+
+        for (ctxMap : registration.registeredPaths.entries) {
+            val context = ctxMap.key
+            val routingTable = router.getRoutingTable(context)
+            val path = ctxMap.value
+            routingTable.removeRoute(path)
+        }
+    }
+    
+    protected def <T extends RpcService> void unregisterRpcService(RpcServiceRegistrationImpl<T> registration) {
+
+        val type = registration.serviceType;
+        
+        val proxy = managedProxies.get(type);
+        if(proxy.proxy.delegate === registration.instance) {
+            proxy.proxy.delegate = null;
+        }
+    }
+    
+    def createDelegate(Class<? extends RpcService> type) {
+        getManagedDirectProxy(type);
+    }
+    
+    def getRpcRouters() {
+        return Collections.unmodifiableMap(rpcRouters);
+    }
+    
+    override close() {
+        dataConsumerRegistration.unregister()
+        dataProviderRegistration.unregister()
+        notifyConsumerRegistration.unregister()
+        notifyProviderRegistration.unregister()
+    }
+    
+}
+
+class RoutedRpcRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RoutedRpcRegistration<T> {
+
+    @Property
+    private val BindingAwareBrokerImpl broker;
+
+    @Property
+    private val RpcRouter<T> router;
+
+    @Property
+    private val Multimap<Class<? extends BaseIdentity>, InstanceIdentifier<?>> registeredPaths = HashMultimap.create();
+
+    private var closed = false;
+
+    new(T instance, RpcRouter<T> backingRouter, BindingAwareBrokerImpl broker) {
+        super(instance)
+        _router = backingRouter;
+        _broker = broker;
+    }
+
+    override protected removeRegistration() {
+        closed = true
+        broker.unregisterRoutedRpcService(this)
+    }
+
+    override registerInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
+        registerPath(context, instance);
     }
+
+    override unregisterInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
+        unregisterPath(context, instance);
+    }
+
+    override registerPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
+        checkClosed()
+        broker.registerPath(this, context, path);
+    }
+
+    override unregisterPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
+        checkClosed()
+        broker.unregisterPath(this, context, path);
+    }
+    
+    override getServiceType() {
+        return router.serviceType;
+    }
+
+    private def checkClosed() {
+        if (closed)
+            throw new IllegalStateException("Registration was closed.");
+    }
+
+}
+class RpcServiceRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RpcRegistration<T>  {
+
+    val ServiceRegistration<T> osgiRegistration;
+    private var BindingAwareBrokerImpl broker;
+    
+    @Property
+    val Class<T> serviceType;
+
+    public new(Class<T> type, T service, ServiceRegistration<T> osgiReg,BindingAwareBrokerImpl broker) {
+        super(service);
+        this._serviceType = type;
+        this.osgiRegistration = osgiReg;
+        this.broker= broker;
+    }
+
+    override protected removeRegistration() {
+        broker.unregisterRpcService(this);
+        broker = null;
+    }
+    
 }