--- /dev/null
+/*\r
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.\r
+ *\r
+ * This program and the accompanying materials are made available under the\r
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+ * and is available at http://www.eclipse.org/legal/epl-v10.html\r
+ */\r
+package org.opendaylight.controller.sal.core.impl;\r
+\r
+import java.util.ArrayList;\r
+import java.util.Collections;\r
+import java.util.HashSet;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.Set;\r
+import java.util.concurrent.ExecutorService;\r
+import java.util.concurrent.Future;\r
+\r
+import org.opendaylight.controller.sal.common.DataStoreIdentifier;\r
+import org.opendaylight.controller.sal.common.util.Rpcs;\r
+import org.opendaylight.controller.sal.core.api.BrokerService;\r
+import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;\r
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;\r
+import org.opendaylight.controller.sal.core.api.Consumer.ConsumerFunctionality;\r
+import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality;\r
+import org.opendaylight.controller.sal.core.api.data.DataBrokerService;\r
+import org.opendaylight.controller.sal.core.api.data.DataCommitHandler;\r
+import org.opendaylight.controller.sal.core.api.data.DataProviderService;\r
+import org.opendaylight.controller.sal.core.api.data.DataValidator;\r
+import org.opendaylight.controller.sal.core.api.data.DataCommitHandler.CommitTransaction;\r
+import org.opendaylight.controller.sal.core.api.data.DataProviderService.DataRefresher;\r
+import org.opendaylight.controller.sal.core.spi.BrokerModule;\r
+import org.opendaylight.controller.yang.common.RpcError;\r
+import org.opendaylight.controller.yang.common.RpcResult;\r
+import org.opendaylight.controller.yang.data.api.CompositeNode;\r
+import org.opendaylight.controller.yang.data.api.CompositeNodeModification;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import com.google.common.collect.ImmutableSet;\r
+\r
+public class DataBrokerModule implements BrokerModule {\r
+\r
+ private static final Logger log = LoggerFactory\r
+ .getLogger(DataBrokerModule.class);\r
+\r
+ private static final Set<Class<? extends ProviderFunctionality>> SUPPORTED_PROVIDER_FUNCTIONALITY = ImmutableSet\r
+ .of((Class<? extends ProviderFunctionality>) DataValidator.class,\r
+ DataRefresher.class, DataCommitHandler.class);\r
+\r
+ private static final Set<Class<? extends BrokerService>> PROVIDED_SESSION_SERVICES = ImmutableSet\r
+ .of((Class<? extends BrokerService>) DataBrokerService.class,\r
+ DataProviderService.class);\r
+\r
+ private Map<DataStoreIdentifier, StoreContext> storeContext;\r
+\r
+ private ExecutorService executor;\r
+ \r
+ private SequentialCommitHandlerCoordinator coordinator = new SequentialCommitHandlerCoordinator();\r
+\r
+ @Override\r
+ public Set<Class<? extends BrokerService>> getProvidedServices() {\r
+ return PROVIDED_SESSION_SERVICES;\r
+ }\r
+\r
+ @Override\r
+ public Set<Class<? extends ProviderFunctionality>> getSupportedProviderFunctionality() {\r
+ return SUPPORTED_PROVIDER_FUNCTIONALITY;\r
+ }\r
+\r
+ @Override\r
+ public Set<Class<? extends ConsumerFunctionality>> getSupportedConsumerFunctionality() {\r
+ return Collections.emptySet();\r
+ }\r
+\r
+ @Override\r
+ public <T extends BrokerService> T getServiceForSession(Class<T> service,\r
+ ConsumerSession session) {\r
+ if (DataProviderService.class.equals(service)\r
+ && session instanceof ProviderSession) {\r
+ @SuppressWarnings("unchecked")\r
+ T ret = (T) newDataProviderService(session);\r
+ return ret;\r
+ } else if (DataBrokerService.class.equals(service)) {\r
+\r
+ @SuppressWarnings("unchecked")\r
+ T ret = (T) newDataConsumerService(session);\r
+ return ret;\r
+ }\r
+\r
+ throw new IllegalArgumentException(\r
+ "The requested session-specific service is not provided by this module.");\r
+ }\r
+\r
+ private DataProviderService newDataProviderService(ConsumerSession session) {\r
+ return new DataProviderSession();\r
+ }\r
+\r
+ private DataBrokerService newDataConsumerService(ConsumerSession session) {\r
+ return new DataConsumerSession();\r
+ }\r
+\r
+ private StoreContext context(DataStoreIdentifier store) {\r
+ return storeContext.get(store);\r
+ }\r
+\r
+ private static class StoreContext {\r
+ private Set<DataCommitHandler> commitHandlers = Collections\r
+ .synchronizedSet(new HashSet<DataCommitHandler>());\r
+ private Set<DataValidator> validators = Collections\r
+ .synchronizedSet(new HashSet<DataValidator>());\r
+ private Set<DataRefresher> refreshers = Collections\r
+ .synchronizedSet(new HashSet<DataRefresher>());\r
+ }\r
+\r
+ private class DataConsumerSession implements DataBrokerService {\r
+\r
+ @Override\r
+ public CompositeNode getData(DataStoreIdentifier store) {\r
+ // TODO Implement this method\r
+ throw new UnsupportedOperationException("Not implemented");\r
+ }\r
+\r
+ @Override\r
+ public CompositeNode getData(DataStoreIdentifier store,\r
+ CompositeNode filter) {\r
+ // TODO Implement this method\r
+ throw new UnsupportedOperationException("Not implemented");\r
+ }\r
+\r
+ @Override\r
+ public CompositeNode getCandidateData(DataStoreIdentifier store) {\r
+ // TODO Implement this method\r
+ throw new UnsupportedOperationException("Not implemented");\r
+ }\r
+\r
+ @Override\r
+ public CompositeNode getCandidateData(DataStoreIdentifier store,\r
+ CompositeNode filter) {\r
+ // TODO Implement this method\r
+ throw new UnsupportedOperationException("Not implemented");\r
+ }\r
+\r
+ @Override\r
+ public RpcResult<CompositeNode> editCandidateData(\r
+ DataStoreIdentifier store, CompositeNodeModification changeSet) {\r
+ // TODO Implement this method\r
+ throw new UnsupportedOperationException("Not implemented");\r
+ }\r
+\r
+ @Override\r
+ public Future<RpcResult<Void>> commit(DataStoreIdentifier store) {\r
+ // TODO Implement this method\r
+ throw new UnsupportedOperationException("Not implemented");\r
+ }\r
+\r
+ @Override\r
+ public void closeSession() {\r
+ // TODO Implement this method\r
+ throw new UnsupportedOperationException("Not implemented");\r
+ }\r
+\r
+ @Override\r
+ public Set<DataStoreIdentifier> getDataStores() {\r
+ // TODO Auto-generated method stub\r
+ return null;\r
+ }\r
+\r
+ }\r
+\r
+ private class DataProviderSession extends DataConsumerSession implements\r
+ DataProviderService {\r
+\r
+ private Set<DataCommitHandler> providerCommitHandlers = new HashSet<DataCommitHandler>();\r
+ private Set<DataValidator> providerValidators = new HashSet<DataValidator>();\r
+ private Set<DataRefresher> providerRefreshers = new HashSet<DataRefresher>();\r
+\r
+ @Override\r
+ public void addValidator(DataStoreIdentifier store,\r
+ DataValidator validator) {\r
+ if (validator == null)\r
+ throw new IllegalArgumentException(\r
+ "Validator should not be null");\r
+\r
+ providerValidators.add(validator);\r
+ context(store).validators.add(validator);\r
+ }\r
+\r
+ @Override\r
+ public void removeValidator(DataStoreIdentifier store,\r
+ DataValidator validator) {\r
+ if (validator == null)\r
+ throw new IllegalArgumentException(\r
+ "Validator should not be null");\r
+\r
+ providerValidators.remove(validator);\r
+ context(store).validators.remove(validator);\r
+ }\r
+\r
+ @Override\r
+ public void addCommitHandler(DataStoreIdentifier store,\r
+ DataCommitHandler provider) {\r
+ if (provider == null)\r
+ throw new IllegalArgumentException(\r
+ "CommitHandler should not be null");\r
+\r
+ providerCommitHandlers.add(provider);\r
+ context(store).commitHandlers.add(provider);\r
+ }\r
+\r
+ @Override\r
+ public void removeCommitHandler(DataStoreIdentifier store,\r
+ DataCommitHandler provider) {\r
+ if (provider == null)\r
+ throw new IllegalArgumentException(\r
+ "CommitHandler should not be null");\r
+\r
+ providerCommitHandlers.remove(provider);\r
+ context(store).commitHandlers.remove(provider);\r
+ }\r
+\r
+ @Override\r
+ public void addRefresher(DataStoreIdentifier store,\r
+ DataRefresher provider) {\r
+ if (provider == null)\r
+ throw new IllegalArgumentException(\r
+ "Refresher should not be null");\r
+\r
+ providerRefreshers.add(provider);\r
+ context(store).refreshers.add(provider);\r
+ }\r
+\r
+ @Override\r
+ public void removeRefresher(DataStoreIdentifier store,\r
+ DataRefresher provider) {\r
+ if (provider == null)\r
+ throw new IllegalArgumentException(\r
+ "Refresher should not be null");\r
+\r
+ providerRefreshers.remove(provider);\r
+ context(store).refreshers.remove(provider);\r
+ }\r
+\r
+ }\r
+\r
+ private class SequentialCommitHandlerCoordinator implements\r
+ DataCommitHandler {\r
+\r
+ @Override\r
+ public RpcResult<CommitTransaction> requestCommit(\r
+ DataStoreIdentifier store) {\r
+ List<RpcError> errors = new ArrayList<RpcError>();\r
+ Set<CommitTransaction> transactions = new HashSet<DataCommitHandler.CommitTransaction>();\r
+ boolean successful = true;\r
+\r
+ for (DataCommitHandler commitHandler : context(store).commitHandlers) {\r
+ try {\r
+ RpcResult<CommitTransaction> partialResult = commitHandler\r
+ .requestCommit(store);\r
+ successful = partialResult.isSuccessful() & successful;\r
+ if (partialResult.isSuccessful()) {\r
+ transactions.add(partialResult.getResult());\r
+ }\r
+\r
+ errors.addAll(partialResult.getErrors());\r
+ } catch (Exception e) {\r
+ log.error("Uncaught exception prevented commit request."\r
+ + e.getMessage(), e);\r
+ successful = false;\r
+ // FIXME: Add RPC Error with exception.\r
+ }\r
+ if (successful == false)\r
+ break;\r
+ }\r
+ CommitTransaction transaction = new SequentialCommitTransaction(\r
+ store, transactions);\r
+ return Rpcs.getRpcResult(successful, transaction, errors);\r
+ }\r
+\r
+ @Override\r
+ public Set<DataStoreIdentifier> getSupportedDataStores() {\r
+ return Collections.emptySet();\r
+ }\r
+ }\r
+\r
+ private class SequentialCommitTransaction implements CommitTransaction {\r
+\r
+ final Set<CommitTransaction> transactions;\r
+ final DataStoreIdentifier store;\r
+\r
+ public SequentialCommitTransaction(DataStoreIdentifier s,\r
+ Set<CommitTransaction> t) {\r
+ transactions = t;\r
+ store = s;\r
+ }\r
+\r
+ @Override\r
+ public RpcResult<Void> finish() {\r
+ List<RpcError> errors = new ArrayList<RpcError>();\r
+ boolean successful = true;\r
+\r
+ for (CommitTransaction commitHandler : transactions) {\r
+ try {\r
+ RpcResult<Void> partialResult = commitHandler.finish();\r
+ successful = partialResult.isSuccessful() & successful;\r
+ errors.addAll(partialResult.getErrors());\r
+ } catch (Exception e) {\r
+ log.error(\r
+ "Uncaught exception prevented finishing of commit."\r
+ + e.getMessage(), e);\r
+ successful = false;\r
+ // FIXME: Add RPC Error with exception.\r
+ }\r
+ if (successful == false)\r
+ break;\r
+ }\r
+\r
+ return Rpcs.getRpcResult(successful, null, errors);\r
+ }\r
+\r
+ @Override\r
+ public RpcResult<Void> rollback() {\r
+ List<RpcError> errors = new ArrayList<RpcError>();\r
+ boolean successful = true;\r
+\r
+ for (CommitTransaction commitHandler : transactions) {\r
+ try {\r
+ RpcResult<Void> partialResult = commitHandler.rollback();\r
+ successful = partialResult.isSuccessful() & successful;\r
+ errors.addAll(partialResult.getErrors());\r
+ } catch (Exception e) {\r
+ log.error(\r
+ "Uncaught exception prevented rollback of commit."\r
+ + e.getMessage(), e);\r
+ successful = false;\r
+ // FIXME: Add RPC Error with exception.\r
+ }\r
+ if (successful == false)\r
+ break;\r
+ }\r
+\r
+ return Rpcs.getRpcResult(successful, null, errors);\r
+ }\r
+\r
+ @Override\r
+ public DataStoreIdentifier getDataStore() {\r
+ return this.store;\r
+ }\r
+\r
+ @Override\r
+ public DataCommitHandler getHandler() {\r
+ return coordinator;\r
+ }\r
+ }\r
+\r
+ private class ValidationCoordinator implements DataValidator {\r
+\r
+ private final DataStoreIdentifier store;\r
+\r
+ ValidationCoordinator(DataStoreIdentifier store) {\r
+ this.store = store;\r
+ }\r
+\r
+ @Override\r
+ public RpcResult<Void> validate(CompositeNode toValidate) {\r
+ List<RpcError> errors = new ArrayList<RpcError>();\r
+ boolean successful = true;\r
+\r
+ for (DataValidator validator : context(store).validators) {\r
+ try {\r
+ RpcResult<Void> partialResult = validator\r
+ .validate(toValidate);\r
+ successful = partialResult.isSuccessful() & successful;\r
+ errors.addAll(partialResult.getErrors());\r
+ } catch (Exception e) {\r
+ log.error(\r
+ "Uncaught exception prevented validation."\r
+ + e.getMessage(), e);\r
+ successful = false;\r
+ // FIXME: Add RPC Error with exception.\r
+ }\r
+ if (successful == false)\r
+ break;\r
+ }\r
+\r
+ return Rpcs.getRpcResult(successful, null, errors);\r
+ }\r
+\r
+ @Override\r
+ public Set<DataStoreIdentifier> getSupportedDataStores() {\r
+ return Collections.emptySet();\r
+ }\r
+\r
+ }\r
+\r
+ private class DataRefreshCoordinator implements DataRefresher {\r
+\r
+ private final DataStoreIdentifier store;\r
+\r
+ DataRefreshCoordinator(DataStoreIdentifier store) {\r
+ this.store = store;\r
+ }\r
+\r
+ @Override\r
+ public void refreshData() {\r
+\r
+ for (DataRefresher refresher : context(store).refreshers) {\r
+ try {\r
+ refresher.refreshData();\r
+ } catch (Exception e) {\r
+ log.error(\r
+ "Uncaught exception during refresh of data: "\r
+ + e.getMessage(), e);\r
+ }\r
+\r
+ }\r
+ }\r
+ }\r
+}\r