Merge "Enhance debug capabilities"
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / BrokerImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.dom.broker;
9
10 import java.util.Collection;
11 import java.util.Collections;
12 import java.util.HashMap;
13 import java.util.HashSet;
14 import java.util.Map;
15 import java.util.Set;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Future;
19 import org.opendaylight.controller.sal.core.api.Broker;
20 import org.opendaylight.controller.sal.core.api.BrokerService;
21 import org.opendaylight.controller.sal.core.api.Consumer;
22 import org.opendaylight.controller.sal.core.api.Provider;
23 import org.opendaylight.controller.sal.core.api.RpcImplementation;
24 import org.opendaylight.controller.sal.core.spi.BrokerModule;
25 import org.opendaylight.yangtools.yang.common.QName;
26 import org.opendaylight.yangtools.yang.common.RpcResult;
27 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
28 import org.osgi.framework.BundleContext;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 public class BrokerImpl implements Broker {
33     private static Logger log = LoggerFactory.getLogger(BrokerImpl.class);
34
35     // Broker Generic Context
36     private Set<ConsumerSessionImpl> sessions = Collections
37             .synchronizedSet(new HashSet<ConsumerSessionImpl>());
38     private Set<ProviderSessionImpl> providerSessions = Collections
39             .synchronizedSet(new HashSet<ProviderSessionImpl>());
40     private Set<BrokerModule> modules = Collections
41             .synchronizedSet(new HashSet<BrokerModule>());
42     private Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections
43             .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
44
45     // RPC Context
46     private Map<QName, RpcImplementation> rpcImpls = Collections
47             .synchronizedMap(new HashMap<QName, RpcImplementation>());
48
49     // Implementation specific
50     private ExecutorService executor;
51
52     private BundleContext bundleContext;
53
54     @Override
55     public ConsumerSession registerConsumer(Consumer consumer,BundleContext ctx) {
56         checkPredicates(consumer);
57         log.info("Registering consumer " + consumer);
58         ConsumerSessionImpl session = newSessionFor(consumer,ctx);
59         consumer.onSessionInitiated(session);
60         sessions.add(session);
61         return session;
62     }
63
64     @Override
65     public ProviderSession registerProvider(Provider provider,BundleContext ctx) {
66         checkPredicates(provider);
67
68         ProviderSessionImpl session = newSessionFor(provider,ctx);
69         provider.onSessionInitiated(session);
70         providerSessions.add(session);
71         return session;
72     }
73
74     public void addModule(BrokerModule module) {
75         log.info("Registering broker module " + module);
76         if (modules.contains(module)) {
77             log.error("Module already registered");
78             throw new IllegalArgumentException("Module already exists.");
79         }
80     
81         Set<Class<? extends BrokerService>> provServices = module
82                 .getProvidedServices();
83         for (Class<? extends BrokerService> serviceType : provServices) {
84             log.info("  Registering session service implementation: "
85                     + serviceType.getCanonicalName());
86             serviceProviders.put(serviceType, module);
87         }
88     }
89
90     public <T extends BrokerService> T serviceFor(Class<T> service,
91             ConsumerSessionImpl session) {
92         BrokerModule prov = serviceProviders.get(service);
93         if (prov == null) {
94             log.warn("Service " + service.toString() + " is not supported");
95             return null;
96         }
97         return prov.getServiceForSession(service, session);
98     }
99
100     // RPC Functionality
101     
102     private void addRpcImplementation(QName rpcType,
103             RpcImplementation implementation) {
104         synchronized (rpcImpls) {
105             if (rpcImpls.get(rpcType) != null) {
106                 throw new IllegalStateException("Implementation for rpc "
107                         + rpcType + " is already registered.");
108             }
109             rpcImpls.put(rpcType, implementation);
110         }
111         // TODO Add notification for availability of Rpc Implementation
112     }
113
114     private void removeRpcImplementation(QName rpcType,
115             RpcImplementation implToRemove) {
116         synchronized (rpcImpls) {
117             if (implToRemove == rpcImpls.get(rpcType)) {
118                 rpcImpls.remove(rpcType);
119             }
120         }
121         // TODO Add notification for removal of Rpc Implementation
122     }
123
124     private Future<RpcResult<CompositeNode>> invokeRpc(QName rpc,
125             CompositeNode input) {
126         RpcImplementation impl = rpcImpls.get(rpc);
127         // if()
128
129         Callable<RpcResult<CompositeNode>> call = callableFor(impl,
130                 rpc, input);
131         Future<RpcResult<CompositeNode>> result = executor.submit(call);
132
133         return result;
134     }
135     
136     // Validation
137
138     private void checkPredicates(Provider prov) {
139         if (prov == null)
140             throw new IllegalArgumentException("Provider should not be null.");
141         for (ProviderSessionImpl session : providerSessions) {
142             if (prov.equals(session.getProvider()))
143                 throw new IllegalStateException("Provider already registered");
144         }
145
146     }
147
148     private void checkPredicates(Consumer cons) {
149         if (cons == null)
150             throw new IllegalArgumentException("Consumer should not be null.");
151         for (ConsumerSessionImpl session : sessions) {
152             if (cons.equals(session.getConsumer()))
153                 throw new IllegalStateException("Consumer already registered");
154         }
155     }
156
157     // Private Factory methods
158     
159     private ConsumerSessionImpl newSessionFor(Consumer provider, BundleContext ctx) {
160         return new ConsumerSessionImpl(provider,ctx);
161     }
162
163     private ProviderSessionImpl newSessionFor(Provider provider, BundleContext ctx) {
164         return new ProviderSessionImpl(provider,ctx);
165     }
166
167     private void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) {
168         sessions.remove(consumerSessionImpl);
169         providerSessions.remove(consumerSessionImpl);
170     }
171
172     private static Callable<RpcResult<CompositeNode>> callableFor(
173             final RpcImplementation implemenation, final QName rpc,
174             final CompositeNode input) {
175
176         return new Callable<RpcResult<CompositeNode>>() {
177
178             @Override
179             public RpcResult<CompositeNode> call() throws Exception {
180                 return implemenation.invokeRpc(rpc, input);
181             }
182         };
183     }
184     
185     private class ConsumerSessionImpl implements ConsumerSession {
186
187         private final Consumer consumer;
188
189         private Map<Class<? extends BrokerService>, BrokerService> instantiatedServices = Collections
190                 .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerService>());
191         private boolean closed = false;
192
193         private BundleContext context;
194
195         public Consumer getConsumer() {
196             return consumer;
197         }
198
199         public ConsumerSessionImpl(Consumer consumer, BundleContext ctx) {
200             this.consumer = consumer;
201             this.context = ctx;
202         }
203
204         @Override
205         public Future<RpcResult<CompositeNode>> rpc(QName rpc,
206                 CompositeNode input) {
207             return BrokerImpl.this.invokeRpc(rpc, input);
208         }
209
210         @Override
211         public <T extends BrokerService> T getService(Class<T> service) {
212             BrokerService potential = instantiatedServices.get(service);
213             if (potential != null) {
214                 @SuppressWarnings("unchecked")
215                 T ret = (T) potential;
216                 return ret;
217             }
218             T ret = BrokerImpl.this.serviceFor(service, this);
219             if (ret != null) {
220                 instantiatedServices.put(service, ret);
221             }
222             return ret;
223         }
224
225         @Override
226         public void close() {
227             Collection<BrokerService> toStop = instantiatedServices.values();
228             this.closed = true;
229             for (BrokerService brokerService : toStop) {
230                 brokerService.closeSession();
231             }
232             BrokerImpl.this.consumerSessionClosed(this);
233         }
234
235         @Override
236         public boolean isClosed() {
237             return closed;
238         }
239
240     }
241
242     private class ProviderSessionImpl extends ConsumerSessionImpl implements
243             ProviderSession {
244
245         private Provider provider;
246         private Map<QName, RpcImplementation> sessionRpcImpls = Collections.synchronizedMap(new HashMap<QName, RpcImplementation>());
247
248         public ProviderSessionImpl(Provider provider, BundleContext ctx) {
249             super(null,ctx);
250             this.provider = provider;
251         }
252
253         @Override
254         public void addRpcImplementation(QName rpcType,
255                 RpcImplementation implementation)
256                 throws IllegalArgumentException {
257             if (rpcType == null) {
258                 throw new IllegalArgumentException("rpcType must not be null");
259             }
260             if (implementation == null) {
261                 throw new IllegalArgumentException(
262                         "Implementation must not be null");
263             }
264             BrokerImpl.this.addRpcImplementation(rpcType, implementation);
265             sessionRpcImpls.put(rpcType, implementation);
266         }
267
268         @Override
269         public void removeRpcImplementation(QName rpcType,
270                 RpcImplementation implToRemove) throws IllegalArgumentException {
271             RpcImplementation localImpl = rpcImpls.get(rpcType);
272             if (localImpl != implToRemove) {
273                 throw new IllegalStateException(
274                         "Implementation was not registered in this session");
275             }
276
277             BrokerImpl.this.removeRpcImplementation(rpcType, implToRemove);
278             sessionRpcImpls.remove(rpcType);
279         }
280
281         public Provider getProvider() {
282             return this.provider;
283         }
284
285     }
286
287     public void setBundleContext(BundleContext context) {
288         this.bundleContext = context;
289     }
290 }