package org.opendaylight.controller.sal.binding.impl import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler import org.opendaylight.controller.sal.binding.api.data.DataChangeListener import org.opendaylight.controller.sal.binding.api.data.DataProviderService import org.opendaylight.yangtools.yang.binding.DataObject import org.opendaylight.yangtools.yang.binding.InstanceIdentifier import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction.DataTransactionListener import org.opendaylight.controller.md.sal.common.api.TransactionStatus import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification import org.opendaylight.controller.md.sal.common.api.data.DataReader import org.opendaylight.yangtools.concepts.AbstractObjectRegistration import org.opendaylight.yangtools.concepts.ListenerRegistration import static extension org.opendaylight.controller.sal.binding.impl.util.MapUtils.*; import java.util.Collection import java.util.Map.Entry import java.util.HashSet import java.util.Set import com.google.common.collect.Multimap import static com.google.common.base.Preconditions.*; import java.util.List import java.util.LinkedList import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider import com.google.common.collect.HashMultimap import java.util.concurrent.ExecutorService import java.util.concurrent.Callable import org.opendaylight.yangtools.yang.common.RpcResult import org.opendaylight.controller.sal.common.util.Rpcs import java.util.Collections import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction import java.util.ArrayList import org.opendaylight.controller.sal.common.util.RpcErrors class DataBrokerImpl extends DeprecatedDataAPISupport implements DataProviderService { @Property var ExecutorService executor; Multimap configReaders = HashMultimap.create(); Multimap operationalReaders = HashMultimap.create(); Multimap listeners = HashMultimap.create(); Multimap commitHandlers = HashMultimap.create(); override beginTransaction() { return new DataTransactionImpl(this); } override readConfigurationData(InstanceIdentifier path) { val readers = configReaders.getAllChildren(path); return readers.readConfiguration(path); } override readOperationalData(InstanceIdentifier path) { val readers = operationalReaders.getAllChildren(path); return readers.readOperational(path); } override registerCommitHandler(InstanceIdentifier path, DataCommitHandler, DataObject> commitHandler) { val registration = new DataCommitHandlerRegistration(path,commitHandler,this); commitHandlers.put(path,registration) return registration; } override registerDataChangeListener(InstanceIdentifier path, DataChangeListener listener) { val reg = new DataChangeListenerRegistration(path, listener, this); listeners.put(path, reg); return reg; } override registerDataReader(InstanceIdentifier path, DataReader, DataObject> provider) { val ret = new DataReaderRegistration(provider, this); ret.paths.add(path); configReaders.put(path, ret); operationalReaders.put(path, ret); return ret; } protected def removeReader(DataReaderRegistration reader) { for (path : reader.paths) { operationalReaders.remove(path, reader); configReaders.remove(path, reader); } } protected def removeListener(DataChangeListenerRegistration registration) { listeners.remove(registration.path, registration); } protected def removeCommitHandler(DataCommitHandlerRegistration registration) { commitHandlers.remove(registration.path, registration); } protected def DataObject readConfiguration( Collection> entries, InstanceIdentifier path) { val List partialResults = new LinkedList(); for (entry : entries) { partialResults.add(entry.value.instance.readConfigurationData(path)) } return merge(path, partialResults); } protected def DataObject readOperational( Collection> entries, InstanceIdentifier path) { val List partialResults = new LinkedList(); for (entry : entries) { partialResults.add(entry.value.instance.readOperationalData(path)) } return merge(path, partialResults); } protected def DataObject merge(InstanceIdentifier identifier, List objects) { // FIXME: implement real merge if (objects.size > 0) { return objects.get(0); } } protected def getActiveCommitHandlers() { return commitHandlers.entries.map[ value.instance].toSet } protected def commit(DataTransactionImpl transaction) { checkNotNull(transaction); transaction.changeStatus(TransactionStatus.SUBMITED); val task = new TwoPhaseCommit(transaction, this); return executor.submit(task); } } package class DataReaderRegistration extends // AbstractObjectRegistration, DataObject>> { DataBrokerImpl dataBroker; @Property val Set> paths; new(DataReader, DataObject> instance, DataBrokerImpl broker) { super(instance) dataBroker = broker; _paths = new HashSet(); } override protected removeRegistration() { dataBroker.removeReader(this); } } package class DataChangeListenerRegistration extends AbstractObjectRegistration implements ListenerRegistration { DataBrokerImpl dataBroker; @Property val InstanceIdentifier path; new(InstanceIdentifier path, DataChangeListener instance, DataBrokerImpl broker) { super(instance) dataBroker = broker; _path = path; } override protected removeRegistration() { dataBroker.removeListener(this); dataBroker = null; } } package class DataCommitHandlerRegistration // extends AbstractObjectRegistration, DataObject>> { DataBrokerImpl dataBroker; @Property val InstanceIdentifier path; new(InstanceIdentifier path, DataCommitHandler, DataObject> instance, DataBrokerImpl broker) { super(instance) dataBroker = broker; _path = path; } override protected removeRegistration() { dataBroker.removeCommitHandler(this); dataBroker = null; } } package class TwoPhaseCommit implements Callable> { val DataTransactionImpl transaction; val DataBrokerImpl dataBroker; new(DataTransactionImpl transaction, DataBrokerImpl broker) { this.transaction = transaction; this.dataBroker = broker; } override call() throws Exception { val Iterable, DataObject>> commitHandlers = dataBroker.activeCommitHandlers; // requesting commits val List, DataObject>> handlerTransactions = new ArrayList(); try { for (handler : commitHandlers) { handlerTransactions.add(handler.requestCommit(transaction)); } } catch (Exception e) { return rollback(handlerTransactions,e); } val List> results = new ArrayList(); try { for (subtransaction : handlerTransactions) { results.add(subtransaction.finish()); } } catch (Exception e) { return rollback(handlerTransactions,e); } return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet()); } def rollback(List, DataObject>> transactions,Exception e) { for (transaction : transactions) { transaction.rollback() } // FIXME return encoutered error. return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet()); } }