Merge "Add container context debug osgi cli command to Switch Manager"
[controller.git] / opendaylight / sal / yang-prototype / sal / sal-broker-impl / src / main / java / org / opendaylight / controller / sal / core / impl / BrokerImpl.java
1 /*\r
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.\r
3  *\r
4  * This program and the accompanying materials are made available under the\r
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
6  * and is available at http://www.eclipse.org/legal/epl-v10.html\r
7  */\r
8 package org.opendaylight.controller.sal.core.impl;\r
9 \r
10 import java.util.Collection;\r
11 import java.util.Collections;\r
12 import java.util.HashMap;\r
13 import java.util.HashSet;\r
14 import java.util.Map;\r
15 import java.util.Set;\r
16 import java.util.concurrent.Callable;\r
17 import java.util.concurrent.ExecutorService;\r
18 import java.util.concurrent.Future;\r
19 import org.opendaylight.controller.sal.core.api.Broker;\r
20 import org.opendaylight.controller.sal.core.api.BrokerService;\r
21 import org.opendaylight.controller.sal.core.api.Consumer;\r
22 import org.opendaylight.controller.sal.core.api.Provider;\r
23 import org.opendaylight.controller.sal.core.api.RpcImplementation;\r
24 import org.opendaylight.controller.sal.core.spi.BrokerModule;\r
25 import org.opendaylight.controller.yang.common.QName;\r
26 import org.opendaylight.controller.yang.common.RpcResult;\r
27 import org.opendaylight.controller.yang.data.api.CompositeNode;\r
28 import org.slf4j.Logger;\r
29 import org.slf4j.LoggerFactory;\r
30 \r
31 public class BrokerImpl implements Broker {\r
32     private static Logger log = LoggerFactory.getLogger(BrokerImpl.class);\r
33 \r
34     // Broker Generic Context\r
35     private Set<ConsumerSessionImpl> sessions = Collections\r
36             .synchronizedSet(new HashSet<ConsumerSessionImpl>());\r
37     private Set<ProviderSessionImpl> providerSessions = Collections\r
38             .synchronizedSet(new HashSet<ProviderSessionImpl>());\r
39     private Set<BrokerModule> modules = Collections\r
40             .synchronizedSet(new HashSet<BrokerModule>());\r
41     private Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections\r
42             .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());\r
43 \r
44     // RPC Context\r
45     private Map<QName, RpcImplementation> rpcImpls = Collections\r
46             .synchronizedMap(new HashMap<QName, RpcImplementation>());\r
47 \r
48     // Implementation specific\r
49     private ExecutorService executor;\r
50 \r
51     @Override\r
52     public ConsumerSession registerConsumer(Consumer consumer) {\r
53         checkPredicates(consumer);\r
54         log.info("Registering consumer " + consumer);\r
55         ConsumerSessionImpl session = newSessionFor(consumer);\r
56         consumer.onSessionInitiated(session);\r
57         sessions.add(session);\r
58         return session;\r
59     }\r
60 \r
61     @Override\r
62     public ProviderSession registerProvider(Provider provider) {\r
63         checkPredicates(provider);\r
64 \r
65         ProviderSessionImpl session = newSessionFor(provider);\r
66         provider.onSessionInitiated(session);\r
67         providerSessions.add(session);\r
68         return session;\r
69     }\r
70 \r
71     public void addModule(BrokerModule module) {\r
72         log.info("Registering broker module " + module);\r
73         if (modules.contains(module)) {\r
74             log.error("Module already registered");\r
75             throw new IllegalArgumentException("Module already exists.");\r
76         }\r
77     \r
78         Set<Class<? extends BrokerService>> provServices = module\r
79                 .getProvidedServices();\r
80         for (Class<? extends BrokerService> serviceType : provServices) {\r
81             log.info("  Registering session service implementation: "\r
82                     + serviceType.getCanonicalName());\r
83             serviceProviders.put(serviceType, module);\r
84         }\r
85     }\r
86 \r
87     public <T extends BrokerService> T serviceFor(Class<T> service,\r
88             ConsumerSessionImpl session) {\r
89         BrokerModule prov = serviceProviders.get(service);\r
90         if (prov == null) {\r
91             log.warn("Service " + service.toString() + " is not supported");\r
92             return null;\r
93         }\r
94         return prov.getServiceForSession(service, session);\r
95     }\r
96 \r
97     // RPC Functionality\r
98     \r
99     private void addRpcImplementation(QName rpcType,\r
100             RpcImplementation implementation) {\r
101         synchronized (rpcImpls) {\r
102             if (rpcImpls.get(rpcType) != null) {\r
103                 throw new IllegalStateException("Implementation for rpc "\r
104                         + rpcType + " is already registered.");\r
105             }\r
106             rpcImpls.put(rpcType, implementation);\r
107         }\r
108         // TODO Add notification for availability of Rpc Implementation\r
109     }\r
110 \r
111     private void removeRpcImplementation(QName rpcType,\r
112             RpcImplementation implToRemove) {\r
113         synchronized (rpcImpls) {\r
114             if (implToRemove == rpcImpls.get(rpcType)) {\r
115                 rpcImpls.remove(rpcType);\r
116             }\r
117         }\r
118         // TODO Add notification for removal of Rpc Implementation\r
119     }\r
120 \r
121     private Future<RpcResult<CompositeNode>> invokeRpc(QName rpc,\r
122             CompositeNode input) {\r
123         RpcImplementation impl = rpcImpls.get(rpc);\r
124         // if()\r
125 \r
126         Callable<RpcResult<CompositeNode>> call = callableFor(impl,\r
127                 rpc, input);\r
128         Future<RpcResult<CompositeNode>> result = executor.submit(call);\r
129 \r
130         return result;\r
131     }\r
132     \r
133     // Validation\r
134 \r
135     private void checkPredicates(Provider prov) {\r
136         if (prov == null)\r
137             throw new IllegalArgumentException("Provider should not be null.");\r
138         for (ProviderSessionImpl session : providerSessions) {\r
139             if (prov.equals(session.getProvider()))\r
140                 throw new IllegalStateException("Provider already registered");\r
141         }\r
142 \r
143     }\r
144 \r
145     private void checkPredicates(Consumer cons) {\r
146         if (cons == null)\r
147             throw new IllegalArgumentException("Consumer should not be null.");\r
148         for (ConsumerSessionImpl session : sessions) {\r
149             if (cons.equals(session.getConsumer()))\r
150                 throw new IllegalStateException("Consumer already registered");\r
151         }\r
152     }\r
153 \r
154     // Private Factory methods\r
155     \r
156     private ConsumerSessionImpl newSessionFor(Consumer cons) {\r
157         return new ConsumerSessionImpl(cons);\r
158     }\r
159 \r
160     private ProviderSessionImpl newSessionFor(Provider provider) {\r
161         return new ProviderSessionImpl(provider);\r
162     }\r
163 \r
164     private void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) {\r
165         sessions.remove(consumerSessionImpl);\r
166         providerSessions.remove(consumerSessionImpl);\r
167     }\r
168 \r
169     private static Callable<RpcResult<CompositeNode>> callableFor(\r
170             final RpcImplementation implemenation, final QName rpc,\r
171             final CompositeNode input) {\r
172 \r
173         return new Callable<RpcResult<CompositeNode>>() {\r
174 \r
175             @Override\r
176             public RpcResult<CompositeNode> call() throws Exception {\r
177                 return implemenation.invokeRpc(rpc, input);\r
178             }\r
179         };\r
180     }\r
181     \r
182     private class ConsumerSessionImpl implements ConsumerSession {\r
183 \r
184         private final Consumer consumer;\r
185 \r
186         private Map<Class<? extends BrokerService>, BrokerService> instantiatedServices = Collections\r
187                 .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerService>());\r
188         private boolean closed = false;\r
189 \r
190         public Consumer getConsumer() {\r
191             return consumer;\r
192         }\r
193 \r
194         public ConsumerSessionImpl(Consumer consumer) {\r
195             this.consumer = consumer;\r
196         }\r
197 \r
198         @Override\r
199         public Future<RpcResult<CompositeNode>> rpc(QName rpc,\r
200                 CompositeNode input) {\r
201             return BrokerImpl.this.invokeRpc(rpc, input);\r
202         }\r
203 \r
204         @Override\r
205         public <T extends BrokerService> T getService(Class<T> service) {\r
206             BrokerService potential = instantiatedServices.get(service);\r
207             if (potential != null) {\r
208                 @SuppressWarnings("unchecked")\r
209                 T ret = (T) potential;\r
210                 return ret;\r
211             }\r
212             T ret = BrokerImpl.this.serviceFor(service, this);\r
213             if (ret != null) {\r
214                 instantiatedServices.put(service, ret);\r
215             }\r
216             return ret;\r
217         }\r
218 \r
219         @Override\r
220         public void close() {\r
221             Collection<BrokerService> toStop = instantiatedServices.values();\r
222             this.closed = true;\r
223             for (BrokerService brokerService : toStop) {\r
224                 brokerService.closeSession();\r
225             }\r
226             BrokerImpl.this.consumerSessionClosed(this);\r
227         }\r
228 \r
229         @Override\r
230         public boolean isClosed() {\r
231             return closed;\r
232         }\r
233 \r
234     }\r
235 \r
236     private class ProviderSessionImpl extends ConsumerSessionImpl implements\r
237             ProviderSession {\r
238 \r
239         private Provider provider;\r
240         private Map<QName, RpcImplementation> sessionRpcImpls = Collections.synchronizedMap(new HashMap<QName, RpcImplementation>());\r
241 \r
242         public ProviderSessionImpl(Provider provider) {\r
243             super(null);\r
244             this.provider = provider;\r
245         }\r
246 \r
247         @Override\r
248         public void addRpcImplementation(QName rpcType,\r
249                 RpcImplementation implementation)\r
250                 throws IllegalArgumentException {\r
251             if (rpcType == null) {\r
252                 throw new IllegalArgumentException("rpcType must not be null");\r
253             }\r
254             if (implementation == null) {\r
255                 throw new IllegalArgumentException(\r
256                         "Implementation must not be null");\r
257             }\r
258             BrokerImpl.this.addRpcImplementation(rpcType, implementation);\r
259             sessionRpcImpls.put(rpcType, implementation);\r
260         }\r
261 \r
262         @Override\r
263         public void removeRpcImplementation(QName rpcType,\r
264                 RpcImplementation implToRemove) throws IllegalArgumentException {\r
265             RpcImplementation localImpl = rpcImpls.get(rpcType);\r
266             if (localImpl != implToRemove) {\r
267                 throw new IllegalStateException(\r
268                         "Implementation was not registered in this session");\r
269             }\r
270 \r
271             BrokerImpl.this.removeRpcImplementation(rpcType, implToRemove);\r
272             sessionRpcImpls.remove(rpcType);\r
273         }\r
274 \r
275         public Provider getProvider() {\r
276             return this.provider;\r
277         }\r
278 \r
279     }\r
280 }\r