Bug 1010: Implement restconf error responses
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / BrokerImpl.xtend
index bdd4491d016ac747bc17d8a7b10860e7f17c4eed..0ed14c1027905f22fff667201d5a3294ed6afc56 100644 (file)
@@ -7,51 +7,47 @@
  */
 package org.opendaylight.controller.sal.dom.broker;
 
+import com.google.common.util.concurrent.ListenableFuture
 import java.util.Collections
-import java.util.HashMap
 import java.util.HashSet
-import java.util.Map
 import java.util.Set
-import java.util.concurrent.Callable
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Executors
-import java.util.concurrent.Future
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
 import org.opendaylight.controller.sal.core.api.Broker
-import org.opendaylight.controller.sal.core.api.BrokerService
 import org.opendaylight.controller.sal.core.api.Consumer
 import org.opendaylight.controller.sal.core.api.Provider
-import org.opendaylight.controller.sal.core.api.RpcImplementation
-import org.opendaylight.controller.sal.core.spi.BrokerModule
 import org.opendaylight.yangtools.yang.common.QName
 import org.opendaylight.yangtools.yang.common.RpcResult
 import org.opendaylight.yangtools.yang.data.api.CompositeNode
 import org.osgi.framework.BundleContext
 import org.slf4j.LoggerFactory
+import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry
+import org.opendaylight.controller.sal.core.api.RpcImplementation
+import org.opendaylight.controller.sal.core.api.RpcRoutingContext
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
+import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation
 
-public class BrokerImpl implements Broker {
+public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
     private static val log = LoggerFactory.getLogger(BrokerImpl);
 
     // Broker Generic Context
     private val Set<ConsumerContextImpl> sessions = Collections.synchronizedSet(new HashSet<ConsumerContextImpl>());
     private val Set<ProviderContextImpl> providerSessions = Collections.synchronizedSet(
         new HashSet<ProviderContextImpl>());
-    private val Set<BrokerModule> modules = Collections.synchronizedSet(new HashSet<BrokerModule>());
-    private val Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections.
-        synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
 
-    // RPC Context
-    private val Map<QName, RpcImplementation> rpcImpls = Collections.synchronizedMap(
-        new HashMap<QName, RpcImplementation>());
+    @Property
+    private var BundleContext bundleContext;
 
-    // Implementation specific
     @Property
-    private var ExecutorService executor = Executors.newFixedThreadPool(5);
+    private var AutoCloseable deactivator;
+
     @Property
-    private var BundleContext bundleContext;
+    private var RpcRouter router;
 
     override registerConsumer(Consumer consumer, BundleContext ctx) {
         checkPredicates(consumer);
-        log.info("Registering consumer " + consumer);
+        log.trace("Registering consumer " + consumer);
         val session = newSessionFor(consumer, ctx);
         consumer.onSessionInitiated(session);
         sessions.add(session);
@@ -67,65 +63,26 @@ public class BrokerImpl implements Broker {
         return session;
     }
 
-    public def addModule(BrokerModule module) {
-        log.info("Registering broker module " + module);
-        if(modules.contains(module)) {
-            log.error("Module already registered");
-            throw new IllegalArgumentException("Module already exists.");
-        }
-
-        val provServices = module.getProvidedServices();
-        for (Class<? extends BrokerService> serviceType : provServices) {
-            log.info("  Registering session service implementation: " + serviceType.getCanonicalName());
-            serviceProviders.put(serviceType, module);
-        }
-    }
-
-    public def <T extends BrokerService> T serviceFor(Class<T> service, ConsumerContextImpl session) {
-        val prov = serviceProviders.get(service);
-        if(prov == null) {
-            log.warn("Service " + service.toString() + " is not supported");
-            return null;
-        }
-        return prov.getServiceForSession(service, session);
-    }
-
-    // RPC Functionality
-    protected def void addRpcImplementation(QName rpcType, RpcImplementation implementation) {
-        if(rpcImpls.get(rpcType) != null) {
-            throw new IllegalStateException("Implementation for rpc " + rpcType + " is already registered.");
-        }
-
-        //TODO: Add notification for availability of Rpc Implementation
-        rpcImpls.put(rpcType, implementation);
-    }
-
-    protected def void removeRpcImplementation(QName rpcType) {
-        rpcImpls.remove(rpcType);
-    }
-
-    protected def Future<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
-        val impl = rpcImpls.get(rpc);
-        val result = executor.submit([|impl.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
-        return result;
+    protected def ListenableFuture<RpcResult<CompositeNode>> invokeRpcAsync(QName rpc, CompositeNode input) {
+        return router.invokeRpc(rpc, input);
     }
 
     // Validation
     private def void checkPredicates(Provider prov) {
-        if(prov == null)
+        if (prov == null)
             throw new IllegalArgumentException("Provider should not be null.");
         for (ProviderContextImpl session : providerSessions) {
-            if(prov.equals(session.getProvider()))
+            if (prov.equals(session.getProvider()))
                 throw new IllegalStateException("Provider already registered");
         }
 
     }
 
     private def void checkPredicates(Consumer cons) {
-        if(cons == null)
+        if (cons == null)
             throw new IllegalArgumentException("Consumer should not be null.");
         for (ConsumerContextImpl session : sessions) {
-            if(cons.equals(session.getConsumer()))
+            if (cons.equals(session.getConsumer()))
                 throw new IllegalStateException("Consumer already registered");
         }
     }
@@ -147,4 +104,37 @@ public class BrokerImpl implements Broker {
         sessions.remove(consumerContextImpl);
         providerSessions.remove(consumerContextImpl);
     }
+
+    override close() throws Exception {
+        deactivator?.close();
+    }
+
+    override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException {
+        router.addRpcImplementation(rpcType,implementation);
+    }
+
+    override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
+        router.addRoutedRpcImplementation(rpcType,implementation);
+    }
+
+    override setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultImplementation) {
+        router.setRoutedRpcDefaultDelegate(defaultImplementation);
+    }
+
+    override addRpcRegistrationListener(RpcRegistrationListener listener) {
+        return router.addRpcRegistrationListener(listener);
+    }
+
+    override <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> registerRouteChangeListener(L listener) {
+        return router.registerRouteChangeListener(listener);
+    }
+
+    override getSupportedRpcs() {
+        return router.getSupportedRpcs();
+    }
+
+    override invokeRpc(QName rpc, CompositeNode input) {
+        return router.invokeRpc(rpc,input)
+    }
+
 }