2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.core.impl;
10 import java.util.Collection;
11 import java.util.Collections;
12 import java.util.HashMap;
13 import java.util.HashSet;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Future;
19 import org.opendaylight.controller.sal.core.api.Broker;
20 import org.opendaylight.controller.sal.core.api.BrokerService;
21 import org.opendaylight.controller.sal.core.api.Consumer;
22 import org.opendaylight.controller.sal.core.api.Provider;
23 import org.opendaylight.controller.sal.core.api.RpcImplementation;
24 import org.opendaylight.controller.sal.core.spi.BrokerModule;
25 import org.opendaylight.yangtools.yang.common.QName;
26 import org.opendaylight.yangtools.yang.common.RpcResult;
27 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
31 public class BrokerImpl implements Broker {
32 private static Logger log = LoggerFactory.getLogger(BrokerImpl.class);
34 // Broker Generic Context
35 private Set<ConsumerSessionImpl> sessions = Collections
36 .synchronizedSet(new HashSet<ConsumerSessionImpl>());
37 private Set<ProviderSessionImpl> providerSessions = Collections
38 .synchronizedSet(new HashSet<ProviderSessionImpl>());
39 private Set<BrokerModule> modules = Collections
40 .synchronizedSet(new HashSet<BrokerModule>());
41 private Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections
42 .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
45 private Map<QName, RpcImplementation> rpcImpls = Collections
46 .synchronizedMap(new HashMap<QName, RpcImplementation>());
48 // Implementation specific
49 private ExecutorService executor;
52 public ConsumerSession registerConsumer(Consumer consumer) {
53 checkPredicates(consumer);
54 log.info("Registering consumer " + consumer);
55 ConsumerSessionImpl session = newSessionFor(consumer);
56 consumer.onSessionInitiated(session);
57 sessions.add(session);
62 public ProviderSession registerProvider(Provider provider) {
63 checkPredicates(provider);
65 ProviderSessionImpl session = newSessionFor(provider);
66 provider.onSessionInitiated(session);
67 providerSessions.add(session);
71 public void addModule(BrokerModule module) {
72 log.info("Registering broker module " + module);
73 if (modules.contains(module)) {
74 log.error("Module already registered");
75 throw new IllegalArgumentException("Module already exists.");
78 Set<Class<? extends BrokerService>> provServices = module
79 .getProvidedServices();
80 for (Class<? extends BrokerService> serviceType : provServices) {
81 log.info(" Registering session service implementation: "
82 + serviceType.getCanonicalName());
83 serviceProviders.put(serviceType, module);
87 public <T extends BrokerService> T serviceFor(Class<T> service,
88 ConsumerSessionImpl session) {
89 BrokerModule prov = serviceProviders.get(service);
91 log.warn("Service " + service.toString() + " is not supported");
94 return prov.getServiceForSession(service, session);
99 private void addRpcImplementation(QName rpcType,
100 RpcImplementation implementation) {
101 synchronized (rpcImpls) {
102 if (rpcImpls.get(rpcType) != null) {
103 throw new IllegalStateException("Implementation for rpc "
104 + rpcType + " is already registered.");
106 rpcImpls.put(rpcType, implementation);
108 // TODO Add notification for availability of Rpc Implementation
111 private void removeRpcImplementation(QName rpcType,
112 RpcImplementation implToRemove) {
113 synchronized (rpcImpls) {
114 if (implToRemove == rpcImpls.get(rpcType)) {
115 rpcImpls.remove(rpcType);
118 // TODO Add notification for removal of Rpc Implementation
121 private Future<RpcResult<CompositeNode>> invokeRpc(QName rpc,
122 CompositeNode input) {
123 RpcImplementation impl = rpcImpls.get(rpc);
126 Callable<RpcResult<CompositeNode>> call = callableFor(impl,
128 Future<RpcResult<CompositeNode>> result = executor.submit(call);
135 private void checkPredicates(Provider prov) {
137 throw new IllegalArgumentException("Provider should not be null.");
138 for (ProviderSessionImpl session : providerSessions) {
139 if (prov.equals(session.getProvider()))
140 throw new IllegalStateException("Provider already registered");
145 private void checkPredicates(Consumer cons) {
147 throw new IllegalArgumentException("Consumer should not be null.");
148 for (ConsumerSessionImpl session : sessions) {
149 if (cons.equals(session.getConsumer()))
150 throw new IllegalStateException("Consumer already registered");
154 // Private Factory methods
156 private ConsumerSessionImpl newSessionFor(Consumer cons) {
157 return new ConsumerSessionImpl(cons);
160 private ProviderSessionImpl newSessionFor(Provider provider) {
161 return new ProviderSessionImpl(provider);
164 private void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) {
165 sessions.remove(consumerSessionImpl);
166 providerSessions.remove(consumerSessionImpl);
169 private static Callable<RpcResult<CompositeNode>> callableFor(
170 final RpcImplementation implemenation, final QName rpc,
171 final CompositeNode input) {
173 return new Callable<RpcResult<CompositeNode>>() {
176 public RpcResult<CompositeNode> call() throws Exception {
177 return implemenation.invokeRpc(rpc, input);
182 private class ConsumerSessionImpl implements ConsumerSession {
184 private final Consumer consumer;
186 private Map<Class<? extends BrokerService>, BrokerService> instantiatedServices = Collections
187 .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerService>());
188 private boolean closed = false;
190 public Consumer getConsumer() {
194 public ConsumerSessionImpl(Consumer consumer) {
195 this.consumer = consumer;
199 public Future<RpcResult<CompositeNode>> rpc(QName rpc,
200 CompositeNode input) {
201 return BrokerImpl.this.invokeRpc(rpc, input);
205 public <T extends BrokerService> T getService(Class<T> service) {
206 BrokerService potential = instantiatedServices.get(service);
207 if (potential != null) {
208 @SuppressWarnings("unchecked")
209 T ret = (T) potential;
212 T ret = BrokerImpl.this.serviceFor(service, this);
214 instantiatedServices.put(service, ret);
220 public void close() {
221 Collection<BrokerService> toStop = instantiatedServices.values();
223 for (BrokerService brokerService : toStop) {
224 brokerService.closeSession();
226 BrokerImpl.this.consumerSessionClosed(this);
230 public boolean isClosed() {
236 private class ProviderSessionImpl extends ConsumerSessionImpl implements
239 private Provider provider;
240 private Map<QName, RpcImplementation> sessionRpcImpls = Collections.synchronizedMap(new HashMap<QName, RpcImplementation>());
242 public ProviderSessionImpl(Provider provider) {
244 this.provider = provider;
248 public void addRpcImplementation(QName rpcType,
249 RpcImplementation implementation)
250 throws IllegalArgumentException {
251 if (rpcType == null) {
252 throw new IllegalArgumentException("rpcType must not be null");
254 if (implementation == null) {
255 throw new IllegalArgumentException(
256 "Implementation must not be null");
258 BrokerImpl.this.addRpcImplementation(rpcType, implementation);
259 sessionRpcImpls.put(rpcType, implementation);
263 public void removeRpcImplementation(QName rpcType,
264 RpcImplementation implToRemove) throws IllegalArgumentException {
265 RpcImplementation localImpl = rpcImpls.get(rpcType);
266 if (localImpl != implToRemove) {
267 throw new IllegalStateException(
268 "Implementation was not registered in this session");
271 BrokerImpl.this.removeRpcImplementation(rpcType, implToRemove);
272 sessionRpcImpls.remove(rpcType);
275 public Provider getProvider() {
276 return this.provider;