2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.dom.broker;
10 import java.util.Collection;
11 import java.util.Collections;
12 import java.util.HashMap;
13 import java.util.HashSet;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Future;
19 import org.opendaylight.controller.sal.core.api.Broker;
20 import org.opendaylight.controller.sal.core.api.BrokerService;
21 import org.opendaylight.controller.sal.core.api.Consumer;
22 import org.opendaylight.controller.sal.core.api.Provider;
23 import org.opendaylight.controller.sal.core.api.RpcImplementation;
24 import org.opendaylight.controller.sal.core.spi.BrokerModule;
25 import org.opendaylight.yangtools.yang.common.QName;
26 import org.opendaylight.yangtools.yang.common.RpcResult;
27 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
28 import org.osgi.framework.BundleContext;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
32 public class BrokerImpl implements Broker {
33 private static Logger log = LoggerFactory.getLogger(BrokerImpl.class);
35 // Broker Generic Context
36 private Set<ConsumerSessionImpl> sessions = Collections
37 .synchronizedSet(new HashSet<ConsumerSessionImpl>());
38 private Set<ProviderSessionImpl> providerSessions = Collections
39 .synchronizedSet(new HashSet<ProviderSessionImpl>());
40 private Set<BrokerModule> modules = Collections
41 .synchronizedSet(new HashSet<BrokerModule>());
42 private Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections
43 .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
46 private Map<QName, RpcImplementation> rpcImpls = Collections
47 .synchronizedMap(new HashMap<QName, RpcImplementation>());
49 // Implementation specific
50 private ExecutorService executor;
52 private BundleContext bundleContext;
55 public ConsumerSession registerConsumer(Consumer consumer,BundleContext ctx) {
56 checkPredicates(consumer);
57 log.info("Registering consumer " + consumer);
58 ConsumerSessionImpl session = newSessionFor(consumer,ctx);
59 consumer.onSessionInitiated(session);
60 sessions.add(session);
65 public ProviderSession registerProvider(Provider provider,BundleContext ctx) {
66 checkPredicates(provider);
68 ProviderSessionImpl session = newSessionFor(provider,ctx);
69 provider.onSessionInitiated(session);
70 providerSessions.add(session);
74 public void addModule(BrokerModule module) {
75 log.info("Registering broker module " + module);
76 if (modules.contains(module)) {
77 log.error("Module already registered");
78 throw new IllegalArgumentException("Module already exists.");
81 Set<Class<? extends BrokerService>> provServices = module
82 .getProvidedServices();
83 for (Class<? extends BrokerService> serviceType : provServices) {
84 log.info(" Registering session service implementation: "
85 + serviceType.getCanonicalName());
86 serviceProviders.put(serviceType, module);
90 public <T extends BrokerService> T serviceFor(Class<T> service,
91 ConsumerSessionImpl session) {
92 BrokerModule prov = serviceProviders.get(service);
94 log.warn("Service " + service.toString() + " is not supported");
97 return prov.getServiceForSession(service, session);
102 private void addRpcImplementation(QName rpcType,
103 RpcImplementation implementation) {
104 synchronized (rpcImpls) {
105 if (rpcImpls.get(rpcType) != null) {
106 throw new IllegalStateException("Implementation for rpc "
107 + rpcType + " is already registered.");
109 rpcImpls.put(rpcType, implementation);
111 // TODO Add notification for availability of Rpc Implementation
114 private void removeRpcImplementation(QName rpcType,
115 RpcImplementation implToRemove) {
116 synchronized (rpcImpls) {
117 if (implToRemove == rpcImpls.get(rpcType)) {
118 rpcImpls.remove(rpcType);
121 // TODO Add notification for removal of Rpc Implementation
124 private Future<RpcResult<CompositeNode>> invokeRpc(QName rpc,
125 CompositeNode input) {
126 RpcImplementation impl = rpcImpls.get(rpc);
129 Callable<RpcResult<CompositeNode>> call = callableFor(impl,
131 Future<RpcResult<CompositeNode>> result = executor.submit(call);
138 private void checkPredicates(Provider prov) {
140 throw new IllegalArgumentException("Provider should not be null.");
141 for (ProviderSessionImpl session : providerSessions) {
142 if (prov.equals(session.getProvider()))
143 throw new IllegalStateException("Provider already registered");
148 private void checkPredicates(Consumer cons) {
150 throw new IllegalArgumentException("Consumer should not be null.");
151 for (ConsumerSessionImpl session : sessions) {
152 if (cons.equals(session.getConsumer()))
153 throw new IllegalStateException("Consumer already registered");
157 // Private Factory methods
159 private ConsumerSessionImpl newSessionFor(Consumer provider, BundleContext ctx) {
160 return new ConsumerSessionImpl(provider,ctx);
163 private ProviderSessionImpl newSessionFor(Provider provider, BundleContext ctx) {
164 return new ProviderSessionImpl(provider,ctx);
167 private void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) {
168 sessions.remove(consumerSessionImpl);
169 providerSessions.remove(consumerSessionImpl);
172 private static Callable<RpcResult<CompositeNode>> callableFor(
173 final RpcImplementation implemenation, final QName rpc,
174 final CompositeNode input) {
176 return new Callable<RpcResult<CompositeNode>>() {
179 public RpcResult<CompositeNode> call() throws Exception {
180 return implemenation.invokeRpc(rpc, input);
185 private class ConsumerSessionImpl implements ConsumerSession {
187 private final Consumer consumer;
189 private Map<Class<? extends BrokerService>, BrokerService> instantiatedServices = Collections
190 .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerService>());
191 private boolean closed = false;
193 private BundleContext context;
195 public Consumer getConsumer() {
199 public ConsumerSessionImpl(Consumer consumer, BundleContext ctx) {
200 this.consumer = consumer;
205 public Future<RpcResult<CompositeNode>> rpc(QName rpc,
206 CompositeNode input) {
207 return BrokerImpl.this.invokeRpc(rpc, input);
211 public <T extends BrokerService> T getService(Class<T> service) {
212 BrokerService potential = instantiatedServices.get(service);
213 if (potential != null) {
214 @SuppressWarnings("unchecked")
215 T ret = (T) potential;
218 T ret = BrokerImpl.this.serviceFor(service, this);
220 instantiatedServices.put(service, ret);
226 public void close() {
227 Collection<BrokerService> toStop = instantiatedServices.values();
229 for (BrokerService brokerService : toStop) {
230 brokerService.closeSession();
232 BrokerImpl.this.consumerSessionClosed(this);
236 public boolean isClosed() {
242 private class ProviderSessionImpl extends ConsumerSessionImpl implements
245 private Provider provider;
246 private Map<QName, RpcImplementation> sessionRpcImpls = Collections.synchronizedMap(new HashMap<QName, RpcImplementation>());
248 public ProviderSessionImpl(Provider provider, BundleContext ctx) {
250 this.provider = provider;
254 public void addRpcImplementation(QName rpcType,
255 RpcImplementation implementation)
256 throws IllegalArgumentException {
257 if (rpcType == null) {
258 throw new IllegalArgumentException("rpcType must not be null");
260 if (implementation == null) {
261 throw new IllegalArgumentException(
262 "Implementation must not be null");
264 BrokerImpl.this.addRpcImplementation(rpcType, implementation);
265 sessionRpcImpls.put(rpcType, implementation);
269 public void removeRpcImplementation(QName rpcType,
270 RpcImplementation implToRemove) throws IllegalArgumentException {
271 RpcImplementation localImpl = rpcImpls.get(rpcType);
272 if (localImpl != implToRemove) {
273 throw new IllegalStateException(
274 "Implementation was not registered in this session");
277 BrokerImpl.this.removeRpcImplementation(rpcType, implToRemove);
278 sessionRpcImpls.remove(rpcType);
281 public Provider getProvider() {
282 return this.provider;
287 public void setBundleContext(BundleContext context) {
288 this.bundleContext = context;