X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-common-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fcommon%2Fimpl%2Fservice%2FAbstractDataBroker.xtend;h=e3d2b567a71f42b202bf028db6f0d8dcd5f70ae2;hp=b878071183e14937a63a1ce8981bcfaa5785566d;hb=d6e3e28bf86638685e55289d6cd9cb749838a75e;hpb=582da55f82ee5d83af2e7a327044c62ef3a76285 diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend index b878071183..e3d2b567a7 100644 --- a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend @@ -27,25 +27,43 @@ import java.util.concurrent.Future import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter import org.opendaylight.yangtools.concepts.Path import org.slf4j.LoggerFactory - -abstract class AbstractDataBroker

,D,DCL extends DataChangeListener> implements -DataModificationTransactionFactory, // +import java.util.HashSet +import java.util.Map.Entry +import java.util.Iterator +import java.util.Collection +import com.google.common.collect.FluentIterable; +import java.util.Set +import com.google.common.collect.ImmutableList +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration +import org.opendaylight.controller.md.sal.common.api.RegistrationListener +import org.opendaylight.yangtools.concepts.util.ListenerRegistry +import java.util.concurrent.atomic.AtomicLong + +abstract class AbstractDataBroker

, D, DCL extends DataChangeListener> implements DataModificationTransactionFactory, // DataReader, // DataChangePublisher, // -DataProvisionService { +DataProvisionService { + + private static val LOG = LoggerFactory.getLogger(AbstractDataBroker); @Property var ExecutorService executor; @Property - var AbstractDataReadRouter dataReadRouter; - - Multimap> listeners = HashMultimap.create(); - Multimap> commitHandlers = HashMultimap.create(); - + var AbstractDataReadRouter dataReadRouter; + Multimap> listeners = HashMultimap.create(); + Multimap> commitHandlers = HashMultimap.create(); + + val ListenerRegistry>> commitHandlerRegistrationListeners = new ListenerRegistry(); public new() { - + } + + protected def /*Iterator>,D>>*/ affectedCommitHandlers( + HashSet

paths) { + return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] // + .transformAndConcat[value] // + .transform[instance].toList() } override final readConfigurationData(P path) { @@ -56,11 +74,18 @@ DataProvisionService { return dataReadRouter.readOperationalData(path); } - override final registerCommitHandler(P path, - DataCommitHandler commitHandler) { - val registration = new DataCommitHandlerRegistration(path,commitHandler,this); - commitHandlers.put(path,registration) - return registration; + override final registerCommitHandler(P path, DataCommitHandler commitHandler) { + val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this); + commitHandlers.put(path, registration) + LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path); + for(listener : commitHandlerRegistrationListeners) { + try { + listener.instance.onRegister(registration); + } catch (Exception e) { + LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e); + } + } + return registration; } override final def registerDataChangeListener(P path, DCL listener) { @@ -69,27 +94,65 @@ DataProvisionService { return reg; } - final def registerDataReader(P path,DataReader reader) { - - val confReg = dataReadRouter.registerConfigurationReader(path,reader); - val dataReg = dataReadRouter.registerOperationalReader(path,reader); + final def registerDataReader(P path, DataReader reader) { + + val confReg = dataReadRouter.registerConfigurationReader(path, reader); + val dataReg = dataReadRouter.registerOperationalReader(path, reader); + + return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg)); + } + + override registerCommitHandlerListener(RegistrationListener> commitHandlerListener) { + val ret = commitHandlerRegistrationListeners.register(commitHandlerListener); - return new CompositeObjectRegistration(reader,Arrays.asList(confReg,dataReg)); + return ret; } + - protected final def removeListener(DataChangeListenerRegistration registration) { + protected final def removeListener(DataChangeListenerRegistration registration) { listeners.remove(registration.path, registration); } - protected final def removeCommitHandler(DataCommitHandlerRegistration registration) { + protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl registration) { commitHandlers.remove(registration.path, registration); + + LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path); + for(listener : commitHandlerRegistrationListeners) { + try { + listener.instance.onUnregister(registration); + } catch (Exception e) { + LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e); + } + } } - - protected final def getActiveCommitHandlers() { - return commitHandlers.entries.map[ value.instance].toSet + + protected final def getActiveCommitHandlers() { + return commitHandlers.entries; + } + + protected def /*Iterator>,D>>*/ affectedListenersWithInitialState( + HashSet

paths) { + return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [ + val operationalState = readOperationalData(key) + val configurationState = readConfigurationData(key) + return new ListenerStateCapture(key, value, operationalState, configurationState) + ].toList() + } + + protected def boolean isAffectedBy(P key, Set

paths) { + if (paths.contains(key)) { + return true; + } + for (path : paths) { + if (key.contains(path)) { + return true; + } + } + + return false; } - package final def Future> commit(AbstractDataTransaction transaction) { + package final def Future> commit(AbstractDataTransaction transaction) { checkNotNull(transaction); transaction.changeStatus(TransactionStatus.SUBMITED); val task = new TwoPhaseCommit(transaction, this); @@ -98,14 +161,30 @@ DataProvisionService { } -package class DataChangeListenerRegistration

,D,DCL extends DataChangeListener> extends AbstractObjectRegistration implements ListenerRegistration { +@Data +package class ListenerStateCapture

, D, DCL extends DataChangeListener> { + + @Property + P path; + + @Property + Collection> listeners; + + @Property + D initialOperationalState; + + @Property + D initialConfigurationState; +} + +package class DataChangeListenerRegistration

, D, DCL extends DataChangeListener> extends AbstractObjectRegistration implements ListenerRegistration { - AbstractDataBroker dataBroker; + AbstractDataBroker dataBroker; @Property val P path; - new(P path, DCL instance, AbstractDataBroker broker) { + new(P path, DCL instance, AbstractDataBroker broker) { super(instance) dataBroker = broker; _path = path; @@ -118,16 +197,16 @@ package class DataChangeListenerRegistration

,D,DCL extends Dat } -package class DataCommitHandlerRegistration

,D> -extends AbstractObjectRegistration> { +package class DataCommitHandlerRegistrationImpl

, D> // +extends AbstractObjectRegistration> // +implements DataCommitHandlerRegistration { - AbstractDataBroker dataBroker; + AbstractDataBroker dataBroker; @Property val P path; - new(P path, DataCommitHandler instance, - AbstractDataBroker broker) { + new(P path, DataCommitHandler instance, AbstractDataBroker broker) { super(instance) dataBroker = broker; _path = path; @@ -137,52 +216,87 @@ extends AbstractObjectRegistration> { dataBroker.removeCommitHandler(this); dataBroker = null; } - } -package class TwoPhaseCommit

,D> implements Callable> { - +package class TwoPhaseCommit

, D, DCL extends DataChangeListener> implements Callable> { + private static val log = LoggerFactory.getLogger(TwoPhaseCommit); - val AbstractDataTransaction transaction; - val AbstractDataBroker dataBroker; + val AbstractDataTransaction transaction; + val AbstractDataBroker dataBroker; - new(AbstractDataTransaction transaction, AbstractDataBroker broker) { + new(AbstractDataTransaction transaction, AbstractDataBroker broker) { this.transaction = transaction; this.dataBroker = broker; } override call() throws Exception { - val Iterable> commitHandlers = dataBroker.activeCommitHandlers; + // get affected paths + val affectedPaths = new HashSet

(); + + affectedPaths.addAll(transaction.createdConfigurationData.keySet); + affectedPaths.addAll(transaction.updatedConfigurationData.keySet); + affectedPaths.addAll(transaction.removedConfigurationData); + + affectedPaths.addAll(transaction.createdOperationalData.keySet); + affectedPaths.addAll(transaction.updatedOperationalData.keySet); + affectedPaths.addAll(transaction.removedOperationalData); + + val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths); + val transactionId = transaction.identifier; + + log.info("Transaction: {} Started.",transactionId); // requesting commits + val Iterable> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths); val List> handlerTransactions = new ArrayList(); try { for (handler : commitHandlers) { handlerTransactions.add(handler.requestCommit(transaction)); } } catch (Exception e) { - log.error("Request Commit failded",e); - return rollback(handlerTransactions,e); + log.error("Transaction: {} Request Commit failed", transactionId,e); + return rollback(handlerTransactions, e); } val List> results = new ArrayList(); try { for (subtransaction : handlerTransactions) { results.add(subtransaction.finish()); } + listeners.publishDataChangeEvent(); } catch (Exception e) { - log.error("Finish Commit failed",e); - return rollback(handlerTransactions,e); + log.error("Transaction: {} Finish Commit failed",transactionId, e); + return rollback(handlerTransactions, e); } - + log.info("Transaction: {} Finished succesfully.",transactionId); return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet()); + } - def rollback(List> transactions,Exception e) { + def void publishDataChangeEvent(ImmutableList> listeners) { + for (listenerSet : listeners) { + val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path); + val updatedOperational = dataBroker.readOperationalData(listenerSet.path); + + val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState, + listenerSet.initialOperationalState, updatedOperational, updatedConfiguration); + for (listener : listenerSet.listeners) { + try { + listener.instance.onDataChanged(changeEvent); + + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + def rollback(List> transactions, Exception e) { for (transaction : transactions) { transaction.rollback() } + // FIXME return encountered error. return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet()); } @@ -193,21 +307,20 @@ public abstract class AbstractDataTransaction

, D> extends Abst @Property private val Object identifier; - var TransactionStatus status; - - + var AbstractDataBroker broker; - protected new (AbstractDataBroker dataBroker) { + protected new(Object identifier,AbstractDataBroker dataBroker) { super(dataBroker); - _identifier = new Object(); + _identifier = identifier; broker = dataBroker; status = TransactionStatus.NEW; - //listeners = new ListenerRegistry<>(); + + //listeners = new ListenerRegistry<>(); } - override commit() { + override commit() { return broker.commit(this); } @@ -230,7 +343,7 @@ public abstract class AbstractDataTransaction

, D> extends Abst return false; if (getClass() != obj.getClass()) return false; - val other = (obj as AbstractDataTransaction) ; + val other = (obj as AbstractDataTransaction); if (broker == null) { if (other.broker != null) return false; @@ -248,12 +361,11 @@ public abstract class AbstractDataTransaction

, D> extends Abst return status; } - protected abstract def void onStatusChange(TransactionStatus status); - + public def changeStatus(TransactionStatus status) { this.status = status; onStatusChange(status); } - + }