/* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.sal.core.impl; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import org.opendaylight.controller.sal.common.DataStoreIdentifier; import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.controller.sal.core.api.BrokerService; import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession; import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; import org.opendaylight.controller.sal.core.api.Consumer.ConsumerFunctionality; import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality; import org.opendaylight.controller.sal.core.api.data.DataBrokerService; import org.opendaylight.controller.sal.core.api.data.DataCommitHandler; import org.opendaylight.controller.sal.core.api.data.DataProviderService; import org.opendaylight.controller.sal.core.api.data.DataValidator; import org.opendaylight.controller.sal.core.api.data.DataCommitHandler.CommitTransaction; import org.opendaylight.controller.sal.core.api.data.DataProviderService.DataRefresher; import org.opendaylight.controller.sal.core.spi.BrokerModule; import org.opendaylight.controller.yang.common.RpcError; import org.opendaylight.controller.yang.common.RpcResult; import org.opendaylight.controller.yang.data.api.CompositeNode; import org.opendaylight.controller.yang.data.api.CompositeNodeModification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableSet; public class DataBrokerModule implements BrokerModule { private static final Logger log = LoggerFactory .getLogger(DataBrokerModule.class); private static final Set> SUPPORTED_PROVIDER_FUNCTIONALITY = ImmutableSet .of((Class) DataValidator.class, DataRefresher.class, DataCommitHandler.class); private static final Set> PROVIDED_SESSION_SERVICES = ImmutableSet .of((Class) DataBrokerService.class, DataProviderService.class); private Map storeContext; private ExecutorService executor; private SequentialCommitHandlerCoordinator coordinator = new SequentialCommitHandlerCoordinator(); @Override public Set> getProvidedServices() { return PROVIDED_SESSION_SERVICES; } @Override public Set> getSupportedProviderFunctionality() { return SUPPORTED_PROVIDER_FUNCTIONALITY; } @Override public Set> getSupportedConsumerFunctionality() { return Collections.emptySet(); } @Override public T getServiceForSession(Class service, ConsumerSession session) { if (DataProviderService.class.equals(service) && session instanceof ProviderSession) { @SuppressWarnings("unchecked") T ret = (T) newDataProviderService(session); return ret; } else if (DataBrokerService.class.equals(service)) { @SuppressWarnings("unchecked") T ret = (T) newDataConsumerService(session); return ret; } throw new IllegalArgumentException( "The requested session-specific service is not provided by this module."); } private DataProviderService newDataProviderService(ConsumerSession session) { return new DataProviderSession(); } private DataBrokerService newDataConsumerService(ConsumerSession session) { return new DataConsumerSession(); } private StoreContext context(DataStoreIdentifier store) { return storeContext.get(store); } private static class StoreContext { private Set commitHandlers = Collections .synchronizedSet(new HashSet()); private Set validators = Collections .synchronizedSet(new HashSet()); private Set refreshers = Collections .synchronizedSet(new HashSet()); } private class DataConsumerSession implements DataBrokerService { @Override public CompositeNode getData(DataStoreIdentifier store) { // TODO Implement this method throw new UnsupportedOperationException("Not implemented"); } @Override public CompositeNode getData(DataStoreIdentifier store, CompositeNode filter) { // TODO Implement this method throw new UnsupportedOperationException("Not implemented"); } @Override public CompositeNode getCandidateData(DataStoreIdentifier store) { // TODO Implement this method throw new UnsupportedOperationException("Not implemented"); } @Override public CompositeNode getCandidateData(DataStoreIdentifier store, CompositeNode filter) { // TODO Implement this method throw new UnsupportedOperationException("Not implemented"); } @Override public RpcResult editCandidateData( DataStoreIdentifier store, CompositeNodeModification changeSet) { // TODO Implement this method throw new UnsupportedOperationException("Not implemented"); } @Override public Future> commit(DataStoreIdentifier store) { // TODO Implement this method throw new UnsupportedOperationException("Not implemented"); } @Override public void closeSession() { // TODO Implement this method throw new UnsupportedOperationException("Not implemented"); } @Override public Set getDataStores() { // TODO Auto-generated method stub return null; } } private class DataProviderSession extends DataConsumerSession implements DataProviderService { private Set providerCommitHandlers = new HashSet(); private Set providerValidators = new HashSet(); private Set providerRefreshers = new HashSet(); @Override public void addValidator(DataStoreIdentifier store, DataValidator validator) { if (validator == null) throw new IllegalArgumentException( "Validator should not be null"); providerValidators.add(validator); context(store).validators.add(validator); } @Override public void removeValidator(DataStoreIdentifier store, DataValidator validator) { if (validator == null) throw new IllegalArgumentException( "Validator should not be null"); providerValidators.remove(validator); context(store).validators.remove(validator); } @Override public void addCommitHandler(DataStoreIdentifier store, DataCommitHandler provider) { if (provider == null) throw new IllegalArgumentException( "CommitHandler should not be null"); providerCommitHandlers.add(provider); context(store).commitHandlers.add(provider); } @Override public void removeCommitHandler(DataStoreIdentifier store, DataCommitHandler provider) { if (provider == null) throw new IllegalArgumentException( "CommitHandler should not be null"); providerCommitHandlers.remove(provider); context(store).commitHandlers.remove(provider); } @Override public void addRefresher(DataStoreIdentifier store, DataRefresher provider) { if (provider == null) throw new IllegalArgumentException( "Refresher should not be null"); providerRefreshers.add(provider); context(store).refreshers.add(provider); } @Override public void removeRefresher(DataStoreIdentifier store, DataRefresher provider) { if (provider == null) throw new IllegalArgumentException( "Refresher should not be null"); providerRefreshers.remove(provider); context(store).refreshers.remove(provider); } } private class SequentialCommitHandlerCoordinator implements DataCommitHandler { @Override public RpcResult requestCommit( DataStoreIdentifier store) { List errors = new ArrayList(); Set transactions = new HashSet(); boolean successful = true; for (DataCommitHandler commitHandler : context(store).commitHandlers) { try { RpcResult partialResult = commitHandler .requestCommit(store); successful = partialResult.isSuccessful() & successful; if (partialResult.isSuccessful()) { transactions.add(partialResult.getResult()); } errors.addAll(partialResult.getErrors()); } catch (Exception e) { log.error("Uncaught exception prevented commit request." + e.getMessage(), e); successful = false; // FIXME: Add RPC Error with exception. } if (successful == false) break; } CommitTransaction transaction = new SequentialCommitTransaction( store, transactions); return Rpcs.getRpcResult(successful, transaction, errors); } @Override public Set getSupportedDataStores() { return Collections.emptySet(); } } private class SequentialCommitTransaction implements CommitTransaction { final Set transactions; final DataStoreIdentifier store; public SequentialCommitTransaction(DataStoreIdentifier s, Set t) { transactions = t; store = s; } @Override public RpcResult finish() { List errors = new ArrayList(); boolean successful = true; for (CommitTransaction commitHandler : transactions) { try { RpcResult partialResult = commitHandler.finish(); successful = partialResult.isSuccessful() & successful; errors.addAll(partialResult.getErrors()); } catch (Exception e) { log.error( "Uncaught exception prevented finishing of commit." + e.getMessage(), e); successful = false; // FIXME: Add RPC Error with exception. } if (successful == false) break; } return Rpcs.getRpcResult(successful, null, errors); } @Override public RpcResult rollback() { List errors = new ArrayList(); boolean successful = true; for (CommitTransaction commitHandler : transactions) { try { RpcResult partialResult = commitHandler.rollback(); successful = partialResult.isSuccessful() & successful; errors.addAll(partialResult.getErrors()); } catch (Exception e) { log.error( "Uncaught exception prevented rollback of commit." + e.getMessage(), e); successful = false; // FIXME: Add RPC Error with exception. } if (successful == false) break; } return Rpcs.getRpcResult(successful, null, errors); } @Override public DataStoreIdentifier getDataStore() { return this.store; } @Override public DataCommitHandler getHandler() { return coordinator; } } private class ValidationCoordinator implements DataValidator { private final DataStoreIdentifier store; ValidationCoordinator(DataStoreIdentifier store) { this.store = store; } @Override public RpcResult validate(CompositeNode toValidate) { List errors = new ArrayList(); boolean successful = true; for (DataValidator validator : context(store).validators) { try { RpcResult partialResult = validator .validate(toValidate); successful = partialResult.isSuccessful() & successful; errors.addAll(partialResult.getErrors()); } catch (Exception e) { log.error( "Uncaught exception prevented validation." + e.getMessage(), e); successful = false; // FIXME: Add RPC Error with exception. } if (successful == false) break; } return Rpcs.getRpcResult(successful, null, errors); } @Override public Set getSupportedDataStores() { return Collections.emptySet(); } } private class DataRefreshCoordinator implements DataRefresher { private final DataStoreIdentifier store; DataRefreshCoordinator(DataStoreIdentifier store) { this.store = store; } @Override public void refreshData() { for (DataRefresher refresher : context(store).refreshers) { try { refresher.refreshData(); } catch (Exception e) { log.error( "Uncaught exception during refresh of data: " + e.getMessage(), e); } } } } }