-
/*\r
* Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.\r
*\r
*/\r
package org.opendaylight.controller.sal.core.impl;\r
\r
+import java.util.Collection;\r
+import java.util.Collections;\r
import java.util.HashMap;\r
import java.util.HashSet;\r
import java.util.Map;\r
import java.util.Set;\r
+import java.util.concurrent.Callable;\r
+import java.util.concurrent.ExecutorService;\r
import java.util.concurrent.Future;\r
-\r
import org.opendaylight.controller.sal.core.api.Broker;\r
import org.opendaylight.controller.sal.core.api.BrokerService;\r
import org.opendaylight.controller.sal.core.api.Consumer;\r
import org.opendaylight.controller.sal.core.api.Provider;\r
+import org.opendaylight.controller.sal.core.api.RpcImplementation;\r
import org.opendaylight.controller.sal.core.spi.BrokerModule;\r
-import org.opendaylight.controller.yang.common.QName;
-import org.opendaylight.controller.yang.common.RpcResult;
-import org.opendaylight.controller.yang.data.api.CompositeNode;
+import org.opendaylight.controller.yang.common.QName;\r
+import org.opendaylight.controller.yang.common.RpcResult;\r
+import org.opendaylight.controller.yang.data.api.CompositeNode;\r
import org.slf4j.Logger;\r
import org.slf4j.LoggerFactory;\r
\r
-\r
public class BrokerImpl implements Broker {\r
private static Logger log = LoggerFactory.getLogger(BrokerImpl.class);\r
\r
- private Set<ConsumerSessionImpl> sessions = new HashSet<ConsumerSessionImpl>();\r
- private Set<ProviderSessionImpl> providerSessions = new HashSet<ProviderSessionImpl>();\r
- // private ExecutorService executor;\r
- private Set<BrokerModule> modules = new HashSet<BrokerModule>();\r
+ // Broker Generic Context\r
+ private Set<ConsumerSessionImpl> sessions = Collections\r
+ .synchronizedSet(new HashSet<ConsumerSessionImpl>());\r
+ private Set<ProviderSessionImpl> providerSessions = Collections\r
+ .synchronizedSet(new HashSet<ProviderSessionImpl>());\r
+ private Set<BrokerModule> modules = Collections\r
+ .synchronizedSet(new HashSet<BrokerModule>());\r
+ private Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections\r
+ .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());\r
+\r
+ // RPC Context\r
+ private Map<QName, RpcImplementation> rpcImpls = Collections\r
+ .synchronizedMap(new HashMap<QName, RpcImplementation>());\r
\r
- private Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = new HashMap<Class<? extends BrokerService>, BrokerModule>();\r
+ // Implementation specific\r
+ private ExecutorService executor;\r
\r
@Override\r
public ConsumerSession registerConsumer(Consumer consumer) {\r
checkPredicates(consumer);\r
log.info("Registering consumer " + consumer);\r
-\r
ConsumerSessionImpl session = newSessionFor(consumer);\r
consumer.onSessionInitiated(session);\r
-\r
sessions.add(session);\r
-\r
return session;\r
-\r
}\r
\r
@Override\r
\r
ProviderSessionImpl session = newSessionFor(provider);\r
provider.onSessionInitiated(session);\r
-\r
providerSessions.add(session);\r
return session;\r
}\r
\r
+ public void addModule(BrokerModule module) {\r
+ log.info("Registering broker module " + module);\r
+ if (modules.contains(module)) {\r
+ log.error("Module already registered");\r
+ throw new IllegalArgumentException("Module already exists.");\r
+ }\r
+ \r
+ Set<Class<? extends BrokerService>> provServices = module\r
+ .getProvidedServices();\r
+ for (Class<? extends BrokerService> serviceType : provServices) {\r
+ log.info(" Registering session service implementation: "\r
+ + serviceType.getCanonicalName());\r
+ serviceProviders.put(serviceType, module);\r
+ }\r
+ }\r
+\r
public <T extends BrokerService> T serviceFor(Class<T> service,\r
ConsumerSessionImpl session) {\r
BrokerModule prov = serviceProviders.get(service);\r
return prov.getServiceForSession(service, session);\r
}\r
\r
- public Future<RpcResult<CompositeNode>> invokeRpc(QName rpc,\r
+ // RPC Functionality\r
+ \r
+ private void addRpcImplementation(QName rpcType,\r
+ RpcImplementation implementation) {\r
+ synchronized (rpcImpls) {\r
+ if (rpcImpls.get(rpcType) != null) {\r
+ throw new IllegalStateException("Implementation for rpc "\r
+ + rpcType + " is already registered.");\r
+ }\r
+ rpcImpls.put(rpcType, implementation);\r
+ }\r
+ // TODO Add notification for availability of Rpc Implementation\r
+ }\r
+\r
+ private void removeRpcImplementation(QName rpcType,\r
+ RpcImplementation implToRemove) {\r
+ synchronized (rpcImpls) {\r
+ if (implToRemove == rpcImpls.get(rpcType)) {\r
+ rpcImpls.remove(rpcType);\r
+ }\r
+ }\r
+ // TODO Add notification for removal of Rpc Implementation\r
+ }\r
+\r
+ private Future<RpcResult<CompositeNode>> invokeRpc(QName rpc,\r
CompositeNode input) {\r
- // TODO Implement this method\r
- throw new UnsupportedOperationException("Not implemented");\r
+ RpcImplementation impl = rpcImpls.get(rpc);\r
+ // if()\r
+\r
+ Callable<RpcResult<CompositeNode>> call = callableFor(impl,\r
+ rpc, input);\r
+ Future<RpcResult<CompositeNode>> result = executor.submit(call);\r
+\r
+ return result;\r
}\r
+ \r
+ // Validation\r
\r
private void checkPredicates(Provider prov) {\r
if (prov == null)\r
}\r
}\r
\r
+ // Private Factory methods\r
+ \r
private ConsumerSessionImpl newSessionFor(Consumer cons) {\r
- return new ConsumerSessionImpl(this, cons);\r
+ return new ConsumerSessionImpl(cons);\r
}\r
\r
private ProviderSessionImpl newSessionFor(Provider provider) {\r
- return new ProviderSessionImpl(this, provider);\r
+ return new ProviderSessionImpl(provider);\r
}\r
\r
- public void addModule(BrokerModule module) {\r
- log.info("Registering broker module " + module);\r
- if (modules.contains(module)) {\r
- log.error("Module already registered");\r
- throw new IllegalArgumentException("Module already exists.");\r
+ private void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) {\r
+ sessions.remove(consumerSessionImpl);\r
+ providerSessions.remove(consumerSessionImpl);\r
+ }\r
+\r
+ private static Callable<RpcResult<CompositeNode>> callableFor(\r
+ final RpcImplementation implemenation, final QName rpc,\r
+ final CompositeNode input) {\r
+\r
+ return new Callable<RpcResult<CompositeNode>>() {\r
+\r
+ @Override\r
+ public RpcResult<CompositeNode> call() throws Exception {\r
+ return implemenation.invokeRpc(rpc, input);\r
+ }\r
+ };\r
+ }\r
+ \r
+ private class ConsumerSessionImpl implements ConsumerSession {\r
+\r
+ private final Consumer consumer;\r
+\r
+ private Map<Class<? extends BrokerService>, BrokerService> instantiatedServices = Collections\r
+ .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerService>());\r
+ private boolean closed = false;\r
+\r
+ public Consumer getConsumer() {\r
+ return consumer;\r
}\r
\r
- Set<Class<? extends BrokerService>> provServices = module\r
- .getProvidedServices();\r
- for (Class<? extends BrokerService> serviceType : provServices) {\r
- log.info(" Registering session service implementation: "\r
- + serviceType.getCanonicalName());\r
- serviceProviders.put(serviceType, module);\r
+ public ConsumerSessionImpl(Consumer consumer) {\r
+ this.consumer = consumer;\r
+ }\r
+\r
+ @Override\r
+ public Future<RpcResult<CompositeNode>> rpc(QName rpc,\r
+ CompositeNode input) {\r
+ return BrokerImpl.this.invokeRpc(rpc, input);\r
+ }\r
+\r
+ @Override\r
+ public <T extends BrokerService> T getService(Class<T> service) {\r
+ BrokerService potential = instantiatedServices.get(service);\r
+ if (potential != null) {\r
+ @SuppressWarnings("unchecked")\r
+ T ret = (T) potential;\r
+ return ret;\r
+ }\r
+ T ret = BrokerImpl.this.serviceFor(service, this);\r
+ if (ret != null) {\r
+ instantiatedServices.put(service, ret);\r
+ }\r
+ return ret;\r
+ }\r
+\r
+ @Override\r
+ public void close() {\r
+ Collection<BrokerService> toStop = instantiatedServices.values();\r
+ this.closed = true;\r
+ for (BrokerService brokerService : toStop) {\r
+ brokerService.closeSession();\r
+ }\r
+ BrokerImpl.this.consumerSessionClosed(this);\r
+ }\r
+\r
+ @Override\r
+ public boolean isClosed() {\r
+ return closed;\r
}\r
- }\r
\r
- public void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) {\r
- sessions.remove(consumerSessionImpl);\r
- providerSessions.remove(consumerSessionImpl);\r
}\r
\r
+ private class ProviderSessionImpl extends ConsumerSessionImpl implements\r
+ ProviderSession {\r
+\r
+ private Provider provider;\r
+ private Map<QName, RpcImplementation> sessionRpcImpls = Collections.synchronizedMap(new HashMap<QName, RpcImplementation>());\r
+\r
+ public ProviderSessionImpl(Provider provider) {\r
+ super(null);\r
+ this.provider = provider;\r
+ }\r
+\r
+ @Override\r
+ public void addRpcImplementation(QName rpcType,\r
+ RpcImplementation implementation)\r
+ throws IllegalArgumentException {\r
+ if (rpcType == null) {\r
+ throw new IllegalArgumentException("rpcType must not be null");\r
+ }\r
+ if (implementation == null) {\r
+ throw new IllegalArgumentException(\r
+ "Implementation must not be null");\r
+ }\r
+ BrokerImpl.this.addRpcImplementation(rpcType, implementation);\r
+ sessionRpcImpls.put(rpcType, implementation);\r
+ }\r
+\r
+ @Override\r
+ public void removeRpcImplementation(QName rpcType,\r
+ RpcImplementation implToRemove) throws IllegalArgumentException {\r
+ RpcImplementation localImpl = rpcImpls.get(rpcType);\r
+ if (localImpl != implToRemove) {\r
+ throw new IllegalStateException(\r
+ "Implementation was not registered in this session");\r
+ }\r
+\r
+ BrokerImpl.this.removeRpcImplementation(rpcType, implToRemove);\r
+ sessionRpcImpls.remove(rpcType);\r
+ }\r
+\r
+ public Provider getProvider() {\r
+ return this.provider;\r
+ }\r
+\r
+ }\r
}\r
-