-/*\r
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.\r
- *\r
- * This program and the accompanying materials are made available under the\r
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
- * and is available at http://www.eclipse.org/legal/epl-v10.html\r
- */\r
-package org.opendaylight.controller.sal.core.impl;\r
-\r
-import java.util.ArrayList;\r
-import java.util.Collections;\r
-import java.util.HashSet;\r
-import java.util.List;\r
-import java.util.Map;\r
-import java.util.Set;\r
-import java.util.concurrent.ExecutorService;\r
-import java.util.concurrent.Future;\r
-\r
-import org.opendaylight.controller.sal.common.DataStoreIdentifier;\r
-import org.opendaylight.controller.sal.common.util.Rpcs;\r
-import org.opendaylight.controller.sal.core.api.BrokerService;\r
-import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;\r
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;\r
-import org.opendaylight.controller.sal.core.api.Consumer.ConsumerFunctionality;\r
-import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality;\r
-import org.opendaylight.controller.sal.core.api.data.DataBrokerService;\r
-import org.opendaylight.controller.sal.core.api.data.DataCommitHandler;\r
-import org.opendaylight.controller.sal.core.api.data.DataProviderService;\r
-import org.opendaylight.controller.sal.core.api.data.DataValidator;\r
-import org.opendaylight.controller.sal.core.api.data.DataCommitHandler.CommitTransaction;\r
-import org.opendaylight.controller.sal.core.api.data.DataProviderService.DataRefresher;\r
-import org.opendaylight.controller.sal.core.spi.BrokerModule;\r
-import org.opendaylight.controller.yang.common.RpcError;\r
-import org.opendaylight.controller.yang.common.RpcResult;\r
-import org.opendaylight.controller.yang.data.api.CompositeNode;\r
-import org.opendaylight.controller.yang.data.api.CompositeNodeModification;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-import com.google.common.collect.ImmutableSet;\r
-\r
-public class DataBrokerModule implements BrokerModule {\r
-\r
- private static final Logger log = LoggerFactory\r
- .getLogger(DataBrokerModule.class);\r
-\r
- private static final Set<Class<? extends ProviderFunctionality>> SUPPORTED_PROVIDER_FUNCTIONALITY = ImmutableSet\r
- .of((Class<? extends ProviderFunctionality>) DataValidator.class,\r
- DataRefresher.class, DataCommitHandler.class);\r
-\r
- private static final Set<Class<? extends BrokerService>> PROVIDED_SESSION_SERVICES = ImmutableSet\r
- .of((Class<? extends BrokerService>) DataBrokerService.class,\r
- DataProviderService.class);\r
-\r
- private Map<DataStoreIdentifier, StoreContext> storeContext;\r
-\r
- private ExecutorService executor;\r
- \r
- private SequentialCommitHandlerCoordinator coordinator = new SequentialCommitHandlerCoordinator();\r
-\r
- @Override\r
- public Set<Class<? extends BrokerService>> getProvidedServices() {\r
- return PROVIDED_SESSION_SERVICES;\r
- }\r
-\r
- @Override\r
- public Set<Class<? extends ProviderFunctionality>> getSupportedProviderFunctionality() {\r
- return SUPPORTED_PROVIDER_FUNCTIONALITY;\r
- }\r
-\r
- @Override\r
- public Set<Class<? extends ConsumerFunctionality>> getSupportedConsumerFunctionality() {\r
- return Collections.emptySet();\r
- }\r
-\r
- @Override\r
- public <T extends BrokerService> T getServiceForSession(Class<T> service,\r
- ConsumerSession session) {\r
- if (DataProviderService.class.equals(service)\r
- && session instanceof ProviderSession) {\r
- @SuppressWarnings("unchecked")\r
- T ret = (T) newDataProviderService(session);\r
- return ret;\r
- } else if (DataBrokerService.class.equals(service)) {\r
-\r
- @SuppressWarnings("unchecked")\r
- T ret = (T) newDataConsumerService(session);\r
- return ret;\r
- }\r
-\r
- throw new IllegalArgumentException(\r
- "The requested session-specific service is not provided by this module.");\r
- }\r
-\r
- private DataProviderService newDataProviderService(ConsumerSession session) {\r
- return new DataProviderSession();\r
- }\r
-\r
- private DataBrokerService newDataConsumerService(ConsumerSession session) {\r
- return new DataConsumerSession();\r
- }\r
-\r
- private StoreContext context(DataStoreIdentifier store) {\r
- return storeContext.get(store);\r
- }\r
-\r
- private static class StoreContext {\r
- private Set<DataCommitHandler> commitHandlers = Collections\r
- .synchronizedSet(new HashSet<DataCommitHandler>());\r
- private Set<DataValidator> validators = Collections\r
- .synchronizedSet(new HashSet<DataValidator>());\r
- private Set<DataRefresher> refreshers = Collections\r
- .synchronizedSet(new HashSet<DataRefresher>());\r
- }\r
-\r
- private class DataConsumerSession implements DataBrokerService {\r
-\r
- @Override\r
- public CompositeNode getData(DataStoreIdentifier store) {\r
- // TODO Implement this method\r
- throw new UnsupportedOperationException("Not implemented");\r
- }\r
-\r
- @Override\r
- public CompositeNode getData(DataStoreIdentifier store,\r
- CompositeNode filter) {\r
- // TODO Implement this method\r
- throw new UnsupportedOperationException("Not implemented");\r
- }\r
-\r
- @Override\r
- public CompositeNode getCandidateData(DataStoreIdentifier store) {\r
- // TODO Implement this method\r
- throw new UnsupportedOperationException("Not implemented");\r
- }\r
-\r
- @Override\r
- public CompositeNode getCandidateData(DataStoreIdentifier store,\r
- CompositeNode filter) {\r
- // TODO Implement this method\r
- throw new UnsupportedOperationException("Not implemented");\r
- }\r
-\r
- @Override\r
- public RpcResult<CompositeNode> editCandidateData(\r
- DataStoreIdentifier store, CompositeNodeModification changeSet) {\r
- // TODO Implement this method\r
- throw new UnsupportedOperationException("Not implemented");\r
- }\r
-\r
- @Override\r
- public Future<RpcResult<Void>> commit(DataStoreIdentifier store) {\r
- // TODO Implement this method\r
- throw new UnsupportedOperationException("Not implemented");\r
- }\r
-\r
- @Override\r
- public void closeSession() {\r
- // TODO Implement this method\r
- throw new UnsupportedOperationException("Not implemented");\r
- }\r
-\r
- @Override\r
- public Set<DataStoreIdentifier> getDataStores() {\r
- // TODO Auto-generated method stub\r
- return null;\r
- }\r
-\r
- }\r
-\r
- private class DataProviderSession extends DataConsumerSession implements\r
- DataProviderService {\r
-\r
- private Set<DataCommitHandler> providerCommitHandlers = new HashSet<DataCommitHandler>();\r
- private Set<DataValidator> providerValidators = new HashSet<DataValidator>();\r
- private Set<DataRefresher> providerRefreshers = new HashSet<DataRefresher>();\r
-\r
- @Override\r
- public void addValidator(DataStoreIdentifier store,\r
- DataValidator validator) {\r
- if (validator == null)\r
- throw new IllegalArgumentException(\r
- "Validator should not be null");\r
-\r
- providerValidators.add(validator);\r
- context(store).validators.add(validator);\r
- }\r
-\r
- @Override\r
- public void removeValidator(DataStoreIdentifier store,\r
- DataValidator validator) {\r
- if (validator == null)\r
- throw new IllegalArgumentException(\r
- "Validator should not be null");\r
-\r
- providerValidators.remove(validator);\r
- context(store).validators.remove(validator);\r
- }\r
-\r
- @Override\r
- public void addCommitHandler(DataStoreIdentifier store,\r
- DataCommitHandler provider) {\r
- if (provider == null)\r
- throw new IllegalArgumentException(\r
- "CommitHandler should not be null");\r
-\r
- providerCommitHandlers.add(provider);\r
- context(store).commitHandlers.add(provider);\r
- }\r
-\r
- @Override\r
- public void removeCommitHandler(DataStoreIdentifier store,\r
- DataCommitHandler provider) {\r
- if (provider == null)\r
- throw new IllegalArgumentException(\r
- "CommitHandler should not be null");\r
-\r
- providerCommitHandlers.remove(provider);\r
- context(store).commitHandlers.remove(provider);\r
- }\r
-\r
- @Override\r
- public void addRefresher(DataStoreIdentifier store,\r
- DataRefresher provider) {\r
- if (provider == null)\r
- throw new IllegalArgumentException(\r
- "Refresher should not be null");\r
-\r
- providerRefreshers.add(provider);\r
- context(store).refreshers.add(provider);\r
- }\r
-\r
- @Override\r
- public void removeRefresher(DataStoreIdentifier store,\r
- DataRefresher provider) {\r
- if (provider == null)\r
- throw new IllegalArgumentException(\r
- "Refresher should not be null");\r
-\r
- providerRefreshers.remove(provider);\r
- context(store).refreshers.remove(provider);\r
- }\r
-\r
- }\r
-\r
- private class SequentialCommitHandlerCoordinator implements\r
- DataCommitHandler {\r
-\r
- @Override\r
- public RpcResult<CommitTransaction> requestCommit(\r
- DataStoreIdentifier store) {\r
- List<RpcError> errors = new ArrayList<RpcError>();\r
- Set<CommitTransaction> transactions = new HashSet<DataCommitHandler.CommitTransaction>();\r
- boolean successful = true;\r
-\r
- for (DataCommitHandler commitHandler : context(store).commitHandlers) {\r
- try {\r
- RpcResult<CommitTransaction> partialResult = commitHandler\r
- .requestCommit(store);\r
- successful = partialResult.isSuccessful() & successful;\r
- if (partialResult.isSuccessful()) {\r
- transactions.add(partialResult.getResult());\r
- }\r
-\r
- errors.addAll(partialResult.getErrors());\r
- } catch (Exception e) {\r
- log.error("Uncaught exception prevented commit request."\r
- + e.getMessage(), e);\r
- successful = false;\r
- // FIXME: Add RPC Error with exception.\r
- }\r
- if (successful == false)\r
- break;\r
- }\r
- CommitTransaction transaction = new SequentialCommitTransaction(\r
- store, transactions);\r
- return Rpcs.getRpcResult(successful, transaction, errors);\r
- }\r
-\r
- @Override\r
- public Set<DataStoreIdentifier> getSupportedDataStores() {\r
- return Collections.emptySet();\r
- }\r
- }\r
-\r
- private class SequentialCommitTransaction implements CommitTransaction {\r
-\r
- final Set<CommitTransaction> transactions;\r
- final DataStoreIdentifier store;\r
-\r
- public SequentialCommitTransaction(DataStoreIdentifier s,\r
- Set<CommitTransaction> t) {\r
- transactions = t;\r
- store = s;\r
- }\r
-\r
- @Override\r
- public RpcResult<Void> finish() {\r
- List<RpcError> errors = new ArrayList<RpcError>();\r
- boolean successful = true;\r
-\r
- for (CommitTransaction commitHandler : transactions) {\r
- try {\r
- RpcResult<Void> partialResult = commitHandler.finish();\r
- successful = partialResult.isSuccessful() & successful;\r
- errors.addAll(partialResult.getErrors());\r
- } catch (Exception e) {\r
- log.error(\r
- "Uncaught exception prevented finishing of commit."\r
- + e.getMessage(), e);\r
- successful = false;\r
- // FIXME: Add RPC Error with exception.\r
- }\r
- if (successful == false)\r
- break;\r
- }\r
-\r
- return Rpcs.getRpcResult(successful, null, errors);\r
- }\r
-\r
- @Override\r
- public RpcResult<Void> rollback() {\r
- List<RpcError> errors = new ArrayList<RpcError>();\r
- boolean successful = true;\r
-\r
- for (CommitTransaction commitHandler : transactions) {\r
- try {\r
- RpcResult<Void> partialResult = commitHandler.rollback();\r
- successful = partialResult.isSuccessful() & successful;\r
- errors.addAll(partialResult.getErrors());\r
- } catch (Exception e) {\r
- log.error(\r
- "Uncaught exception prevented rollback of commit."\r
- + e.getMessage(), e);\r
- successful = false;\r
- // FIXME: Add RPC Error with exception.\r
- }\r
- if (successful == false)\r
- break;\r
- }\r
-\r
- return Rpcs.getRpcResult(successful, null, errors);\r
- }\r
-\r
- @Override\r
- public DataStoreIdentifier getDataStore() {\r
- return this.store;\r
- }\r
-\r
- @Override\r
- public DataCommitHandler getHandler() {\r
- return coordinator;\r
- }\r
- }\r
-\r
- private class ValidationCoordinator implements DataValidator {\r
-\r
- private final DataStoreIdentifier store;\r
-\r
- ValidationCoordinator(DataStoreIdentifier store) {\r
- this.store = store;\r
- }\r
-\r
- @Override\r
- public RpcResult<Void> validate(CompositeNode toValidate) {\r
- List<RpcError> errors = new ArrayList<RpcError>();\r
- boolean successful = true;\r
-\r
- for (DataValidator validator : context(store).validators) {\r
- try {\r
- RpcResult<Void> partialResult = validator\r
- .validate(toValidate);\r
- successful = partialResult.isSuccessful() & successful;\r
- errors.addAll(partialResult.getErrors());\r
- } catch (Exception e) {\r
- log.error(\r
- "Uncaught exception prevented validation."\r
- + e.getMessage(), e);\r
- successful = false;\r
- // FIXME: Add RPC Error with exception.\r
- }\r
- if (successful == false)\r
- break;\r
- }\r
-\r
- return Rpcs.getRpcResult(successful, null, errors);\r
- }\r
-\r
- @Override\r
- public Set<DataStoreIdentifier> getSupportedDataStores() {\r
- return Collections.emptySet();\r
- }\r
-\r
- }\r
-\r
- private class DataRefreshCoordinator implements DataRefresher {\r
-\r
- private final DataStoreIdentifier store;\r
-\r
- DataRefreshCoordinator(DataStoreIdentifier store) {\r
- this.store = store;\r
- }\r
-\r
- @Override\r
- public void refreshData() {\r
-\r
- for (DataRefresher refresher : context(store).refreshers) {\r
- try {\r
- refresher.refreshData();\r
- } catch (Exception e) {\r
- log.error(\r
- "Uncaught exception during refresh of data: "\r
- + e.getMessage(), e);\r
- }\r
-\r
- }\r
- }\r
- }\r
-}\r
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.core.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.opendaylight.controller.sal.common.DataStoreIdentifier;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.core.api.BrokerService;
+import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.Consumer.ConsumerFunctionality;
+import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality;
+import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
+import org.opendaylight.controller.sal.core.api.data.DataCommitHandler;
+import org.opendaylight.controller.sal.core.api.data.DataProviderService;
+import org.opendaylight.controller.sal.core.api.data.DataValidator;
+import org.opendaylight.controller.sal.core.api.data.DataCommitHandler.CommitTransaction;
+import org.opendaylight.controller.sal.core.api.data.DataProviderService.DataRefresher;
+import org.opendaylight.controller.sal.core.spi.BrokerModule;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.MutableCompositeNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableSet;
+
+public class DataBrokerModule implements BrokerModule {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(DataBrokerModule.class);
+
+ private static final Set<Class<? extends ProviderFunctionality>> SUPPORTED_PROVIDER_FUNCTIONALITY = ImmutableSet
+ .of((Class<? extends ProviderFunctionality>) DataValidator.class,
+ DataRefresher.class, DataCommitHandler.class);
+
+ private static final Set<Class<? extends BrokerService>> PROVIDED_SESSION_SERVICES = ImmutableSet
+ .of((Class<? extends BrokerService>) DataBrokerService.class,
+ DataProviderService.class);
+
+ private Map<DataStoreIdentifier, StoreContext> storeContext;
+
+ private ExecutorService executor;
+
+ private SequentialCommitHandlerCoordinator coordinator = new SequentialCommitHandlerCoordinator();
+
+ @Override
+ public Set<Class<? extends BrokerService>> getProvidedServices() {
+ return PROVIDED_SESSION_SERVICES;
+ }
+
+ @Override
+ public Set<Class<? extends ProviderFunctionality>> getSupportedProviderFunctionality() {
+ return SUPPORTED_PROVIDER_FUNCTIONALITY;
+ }
+
+ @Override
+ public Set<Class<? extends ConsumerFunctionality>> getSupportedConsumerFunctionality() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public <T extends BrokerService> T getServiceForSession(Class<T> service,
+ ConsumerSession session) {
+ if (DataProviderService.class.equals(service)
+ && session instanceof ProviderSession) {
+ @SuppressWarnings("unchecked")
+ T ret = (T) newDataProviderService(session);
+ return ret;
+ } else if (DataBrokerService.class.equals(service)) {
+
+ @SuppressWarnings("unchecked")
+ T ret = (T) newDataConsumerService(session);
+ return ret;
+ }
+
+ throw new IllegalArgumentException(
+ "The requested session-specific service is not provided by this module.");
+ }
+
+ private DataProviderService newDataProviderService(ConsumerSession session) {
+ return new DataProviderSession();
+ }
+
+ private DataBrokerService newDataConsumerService(ConsumerSession session) {
+ return new DataConsumerSession();
+ }
+
+ private StoreContext context(DataStoreIdentifier store) {
+ return storeContext.get(store);
+ }
+
+ private static class StoreContext {
+ private Set<DataCommitHandler> commitHandlers = Collections
+ .synchronizedSet(new HashSet<DataCommitHandler>());
+ private Set<DataValidator> validators = Collections
+ .synchronizedSet(new HashSet<DataValidator>());
+ private Set<DataRefresher> refreshers = Collections
+ .synchronizedSet(new HashSet<DataRefresher>());
+ }
+
+ private class DataConsumerSession implements DataBrokerService {
+
+ @Override
+ public CompositeNode getData(DataStoreIdentifier store) {
+ // TODO Implement this method
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public CompositeNode getData(DataStoreIdentifier store,
+ CompositeNode filter) {
+ // TODO Implement this method
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public CompositeNode getCandidateData(DataStoreIdentifier store) {
+ // TODO Implement this method
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public CompositeNode getCandidateData(DataStoreIdentifier store,
+ CompositeNode filter) {
+ // TODO Implement this method
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public RpcResult<CompositeNode> editCandidateData(
+ DataStoreIdentifier store, MutableCompositeNode changeSet) {
+ // TODO Implement this method
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public Future<RpcResult<Void>> commit(DataStoreIdentifier store) {
+ // TODO Implement this method
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public void closeSession() {
+ // TODO Implement this method
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public Set<DataStoreIdentifier> getDataStores() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ }
+
+ private class DataProviderSession extends DataConsumerSession implements
+ DataProviderService {
+
+ private Set<DataCommitHandler> providerCommitHandlers = new HashSet<DataCommitHandler>();
+ private Set<DataValidator> providerValidators = new HashSet<DataValidator>();
+ private Set<DataRefresher> providerRefreshers = new HashSet<DataRefresher>();
+
+ @Override
+ public void addValidator(DataStoreIdentifier store,
+ DataValidator validator) {
+ if (validator == null)
+ throw new IllegalArgumentException(
+ "Validator should not be null");
+
+ providerValidators.add(validator);
+ context(store).validators.add(validator);
+ }
+
+ @Override
+ public void removeValidator(DataStoreIdentifier store,
+ DataValidator validator) {
+ if (validator == null)
+ throw new IllegalArgumentException(
+ "Validator should not be null");
+
+ providerValidators.remove(validator);
+ context(store).validators.remove(validator);
+ }
+
+ @Override
+ public void addCommitHandler(DataStoreIdentifier store,
+ DataCommitHandler provider) {
+ if (provider == null)
+ throw new IllegalArgumentException(
+ "CommitHandler should not be null");
+
+ providerCommitHandlers.add(provider);
+ context(store).commitHandlers.add(provider);
+ }
+
+ @Override
+ public void removeCommitHandler(DataStoreIdentifier store,
+ DataCommitHandler provider) {
+ if (provider == null)
+ throw new IllegalArgumentException(
+ "CommitHandler should not be null");
+
+ providerCommitHandlers.remove(provider);
+ context(store).commitHandlers.remove(provider);
+ }
+
+ @Override
+ public void addRefresher(DataStoreIdentifier store,
+ DataRefresher provider) {
+ if (provider == null)
+ throw new IllegalArgumentException(
+ "Refresher should not be null");
+
+ providerRefreshers.add(provider);
+ context(store).refreshers.add(provider);
+ }
+
+ @Override
+ public void removeRefresher(DataStoreIdentifier store,
+ DataRefresher provider) {
+ if (provider == null)
+ throw new IllegalArgumentException(
+ "Refresher should not be null");
+
+ providerRefreshers.remove(provider);
+ context(store).refreshers.remove(provider);
+ }
+
+ }
+
+ private class SequentialCommitHandlerCoordinator implements
+ DataCommitHandler {
+
+ @Override
+ public RpcResult<CommitTransaction> requestCommit(
+ DataStoreIdentifier store) {
+ List<RpcError> errors = new ArrayList<RpcError>();
+ Set<CommitTransaction> transactions = new HashSet<DataCommitHandler.CommitTransaction>();
+ boolean successful = true;
+
+ for (DataCommitHandler commitHandler : context(store).commitHandlers) {
+ try {
+ RpcResult<CommitTransaction> partialResult = commitHandler
+ .requestCommit(store);
+ successful = partialResult.isSuccessful() & successful;
+ if (partialResult.isSuccessful()) {
+ transactions.add(partialResult.getResult());
+ }
+
+ errors.addAll(partialResult.getErrors());
+ } catch (Exception e) {
+ log.error("Uncaught exception prevented commit request."
+ + e.getMessage(), e);
+ successful = false;
+ // FIXME: Add RPC Error with exception.
+ }
+ if (successful == false)
+ break;
+ }
+ CommitTransaction transaction = new SequentialCommitTransaction(
+ store, transactions);
+ return Rpcs.getRpcResult(successful, transaction, errors);
+ }
+
+ @Override
+ public Set<DataStoreIdentifier> getSupportedDataStores() {
+ return Collections.emptySet();
+ }
+ }
+
+ private class SequentialCommitTransaction implements CommitTransaction {
+
+ final Set<CommitTransaction> transactions;
+ final DataStoreIdentifier store;
+
+ public SequentialCommitTransaction(DataStoreIdentifier s,
+ Set<CommitTransaction> t) {
+ transactions = t;
+ store = s;
+ }
+
+ @Override
+ public RpcResult<Void> finish() {
+ List<RpcError> errors = new ArrayList<RpcError>();
+ boolean successful = true;
+
+ for (CommitTransaction commitHandler : transactions) {
+ try {
+ RpcResult<Void> partialResult = commitHandler.finish();
+ successful = partialResult.isSuccessful() & successful;
+ errors.addAll(partialResult.getErrors());
+ } catch (Exception e) {
+ log.error(
+ "Uncaught exception prevented finishing of commit."
+ + e.getMessage(), e);
+ successful = false;
+ // FIXME: Add RPC Error with exception.
+ }
+ if (successful == false)
+ break;
+ }
+
+ return Rpcs.getRpcResult(successful, null, errors);
+ }
+
+ @Override
+ public RpcResult<Void> rollback() {
+ List<RpcError> errors = new ArrayList<RpcError>();
+ boolean successful = true;
+
+ for (CommitTransaction commitHandler : transactions) {
+ try {
+ RpcResult<Void> partialResult = commitHandler.rollback();
+ successful = partialResult.isSuccessful() & successful;
+ errors.addAll(partialResult.getErrors());
+ } catch (Exception e) {
+ log.error(
+ "Uncaught exception prevented rollback of commit."
+ + e.getMessage(), e);
+ successful = false;
+ // FIXME: Add RPC Error with exception.
+ }
+ if (successful == false)
+ break;
+ }
+
+ return Rpcs.getRpcResult(successful, null, errors);
+ }
+
+ @Override
+ public DataStoreIdentifier getDataStore() {
+ return this.store;
+ }
+
+ @Override
+ public DataCommitHandler getHandler() {
+ return coordinator;
+ }
+ }
+
+ private class ValidationCoordinator implements DataValidator {
+
+ private final DataStoreIdentifier store;
+
+ ValidationCoordinator(DataStoreIdentifier store) {
+ this.store = store;
+ }
+
+ @Override
+ public RpcResult<Void> validate(CompositeNode toValidate) {
+ List<RpcError> errors = new ArrayList<RpcError>();
+ boolean successful = true;
+
+ for (DataValidator validator : context(store).validators) {
+ try {
+ RpcResult<Void> partialResult = validator
+ .validate(toValidate);
+ successful = partialResult.isSuccessful() & successful;
+ errors.addAll(partialResult.getErrors());
+ } catch (Exception e) {
+ log.error(
+ "Uncaught exception prevented validation."
+ + e.getMessage(), e);
+ successful = false;
+ // FIXME: Add RPC Error with exception.
+ }
+ if (successful == false)
+ break;
+ }
+
+ return Rpcs.getRpcResult(successful, null, errors);
+ }
+
+ @Override
+ public Set<DataStoreIdentifier> getSupportedDataStores() {
+ return Collections.emptySet();
+ }
+
+ }
+
+ private class DataRefreshCoordinator implements DataRefresher {
+
+ private final DataStoreIdentifier store;
+
+ DataRefreshCoordinator(DataStoreIdentifier store) {
+ this.store = store;
+ }
+
+ @Override
+ public void refreshData() {
+
+ for (DataRefresher refresher : context(store).refreshers) {
+ try {
+ refresher.refreshData();
+ } catch (Exception e) {
+ log.error(
+ "Uncaught exception during refresh of data: "
+ + e.getMessage(), e);
+ }
+
+ }
+ }
+ }
+}