-/*\r
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.\r
- *\r
- * This program and the accompanying materials are made available under the\r
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
- * and is available at http://www.eclipse.org/legal/epl-v10.html\r
- */\r
-package org.opendaylight.controller.sal.core.impl;\r
-\r
-import java.util.Collection;\r
-import java.util.Collections;\r
-import java.util.HashMap;\r
-import java.util.HashSet;\r
-import java.util.Map;\r
-import java.util.Set;\r
-import java.util.concurrent.Callable;\r
-import java.util.concurrent.ExecutorService;\r
-import java.util.concurrent.Future;\r
-import org.opendaylight.controller.sal.core.api.Broker;\r
-import org.opendaylight.controller.sal.core.api.BrokerService;\r
-import org.opendaylight.controller.sal.core.api.Consumer;\r
-import org.opendaylight.controller.sal.core.api.Provider;\r
-import org.opendaylight.controller.sal.core.api.RpcImplementation;\r
-import org.opendaylight.controller.sal.core.spi.BrokerModule;\r
-import org.opendaylight.controller.yang.common.QName;\r
-import org.opendaylight.controller.yang.common.RpcResult;\r
-import org.opendaylight.controller.yang.data.api.CompositeNode;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-public class BrokerImpl implements Broker {\r
- private static Logger log = LoggerFactory.getLogger(BrokerImpl.class);\r
-\r
- // Broker Generic Context\r
- private Set<ConsumerSessionImpl> sessions = Collections\r
- .synchronizedSet(new HashSet<ConsumerSessionImpl>());\r
- private Set<ProviderSessionImpl> providerSessions = Collections\r
- .synchronizedSet(new HashSet<ProviderSessionImpl>());\r
- private Set<BrokerModule> modules = Collections\r
- .synchronizedSet(new HashSet<BrokerModule>());\r
- private Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections\r
- .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());\r
-\r
- // RPC Context\r
- private Map<QName, RpcImplementation> rpcImpls = Collections\r
- .synchronizedMap(new HashMap<QName, RpcImplementation>());\r
-\r
- // Implementation specific\r
- private ExecutorService executor;\r
-\r
- @Override\r
- public ConsumerSession registerConsumer(Consumer consumer) {\r
- checkPredicates(consumer);\r
- log.info("Registering consumer " + consumer);\r
- ConsumerSessionImpl session = newSessionFor(consumer);\r
- consumer.onSessionInitiated(session);\r
- sessions.add(session);\r
- return session;\r
- }\r
-\r
- @Override\r
- public ProviderSession registerProvider(Provider provider) {\r
- checkPredicates(provider);\r
-\r
- ProviderSessionImpl session = newSessionFor(provider);\r
- provider.onSessionInitiated(session);\r
- providerSessions.add(session);\r
- return session;\r
- }\r
-\r
- public void addModule(BrokerModule module) {\r
- log.info("Registering broker module " + module);\r
- if (modules.contains(module)) {\r
- log.error("Module already registered");\r
- throw new IllegalArgumentException("Module already exists.");\r
- }\r
- \r
- Set<Class<? extends BrokerService>> provServices = module\r
- .getProvidedServices();\r
- for (Class<? extends BrokerService> serviceType : provServices) {\r
- log.info(" Registering session service implementation: "\r
- + serviceType.getCanonicalName());\r
- serviceProviders.put(serviceType, module);\r
- }\r
- }\r
-\r
- public <T extends BrokerService> T serviceFor(Class<T> service,\r
- ConsumerSessionImpl session) {\r
- BrokerModule prov = serviceProviders.get(service);\r
- if (prov == null) {\r
- log.warn("Service " + service.toString() + " is not supported");\r
- return null;\r
- }\r
- return prov.getServiceForSession(service, session);\r
- }\r
-\r
- // RPC Functionality\r
- \r
- private void addRpcImplementation(QName rpcType,\r
- RpcImplementation implementation) {\r
- synchronized (rpcImpls) {\r
- if (rpcImpls.get(rpcType) != null) {\r
- throw new IllegalStateException("Implementation for rpc "\r
- + rpcType + " is already registered.");\r
- }\r
- rpcImpls.put(rpcType, implementation);\r
- }\r
- // TODO Add notification for availability of Rpc Implementation\r
- }\r
-\r
- private void removeRpcImplementation(QName rpcType,\r
- RpcImplementation implToRemove) {\r
- synchronized (rpcImpls) {\r
- if (implToRemove == rpcImpls.get(rpcType)) {\r
- rpcImpls.remove(rpcType);\r
- }\r
- }\r
- // TODO Add notification for removal of Rpc Implementation\r
- }\r
-\r
- private Future<RpcResult<CompositeNode>> invokeRpc(QName rpc,\r
- CompositeNode input) {\r
- RpcImplementation impl = rpcImpls.get(rpc);\r
- // if()\r
-\r
- Callable<RpcResult<CompositeNode>> call = callableFor(impl,\r
- rpc, input);\r
- Future<RpcResult<CompositeNode>> result = executor.submit(call);\r
-\r
- return result;\r
- }\r
- \r
- // Validation\r
-\r
- private void checkPredicates(Provider prov) {\r
- if (prov == null)\r
- throw new IllegalArgumentException("Provider should not be null.");\r
- for (ProviderSessionImpl session : providerSessions) {\r
- if (prov.equals(session.getProvider()))\r
- throw new IllegalStateException("Provider already registered");\r
- }\r
-\r
- }\r
-\r
- private void checkPredicates(Consumer cons) {\r
- if (cons == null)\r
- throw new IllegalArgumentException("Consumer should not be null.");\r
- for (ConsumerSessionImpl session : sessions) {\r
- if (cons.equals(session.getConsumer()))\r
- throw new IllegalStateException("Consumer already registered");\r
- }\r
- }\r
-\r
- // Private Factory methods\r
- \r
- private ConsumerSessionImpl newSessionFor(Consumer cons) {\r
- return new ConsumerSessionImpl(cons);\r
- }\r
-\r
- private ProviderSessionImpl newSessionFor(Provider provider) {\r
- return new ProviderSessionImpl(provider);\r
- }\r
-\r
- private void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) {\r
- sessions.remove(consumerSessionImpl);\r
- providerSessions.remove(consumerSessionImpl);\r
- }\r
-\r
- private static Callable<RpcResult<CompositeNode>> callableFor(\r
- final RpcImplementation implemenation, final QName rpc,\r
- final CompositeNode input) {\r
-\r
- return new Callable<RpcResult<CompositeNode>>() {\r
-\r
- @Override\r
- public RpcResult<CompositeNode> call() throws Exception {\r
- return implemenation.invokeRpc(rpc, input);\r
- }\r
- };\r
- }\r
- \r
- private class ConsumerSessionImpl implements ConsumerSession {\r
-\r
- private final Consumer consumer;\r
-\r
- private Map<Class<? extends BrokerService>, BrokerService> instantiatedServices = Collections\r
- .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerService>());\r
- private boolean closed = false;\r
-\r
- public Consumer getConsumer() {\r
- return consumer;\r
- }\r
-\r
- public ConsumerSessionImpl(Consumer consumer) {\r
- this.consumer = consumer;\r
- }\r
-\r
- @Override\r
- public Future<RpcResult<CompositeNode>> rpc(QName rpc,\r
- CompositeNode input) {\r
- return BrokerImpl.this.invokeRpc(rpc, input);\r
- }\r
-\r
- @Override\r
- public <T extends BrokerService> T getService(Class<T> service) {\r
- BrokerService potential = instantiatedServices.get(service);\r
- if (potential != null) {\r
- @SuppressWarnings("unchecked")\r
- T ret = (T) potential;\r
- return ret;\r
- }\r
- T ret = BrokerImpl.this.serviceFor(service, this);\r
- if (ret != null) {\r
- instantiatedServices.put(service, ret);\r
- }\r
- return ret;\r
- }\r
-\r
- @Override\r
- public void close() {\r
- Collection<BrokerService> toStop = instantiatedServices.values();\r
- this.closed = true;\r
- for (BrokerService brokerService : toStop) {\r
- brokerService.closeSession();\r
- }\r
- BrokerImpl.this.consumerSessionClosed(this);\r
- }\r
-\r
- @Override\r
- public boolean isClosed() {\r
- return closed;\r
- }\r
-\r
- }\r
-\r
- private class ProviderSessionImpl extends ConsumerSessionImpl implements\r
- ProviderSession {\r
-\r
- private Provider provider;\r
- private Map<QName, RpcImplementation> sessionRpcImpls = Collections.synchronizedMap(new HashMap<QName, RpcImplementation>());\r
-\r
- public ProviderSessionImpl(Provider provider) {\r
- super(null);\r
- this.provider = provider;\r
- }\r
-\r
- @Override\r
- public void addRpcImplementation(QName rpcType,\r
- RpcImplementation implementation)\r
- throws IllegalArgumentException {\r
- if (rpcType == null) {\r
- throw new IllegalArgumentException("rpcType must not be null");\r
- }\r
- if (implementation == null) {\r
- throw new IllegalArgumentException(\r
- "Implementation must not be null");\r
- }\r
- BrokerImpl.this.addRpcImplementation(rpcType, implementation);\r
- sessionRpcImpls.put(rpcType, implementation);\r
- }\r
-\r
- @Override\r
- public void removeRpcImplementation(QName rpcType,\r
- RpcImplementation implToRemove) throws IllegalArgumentException {\r
- RpcImplementation localImpl = rpcImpls.get(rpcType);\r
- if (localImpl != implToRemove) {\r
- throw new IllegalStateException(\r
- "Implementation was not registered in this session");\r
- }\r
-\r
- BrokerImpl.this.removeRpcImplementation(rpcType, implToRemove);\r
- sessionRpcImpls.remove(rpcType);\r
- }\r
-\r
- public Provider getProvider() {\r
- return this.provider;\r
- }\r
-\r
- }\r
-}\r
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.core.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.BrokerService;
+import org.opendaylight.controller.sal.core.api.Consumer;
+import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.spi.BrokerModule;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BrokerImpl implements Broker {
+ private static Logger log = LoggerFactory.getLogger(BrokerImpl.class);
+
+ // Broker Generic Context
+ private Set<ConsumerSessionImpl> sessions = Collections
+ .synchronizedSet(new HashSet<ConsumerSessionImpl>());
+ private Set<ProviderSessionImpl> providerSessions = Collections
+ .synchronizedSet(new HashSet<ProviderSessionImpl>());
+ private Set<BrokerModule> modules = Collections
+ .synchronizedSet(new HashSet<BrokerModule>());
+ private Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections
+ .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
+
+ // RPC Context
+ private Map<QName, RpcImplementation> rpcImpls = Collections
+ .synchronizedMap(new HashMap<QName, RpcImplementation>());
+
+ // Implementation specific
+ private ExecutorService executor;
+
+ @Override
+ public ConsumerSession registerConsumer(Consumer consumer) {
+ checkPredicates(consumer);
+ log.info("Registering consumer " + consumer);
+ ConsumerSessionImpl session = newSessionFor(consumer);
+ consumer.onSessionInitiated(session);
+ sessions.add(session);
+ return session;
+ }
+
+ @Override
+ public ProviderSession registerProvider(Provider provider) {
+ checkPredicates(provider);
+
+ ProviderSessionImpl session = newSessionFor(provider);
+ provider.onSessionInitiated(session);
+ providerSessions.add(session);
+ return session;
+ }
+
+ public void addModule(BrokerModule module) {
+ log.info("Registering broker module " + module);
+ if (modules.contains(module)) {
+ log.error("Module already registered");
+ throw new IllegalArgumentException("Module already exists.");
+ }
+
+ Set<Class<? extends BrokerService>> provServices = module
+ .getProvidedServices();
+ for (Class<? extends BrokerService> serviceType : provServices) {
+ log.info(" Registering session service implementation: "
+ + serviceType.getCanonicalName());
+ serviceProviders.put(serviceType, module);
+ }
+ }
+
+ public <T extends BrokerService> T serviceFor(Class<T> service,
+ ConsumerSessionImpl session) {
+ BrokerModule prov = serviceProviders.get(service);
+ if (prov == null) {
+ log.warn("Service " + service.toString() + " is not supported");
+ return null;
+ }
+ return prov.getServiceForSession(service, session);
+ }
+
+ // RPC Functionality
+
+ private void addRpcImplementation(QName rpcType,
+ RpcImplementation implementation) {
+ synchronized (rpcImpls) {
+ if (rpcImpls.get(rpcType) != null) {
+ throw new IllegalStateException("Implementation for rpc "
+ + rpcType + " is already registered.");
+ }
+ rpcImpls.put(rpcType, implementation);
+ }
+ // TODO Add notification for availability of Rpc Implementation
+ }
+
+ private void removeRpcImplementation(QName rpcType,
+ RpcImplementation implToRemove) {
+ synchronized (rpcImpls) {
+ if (implToRemove == rpcImpls.get(rpcType)) {
+ rpcImpls.remove(rpcType);
+ }
+ }
+ // TODO Add notification for removal of Rpc Implementation
+ }
+
+ private Future<RpcResult<CompositeNode>> invokeRpc(QName rpc,
+ CompositeNode input) {
+ RpcImplementation impl = rpcImpls.get(rpc);
+ // if()
+
+ Callable<RpcResult<CompositeNode>> call = callableFor(impl,
+ rpc, input);
+ Future<RpcResult<CompositeNode>> result = executor.submit(call);
+
+ return result;
+ }
+
+ // Validation
+
+ private void checkPredicates(Provider prov) {
+ if (prov == null)
+ throw new IllegalArgumentException("Provider should not be null.");
+ for (ProviderSessionImpl session : providerSessions) {
+ if (prov.equals(session.getProvider()))
+ throw new IllegalStateException("Provider already registered");
+ }
+
+ }
+
+ private void checkPredicates(Consumer cons) {
+ if (cons == null)
+ throw new IllegalArgumentException("Consumer should not be null.");
+ for (ConsumerSessionImpl session : sessions) {
+ if (cons.equals(session.getConsumer()))
+ throw new IllegalStateException("Consumer already registered");
+ }
+ }
+
+ // Private Factory methods
+
+ private ConsumerSessionImpl newSessionFor(Consumer cons) {
+ return new ConsumerSessionImpl(cons);
+ }
+
+ private ProviderSessionImpl newSessionFor(Provider provider) {
+ return new ProviderSessionImpl(provider);
+ }
+
+ private void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) {
+ sessions.remove(consumerSessionImpl);
+ providerSessions.remove(consumerSessionImpl);
+ }
+
+ private static Callable<RpcResult<CompositeNode>> callableFor(
+ final RpcImplementation implemenation, final QName rpc,
+ final CompositeNode input) {
+
+ return new Callable<RpcResult<CompositeNode>>() {
+
+ @Override
+ public RpcResult<CompositeNode> call() throws Exception {
+ return implemenation.invokeRpc(rpc, input);
+ }
+ };
+ }
+
+ private class ConsumerSessionImpl implements ConsumerSession {
+
+ private final Consumer consumer;
+
+ private Map<Class<? extends BrokerService>, BrokerService> instantiatedServices = Collections
+ .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerService>());
+ private boolean closed = false;
+
+ public Consumer getConsumer() {
+ return consumer;
+ }
+
+ public ConsumerSessionImpl(Consumer consumer) {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public Future<RpcResult<CompositeNode>> rpc(QName rpc,
+ CompositeNode input) {
+ return BrokerImpl.this.invokeRpc(rpc, input);
+ }
+
+ @Override
+ public <T extends BrokerService> T getService(Class<T> service) {
+ BrokerService potential = instantiatedServices.get(service);
+ if (potential != null) {
+ @SuppressWarnings("unchecked")
+ T ret = (T) potential;
+ return ret;
+ }
+ T ret = BrokerImpl.this.serviceFor(service, this);
+ if (ret != null) {
+ instantiatedServices.put(service, ret);
+ }
+ return ret;
+ }
+
+ @Override
+ public void close() {
+ Collection<BrokerService> toStop = instantiatedServices.values();
+ this.closed = true;
+ for (BrokerService brokerService : toStop) {
+ brokerService.closeSession();
+ }
+ BrokerImpl.this.consumerSessionClosed(this);
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
+
+ }
+
+ private class ProviderSessionImpl extends ConsumerSessionImpl implements
+ ProviderSession {
+
+ private Provider provider;
+ private Map<QName, RpcImplementation> sessionRpcImpls = Collections.synchronizedMap(new HashMap<QName, RpcImplementation>());
+
+ public ProviderSessionImpl(Provider provider) {
+ super(null);
+ this.provider = provider;
+ }
+
+ @Override
+ public void addRpcImplementation(QName rpcType,
+ RpcImplementation implementation)
+ throws IllegalArgumentException {
+ if (rpcType == null) {
+ throw new IllegalArgumentException("rpcType must not be null");
+ }
+ if (implementation == null) {
+ throw new IllegalArgumentException(
+ "Implementation must not be null");
+ }
+ BrokerImpl.this.addRpcImplementation(rpcType, implementation);
+ sessionRpcImpls.put(rpcType, implementation);
+ }
+
+ @Override
+ public void removeRpcImplementation(QName rpcType,
+ RpcImplementation implToRemove) throws IllegalArgumentException {
+ RpcImplementation localImpl = rpcImpls.get(rpcType);
+ if (localImpl != implToRemove) {
+ throw new IllegalStateException(
+ "Implementation was not registered in this session");
+ }
+
+ BrokerImpl.this.removeRpcImplementation(rpcType, implToRemove);
+ sessionRpcImpls.remove(rpcType);
+ }
+
+ public Provider getProvider() {
+ return this.provider;
+ }
+
+ }
+}