X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-common-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fcommon%2Fimpl%2Fservice%2FAbstractDataBroker.xtend;h=f90465f925a28bdeacf153fa9aeb0940ded58411;hp=5f4f8159617e695e930b5c142708834166fbb985;hb=644a3b79d10db706024055a20977b31bc88a6251;hpb=a35fe9ae149bd4c7089d48665ccc8bff580bce53 diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend index 5f4f815961..f90465f925 100644 --- a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend @@ -1,377 +1,404 @@ -package org.opendaylight.controller.md.sal.common.impl.service - -import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler -import org.opendaylight.controller.md.sal.common.api.TransactionStatus -import org.opendaylight.controller.md.sal.common.api.data.DataReader -import org.opendaylight.yangtools.concepts.AbstractObjectRegistration -import org.opendaylight.yangtools.concepts.ListenerRegistration -import com.google.common.collect.Multimap -import static com.google.common.base.Preconditions.*; -import java.util.List -import com.google.common.collect.HashMultimap -import java.util.concurrent.ExecutorService -import java.util.concurrent.Callable -import org.opendaylight.yangtools.yang.common.RpcResult -import java.util.Collections -import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction -import java.util.ArrayList -import org.opendaylight.yangtools.concepts.CompositeObjectRegistration -import java.util.Arrays -import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService -import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory -import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher -import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener -import org.opendaylight.controller.sal.common.util.Rpcs -import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification -import java.util.concurrent.Future -import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter -import org.opendaylight.yangtools.concepts.Path -import org.slf4j.LoggerFactory -import java.util.HashSet -import java.util.Collection -import com.google.common.collect.FluentIterable; -import java.util.Set -import com.google.common.collect.ImmutableList -import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration -import org.opendaylight.controller.md.sal.common.api.RegistrationListener -import org.opendaylight.yangtools.concepts.util.ListenerRegistry -import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent - -abstract class AbstractDataBroker

, D, DCL extends DataChangeListener> implements DataModificationTransactionFactory, // -DataReader, // -DataChangePublisher, // -DataProvisionService { - - private static val LOG = LoggerFactory.getLogger(AbstractDataBroker); - - @Property - var ExecutorService executor; - - @Property - var AbstractDataReadRouter dataReadRouter; - - Multimap> listeners = HashMultimap.create(); - Multimap> commitHandlers = HashMultimap.create(); - - val ListenerRegistry>> commitHandlerRegistrationListeners = new ListenerRegistry(); - public new() { - } - - protected def /*Iterator>,D>>*/ affectedCommitHandlers( - HashSet

paths) { - return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] // - .transformAndConcat[value] // - .transform[instance].toList() - } - - override final readConfigurationData(P path) { - return dataReadRouter.readConfigurationData(path); - } - - override final readOperationalData(P path) { - return dataReadRouter.readOperationalData(path); - } - - override final registerCommitHandler(P path, DataCommitHandler commitHandler) { - val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this); - commitHandlers.put(path, registration) - LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path); - for(listener : commitHandlerRegistrationListeners) { - try { - listener.instance.onRegister(registration); - } catch (Exception e) { - LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e); - } - } - return registration; - } - - override final def registerDataChangeListener(P path, DCL listener) { - val reg = new DataChangeListenerRegistration(path, listener, this); - listeners.put(path, reg); - val initialConfig = dataReadRouter.readConfigurationData(path); - val initialOperational = dataReadRouter.readOperationalData(path); - val event = createInitialListenerEvent(path,initialConfig,initialOperational); - listener.onDataChanged(event); - return reg; - } - - final def registerDataReader(P path, DataReader reader) { - - val confReg = dataReadRouter.registerConfigurationReader(path, reader); - val dataReg = dataReadRouter.registerOperationalReader(path, reader); - - return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg)); - } - - override registerCommitHandlerListener(RegistrationListener> commitHandlerListener) { - val ret = commitHandlerRegistrationListeners.register(commitHandlerListener); - - return ret; - } - - protected def DataChangeEvent createInitialListenerEvent(P path,D initialConfig,D initialOperational) { - return new InitialDataChangeEventImpl(initialConfig,initialOperational); - - } - - protected final def removeListener(DataChangeListenerRegistration registration) { - listeners.remove(registration.path, registration); - } - - protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl registration) { - commitHandlers.remove(registration.path, registration); - - LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path); - for(listener : commitHandlerRegistrationListeners) { - try { - listener.instance.onUnregister(registration); - } catch (Exception e) { - LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e); - } - } - } - - protected final def getActiveCommitHandlers() { - return commitHandlers.entries; - } - - protected def /*Iterator>,D>>*/ affectedListenersWithInitialState( - HashSet

paths) { - return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [ - val operationalState = readOperationalData(key) - val configurationState = readConfigurationData(key) - return new ListenerStateCapture(key, value, operationalState, configurationState) - ].toList() - } - - protected def boolean isAffectedBy(P key, Set

paths) { - if (paths.contains(key)) { - return true; - } - for (path : paths) { - if (key.contains(path)) { - return true; - } - } - - return false; - } - - package final def Future> commit(AbstractDataTransaction transaction) { - checkNotNull(transaction); - transaction.changeStatus(TransactionStatus.SUBMITED); - val task = new TwoPhaseCommit(transaction, this); - return executor.submit(task); - } - -} - -@Data -package class ListenerStateCapture

, D, DCL extends DataChangeListener> { - - @Property - P path; - - @Property - Collection> listeners; - - @Property - D initialOperationalState; - - @Property - D initialConfigurationState; -} - -package class DataChangeListenerRegistration

, D, DCL extends DataChangeListener> extends AbstractObjectRegistration implements ListenerRegistration { - - AbstractDataBroker dataBroker; - - @Property - val P path; - - new(P path, DCL instance, AbstractDataBroker broker) { - super(instance) - dataBroker = broker; - _path = path; - } - - override protected removeRegistration() { - dataBroker.removeListener(this); - dataBroker = null; - } - -} - -package class DataCommitHandlerRegistrationImpl

, D> // -extends AbstractObjectRegistration> // -implements DataCommitHandlerRegistration { - - AbstractDataBroker dataBroker; - - @Property - val P path; - - new(P path, DataCommitHandler instance, AbstractDataBroker broker) { - super(instance) - dataBroker = broker; - _path = path; - } - - override protected removeRegistration() { - dataBroker.removeCommitHandler(this); - dataBroker = null; - } -} - -package class TwoPhaseCommit

, D, DCL extends DataChangeListener> implements Callable> { - - private static val log = LoggerFactory.getLogger(TwoPhaseCommit); - - val AbstractDataTransaction transaction; - val AbstractDataBroker dataBroker; - - new(AbstractDataTransaction transaction, AbstractDataBroker broker) { - this.transaction = transaction; - this.dataBroker = broker; - } - - override call() throws Exception { - - // get affected paths - val affectedPaths = new HashSet

(); - - affectedPaths.addAll(transaction.createdConfigurationData.keySet); - affectedPaths.addAll(transaction.updatedConfigurationData.keySet); - affectedPaths.addAll(transaction.removedConfigurationData); - - affectedPaths.addAll(transaction.createdOperationalData.keySet); - affectedPaths.addAll(transaction.updatedOperationalData.keySet); - affectedPaths.addAll(transaction.removedOperationalData); - - val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths); - - val transactionId = transaction.identifier; - - log.info("Transaction: {} Started.",transactionId); - // requesting commits - val Iterable> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths); - val List> handlerTransactions = new ArrayList(); - try { - for (handler : commitHandlers) { - handlerTransactions.add(handler.requestCommit(transaction)); - } - } catch (Exception e) { - log.error("Transaction: {} Request Commit failed", transactionId,e); - return rollback(handlerTransactions, e); - } - val List> results = new ArrayList(); - try { - for (subtransaction : handlerTransactions) { - results.add(subtransaction.finish()); - } - listeners.publishDataChangeEvent(); - } catch (Exception e) { - log.error("Transaction: {} Finish Commit failed",transactionId, e); - return rollback(handlerTransactions, e); - } - log.info("Transaction: {} Finished succesfully.",transactionId); - return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet()); - - } - - def void publishDataChangeEvent(ImmutableList> listeners) { - for (listenerSet : listeners) { - val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path); - val updatedOperational = dataBroker.readOperationalData(listenerSet.path); - - val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState, - listenerSet.initialOperationalState, updatedOperational, updatedConfiguration); - for (listener : listenerSet.listeners) { - try { - listener.instance.onDataChanged(changeEvent); - - } catch (Exception e) { - e.printStackTrace(); - } - } - } - } - - def rollback(List> transactions, Exception e) { - for (transaction : transactions) { - transaction.rollback() - } - - // FIXME return encountered error. - return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet()); - } -} - -public abstract class AbstractDataTransaction

, D> extends AbstractDataModification { - - @Property - private val Object identifier; - - var TransactionStatus status; - - var AbstractDataBroker broker; - - protected new(Object identifier,AbstractDataBroker dataBroker) { - super(dataBroker); - _identifier = identifier; - broker = dataBroker; - status = TransactionStatus.NEW; - - //listeners = new ListenerRegistry<>(); - } - - override commit() { - return broker.commit(this); - } - - override readConfigurationData(P path) { - return broker.readConfigurationData(path); - } - - override readOperationalData(P path) { - return broker.readOperationalData(path); - } - - override hashCode() { - return identifier.hashCode; - } - - override equals(Object obj) { - if (this === obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - val other = (obj as AbstractDataTransaction); - if (broker == null) { - if (other.broker != null) - return false; - } else if (!broker.equals(other.broker)) - return false; - if (identifier == null) { - if (other.identifier != null) - return false; - } else if (!identifier.equals(other.identifier)) - return false; - return true; - } - - override TransactionStatus getStatus() { - return status; - } - - protected abstract def void onStatusChange(TransactionStatus status); - - public def changeStatus(TransactionStatus status) { - this.status = status; - onStatusChange(status); - } - -} +package org.opendaylight.controller.md.sal.common.impl.service + +import com.google.common.collect.FluentIterable +import com.google.common.collect.HashMultimap +import com.google.common.collect.ImmutableList +import com.google.common.collect.Multimap +import java.util.ArrayList +import java.util.Arrays +import java.util.Collection +import java.util.Collections +import java.util.HashSet +import java.util.List +import java.util.Set +import java.util.concurrent.Callable +import java.util.concurrent.ExecutorService +import java.util.concurrent.Future +import java.util.concurrent.atomic.AtomicLong +import org.opendaylight.controller.md.sal.common.api.RegistrationListener +import org.opendaylight.controller.md.sal.common.api.TransactionStatus +import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener +import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration +import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory +import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService +import org.opendaylight.controller.md.sal.common.api.data.DataReader +import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification +import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter +import org.opendaylight.controller.sal.common.util.Rpcs +import org.opendaylight.yangtools.concepts.AbstractObjectRegistration +import org.opendaylight.yangtools.concepts.CompositeObjectRegistration +import org.opendaylight.yangtools.concepts.ListenerRegistration +import org.opendaylight.yangtools.concepts.Path +import org.opendaylight.yangtools.concepts.util.ListenerRegistry +import org.opendaylight.yangtools.yang.common.RpcResult +import org.slf4j.LoggerFactory + +import static com.google.common.base.Preconditions.* import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent + +abstract class AbstractDataBroker

, D, DCL extends DataChangeListener> implements DataModificationTransactionFactory, // +DataReader, // +DataChangePublisher, // +DataProvisionService { + + private static val LOG = LoggerFactory.getLogger(AbstractDataBroker); + + @Property + var ExecutorService executor; + + @Property + var AbstractDataReadRouter dataReadRouter; + + @Property + private val AtomicLong submittedTransactionsCount = new AtomicLong; + + @Property + private val AtomicLong failedTransactionsCount = new AtomicLong + + @Property + private val AtomicLong finishedTransactionsCount = new AtomicLong + + Multimap> listeners = HashMultimap.create(); + Multimap> commitHandlers = HashMultimap.create(); + + val ListenerRegistry>> commitHandlerRegistrationListeners = new ListenerRegistry(); + public new() { + } + + protected def /*Iterator>,D>>*/ affectedCommitHandlers( + HashSet

paths) { + return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] // + .transformAndConcat[value] // + .transform[instance].toList() + } + + override final readConfigurationData(P path) { + return dataReadRouter.readConfigurationData(path); + } + + override final readOperationalData(P path) { + return dataReadRouter.readOperationalData(path); + } + + override final registerCommitHandler(P path, DataCommitHandler commitHandler) { + val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this); + commitHandlers.put(path, registration) + LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path); + for(listener : commitHandlerRegistrationListeners) { + try { + listener.instance.onRegister(registration); + } catch (Exception e) { + LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e); + } + } + return registration; + } + + override final def registerDataChangeListener(P path, DCL listener) { + val reg = new DataChangeListenerRegistration(path, listener, this); + listeners.put(path, reg); + val initialConfig = dataReadRouter.readConfigurationData(path); + val initialOperational = dataReadRouter.readOperationalData(path); + val event = createInitialListenerEvent(path,initialConfig,initialOperational); + listener.onDataChanged(event); + return reg; + } + + final def registerDataReader(P path, DataReader reader) { + + val confReg = dataReadRouter.registerConfigurationReader(path, reader); + val dataReg = dataReadRouter.registerOperationalReader(path, reader); + + return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg)); + } + + override registerCommitHandlerListener(RegistrationListener> commitHandlerListener) { + val ret = commitHandlerRegistrationListeners.register(commitHandlerListener); + + return ret; + } + + protected def DataChangeEvent createInitialListenerEvent(P path,D initialConfig,D initialOperational) { + return new InitialDataChangeEventImpl(initialConfig,initialOperational); + + } + + protected final def removeListener(DataChangeListenerRegistration registration) { + listeners.remove(registration.path, registration); + } + + protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl registration) { + commitHandlers.remove(registration.path, registration); + + LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path); + for(listener : commitHandlerRegistrationListeners) { + try { + listener.instance.onUnregister(registration); + } catch (Exception e) { + LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e); + } + } + } + + protected final def getActiveCommitHandlers() { + return commitHandlers.entries; + } + + protected def /*Iterator>,D>>*/ affectedListenersWithInitialState( + HashSet

paths) { + return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [ + val operationalState = readOperationalData(key) + val configurationState = readConfigurationData(key) + return new ListenerStateCapture(key, value, operationalState, configurationState) + ].toList() + } + + protected def boolean isAffectedBy(P key, Set

paths) { + if (paths.contains(key)) { + return true; + } + for (path : paths) { + if (key.contains(path)) { + return true; + } + } + + return false; + } + + package final def Future> commit(AbstractDataTransaction transaction) { + checkNotNull(transaction); + transaction.changeStatus(TransactionStatus.SUBMITED); + val task = new TwoPhaseCommit(transaction, this); + submittedTransactionsCount.andIncrement; + return executor.submit(task); + } + +} + +@Data +package class ListenerStateCapture

, D, DCL extends DataChangeListener> { + + @Property + P path; + + @Property + Collection> listeners; + + @Property + D initialOperationalState; + + @Property + D initialConfigurationState; +} + +package class DataChangeListenerRegistration

, D, DCL extends DataChangeListener> extends AbstractObjectRegistration implements ListenerRegistration { + + AbstractDataBroker dataBroker; + + @Property + val P path; + + new(P path, DCL instance, AbstractDataBroker broker) { + super(instance) + dataBroker = broker; + _path = path; + } + + override protected removeRegistration() { + dataBroker.removeListener(this); + dataBroker = null; + } + +} + +package class DataCommitHandlerRegistrationImpl

, D> // +extends AbstractObjectRegistration> // +implements DataCommitHandlerRegistration { + + AbstractDataBroker dataBroker; + + @Property + val P path; + + new(P path, DataCommitHandler instance, AbstractDataBroker broker) { + super(instance) + dataBroker = broker; + _path = path; + } + + override protected removeRegistration() { + dataBroker.removeCommitHandler(this); + dataBroker = null; + } +} + +package class TwoPhaseCommit

, D, DCL extends DataChangeListener> implements Callable> { + + private static val log = LoggerFactory.getLogger(TwoPhaseCommit); + + val AbstractDataTransaction transaction; + val AbstractDataBroker dataBroker; + + new(AbstractDataTransaction transaction, AbstractDataBroker broker) { + this.transaction = transaction; + this.dataBroker = broker; + } + + override call() throws Exception { + + // get affected paths + val affectedPaths = new HashSet

(); + + affectedPaths.addAll(transaction.createdConfigurationData.keySet); + affectedPaths.addAll(transaction.updatedConfigurationData.keySet); + affectedPaths.addAll(transaction.removedConfigurationData); + + affectedPaths.addAll(transaction.createdOperationalData.keySet); + affectedPaths.addAll(transaction.updatedOperationalData.keySet); + affectedPaths.addAll(transaction.removedOperationalData); + + val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths); + + val transactionId = transaction.identifier; + + log.info("Transaction: {} Started.",transactionId); + // requesting commits + val Iterable> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths); + val List> handlerTransactions = new ArrayList(); + try { + for (handler : commitHandlers) { + handlerTransactions.add(handler.requestCommit(transaction)); + } + } catch (Exception e) { + log.error("Transaction: {} Request Commit failed", transactionId,e); + dataBroker.failedTransactionsCount.andIncrement + return rollback(handlerTransactions, e); + } + val List> results = new ArrayList(); + try { + for (subtransaction : handlerTransactions) { + results.add(subtransaction.finish()); + } + listeners.publishDataChangeEvent(); + } catch (Exception e) { + log.error("Transaction: {} Finish Commit failed",transactionId, e); + dataBroker.failedTransactionsCount.andIncrement + return rollback(handlerTransactions, e); + } + log.info("Transaction: {} Finished successfully.",transactionId); + dataBroker.finishedTransactionsCount.andIncrement; + return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet()); + + } + + def void publishDataChangeEvent(ImmutableList> listeners) { + for (listenerSet : listeners) { + val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path); + val updatedOperational = dataBroker.readOperationalData(listenerSet.path); + + val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState, + listenerSet.initialOperationalState, updatedOperational, updatedConfiguration); + for (listener : listenerSet.listeners) { + try { + listener.instance.onDataChanged(changeEvent); + + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + def rollback(List> transactions, Exception e) { + for (transaction : transactions) { + transaction.rollback() + } + + // FIXME return encountered error. + return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet()); + } +} + +public abstract class AbstractDataTransaction

, D> extends AbstractDataModification { + + private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction); + + @Property + private val Object identifier; + + var TransactionStatus status; + + var AbstractDataBroker broker; + + protected new(Object identifier,AbstractDataBroker dataBroker) { + super(dataBroker); + _identifier = identifier; + broker = dataBroker; + status = TransactionStatus.NEW; + LOG.debug("Transaction {} Allocated.", identifier); + + //listeners = new ListenerRegistry<>(); + } + + override commit() { + return broker.commit(this); + } + + override readConfigurationData(P path) { + val local = this.updatedConfigurationData.get(path); + if(local != null) { + return local; + } + + return broker.readConfigurationData(path); + } + + override readOperationalData(P path) { + val local = this.updatedOperationalData.get(path); + if(local != null) { + return local; + } + return broker.readOperationalData(path); + } + + override hashCode() { + return identifier.hashCode; + } + + override equals(Object obj) { + if (this === obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + val other = (obj as AbstractDataTransaction); + if (broker == null) { + if (other.broker != null) + return false; + } else if (!broker.equals(other.broker)) + return false; + if (identifier == null) { + if (other.identifier != null) + return false; + } else if (!identifier.equals(other.identifier)) + return false; + return true; + } + + override TransactionStatus getStatus() { + return status; + } + + protected abstract def void onStatusChange(TransactionStatus status); + + public def changeStatus(TransactionStatus status) { + LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status); + this.status = status; + onStatusChange(status); + } + +}