2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
\r
4 * This program and the accompanying materials are made available under the
\r
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
\r
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
\r
8 package org.opendaylight.controller.sal.core.impl;
\r
10 import java.util.ArrayList;
\r
11 import java.util.Collections;
\r
12 import java.util.HashSet;
\r
13 import java.util.List;
\r
14 import java.util.Map;
\r
15 import java.util.Set;
\r
16 import java.util.concurrent.ExecutorService;
\r
17 import java.util.concurrent.Future;
\r
19 import org.opendaylight.controller.sal.common.DataStoreIdentifier;
\r
20 import org.opendaylight.controller.sal.common.util.Rpcs;
\r
21 import org.opendaylight.controller.sal.core.api.BrokerService;
\r
22 import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;
\r
23 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
\r
24 import org.opendaylight.controller.sal.core.api.Consumer.ConsumerFunctionality;
\r
25 import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality;
\r
26 import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
\r
27 import org.opendaylight.controller.sal.core.api.data.DataCommitHandler;
\r
28 import org.opendaylight.controller.sal.core.api.data.DataProviderService;
\r
29 import org.opendaylight.controller.sal.core.api.data.DataValidator;
\r
30 import org.opendaylight.controller.sal.core.api.data.DataCommitHandler.CommitTransaction;
\r
31 import org.opendaylight.controller.sal.core.api.data.DataProviderService.DataRefresher;
\r
32 import org.opendaylight.controller.sal.core.spi.BrokerModule;
\r
33 import org.opendaylight.controller.yang.common.RpcError;
\r
34 import org.opendaylight.controller.yang.common.RpcResult;
\r
35 import org.opendaylight.controller.yang.data.api.CompositeNode;
\r
36 import org.opendaylight.controller.yang.data.api.CompositeNodeModification;
\r
37 import org.slf4j.Logger;
\r
38 import org.slf4j.LoggerFactory;
\r
40 import com.google.common.collect.ImmutableSet;
\r
42 public class DataBrokerModule implements BrokerModule {
\r
44 private static final Logger log = LoggerFactory
\r
45 .getLogger(DataBrokerModule.class);
\r
47 private static final Set<Class<? extends ProviderFunctionality>> SUPPORTED_PROVIDER_FUNCTIONALITY = ImmutableSet
\r
48 .of((Class<? extends ProviderFunctionality>) DataValidator.class,
\r
49 DataRefresher.class, DataCommitHandler.class);
\r
51 private static final Set<Class<? extends BrokerService>> PROVIDED_SESSION_SERVICES = ImmutableSet
\r
52 .of((Class<? extends BrokerService>) DataBrokerService.class,
\r
53 DataProviderService.class);
\r
55 private Map<DataStoreIdentifier, StoreContext> storeContext;
\r
57 private ExecutorService executor;
\r
59 private SequentialCommitHandlerCoordinator coordinator = new SequentialCommitHandlerCoordinator();
\r
62 public Set<Class<? extends BrokerService>> getProvidedServices() {
\r
63 return PROVIDED_SESSION_SERVICES;
\r
67 public Set<Class<? extends ProviderFunctionality>> getSupportedProviderFunctionality() {
\r
68 return SUPPORTED_PROVIDER_FUNCTIONALITY;
\r
72 public Set<Class<? extends ConsumerFunctionality>> getSupportedConsumerFunctionality() {
\r
73 return Collections.emptySet();
\r
77 public <T extends BrokerService> T getServiceForSession(Class<T> service,
\r
78 ConsumerSession session) {
\r
79 if (DataProviderService.class.equals(service)
\r
80 && session instanceof ProviderSession) {
\r
81 @SuppressWarnings("unchecked")
\r
82 T ret = (T) newDataProviderService(session);
\r
84 } else if (DataBrokerService.class.equals(service)) {
\r
86 @SuppressWarnings("unchecked")
\r
87 T ret = (T) newDataConsumerService(session);
\r
91 throw new IllegalArgumentException(
\r
92 "The requested session-specific service is not provided by this module.");
\r
95 private DataProviderService newDataProviderService(ConsumerSession session) {
\r
96 return new DataProviderSession();
\r
99 private DataBrokerService newDataConsumerService(ConsumerSession session) {
\r
100 return new DataConsumerSession();
\r
103 private StoreContext context(DataStoreIdentifier store) {
\r
104 return storeContext.get(store);
\r
107 private static class StoreContext {
\r
108 private Set<DataCommitHandler> commitHandlers = Collections
\r
109 .synchronizedSet(new HashSet<DataCommitHandler>());
\r
110 private Set<DataValidator> validators = Collections
\r
111 .synchronizedSet(new HashSet<DataValidator>());
\r
112 private Set<DataRefresher> refreshers = Collections
\r
113 .synchronizedSet(new HashSet<DataRefresher>());
\r
116 private class DataConsumerSession implements DataBrokerService {
\r
119 public CompositeNode getData(DataStoreIdentifier store) {
\r
120 // TODO Implement this method
\r
121 throw new UnsupportedOperationException("Not implemented");
\r
125 public CompositeNode getData(DataStoreIdentifier store,
\r
126 CompositeNode filter) {
\r
127 // TODO Implement this method
\r
128 throw new UnsupportedOperationException("Not implemented");
\r
132 public CompositeNode getCandidateData(DataStoreIdentifier store) {
\r
133 // TODO Implement this method
\r
134 throw new UnsupportedOperationException("Not implemented");
\r
138 public CompositeNode getCandidateData(DataStoreIdentifier store,
\r
139 CompositeNode filter) {
\r
140 // TODO Implement this method
\r
141 throw new UnsupportedOperationException("Not implemented");
\r
145 public RpcResult<CompositeNode> editCandidateData(
\r
146 DataStoreIdentifier store, CompositeNodeModification changeSet) {
\r
147 // TODO Implement this method
\r
148 throw new UnsupportedOperationException("Not implemented");
\r
152 public Future<RpcResult<Void>> commit(DataStoreIdentifier store) {
\r
153 // TODO Implement this method
\r
154 throw new UnsupportedOperationException("Not implemented");
\r
158 public void closeSession() {
\r
159 // TODO Implement this method
\r
160 throw new UnsupportedOperationException("Not implemented");
\r
164 public Set<DataStoreIdentifier> getDataStores() {
\r
165 // TODO Auto-generated method stub
\r
171 private class DataProviderSession extends DataConsumerSession implements
\r
172 DataProviderService {
\r
174 private Set<DataCommitHandler> providerCommitHandlers = new HashSet<DataCommitHandler>();
\r
175 private Set<DataValidator> providerValidators = new HashSet<DataValidator>();
\r
176 private Set<DataRefresher> providerRefreshers = new HashSet<DataRefresher>();
\r
179 public void addValidator(DataStoreIdentifier store,
\r
180 DataValidator validator) {
\r
181 if (validator == null)
\r
182 throw new IllegalArgumentException(
\r
183 "Validator should not be null");
\r
185 providerValidators.add(validator);
\r
186 context(store).validators.add(validator);
\r
190 public void removeValidator(DataStoreIdentifier store,
\r
191 DataValidator validator) {
\r
192 if (validator == null)
\r
193 throw new IllegalArgumentException(
\r
194 "Validator should not be null");
\r
196 providerValidators.remove(validator);
\r
197 context(store).validators.remove(validator);
\r
201 public void addCommitHandler(DataStoreIdentifier store,
\r
202 DataCommitHandler provider) {
\r
203 if (provider == null)
\r
204 throw new IllegalArgumentException(
\r
205 "CommitHandler should not be null");
\r
207 providerCommitHandlers.add(provider);
\r
208 context(store).commitHandlers.add(provider);
\r
212 public void removeCommitHandler(DataStoreIdentifier store,
\r
213 DataCommitHandler provider) {
\r
214 if (provider == null)
\r
215 throw new IllegalArgumentException(
\r
216 "CommitHandler should not be null");
\r
218 providerCommitHandlers.remove(provider);
\r
219 context(store).commitHandlers.remove(provider);
\r
223 public void addRefresher(DataStoreIdentifier store,
\r
224 DataRefresher provider) {
\r
225 if (provider == null)
\r
226 throw new IllegalArgumentException(
\r
227 "Refresher should not be null");
\r
229 providerRefreshers.add(provider);
\r
230 context(store).refreshers.add(provider);
\r
234 public void removeRefresher(DataStoreIdentifier store,
\r
235 DataRefresher provider) {
\r
236 if (provider == null)
\r
237 throw new IllegalArgumentException(
\r
238 "Refresher should not be null");
\r
240 providerRefreshers.remove(provider);
\r
241 context(store).refreshers.remove(provider);
\r
246 private class SequentialCommitHandlerCoordinator implements
\r
247 DataCommitHandler {
\r
250 public RpcResult<CommitTransaction> requestCommit(
\r
251 DataStoreIdentifier store) {
\r
252 List<RpcError> errors = new ArrayList<RpcError>();
\r
253 Set<CommitTransaction> transactions = new HashSet<DataCommitHandler.CommitTransaction>();
\r
254 boolean successful = true;
\r
256 for (DataCommitHandler commitHandler : context(store).commitHandlers) {
\r
258 RpcResult<CommitTransaction> partialResult = commitHandler
\r
259 .requestCommit(store);
\r
260 successful = partialResult.isSuccessful() & successful;
\r
261 if (partialResult.isSuccessful()) {
\r
262 transactions.add(partialResult.getResult());
\r
265 errors.addAll(partialResult.getErrors());
\r
266 } catch (Exception e) {
\r
267 log.error("Uncaught exception prevented commit request."
\r
268 + e.getMessage(), e);
\r
269 successful = false;
\r
270 // FIXME: Add RPC Error with exception.
\r
272 if (successful == false)
\r
275 CommitTransaction transaction = new SequentialCommitTransaction(
\r
276 store, transactions);
\r
277 return Rpcs.getRpcResult(successful, transaction, errors);
\r
281 public Set<DataStoreIdentifier> getSupportedDataStores() {
\r
282 return Collections.emptySet();
\r
286 private class SequentialCommitTransaction implements CommitTransaction {
\r
288 final Set<CommitTransaction> transactions;
\r
289 final DataStoreIdentifier store;
\r
291 public SequentialCommitTransaction(DataStoreIdentifier s,
\r
292 Set<CommitTransaction> t) {
\r
298 public RpcResult<Void> finish() {
\r
299 List<RpcError> errors = new ArrayList<RpcError>();
\r
300 boolean successful = true;
\r
302 for (CommitTransaction commitHandler : transactions) {
\r
304 RpcResult<Void> partialResult = commitHandler.finish();
\r
305 successful = partialResult.isSuccessful() & successful;
\r
306 errors.addAll(partialResult.getErrors());
\r
307 } catch (Exception e) {
\r
309 "Uncaught exception prevented finishing of commit."
\r
310 + e.getMessage(), e);
\r
311 successful = false;
\r
312 // FIXME: Add RPC Error with exception.
\r
314 if (successful == false)
\r
318 return Rpcs.getRpcResult(successful, null, errors);
\r
322 public RpcResult<Void> rollback() {
\r
323 List<RpcError> errors = new ArrayList<RpcError>();
\r
324 boolean successful = true;
\r
326 for (CommitTransaction commitHandler : transactions) {
\r
328 RpcResult<Void> partialResult = commitHandler.rollback();
\r
329 successful = partialResult.isSuccessful() & successful;
\r
330 errors.addAll(partialResult.getErrors());
\r
331 } catch (Exception e) {
\r
333 "Uncaught exception prevented rollback of commit."
\r
334 + e.getMessage(), e);
\r
335 successful = false;
\r
336 // FIXME: Add RPC Error with exception.
\r
338 if (successful == false)
\r
342 return Rpcs.getRpcResult(successful, null, errors);
\r
346 public DataStoreIdentifier getDataStore() {
\r
351 public DataCommitHandler getHandler() {
\r
352 return coordinator;
\r
356 private class ValidationCoordinator implements DataValidator {
\r
358 private final DataStoreIdentifier store;
\r
360 ValidationCoordinator(DataStoreIdentifier store) {
\r
361 this.store = store;
\r
365 public RpcResult<Void> validate(CompositeNode toValidate) {
\r
366 List<RpcError> errors = new ArrayList<RpcError>();
\r
367 boolean successful = true;
\r
369 for (DataValidator validator : context(store).validators) {
\r
371 RpcResult<Void> partialResult = validator
\r
372 .validate(toValidate);
\r
373 successful = partialResult.isSuccessful() & successful;
\r
374 errors.addAll(partialResult.getErrors());
\r
375 } catch (Exception e) {
\r
377 "Uncaught exception prevented validation."
\r
378 + e.getMessage(), e);
\r
379 successful = false;
\r
380 // FIXME: Add RPC Error with exception.
\r
382 if (successful == false)
\r
386 return Rpcs.getRpcResult(successful, null, errors);
\r
390 public Set<DataStoreIdentifier> getSupportedDataStores() {
\r
391 return Collections.emptySet();
\r
396 private class DataRefreshCoordinator implements DataRefresher {
\r
398 private final DataStoreIdentifier store;
\r
400 DataRefreshCoordinator(DataStoreIdentifier store) {
\r
401 this.store = store;
\r
405 public void refreshData() {
\r
407 for (DataRefresher refresher : context(store).refreshers) {
\r
409 refresher.refreshData();
\r
410 } catch (Exception e) {
\r
412 "Uncaught exception during refresh of data: "
\r
413 + e.getMessage(), e);
\r