Merge "Devices dashlet show port total numbers, modal shows collapsible list Replaced...
[controller.git] / opendaylight / sal / yang-prototype / sal / sal-broker-impl / src / main / java / org / opendaylight / controller / sal / core / impl / BrokerImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.core.impl;
9
10 import java.util.Collection;
11 import java.util.Collections;
12 import java.util.HashMap;
13 import java.util.HashSet;
14 import java.util.Map;
15 import java.util.Set;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Future;
19 import org.opendaylight.controller.sal.core.api.Broker;
20 import org.opendaylight.controller.sal.core.api.BrokerService;
21 import org.opendaylight.controller.sal.core.api.Consumer;
22 import org.opendaylight.controller.sal.core.api.Provider;
23 import org.opendaylight.controller.sal.core.api.RpcImplementation;
24 import org.opendaylight.controller.sal.core.spi.BrokerModule;
25 import org.opendaylight.yangtools.yang.common.QName;
26 import org.opendaylight.yangtools.yang.common.RpcResult;
27 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 public class BrokerImpl implements Broker {
32     private static Logger log = LoggerFactory.getLogger(BrokerImpl.class);
33
34     // Broker Generic Context
35     private Set<ConsumerSessionImpl> sessions = Collections
36             .synchronizedSet(new HashSet<ConsumerSessionImpl>());
37     private Set<ProviderSessionImpl> providerSessions = Collections
38             .synchronizedSet(new HashSet<ProviderSessionImpl>());
39     private Set<BrokerModule> modules = Collections
40             .synchronizedSet(new HashSet<BrokerModule>());
41     private Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections
42             .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
43
44     // RPC Context
45     private Map<QName, RpcImplementation> rpcImpls = Collections
46             .synchronizedMap(new HashMap<QName, RpcImplementation>());
47
48     // Implementation specific
49     private ExecutorService executor;
50
51     @Override
52     public ConsumerSession registerConsumer(Consumer consumer) {
53         checkPredicates(consumer);
54         log.info("Registering consumer " + consumer);
55         ConsumerSessionImpl session = newSessionFor(consumer);
56         consumer.onSessionInitiated(session);
57         sessions.add(session);
58         return session;
59     }
60
61     @Override
62     public ProviderSession registerProvider(Provider provider) {
63         checkPredicates(provider);
64
65         ProviderSessionImpl session = newSessionFor(provider);
66         provider.onSessionInitiated(session);
67         providerSessions.add(session);
68         return session;
69     }
70
71     public void addModule(BrokerModule module) {
72         log.info("Registering broker module " + module);
73         if (modules.contains(module)) {
74             log.error("Module already registered");
75             throw new IllegalArgumentException("Module already exists.");
76         }
77     
78         Set<Class<? extends BrokerService>> provServices = module
79                 .getProvidedServices();
80         for (Class<? extends BrokerService> serviceType : provServices) {
81             log.info("  Registering session service implementation: "
82                     + serviceType.getCanonicalName());
83             serviceProviders.put(serviceType, module);
84         }
85     }
86
87     public <T extends BrokerService> T serviceFor(Class<T> service,
88             ConsumerSessionImpl session) {
89         BrokerModule prov = serviceProviders.get(service);
90         if (prov == null) {
91             log.warn("Service " + service.toString() + " is not supported");
92             return null;
93         }
94         return prov.getServiceForSession(service, session);
95     }
96
97     // RPC Functionality
98     
99     private void addRpcImplementation(QName rpcType,
100             RpcImplementation implementation) {
101         synchronized (rpcImpls) {
102             if (rpcImpls.get(rpcType) != null) {
103                 throw new IllegalStateException("Implementation for rpc "
104                         + rpcType + " is already registered.");
105             }
106             rpcImpls.put(rpcType, implementation);
107         }
108         // TODO Add notification for availability of Rpc Implementation
109     }
110
111     private void removeRpcImplementation(QName rpcType,
112             RpcImplementation implToRemove) {
113         synchronized (rpcImpls) {
114             if (implToRemove == rpcImpls.get(rpcType)) {
115                 rpcImpls.remove(rpcType);
116             }
117         }
118         // TODO Add notification for removal of Rpc Implementation
119     }
120
121     private Future<RpcResult<CompositeNode>> invokeRpc(QName rpc,
122             CompositeNode input) {
123         RpcImplementation impl = rpcImpls.get(rpc);
124         // if()
125
126         Callable<RpcResult<CompositeNode>> call = callableFor(impl,
127                 rpc, input);
128         Future<RpcResult<CompositeNode>> result = executor.submit(call);
129
130         return result;
131     }
132     
133     // Validation
134
135     private void checkPredicates(Provider prov) {
136         if (prov == null)
137             throw new IllegalArgumentException("Provider should not be null.");
138         for (ProviderSessionImpl session : providerSessions) {
139             if (prov.equals(session.getProvider()))
140                 throw new IllegalStateException("Provider already registered");
141         }
142
143     }
144
145     private void checkPredicates(Consumer cons) {
146         if (cons == null)
147             throw new IllegalArgumentException("Consumer should not be null.");
148         for (ConsumerSessionImpl session : sessions) {
149             if (cons.equals(session.getConsumer()))
150                 throw new IllegalStateException("Consumer already registered");
151         }
152     }
153
154     // Private Factory methods
155     
156     private ConsumerSessionImpl newSessionFor(Consumer cons) {
157         return new ConsumerSessionImpl(cons);
158     }
159
160     private ProviderSessionImpl newSessionFor(Provider provider) {
161         return new ProviderSessionImpl(provider);
162     }
163
164     private void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) {
165         sessions.remove(consumerSessionImpl);
166         providerSessions.remove(consumerSessionImpl);
167     }
168
169     private static Callable<RpcResult<CompositeNode>> callableFor(
170             final RpcImplementation implemenation, final QName rpc,
171             final CompositeNode input) {
172
173         return new Callable<RpcResult<CompositeNode>>() {
174
175             @Override
176             public RpcResult<CompositeNode> call() throws Exception {
177                 return implemenation.invokeRpc(rpc, input);
178             }
179         };
180     }
181     
182     private class ConsumerSessionImpl implements ConsumerSession {
183
184         private final Consumer consumer;
185
186         private Map<Class<? extends BrokerService>, BrokerService> instantiatedServices = Collections
187                 .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerService>());
188         private boolean closed = false;
189
190         public Consumer getConsumer() {
191             return consumer;
192         }
193
194         public ConsumerSessionImpl(Consumer consumer) {
195             this.consumer = consumer;
196         }
197
198         @Override
199         public Future<RpcResult<CompositeNode>> rpc(QName rpc,
200                 CompositeNode input) {
201             return BrokerImpl.this.invokeRpc(rpc, input);
202         }
203
204         @Override
205         public <T extends BrokerService> T getService(Class<T> service) {
206             BrokerService potential = instantiatedServices.get(service);
207             if (potential != null) {
208                 @SuppressWarnings("unchecked")
209                 T ret = (T) potential;
210                 return ret;
211             }
212             T ret = BrokerImpl.this.serviceFor(service, this);
213             if (ret != null) {
214                 instantiatedServices.put(service, ret);
215             }
216             return ret;
217         }
218
219         @Override
220         public void close() {
221             Collection<BrokerService> toStop = instantiatedServices.values();
222             this.closed = true;
223             for (BrokerService brokerService : toStop) {
224                 brokerService.closeSession();
225             }
226             BrokerImpl.this.consumerSessionClosed(this);
227         }
228
229         @Override
230         public boolean isClosed() {
231             return closed;
232         }
233
234     }
235
236     private class ProviderSessionImpl extends ConsumerSessionImpl implements
237             ProviderSession {
238
239         private Provider provider;
240         private Map<QName, RpcImplementation> sessionRpcImpls = Collections.synchronizedMap(new HashMap<QName, RpcImplementation>());
241
242         public ProviderSessionImpl(Provider provider) {
243             super(null);
244             this.provider = provider;
245         }
246
247         @Override
248         public void addRpcImplementation(QName rpcType,
249                 RpcImplementation implementation)
250                 throws IllegalArgumentException {
251             if (rpcType == null) {
252                 throw new IllegalArgumentException("rpcType must not be null");
253             }
254             if (implementation == null) {
255                 throw new IllegalArgumentException(
256                         "Implementation must not be null");
257             }
258             BrokerImpl.this.addRpcImplementation(rpcType, implementation);
259             sessionRpcImpls.put(rpcType, implementation);
260         }
261
262         @Override
263         public void removeRpcImplementation(QName rpcType,
264                 RpcImplementation implToRemove) throws IllegalArgumentException {
265             RpcImplementation localImpl = rpcImpls.get(rpcType);
266             if (localImpl != implToRemove) {
267                 throw new IllegalStateException(
268                         "Implementation was not registered in this session");
269             }
270
271             BrokerImpl.this.removeRpcImplementation(rpcType, implToRemove);
272             sessionRpcImpls.remove(rpcType);
273         }
274
275         public Provider getProvider() {
276             return this.provider;
277         }
278
279     }
280 }