Added support for binding-independent RPCs
[controller.git] / opendaylight / sal / yang-prototype / sal / sal-broker-impl / src / main / java / org / opendaylight / controller / sal / core / impl / DataBrokerModule.java
1 /*\r
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.\r
3  *\r
4  * This program and the accompanying materials are made available under the\r
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
6  * and is available at http://www.eclipse.org/legal/epl-v10.html\r
7  */\r
8 package org.opendaylight.controller.sal.core.impl;\r
9 \r
10 import java.util.ArrayList;\r
11 import java.util.Collections;\r
12 import java.util.HashSet;\r
13 import java.util.List;\r
14 import java.util.Map;\r
15 import java.util.Set;\r
16 import java.util.concurrent.ExecutorService;\r
17 import java.util.concurrent.Future;\r
18 \r
19 import org.opendaylight.controller.sal.common.DataStoreIdentifier;\r
20 import org.opendaylight.controller.sal.common.util.Rpcs;\r
21 import org.opendaylight.controller.sal.core.api.BrokerService;\r
22 import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;\r
23 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;\r
24 import org.opendaylight.controller.sal.core.api.Consumer.ConsumerFunctionality;\r
25 import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality;\r
26 import org.opendaylight.controller.sal.core.api.data.DataBrokerService;\r
27 import org.opendaylight.controller.sal.core.api.data.DataCommitHandler;\r
28 import org.opendaylight.controller.sal.core.api.data.DataProviderService;\r
29 import org.opendaylight.controller.sal.core.api.data.DataValidator;\r
30 import org.opendaylight.controller.sal.core.api.data.DataCommitHandler.CommitTransaction;\r
31 import org.opendaylight.controller.sal.core.api.data.DataProviderService.DataRefresher;\r
32 import org.opendaylight.controller.sal.core.spi.BrokerModule;\r
33 import org.opendaylight.controller.yang.common.RpcError;\r
34 import org.opendaylight.controller.yang.common.RpcResult;\r
35 import org.opendaylight.controller.yang.data.api.CompositeNode;\r
36 import org.opendaylight.controller.yang.data.api.CompositeNodeModification;\r
37 import org.slf4j.Logger;\r
38 import org.slf4j.LoggerFactory;\r
39 \r
40 import com.google.common.collect.ImmutableSet;\r
41 \r
42 public class DataBrokerModule implements BrokerModule {\r
43 \r
44     private static final Logger log = LoggerFactory\r
45             .getLogger(DataBrokerModule.class);\r
46 \r
47     private static final Set<Class<? extends ProviderFunctionality>> SUPPORTED_PROVIDER_FUNCTIONALITY = ImmutableSet\r
48             .of((Class<? extends ProviderFunctionality>) DataValidator.class,\r
49                     DataRefresher.class, DataCommitHandler.class);\r
50 \r
51     private static final Set<Class<? extends BrokerService>> PROVIDED_SESSION_SERVICES = ImmutableSet\r
52             .of((Class<? extends BrokerService>) DataBrokerService.class,\r
53                     DataProviderService.class);\r
54 \r
55     private Map<DataStoreIdentifier, StoreContext> storeContext;\r
56 \r
57     private ExecutorService executor;\r
58     \r
59     private SequentialCommitHandlerCoordinator coordinator = new SequentialCommitHandlerCoordinator();\r
60 \r
61     @Override\r
62     public Set<Class<? extends BrokerService>> getProvidedServices() {\r
63         return PROVIDED_SESSION_SERVICES;\r
64     }\r
65 \r
66     @Override\r
67     public Set<Class<? extends ProviderFunctionality>> getSupportedProviderFunctionality() {\r
68         return SUPPORTED_PROVIDER_FUNCTIONALITY;\r
69     }\r
70 \r
71     @Override\r
72     public Set<Class<? extends ConsumerFunctionality>> getSupportedConsumerFunctionality() {\r
73         return Collections.emptySet();\r
74     }\r
75 \r
76     @Override\r
77     public <T extends BrokerService> T getServiceForSession(Class<T> service,\r
78             ConsumerSession session) {\r
79         if (DataProviderService.class.equals(service)\r
80                 && session instanceof ProviderSession) {\r
81             @SuppressWarnings("unchecked")\r
82             T ret = (T) newDataProviderService(session);\r
83             return ret;\r
84         } else if (DataBrokerService.class.equals(service)) {\r
85 \r
86             @SuppressWarnings("unchecked")\r
87             T ret = (T) newDataConsumerService(session);\r
88             return ret;\r
89         }\r
90 \r
91         throw new IllegalArgumentException(\r
92                 "The requested session-specific service is not provided by this module.");\r
93     }\r
94 \r
95     private DataProviderService newDataProviderService(ConsumerSession session) {\r
96         return new DataProviderSession();\r
97     }\r
98 \r
99     private DataBrokerService newDataConsumerService(ConsumerSession session) {\r
100         return new DataConsumerSession();\r
101     }\r
102 \r
103     private StoreContext context(DataStoreIdentifier store) {\r
104         return storeContext.get(store);\r
105     }\r
106 \r
107     private static class StoreContext {\r
108         private Set<DataCommitHandler> commitHandlers = Collections\r
109                 .synchronizedSet(new HashSet<DataCommitHandler>());\r
110         private Set<DataValidator> validators = Collections\r
111                 .synchronizedSet(new HashSet<DataValidator>());\r
112         private Set<DataRefresher> refreshers = Collections\r
113                 .synchronizedSet(new HashSet<DataRefresher>());\r
114     }\r
115 \r
116     private class DataConsumerSession implements DataBrokerService {\r
117 \r
118         @Override\r
119         public CompositeNode getData(DataStoreIdentifier store) {\r
120             // TODO Implement this method\r
121             throw new UnsupportedOperationException("Not implemented");\r
122         }\r
123 \r
124         @Override\r
125         public CompositeNode getData(DataStoreIdentifier store,\r
126                 CompositeNode filter) {\r
127             // TODO Implement this method\r
128             throw new UnsupportedOperationException("Not implemented");\r
129         }\r
130 \r
131         @Override\r
132         public CompositeNode getCandidateData(DataStoreIdentifier store) {\r
133             // TODO Implement this method\r
134             throw new UnsupportedOperationException("Not implemented");\r
135         }\r
136 \r
137         @Override\r
138         public CompositeNode getCandidateData(DataStoreIdentifier store,\r
139                 CompositeNode filter) {\r
140             // TODO Implement this method\r
141             throw new UnsupportedOperationException("Not implemented");\r
142         }\r
143 \r
144         @Override\r
145         public RpcResult<CompositeNode> editCandidateData(\r
146                 DataStoreIdentifier store, CompositeNodeModification changeSet) {\r
147             // TODO Implement this method\r
148             throw new UnsupportedOperationException("Not implemented");\r
149         }\r
150 \r
151         @Override\r
152         public Future<RpcResult<Void>> commit(DataStoreIdentifier store) {\r
153             // TODO Implement this method\r
154             throw new UnsupportedOperationException("Not implemented");\r
155         }\r
156 \r
157         @Override\r
158         public void closeSession() {\r
159             // TODO Implement this method\r
160             throw new UnsupportedOperationException("Not implemented");\r
161         }\r
162 \r
163         @Override\r
164         public Set<DataStoreIdentifier> getDataStores() {\r
165             // TODO Auto-generated method stub\r
166             return null;\r
167         }\r
168 \r
169     }\r
170 \r
171     private class DataProviderSession extends DataConsumerSession implements\r
172             DataProviderService {\r
173 \r
174         private Set<DataCommitHandler> providerCommitHandlers = new HashSet<DataCommitHandler>();\r
175         private Set<DataValidator> providerValidators = new HashSet<DataValidator>();\r
176         private Set<DataRefresher> providerRefreshers = new HashSet<DataRefresher>();\r
177 \r
178         @Override\r
179         public void addValidator(DataStoreIdentifier store,\r
180                 DataValidator validator) {\r
181             if (validator == null)\r
182                 throw new IllegalArgumentException(\r
183                         "Validator should not be null");\r
184 \r
185             providerValidators.add(validator);\r
186             context(store).validators.add(validator);\r
187         }\r
188 \r
189         @Override\r
190         public void removeValidator(DataStoreIdentifier store,\r
191                 DataValidator validator) {\r
192             if (validator == null)\r
193                 throw new IllegalArgumentException(\r
194                         "Validator should not be null");\r
195 \r
196             providerValidators.remove(validator);\r
197             context(store).validators.remove(validator);\r
198         }\r
199 \r
200         @Override\r
201         public void addCommitHandler(DataStoreIdentifier store,\r
202                 DataCommitHandler provider) {\r
203             if (provider == null)\r
204                 throw new IllegalArgumentException(\r
205                         "CommitHandler should not be null");\r
206 \r
207             providerCommitHandlers.add(provider);\r
208             context(store).commitHandlers.add(provider);\r
209         }\r
210 \r
211         @Override\r
212         public void removeCommitHandler(DataStoreIdentifier store,\r
213                 DataCommitHandler provider) {\r
214             if (provider == null)\r
215                 throw new IllegalArgumentException(\r
216                         "CommitHandler should not be null");\r
217 \r
218             providerCommitHandlers.remove(provider);\r
219             context(store).commitHandlers.remove(provider);\r
220         }\r
221 \r
222         @Override\r
223         public void addRefresher(DataStoreIdentifier store,\r
224                 DataRefresher provider) {\r
225             if (provider == null)\r
226                 throw new IllegalArgumentException(\r
227                         "Refresher should not be null");\r
228 \r
229             providerRefreshers.add(provider);\r
230             context(store).refreshers.add(provider);\r
231         }\r
232 \r
233         @Override\r
234         public void removeRefresher(DataStoreIdentifier store,\r
235                 DataRefresher provider) {\r
236             if (provider == null)\r
237                 throw new IllegalArgumentException(\r
238                         "Refresher should not be null");\r
239 \r
240             providerRefreshers.remove(provider);\r
241             context(store).refreshers.remove(provider);\r
242         }\r
243 \r
244     }\r
245 \r
246     private class SequentialCommitHandlerCoordinator implements\r
247             DataCommitHandler {\r
248 \r
249         @Override\r
250         public RpcResult<CommitTransaction> requestCommit(\r
251                 DataStoreIdentifier store) {\r
252             List<RpcError> errors = new ArrayList<RpcError>();\r
253             Set<CommitTransaction> transactions = new HashSet<DataCommitHandler.CommitTransaction>();\r
254             boolean successful = true;\r
255 \r
256             for (DataCommitHandler commitHandler : context(store).commitHandlers) {\r
257                 try {\r
258                     RpcResult<CommitTransaction> partialResult = commitHandler\r
259                             .requestCommit(store);\r
260                     successful = partialResult.isSuccessful() & successful;\r
261                     if (partialResult.isSuccessful()) {\r
262                         transactions.add(partialResult.getResult());\r
263                     }\r
264 \r
265                     errors.addAll(partialResult.getErrors());\r
266                 } catch (Exception e) {\r
267                     log.error("Uncaught exception prevented commit request."\r
268                             + e.getMessage(), e);\r
269                     successful = false;\r
270                     // FIXME: Add RPC Error with exception.\r
271                 }\r
272                 if (successful == false)\r
273                     break;\r
274             }\r
275             CommitTransaction transaction = new SequentialCommitTransaction(\r
276                     store, transactions);\r
277             return Rpcs.getRpcResult(successful, transaction, errors);\r
278         }\r
279 \r
280         @Override\r
281         public Set<DataStoreIdentifier> getSupportedDataStores() {\r
282             return Collections.emptySet();\r
283         }\r
284     }\r
285 \r
286     private class SequentialCommitTransaction implements CommitTransaction {\r
287 \r
288         final Set<CommitTransaction> transactions;\r
289         final DataStoreIdentifier store;\r
290 \r
291         public SequentialCommitTransaction(DataStoreIdentifier s,\r
292                 Set<CommitTransaction> t) {\r
293             transactions = t;\r
294             store = s;\r
295         }\r
296 \r
297         @Override\r
298         public RpcResult<Void> finish() {\r
299             List<RpcError> errors = new ArrayList<RpcError>();\r
300             boolean successful = true;\r
301 \r
302             for (CommitTransaction commitHandler : transactions) {\r
303                 try {\r
304                     RpcResult<Void> partialResult = commitHandler.finish();\r
305                     successful = partialResult.isSuccessful() & successful;\r
306                     errors.addAll(partialResult.getErrors());\r
307                 } catch (Exception e) {\r
308                     log.error(\r
309                             "Uncaught exception prevented finishing of commit."\r
310                                     + e.getMessage(), e);\r
311                     successful = false;\r
312                     // FIXME: Add RPC Error with exception.\r
313                 }\r
314                 if (successful == false)\r
315                     break;\r
316             }\r
317 \r
318             return Rpcs.getRpcResult(successful, null, errors);\r
319         }\r
320 \r
321         @Override\r
322         public RpcResult<Void> rollback() {\r
323             List<RpcError> errors = new ArrayList<RpcError>();\r
324             boolean successful = true;\r
325 \r
326             for (CommitTransaction commitHandler : transactions) {\r
327                 try {\r
328                     RpcResult<Void> partialResult = commitHandler.rollback();\r
329                     successful = partialResult.isSuccessful() & successful;\r
330                     errors.addAll(partialResult.getErrors());\r
331                 } catch (Exception e) {\r
332                     log.error(\r
333                             "Uncaught exception prevented rollback of commit."\r
334                                     + e.getMessage(), e);\r
335                     successful = false;\r
336                     // FIXME: Add RPC Error with exception.\r
337                 }\r
338                 if (successful == false)\r
339                     break;\r
340             }\r
341 \r
342             return Rpcs.getRpcResult(successful, null, errors);\r
343         }\r
344 \r
345         @Override\r
346         public DataStoreIdentifier getDataStore() {\r
347             return this.store;\r
348         }\r
349 \r
350         @Override\r
351         public DataCommitHandler getHandler() {\r
352             return coordinator;\r
353         }\r
354     }\r
355 \r
356     private class ValidationCoordinator implements DataValidator {\r
357 \r
358         private final DataStoreIdentifier store;\r
359 \r
360         ValidationCoordinator(DataStoreIdentifier store) {\r
361             this.store = store;\r
362         }\r
363 \r
364         @Override\r
365         public RpcResult<Void> validate(CompositeNode toValidate) {\r
366             List<RpcError> errors = new ArrayList<RpcError>();\r
367             boolean successful = true;\r
368 \r
369             for (DataValidator validator : context(store).validators) {\r
370                 try {\r
371                     RpcResult<Void> partialResult = validator\r
372                             .validate(toValidate);\r
373                     successful = partialResult.isSuccessful() & successful;\r
374                     errors.addAll(partialResult.getErrors());\r
375                 } catch (Exception e) {\r
376                     log.error(\r
377                             "Uncaught exception prevented validation."\r
378                                     + e.getMessage(), e);\r
379                     successful = false;\r
380                     // FIXME: Add RPC Error with exception.\r
381                 }\r
382                 if (successful == false)\r
383                     break;\r
384             }\r
385 \r
386             return Rpcs.getRpcResult(successful, null, errors);\r
387         }\r
388 \r
389         @Override\r
390         public Set<DataStoreIdentifier> getSupportedDataStores() {\r
391             return Collections.emptySet();\r
392         }\r
393 \r
394     }\r
395 \r
396     private class DataRefreshCoordinator implements DataRefresher {\r
397 \r
398         private final DataStoreIdentifier store;\r
399 \r
400         DataRefreshCoordinator(DataStoreIdentifier store) {\r
401             this.store = store;\r
402         }\r
403 \r
404         @Override\r
405         public void refreshData() {\r
406 \r
407             for (DataRefresher refresher : context(store).refreshers) {\r
408                 try {\r
409                     refresher.refreshData();\r
410                 } catch (Exception e) {\r
411                     log.error(\r
412                             "Uncaught exception during refresh of data: "\r
413                                     + e.getMessage(), e);\r
414                 }\r
415 \r
416             }\r
417         }\r
418     }\r
419 }\r