Added support for binding-independent RPCs
[controller.git] / opendaylight / sal / yang-prototype / sal / sal-broker-impl / src / main / java / org / opendaylight / controller / sal / core / impl / BrokerImpl.java
index 84bc0569504272704d3ea68605d42f6c13de0afe..b8a0b97eab9abb81b851eeda349ec6156709442a 100644 (file)
@@ -1,4 +1,3 @@
-
 /*\r
  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.\r
  *\r
@@ -8,46 +7,55 @@
  */\r
 package org.opendaylight.controller.sal.core.impl;\r
 \r
+import java.util.Collection;\r
+import java.util.Collections;\r
 import java.util.HashMap;\r
 import java.util.HashSet;\r
 import java.util.Map;\r
 import java.util.Set;\r
+import java.util.concurrent.Callable;\r
+import java.util.concurrent.ExecutorService;\r
 import java.util.concurrent.Future;\r
-\r
 import org.opendaylight.controller.sal.core.api.Broker;\r
 import org.opendaylight.controller.sal.core.api.BrokerService;\r
 import org.opendaylight.controller.sal.core.api.Consumer;\r
 import org.opendaylight.controller.sal.core.api.Provider;\r
+import org.opendaylight.controller.sal.core.api.RpcImplementation;\r
 import org.opendaylight.controller.sal.core.spi.BrokerModule;\r
-import org.opendaylight.controller.yang.common.QName;
-import org.opendaylight.controller.yang.common.RpcResult;
-import org.opendaylight.controller.yang.data.api.CompositeNode;
+import org.opendaylight.controller.yang.common.QName;\r
+import org.opendaylight.controller.yang.common.RpcResult;\r
+import org.opendaylight.controller.yang.data.api.CompositeNode;\r
 import org.slf4j.Logger;\r
 import org.slf4j.LoggerFactory;\r
 \r
-\r
 public class BrokerImpl implements Broker {\r
     private static Logger log = LoggerFactory.getLogger(BrokerImpl.class);\r
 \r
-    private Set<ConsumerSessionImpl> sessions = new HashSet<ConsumerSessionImpl>();\r
-    private Set<ProviderSessionImpl> providerSessions = new HashSet<ProviderSessionImpl>();\r
-    // private ExecutorService executor;\r
-    private Set<BrokerModule> modules = new HashSet<BrokerModule>();\r
+    // Broker Generic Context\r
+    private Set<ConsumerSessionImpl> sessions = Collections\r
+            .synchronizedSet(new HashSet<ConsumerSessionImpl>());\r
+    private Set<ProviderSessionImpl> providerSessions = Collections\r
+            .synchronizedSet(new HashSet<ProviderSessionImpl>());\r
+    private Set<BrokerModule> modules = Collections\r
+            .synchronizedSet(new HashSet<BrokerModule>());\r
+    private Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections\r
+            .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());\r
+\r
+    // RPC Context\r
+    private Map<QName, RpcImplementation> rpcImpls = Collections\r
+            .synchronizedMap(new HashMap<QName, RpcImplementation>());\r
 \r
-    private Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = new HashMap<Class<? extends BrokerService>, BrokerModule>();\r
+    // Implementation specific\r
+    private ExecutorService executor;\r
 \r
     @Override\r
     public ConsumerSession registerConsumer(Consumer consumer) {\r
         checkPredicates(consumer);\r
         log.info("Registering consumer " + consumer);\r
-\r
         ConsumerSessionImpl session = newSessionFor(consumer);\r
         consumer.onSessionInitiated(session);\r
-\r
         sessions.add(session);\r
-\r
         return session;\r
-\r
     }\r
 \r
     @Override\r
@@ -56,11 +64,26 @@ public class BrokerImpl implements Broker {
 \r
         ProviderSessionImpl session = newSessionFor(provider);\r
         provider.onSessionInitiated(session);\r
-\r
         providerSessions.add(session);\r
         return session;\r
     }\r
 \r
+    public void addModule(BrokerModule module) {\r
+        log.info("Registering broker module " + module);\r
+        if (modules.contains(module)) {\r
+            log.error("Module already registered");\r
+            throw new IllegalArgumentException("Module already exists.");\r
+        }\r
+    \r
+        Set<Class<? extends BrokerService>> provServices = module\r
+                .getProvidedServices();\r
+        for (Class<? extends BrokerService> serviceType : provServices) {\r
+            log.info("  Registering session service implementation: "\r
+                    + serviceType.getCanonicalName());\r
+            serviceProviders.put(serviceType, module);\r
+        }\r
+    }\r
+\r
     public <T extends BrokerService> T serviceFor(Class<T> service,\r
             ConsumerSessionImpl session) {\r
         BrokerModule prov = serviceProviders.get(service);\r
@@ -71,11 +94,43 @@ public class BrokerImpl implements Broker {
         return prov.getServiceForSession(service, session);\r
     }\r
 \r
-    public Future<RpcResult<CompositeNode>> invokeRpc(QName rpc,\r
+    // RPC Functionality\r
+    \r
+    private void addRpcImplementation(QName rpcType,\r
+            RpcImplementation implementation) {\r
+        synchronized (rpcImpls) {\r
+            if (rpcImpls.get(rpcType) != null) {\r
+                throw new IllegalStateException("Implementation for rpc "\r
+                        + rpcType + " is already registered.");\r
+            }\r
+            rpcImpls.put(rpcType, implementation);\r
+        }\r
+        // TODO Add notification for availability of Rpc Implementation\r
+    }\r
+\r
+    private void removeRpcImplementation(QName rpcType,\r
+            RpcImplementation implToRemove) {\r
+        synchronized (rpcImpls) {\r
+            if (implToRemove == rpcImpls.get(rpcType)) {\r
+                rpcImpls.remove(rpcType);\r
+            }\r
+        }\r
+        // TODO Add notification for removal of Rpc Implementation\r
+    }\r
+\r
+    private Future<RpcResult<CompositeNode>> invokeRpc(QName rpc,\r
             CompositeNode input) {\r
-        // TODO Implement this method\r
-        throw new UnsupportedOperationException("Not implemented");\r
+        RpcImplementation impl = rpcImpls.get(rpc);\r
+        // if()\r
+\r
+        Callable<RpcResult<CompositeNode>> call = callableFor(impl,\r
+                rpc, input);\r
+        Future<RpcResult<CompositeNode>> result = executor.submit(call);\r
+\r
+        return result;\r
     }\r
+    \r
+    // Validation\r
 \r
     private void checkPredicates(Provider prov) {\r
         if (prov == null)\r
@@ -96,34 +151,130 @@ public class BrokerImpl implements Broker {
         }\r
     }\r
 \r
+    // Private Factory methods\r
+    \r
     private ConsumerSessionImpl newSessionFor(Consumer cons) {\r
-        return new ConsumerSessionImpl(this, cons);\r
+        return new ConsumerSessionImpl(cons);\r
     }\r
 \r
     private ProviderSessionImpl newSessionFor(Provider provider) {\r
-        return new ProviderSessionImpl(this, provider);\r
+        return new ProviderSessionImpl(provider);\r
     }\r
 \r
-    public void addModule(BrokerModule module) {\r
-        log.info("Registering broker module " + module);\r
-        if (modules.contains(module)) {\r
-            log.error("Module already registered");\r
-            throw new IllegalArgumentException("Module already exists.");\r
+    private void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) {\r
+        sessions.remove(consumerSessionImpl);\r
+        providerSessions.remove(consumerSessionImpl);\r
+    }\r
+\r
+    private static Callable<RpcResult<CompositeNode>> callableFor(\r
+            final RpcImplementation implemenation, final QName rpc,\r
+            final CompositeNode input) {\r
+\r
+        return new Callable<RpcResult<CompositeNode>>() {\r
+\r
+            @Override\r
+            public RpcResult<CompositeNode> call() throws Exception {\r
+                return implemenation.invokeRpc(rpc, input);\r
+            }\r
+        };\r
+    }\r
+    \r
+    private class ConsumerSessionImpl implements ConsumerSession {\r
+\r
+        private final Consumer consumer;\r
+\r
+        private Map<Class<? extends BrokerService>, BrokerService> instantiatedServices = Collections\r
+                .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerService>());\r
+        private boolean closed = false;\r
+\r
+        public Consumer getConsumer() {\r
+            return consumer;\r
         }\r
 \r
-        Set<Class<? extends BrokerService>> provServices = module\r
-                .getProvidedServices();\r
-        for (Class<? extends BrokerService> serviceType : provServices) {\r
-            log.info("  Registering session service implementation: "\r
-                    + serviceType.getCanonicalName());\r
-            serviceProviders.put(serviceType, module);\r
+        public ConsumerSessionImpl(Consumer consumer) {\r
+            this.consumer = consumer;\r
+        }\r
+\r
+        @Override\r
+        public Future<RpcResult<CompositeNode>> rpc(QName rpc,\r
+                CompositeNode input) {\r
+            return BrokerImpl.this.invokeRpc(rpc, input);\r
+        }\r
+\r
+        @Override\r
+        public <T extends BrokerService> T getService(Class<T> service) {\r
+            BrokerService potential = instantiatedServices.get(service);\r
+            if (potential != null) {\r
+                @SuppressWarnings("unchecked")\r
+                T ret = (T) potential;\r
+                return ret;\r
+            }\r
+            T ret = BrokerImpl.this.serviceFor(service, this);\r
+            if (ret != null) {\r
+                instantiatedServices.put(service, ret);\r
+            }\r
+            return ret;\r
+        }\r
+\r
+        @Override\r
+        public void close() {\r
+            Collection<BrokerService> toStop = instantiatedServices.values();\r
+            this.closed = true;\r
+            for (BrokerService brokerService : toStop) {\r
+                brokerService.closeSession();\r
+            }\r
+            BrokerImpl.this.consumerSessionClosed(this);\r
+        }\r
+\r
+        @Override\r
+        public boolean isClosed() {\r
+            return closed;\r
         }\r
-    }\r
 \r
-    public void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) {\r
-        sessions.remove(consumerSessionImpl);\r
-        providerSessions.remove(consumerSessionImpl);\r
     }\r
 \r
+    private class ProviderSessionImpl extends ConsumerSessionImpl implements\r
+            ProviderSession {\r
+\r
+        private Provider provider;\r
+        private Map<QName, RpcImplementation> sessionRpcImpls = Collections.synchronizedMap(new HashMap<QName, RpcImplementation>());\r
+\r
+        public ProviderSessionImpl(Provider provider) {\r
+            super(null);\r
+            this.provider = provider;\r
+        }\r
+\r
+        @Override\r
+        public void addRpcImplementation(QName rpcType,\r
+                RpcImplementation implementation)\r
+                throws IllegalArgumentException {\r
+            if (rpcType == null) {\r
+                throw new IllegalArgumentException("rpcType must not be null");\r
+            }\r
+            if (implementation == null) {\r
+                throw new IllegalArgumentException(\r
+                        "Implementation must not be null");\r
+            }\r
+            BrokerImpl.this.addRpcImplementation(rpcType, implementation);\r
+            sessionRpcImpls.put(rpcType, implementation);\r
+        }\r
+\r
+        @Override\r
+        public void removeRpcImplementation(QName rpcType,\r
+                RpcImplementation implToRemove) throws IllegalArgumentException {\r
+            RpcImplementation localImpl = rpcImpls.get(rpcType);\r
+            if (localImpl != implToRemove) {\r
+                throw new IllegalStateException(\r
+                        "Implementation was not registered in this session");\r
+            }\r
+\r
+            BrokerImpl.this.removeRpcImplementation(rpcType, implToRemove);\r
+            sessionRpcImpls.remove(rpcType);\r
+        }\r
+\r
+        public Provider getProvider() {\r
+            return this.provider;\r
+        }\r
+\r
+    }\r
 }\r
-