7fa20d7bea652a44b8a70b4198cb5227a292df47
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / DataBrokerModule.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.dom.broker;
9
10 import java.util.ArrayList;
11 import java.util.Collections;
12 import java.util.HashSet;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.Set;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Future;
18
19 import org.opendaylight.controller.sal.common.DataStoreIdentifier;
20 import org.opendaylight.controller.sal.common.util.Rpcs;
21 import org.opendaylight.controller.sal.core.api.BrokerService;
22 import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;
23 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
24 import org.opendaylight.controller.sal.core.api.Consumer.ConsumerFunctionality;
25 import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality;
26 import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
27 import org.opendaylight.controller.sal.core.api.data.DataCommitHandler;
28 import org.opendaylight.controller.sal.core.api.data.DataProviderService;
29 import org.opendaylight.controller.sal.core.api.data.DataValidator;
30 import org.opendaylight.controller.sal.core.api.data.DataCommitHandler.CommitTransaction;
31 import org.opendaylight.controller.sal.core.api.data.DataProviderService.DataRefresher;
32 import org.opendaylight.controller.sal.core.spi.BrokerModule;
33 import org.opendaylight.yangtools.yang.common.RpcError;
34 import org.opendaylight.yangtools.yang.common.RpcResult;
35 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
36 import org.opendaylight.yangtools.yang.data.api.MutableCompositeNode;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import com.google.common.collect.ImmutableSet;
41
42 public class DataBrokerModule implements BrokerModule {
43
44     private static final Logger log = LoggerFactory
45             .getLogger(DataBrokerModule.class);
46
47     private static final Set<Class<? extends ProviderFunctionality>> SUPPORTED_PROVIDER_FUNCTIONALITY = ImmutableSet
48             .of((Class<? extends ProviderFunctionality>) DataValidator.class,
49                     DataRefresher.class, DataCommitHandler.class);
50
51     private static final Set<Class<? extends BrokerService>> PROVIDED_SESSION_SERVICES = ImmutableSet
52             .of((Class<? extends BrokerService>) DataBrokerService.class,
53                     DataProviderService.class);
54
55     private Map<DataStoreIdentifier, StoreContext> storeContext;
56
57     private ExecutorService executor;
58     
59     private SequentialCommitHandlerCoordinator coordinator = new SequentialCommitHandlerCoordinator();
60
61     @Override
62     public Set<Class<? extends BrokerService>> getProvidedServices() {
63         return PROVIDED_SESSION_SERVICES;
64     }
65
66     @Override
67     public Set<Class<? extends ProviderFunctionality>> getSupportedProviderFunctionality() {
68         return SUPPORTED_PROVIDER_FUNCTIONALITY;
69     }
70
71     @Override
72     public Set<Class<? extends ConsumerFunctionality>> getSupportedConsumerFunctionality() {
73         return Collections.emptySet();
74     }
75
76     @Override
77     public <T extends BrokerService> T getServiceForSession(Class<T> service,
78             ConsumerSession session) {
79         if (DataProviderService.class.equals(service)
80                 && session instanceof ProviderSession) {
81             @SuppressWarnings("unchecked")
82             T ret = (T) newDataProviderService(session);
83             return ret;
84         } else if (DataBrokerService.class.equals(service)) {
85
86             @SuppressWarnings("unchecked")
87             T ret = (T) newDataConsumerService(session);
88             return ret;
89         }
90
91         throw new IllegalArgumentException(
92                 "The requested session-specific service is not provided by this module.");
93     }
94
95     private DataProviderService newDataProviderService(ConsumerSession session) {
96         return new DataProviderSession();
97     }
98
99     private DataBrokerService newDataConsumerService(ConsumerSession session) {
100         return new DataConsumerSession();
101     }
102
103     private StoreContext context(DataStoreIdentifier store) {
104         return storeContext.get(store);
105     }
106
107     private static class StoreContext {
108         private Set<DataCommitHandler> commitHandlers = Collections
109                 .synchronizedSet(new HashSet<DataCommitHandler>());
110         private Set<DataValidator> validators = Collections
111                 .synchronizedSet(new HashSet<DataValidator>());
112         private Set<DataRefresher> refreshers = Collections
113                 .synchronizedSet(new HashSet<DataRefresher>());
114     }
115
116     private class DataConsumerSession implements DataBrokerService {
117
118         @Override
119         public CompositeNode getData(DataStoreIdentifier store) {
120             // TODO Implement this method
121             throw new UnsupportedOperationException("Not implemented");
122         }
123
124         @Override
125         public CompositeNode getData(DataStoreIdentifier store,
126                 CompositeNode filter) {
127             // TODO Implement this method
128             throw new UnsupportedOperationException("Not implemented");
129         }
130
131         @Override
132         public CompositeNode getCandidateData(DataStoreIdentifier store) {
133             // TODO Implement this method
134             throw new UnsupportedOperationException("Not implemented");
135         }
136
137         @Override
138         public CompositeNode getCandidateData(DataStoreIdentifier store,
139                 CompositeNode filter) {
140             // TODO Implement this method
141             throw new UnsupportedOperationException("Not implemented");
142         }
143
144         @Override
145         public RpcResult<CompositeNode> editCandidateData(
146                 DataStoreIdentifier store, MutableCompositeNode changeSet) {
147             // TODO Implement this method
148             throw new UnsupportedOperationException("Not implemented");
149         }
150
151         @Override
152         public Future<RpcResult<Void>> commit(DataStoreIdentifier store) {
153             // TODO Implement this method
154             throw new UnsupportedOperationException("Not implemented");
155         }
156
157         @Override
158         public void closeSession() {
159             // TODO Implement this method
160             throw new UnsupportedOperationException("Not implemented");
161         }
162
163         @Override
164         public Set<DataStoreIdentifier> getDataStores() {
165             // TODO Auto-generated method stub
166             return null;
167         }
168
169     }
170
171     private class DataProviderSession extends DataConsumerSession implements
172             DataProviderService {
173
174         private Set<DataCommitHandler> providerCommitHandlers = new HashSet<DataCommitHandler>();
175         private Set<DataValidator> providerValidators = new HashSet<DataValidator>();
176         private Set<DataRefresher> providerRefreshers = new HashSet<DataRefresher>();
177
178         @Override
179         public void addValidator(DataStoreIdentifier store,
180                 DataValidator validator) {
181             if (validator == null)
182                 throw new IllegalArgumentException(
183                         "Validator should not be null");
184
185             providerValidators.add(validator);
186             context(store).validators.add(validator);
187         }
188
189         @Override
190         public void removeValidator(DataStoreIdentifier store,
191                 DataValidator validator) {
192             if (validator == null)
193                 throw new IllegalArgumentException(
194                         "Validator should not be null");
195
196             providerValidators.remove(validator);
197             context(store).validators.remove(validator);
198         }
199
200         @Override
201         public void addCommitHandler(DataStoreIdentifier store,
202                 DataCommitHandler provider) {
203             if (provider == null)
204                 throw new IllegalArgumentException(
205                         "CommitHandler should not be null");
206
207             providerCommitHandlers.add(provider);
208             context(store).commitHandlers.add(provider);
209         }
210
211         @Override
212         public void removeCommitHandler(DataStoreIdentifier store,
213                 DataCommitHandler provider) {
214             if (provider == null)
215                 throw new IllegalArgumentException(
216                         "CommitHandler should not be null");
217
218             providerCommitHandlers.remove(provider);
219             context(store).commitHandlers.remove(provider);
220         }
221
222         @Override
223         public void addRefresher(DataStoreIdentifier store,
224                 DataRefresher provider) {
225             if (provider == null)
226                 throw new IllegalArgumentException(
227                         "Refresher should not be null");
228
229             providerRefreshers.add(provider);
230             context(store).refreshers.add(provider);
231         }
232
233         @Override
234         public void removeRefresher(DataStoreIdentifier store,
235                 DataRefresher provider) {
236             if (provider == null)
237                 throw new IllegalArgumentException(
238                         "Refresher should not be null");
239
240             providerRefreshers.remove(provider);
241             context(store).refreshers.remove(provider);
242         }
243
244     }
245
246     private class SequentialCommitHandlerCoordinator implements
247             DataCommitHandler {
248
249         @Override
250         public RpcResult<CommitTransaction> requestCommit(
251                 DataStoreIdentifier store) {
252             List<RpcError> errors = new ArrayList<RpcError>();
253             Set<CommitTransaction> transactions = new HashSet<DataCommitHandler.CommitTransaction>();
254             boolean successful = true;
255
256             for (DataCommitHandler commitHandler : context(store).commitHandlers) {
257                 try {
258                     RpcResult<CommitTransaction> partialResult = commitHandler
259                             .requestCommit(store);
260                     successful = partialResult.isSuccessful() & successful;
261                     if (partialResult.isSuccessful()) {
262                         transactions.add(partialResult.getResult());
263                     }
264
265                     errors.addAll(partialResult.getErrors());
266                 } catch (Exception e) {
267                     log.error("Uncaught exception prevented commit request."
268                             + e.getMessage(), e);
269                     successful = false;
270                     // FIXME: Add RPC Error with exception.
271                 }
272                 if (successful == false)
273                     break;
274             }
275             CommitTransaction transaction = new SequentialCommitTransaction(
276                     store, transactions);
277             return Rpcs.getRpcResult(successful, transaction, errors);
278         }
279
280         @Override
281         public Set<DataStoreIdentifier> getSupportedDataStores() {
282             return Collections.emptySet();
283         }
284     }
285
286     private class SequentialCommitTransaction implements CommitTransaction {
287
288         final Set<CommitTransaction> transactions;
289         final DataStoreIdentifier store;
290
291         public SequentialCommitTransaction(DataStoreIdentifier s,
292                 Set<CommitTransaction> t) {
293             transactions = t;
294             store = s;
295         }
296
297         @Override
298         public RpcResult<Void> finish() {
299             List<RpcError> errors = new ArrayList<RpcError>();
300             boolean successful = true;
301
302             for (CommitTransaction commitHandler : transactions) {
303                 try {
304                     RpcResult<Void> partialResult = commitHandler.finish();
305                     successful = partialResult.isSuccessful() & successful;
306                     errors.addAll(partialResult.getErrors());
307                 } catch (Exception e) {
308                     log.error(
309                             "Uncaught exception prevented finishing of commit."
310                                     + e.getMessage(), e);
311                     successful = false;
312                     // FIXME: Add RPC Error with exception.
313                 }
314                 if (successful == false)
315                     break;
316             }
317
318             return Rpcs.getRpcResult(successful, null, errors);
319         }
320
321         @Override
322         public RpcResult<Void> rollback() {
323             List<RpcError> errors = new ArrayList<RpcError>();
324             boolean successful = true;
325
326             for (CommitTransaction commitHandler : transactions) {
327                 try {
328                     RpcResult<Void> partialResult = commitHandler.rollback();
329                     successful = partialResult.isSuccessful() & successful;
330                     errors.addAll(partialResult.getErrors());
331                 } catch (Exception e) {
332                     log.error(
333                             "Uncaught exception prevented rollback of commit."
334                                     + e.getMessage(), e);
335                     successful = false;
336                     // FIXME: Add RPC Error with exception.
337                 }
338                 if (successful == false)
339                     break;
340             }
341
342             return Rpcs.getRpcResult(successful, null, errors);
343         }
344
345         @Override
346         public DataStoreIdentifier getDataStore() {
347             return this.store;
348         }
349
350         @Override
351         public DataCommitHandler getHandler() {
352             return coordinator;
353         }
354     }
355
356     private class ValidationCoordinator implements DataValidator {
357
358         private final DataStoreIdentifier store;
359
360         ValidationCoordinator(DataStoreIdentifier store) {
361             this.store = store;
362         }
363
364         @Override
365         public RpcResult<Void> validate(CompositeNode toValidate) {
366             List<RpcError> errors = new ArrayList<RpcError>();
367             boolean successful = true;
368
369             for (DataValidator validator : context(store).validators) {
370                 try {
371                     RpcResult<Void> partialResult = validator
372                             .validate(toValidate);
373                     successful = partialResult.isSuccessful() & successful;
374                     errors.addAll(partialResult.getErrors());
375                 } catch (Exception e) {
376                     log.error(
377                             "Uncaught exception prevented validation."
378                                     + e.getMessage(), e);
379                     successful = false;
380                     // FIXME: Add RPC Error with exception.
381                 }
382                 if (successful == false)
383                     break;
384             }
385
386             return Rpcs.getRpcResult(successful, null, errors);
387         }
388
389         @Override
390         public Set<DataStoreIdentifier> getSupportedDataStores() {
391             return Collections.emptySet();
392         }
393
394     }
395
396     private class DataRefreshCoordinator implements DataRefresher {
397
398         private final DataStoreIdentifier store;
399
400         DataRefreshCoordinator(DataStoreIdentifier store) {
401             this.store = store;
402         }
403
404         @Override
405         public void refreshData() {
406
407             for (DataRefresher refresher : context(store).refreshers) {
408                 try {
409                     refresher.refreshData();
410                 } catch (Exception e) {
411                     log.error(
412                             "Uncaught exception during refresh of data: "
413                                     + e.getMessage(), e);
414                 }
415
416             }
417         }
418     }
419 }