2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
\r
4 * This program and the accompanying materials are made available under the
\r
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
\r
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
\r
8 package org.opendaylight.controller.sal.core.impl;
\r
10 import java.util.Collection;
\r
11 import java.util.Collections;
\r
12 import java.util.HashMap;
\r
13 import java.util.HashSet;
\r
14 import java.util.Map;
\r
15 import java.util.Set;
\r
16 import java.util.concurrent.Callable;
\r
17 import java.util.concurrent.ExecutorService;
\r
18 import java.util.concurrent.Future;
\r
19 import org.opendaylight.controller.sal.core.api.Broker;
\r
20 import org.opendaylight.controller.sal.core.api.BrokerService;
\r
21 import org.opendaylight.controller.sal.core.api.Consumer;
\r
22 import org.opendaylight.controller.sal.core.api.Provider;
\r
23 import org.opendaylight.controller.sal.core.api.RpcImplementation;
\r
24 import org.opendaylight.controller.sal.core.spi.BrokerModule;
\r
25 import org.opendaylight.controller.yang.common.QName;
\r
26 import org.opendaylight.controller.yang.common.RpcResult;
\r
27 import org.opendaylight.controller.yang.data.api.CompositeNode;
\r
28 import org.slf4j.Logger;
\r
29 import org.slf4j.LoggerFactory;
\r
31 public class BrokerImpl implements Broker {
\r
32 private static Logger log = LoggerFactory.getLogger(BrokerImpl.class);
\r
34 // Broker Generic Context
\r
35 private Set<ConsumerSessionImpl> sessions = Collections
\r
36 .synchronizedSet(new HashSet<ConsumerSessionImpl>());
\r
37 private Set<ProviderSessionImpl> providerSessions = Collections
\r
38 .synchronizedSet(new HashSet<ProviderSessionImpl>());
\r
39 private Set<BrokerModule> modules = Collections
\r
40 .synchronizedSet(new HashSet<BrokerModule>());
\r
41 private Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections
\r
42 .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
\r
45 private Map<QName, RpcImplementation> rpcImpls = Collections
\r
46 .synchronizedMap(new HashMap<QName, RpcImplementation>());
\r
48 // Implementation specific
\r
49 private ExecutorService executor;
\r
52 public ConsumerSession registerConsumer(Consumer consumer) {
\r
53 checkPredicates(consumer);
\r
54 log.info("Registering consumer " + consumer);
\r
55 ConsumerSessionImpl session = newSessionFor(consumer);
\r
56 consumer.onSessionInitiated(session);
\r
57 sessions.add(session);
\r
62 public ProviderSession registerProvider(Provider provider) {
\r
63 checkPredicates(provider);
\r
65 ProviderSessionImpl session = newSessionFor(provider);
\r
66 provider.onSessionInitiated(session);
\r
67 providerSessions.add(session);
\r
71 public void addModule(BrokerModule module) {
\r
72 log.info("Registering broker module " + module);
\r
73 if (modules.contains(module)) {
\r
74 log.error("Module already registered");
\r
75 throw new IllegalArgumentException("Module already exists.");
\r
78 Set<Class<? extends BrokerService>> provServices = module
\r
79 .getProvidedServices();
\r
80 for (Class<? extends BrokerService> serviceType : provServices) {
\r
81 log.info(" Registering session service implementation: "
\r
82 + serviceType.getCanonicalName());
\r
83 serviceProviders.put(serviceType, module);
\r
87 public <T extends BrokerService> T serviceFor(Class<T> service,
\r
88 ConsumerSessionImpl session) {
\r
89 BrokerModule prov = serviceProviders.get(service);
\r
91 log.warn("Service " + service.toString() + " is not supported");
\r
94 return prov.getServiceForSession(service, session);
\r
97 // RPC Functionality
\r
99 private void addRpcImplementation(QName rpcType,
\r
100 RpcImplementation implementation) {
\r
101 synchronized (rpcImpls) {
\r
102 if (rpcImpls.get(rpcType) != null) {
\r
103 throw new IllegalStateException("Implementation for rpc "
\r
104 + rpcType + " is already registered.");
\r
106 rpcImpls.put(rpcType, implementation);
\r
108 // TODO Add notification for availability of Rpc Implementation
\r
111 private void removeRpcImplementation(QName rpcType,
\r
112 RpcImplementation implToRemove) {
\r
113 synchronized (rpcImpls) {
\r
114 if (implToRemove == rpcImpls.get(rpcType)) {
\r
115 rpcImpls.remove(rpcType);
\r
118 // TODO Add notification for removal of Rpc Implementation
\r
121 private Future<RpcResult<CompositeNode>> invokeRpc(QName rpc,
\r
122 CompositeNode input) {
\r
123 RpcImplementation impl = rpcImpls.get(rpc);
\r
126 Callable<RpcResult<CompositeNode>> call = callableFor(impl,
\r
128 Future<RpcResult<CompositeNode>> result = executor.submit(call);
\r
135 private void checkPredicates(Provider prov) {
\r
137 throw new IllegalArgumentException("Provider should not be null.");
\r
138 for (ProviderSessionImpl session : providerSessions) {
\r
139 if (prov.equals(session.getProvider()))
\r
140 throw new IllegalStateException("Provider already registered");
\r
145 private void checkPredicates(Consumer cons) {
\r
147 throw new IllegalArgumentException("Consumer should not be null.");
\r
148 for (ConsumerSessionImpl session : sessions) {
\r
149 if (cons.equals(session.getConsumer()))
\r
150 throw new IllegalStateException("Consumer already registered");
\r
154 // Private Factory methods
\r
156 private ConsumerSessionImpl newSessionFor(Consumer cons) {
\r
157 return new ConsumerSessionImpl(cons);
\r
160 private ProviderSessionImpl newSessionFor(Provider provider) {
\r
161 return new ProviderSessionImpl(provider);
\r
164 private void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) {
\r
165 sessions.remove(consumerSessionImpl);
\r
166 providerSessions.remove(consumerSessionImpl);
\r
169 private static Callable<RpcResult<CompositeNode>> callableFor(
\r
170 final RpcImplementation implemenation, final QName rpc,
\r
171 final CompositeNode input) {
\r
173 return new Callable<RpcResult<CompositeNode>>() {
\r
176 public RpcResult<CompositeNode> call() throws Exception {
\r
177 return implemenation.invokeRpc(rpc, input);
\r
182 private class ConsumerSessionImpl implements ConsumerSession {
\r
184 private final Consumer consumer;
\r
186 private Map<Class<? extends BrokerService>, BrokerService> instantiatedServices = Collections
\r
187 .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerService>());
\r
188 private boolean closed = false;
\r
190 public Consumer getConsumer() {
\r
194 public ConsumerSessionImpl(Consumer consumer) {
\r
195 this.consumer = consumer;
\r
199 public Future<RpcResult<CompositeNode>> rpc(QName rpc,
\r
200 CompositeNode input) {
\r
201 return BrokerImpl.this.invokeRpc(rpc, input);
\r
205 public <T extends BrokerService> T getService(Class<T> service) {
\r
206 BrokerService potential = instantiatedServices.get(service);
\r
207 if (potential != null) {
\r
208 @SuppressWarnings("unchecked")
\r
209 T ret = (T) potential;
\r
212 T ret = BrokerImpl.this.serviceFor(service, this);
\r
214 instantiatedServices.put(service, ret);
\r
220 public void close() {
\r
221 Collection<BrokerService> toStop = instantiatedServices.values();
\r
222 this.closed = true;
\r
223 for (BrokerService brokerService : toStop) {
\r
224 brokerService.closeSession();
\r
226 BrokerImpl.this.consumerSessionClosed(this);
\r
230 public boolean isClosed() {
\r
236 private class ProviderSessionImpl extends ConsumerSessionImpl implements
\r
239 private Provider provider;
\r
240 private Map<QName, RpcImplementation> sessionRpcImpls = Collections.synchronizedMap(new HashMap<QName, RpcImplementation>());
\r
242 public ProviderSessionImpl(Provider provider) {
\r
244 this.provider = provider;
\r
248 public void addRpcImplementation(QName rpcType,
\r
249 RpcImplementation implementation)
\r
250 throws IllegalArgumentException {
\r
251 if (rpcType == null) {
\r
252 throw new IllegalArgumentException("rpcType must not be null");
\r
254 if (implementation == null) {
\r
255 throw new IllegalArgumentException(
\r
256 "Implementation must not be null");
\r
258 BrokerImpl.this.addRpcImplementation(rpcType, implementation);
\r
259 sessionRpcImpls.put(rpcType, implementation);
\r
263 public void removeRpcImplementation(QName rpcType,
\r
264 RpcImplementation implToRemove) throws IllegalArgumentException {
\r
265 RpcImplementation localImpl = rpcImpls.get(rpcType);
\r
266 if (localImpl != implToRemove) {
\r
267 throw new IllegalStateException(
\r
268 "Implementation was not registered in this session");
\r
271 BrokerImpl.this.removeRpcImplementation(rpcType, implToRemove);
\r
272 sessionRpcImpls.remove(rpcType);
\r
275 public Provider getProvider() {
\r
276 return this.provider;
\r