package org.opendaylight.controller.md.sal.common.impl.service import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler import org.opendaylight.controller.md.sal.common.api.TransactionStatus import org.opendaylight.controller.md.sal.common.api.data.DataReader import org.opendaylight.yangtools.concepts.AbstractObjectRegistration import org.opendaylight.yangtools.concepts.ListenerRegistration import com.google.common.collect.Multimap import static com.google.common.base.Preconditions.*; import java.util.List import com.google.common.collect.HashMultimap import java.util.concurrent.ExecutorService import java.util.concurrent.Callable import org.opendaylight.yangtools.yang.common.RpcResult import java.util.Collections import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction import java.util.ArrayList import org.opendaylight.yangtools.concepts.CompositeObjectRegistration import java.util.Arrays import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener import org.opendaylight.controller.sal.common.util.Rpcs import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification import java.util.concurrent.Future import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter import org.opendaylight.yangtools.concepts.Path import org.slf4j.LoggerFactory abstract class AbstractDataBroker

,D,DCL extends DataChangeListener> implements DataModificationTransactionFactory, // DataReader, // DataChangePublisher, // DataProvisionService { @Property var ExecutorService executor; @Property var AbstractDataReadRouter dataReadRouter; Multimap> listeners = HashMultimap.create(); Multimap> commitHandlers = HashMultimap.create(); public new() { } override final readConfigurationData(P path) { return dataReadRouter.readConfigurationData(path); } override final readOperationalData(P path) { return dataReadRouter.readOperationalData(path); } override final registerCommitHandler(P path, DataCommitHandler commitHandler) { val registration = new DataCommitHandlerRegistration(path,commitHandler,this); commitHandlers.put(path,registration) return registration; } override final def registerDataChangeListener(P path, DCL listener) { val reg = new DataChangeListenerRegistration(path, listener, this); listeners.put(path, reg); return reg; } final def registerDataReader(P path,DataReader reader) { val confReg = dataReadRouter.registerConfigurationReader(path,reader); val dataReg = dataReadRouter.registerOperationalReader(path,reader); return new CompositeObjectRegistration(reader,Arrays.asList(confReg,dataReg)); } protected final def removeListener(DataChangeListenerRegistration registration) { listeners.remove(registration.path, registration); } protected final def removeCommitHandler(DataCommitHandlerRegistration registration) { commitHandlers.remove(registration.path, registration); } protected final def getActiveCommitHandlers() { return commitHandlers.entries.map[ value.instance].toSet } package final def Future> commit(AbstractDataTransaction transaction) { checkNotNull(transaction); transaction.changeStatus(TransactionStatus.SUBMITED); val task = new TwoPhaseCommit(transaction, this); return executor.submit(task); } } package class DataChangeListenerRegistration

,D,DCL extends DataChangeListener> extends AbstractObjectRegistration implements ListenerRegistration { AbstractDataBroker dataBroker; @Property val P path; new(P path, DCL instance, AbstractDataBroker broker) { super(instance) dataBroker = broker; _path = path; } override protected removeRegistration() { dataBroker.removeListener(this); dataBroker = null; } } package class DataCommitHandlerRegistration

,D> extends AbstractObjectRegistration> { AbstractDataBroker dataBroker; @Property val P path; new(P path, DataCommitHandler instance, AbstractDataBroker broker) { super(instance) dataBroker = broker; _path = path; } override protected removeRegistration() { dataBroker.removeCommitHandler(this); dataBroker = null; } } package class TwoPhaseCommit

,D> implements Callable> { private static val log = LoggerFactory.getLogger(TwoPhaseCommit); val AbstractDataTransaction transaction; val AbstractDataBroker dataBroker; new(AbstractDataTransaction transaction, AbstractDataBroker broker) { this.transaction = transaction; this.dataBroker = broker; } override call() throws Exception { val Iterable> commitHandlers = dataBroker.activeCommitHandlers; // requesting commits val List> handlerTransactions = new ArrayList(); try { for (handler : commitHandlers) { handlerTransactions.add(handler.requestCommit(transaction)); } } catch (Exception e) { log.error("Request Commit failded",e); return rollback(handlerTransactions,e); } val List> results = new ArrayList(); try { for (subtransaction : handlerTransactions) { results.add(subtransaction.finish()); } } catch (Exception e) { log.error("Finish Commit failed",e); return rollback(handlerTransactions,e); } return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet()); } def rollback(List> transactions,Exception e) { for (transaction : transactions) { transaction.rollback() } // FIXME return encountered error. return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet()); } } public abstract class AbstractDataTransaction

, D> extends AbstractDataModification { @Property private val Object identifier; var TransactionStatus status; var AbstractDataBroker broker; protected new (AbstractDataBroker dataBroker) { super(dataBroker); _identifier = new Object(); broker = dataBroker; status = TransactionStatus.NEW; //listeners = new ListenerRegistry<>(); } override commit() { return broker.commit(this); } override readConfigurationData(P path) { return broker.readConfigurationData(path); } override readOperationalData(P path) { return broker.readOperationalData(path); } override hashCode() { return identifier.hashCode; } override equals(Object obj) { if (this === obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; val other = (obj as AbstractDataTransaction) ; if (broker == null) { if (other.broker != null) return false; } else if (!broker.equals(other.broker)) return false; if (identifier == null) { if (other.identifier != null) return false; } else if (!identifier.equals(other.identifier)) return false; return true; } override TransactionStatus getStatus() { return status; } protected abstract def void onStatusChange(TransactionStatus status); public def changeStatus(TransactionStatus status) { this.status = status; onStatusChange(status); } }