2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.dom.broker;
10 import java.util.ArrayList;
11 import java.util.Collections;
12 import java.util.HashSet;
13 import java.util.List;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Future;
19 import org.opendaylight.controller.sal.common.DataStoreIdentifier;
20 import org.opendaylight.controller.sal.common.util.Rpcs;
21 import org.opendaylight.controller.sal.core.api.BrokerService;
22 import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;
23 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
24 import org.opendaylight.controller.sal.core.api.Consumer.ConsumerFunctionality;
25 import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality;
26 import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
27 import org.opendaylight.controller.sal.core.api.data.DataCommitHandler;
28 import org.opendaylight.controller.sal.core.api.data.DataProviderService;
29 import org.opendaylight.controller.sal.core.api.data.DataValidator;
30 import org.opendaylight.controller.sal.core.api.data.DataCommitHandler.CommitTransaction;
31 import org.opendaylight.controller.sal.core.api.data.DataProviderService.DataRefresher;
32 import org.opendaylight.controller.sal.core.spi.BrokerModule;
33 import org.opendaylight.yangtools.yang.common.RpcError;
34 import org.opendaylight.yangtools.yang.common.RpcResult;
35 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
36 import org.opendaylight.yangtools.yang.data.api.MutableCompositeNode;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
40 import com.google.common.collect.ImmutableSet;
42 public class DataBrokerModule implements BrokerModule {
44 private static final Logger log = LoggerFactory
45 .getLogger(DataBrokerModule.class);
47 private static final Set<Class<? extends ProviderFunctionality>> SUPPORTED_PROVIDER_FUNCTIONALITY = ImmutableSet
48 .of((Class<? extends ProviderFunctionality>) DataValidator.class,
49 DataRefresher.class, DataCommitHandler.class);
51 private static final Set<Class<? extends BrokerService>> PROVIDED_SESSION_SERVICES = ImmutableSet
52 .of((Class<? extends BrokerService>) DataBrokerService.class,
53 DataProviderService.class);
55 private Map<DataStoreIdentifier, StoreContext> storeContext;
57 private ExecutorService executor;
59 private SequentialCommitHandlerCoordinator coordinator = new SequentialCommitHandlerCoordinator();
62 public Set<Class<? extends BrokerService>> getProvidedServices() {
63 return PROVIDED_SESSION_SERVICES;
67 public Set<Class<? extends ProviderFunctionality>> getSupportedProviderFunctionality() {
68 return SUPPORTED_PROVIDER_FUNCTIONALITY;
72 public Set<Class<? extends ConsumerFunctionality>> getSupportedConsumerFunctionality() {
73 return Collections.emptySet();
77 public <T extends BrokerService> T getServiceForSession(Class<T> service,
78 ConsumerSession session) {
79 if (DataProviderService.class.equals(service)
80 && session instanceof ProviderSession) {
81 @SuppressWarnings("unchecked")
82 T ret = (T) newDataProviderService(session);
84 } else if (DataBrokerService.class.equals(service)) {
86 @SuppressWarnings("unchecked")
87 T ret = (T) newDataConsumerService(session);
91 throw new IllegalArgumentException(
92 "The requested session-specific service is not provided by this module.");
95 private DataProviderService newDataProviderService(ConsumerSession session) {
96 return new DataProviderSession();
99 private DataBrokerService newDataConsumerService(ConsumerSession session) {
100 return new DataConsumerSession();
103 private StoreContext context(DataStoreIdentifier store) {
104 return storeContext.get(store);
107 private static class StoreContext {
108 private Set<DataCommitHandler> commitHandlers = Collections
109 .synchronizedSet(new HashSet<DataCommitHandler>());
110 private Set<DataValidator> validators = Collections
111 .synchronizedSet(new HashSet<DataValidator>());
112 private Set<DataRefresher> refreshers = Collections
113 .synchronizedSet(new HashSet<DataRefresher>());
116 private class DataConsumerSession implements DataBrokerService {
119 public CompositeNode getData(DataStoreIdentifier store) {
120 // TODO Implement this method
121 throw new UnsupportedOperationException("Not implemented");
125 public CompositeNode getData(DataStoreIdentifier store,
126 CompositeNode filter) {
127 // TODO Implement this method
128 throw new UnsupportedOperationException("Not implemented");
132 public CompositeNode getCandidateData(DataStoreIdentifier store) {
133 // TODO Implement this method
134 throw new UnsupportedOperationException("Not implemented");
138 public CompositeNode getCandidateData(DataStoreIdentifier store,
139 CompositeNode filter) {
140 // TODO Implement this method
141 throw new UnsupportedOperationException("Not implemented");
145 public RpcResult<CompositeNode> editCandidateData(
146 DataStoreIdentifier store, MutableCompositeNode changeSet) {
147 // TODO Implement this method
148 throw new UnsupportedOperationException("Not implemented");
152 public Future<RpcResult<Void>> commit(DataStoreIdentifier store) {
153 // TODO Implement this method
154 throw new UnsupportedOperationException("Not implemented");
158 public void closeSession() {
159 // TODO Implement this method
160 throw new UnsupportedOperationException("Not implemented");
164 public Set<DataStoreIdentifier> getDataStores() {
165 // TODO Auto-generated method stub
171 private class DataProviderSession extends DataConsumerSession implements
172 DataProviderService {
174 private Set<DataCommitHandler> providerCommitHandlers = new HashSet<DataCommitHandler>();
175 private Set<DataValidator> providerValidators = new HashSet<DataValidator>();
176 private Set<DataRefresher> providerRefreshers = new HashSet<DataRefresher>();
179 public void addValidator(DataStoreIdentifier store,
180 DataValidator validator) {
181 if (validator == null)
182 throw new IllegalArgumentException(
183 "Validator should not be null");
185 providerValidators.add(validator);
186 context(store).validators.add(validator);
190 public void removeValidator(DataStoreIdentifier store,
191 DataValidator validator) {
192 if (validator == null)
193 throw new IllegalArgumentException(
194 "Validator should not be null");
196 providerValidators.remove(validator);
197 context(store).validators.remove(validator);
201 public void addCommitHandler(DataStoreIdentifier store,
202 DataCommitHandler provider) {
203 if (provider == null)
204 throw new IllegalArgumentException(
205 "CommitHandler should not be null");
207 providerCommitHandlers.add(provider);
208 context(store).commitHandlers.add(provider);
212 public void removeCommitHandler(DataStoreIdentifier store,
213 DataCommitHandler provider) {
214 if (provider == null)
215 throw new IllegalArgumentException(
216 "CommitHandler should not be null");
218 providerCommitHandlers.remove(provider);
219 context(store).commitHandlers.remove(provider);
223 public void addRefresher(DataStoreIdentifier store,
224 DataRefresher provider) {
225 if (provider == null)
226 throw new IllegalArgumentException(
227 "Refresher should not be null");
229 providerRefreshers.add(provider);
230 context(store).refreshers.add(provider);
234 public void removeRefresher(DataStoreIdentifier store,
235 DataRefresher provider) {
236 if (provider == null)
237 throw new IllegalArgumentException(
238 "Refresher should not be null");
240 providerRefreshers.remove(provider);
241 context(store).refreshers.remove(provider);
246 private class SequentialCommitHandlerCoordinator implements
250 public RpcResult<CommitTransaction> requestCommit(
251 DataStoreIdentifier store) {
252 List<RpcError> errors = new ArrayList<RpcError>();
253 Set<CommitTransaction> transactions = new HashSet<DataCommitHandler.CommitTransaction>();
254 boolean successful = true;
256 for (DataCommitHandler commitHandler : context(store).commitHandlers) {
258 RpcResult<CommitTransaction> partialResult = commitHandler
259 .requestCommit(store);
260 successful = partialResult.isSuccessful() & successful;
261 if (partialResult.isSuccessful()) {
262 transactions.add(partialResult.getResult());
265 errors.addAll(partialResult.getErrors());
266 } catch (Exception e) {
267 log.error("Uncaught exception prevented commit request."
268 + e.getMessage(), e);
270 // FIXME: Add RPC Error with exception.
272 if (successful == false)
275 CommitTransaction transaction = new SequentialCommitTransaction(
276 store, transactions);
277 return Rpcs.getRpcResult(successful, transaction, errors);
281 public Set<DataStoreIdentifier> getSupportedDataStores() {
282 return Collections.emptySet();
286 private class SequentialCommitTransaction implements CommitTransaction {
288 final Set<CommitTransaction> transactions;
289 final DataStoreIdentifier store;
291 public SequentialCommitTransaction(DataStoreIdentifier s,
292 Set<CommitTransaction> t) {
298 public RpcResult<Void> finish() {
299 List<RpcError> errors = new ArrayList<RpcError>();
300 boolean successful = true;
302 for (CommitTransaction commitHandler : transactions) {
304 RpcResult<Void> partialResult = commitHandler.finish();
305 successful = partialResult.isSuccessful() & successful;
306 errors.addAll(partialResult.getErrors());
307 } catch (Exception e) {
309 "Uncaught exception prevented finishing of commit."
310 + e.getMessage(), e);
312 // FIXME: Add RPC Error with exception.
314 if (successful == false)
318 return Rpcs.getRpcResult(successful, null, errors);
322 public RpcResult<Void> rollback() {
323 List<RpcError> errors = new ArrayList<RpcError>();
324 boolean successful = true;
326 for (CommitTransaction commitHandler : transactions) {
328 RpcResult<Void> partialResult = commitHandler.rollback();
329 successful = partialResult.isSuccessful() & successful;
330 errors.addAll(partialResult.getErrors());
331 } catch (Exception e) {
333 "Uncaught exception prevented rollback of commit."
334 + e.getMessage(), e);
336 // FIXME: Add RPC Error with exception.
338 if (successful == false)
342 return Rpcs.getRpcResult(successful, null, errors);
346 public DataStoreIdentifier getDataStore() {
351 public DataCommitHandler getHandler() {
356 private class ValidationCoordinator implements DataValidator {
358 private final DataStoreIdentifier store;
360 ValidationCoordinator(DataStoreIdentifier store) {
365 public RpcResult<Void> validate(CompositeNode toValidate) {
366 List<RpcError> errors = new ArrayList<RpcError>();
367 boolean successful = true;
369 for (DataValidator validator : context(store).validators) {
371 RpcResult<Void> partialResult = validator
372 .validate(toValidate);
373 successful = partialResult.isSuccessful() & successful;
374 errors.addAll(partialResult.getErrors());
375 } catch (Exception e) {
377 "Uncaught exception prevented validation."
378 + e.getMessage(), e);
380 // FIXME: Add RPC Error with exception.
382 if (successful == false)
386 return Rpcs.getRpcResult(successful, null, errors);
390 public Set<DataStoreIdentifier> getSupportedDataStores() {
391 return Collections.emptySet();
396 private class DataRefreshCoordinator implements DataRefresher {
398 private final DataStoreIdentifier store;
400 DataRefreshCoordinator(DataStoreIdentifier store) {
405 public void refreshData() {
407 for (DataRefresher refresher : context(store).refreshers) {
409 refresher.refreshData();
410 } catch (Exception e) {
412 "Uncaught exception during refresh of data: "
413 + e.getMessage(), e);