X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-common-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fcommon%2Fimpl%2Fservice%2FAbstractDataBroker.xtend;fp=opendaylight%2Fmd-sal%2Fsal-common-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fcommon%2Fimpl%2Fservice%2FAbstractDataBroker.xtend;h=e3d2b567a71f42b202bf028db6f0d8dcd5f70ae2;hp=74c4e0a148640a2278deee8a87bf67c4c27eb334;hb=cf4834ff659cecdeb08a247679dfbf6b10f4ea73;hpb=a9e05354c351d3d88457892e28e2f01993d53142 diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend index 74c4e0a148..e3d2b567a7 100644 --- a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend @@ -34,12 +34,18 @@ import java.util.Collection import com.google.common.collect.FluentIterable; import java.util.Set import com.google.common.collect.ImmutableList +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration +import org.opendaylight.controller.md.sal.common.api.RegistrationListener +import org.opendaylight.yangtools.concepts.util.ListenerRegistry +import java.util.concurrent.atomic.AtomicLong abstract class AbstractDataBroker

, D, DCL extends DataChangeListener> implements DataModificationTransactionFactory, // DataReader, // DataChangePublisher, // DataProvisionService { + private static val LOG = LoggerFactory.getLogger(AbstractDataBroker); + @Property var ExecutorService executor; @@ -47,17 +53,17 @@ DataProvisionService { var AbstractDataReadRouter dataReadRouter; Multimap> listeners = HashMultimap.create(); - Multimap> commitHandlers = HashMultimap.create(); - + Multimap> commitHandlers = HashMultimap.create(); + + val ListenerRegistry>> commitHandlerRegistrationListeners = new ListenerRegistry(); public new() { } protected def /*Iterator>,D>>*/ affectedCommitHandlers( HashSet

paths) { - return FluentIterable.from(commitHandlers.asMap.entrySet) - .filter[key.isAffectedBy(paths)] // - .transformAndConcat [value] // - .transform[instance].toList() + return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] // + .transformAndConcat[value] // + .transform[instance].toList() } override final readConfigurationData(P path) { @@ -69,8 +75,16 @@ DataProvisionService { } override final registerCommitHandler(P path, DataCommitHandler commitHandler) { - val registration = new DataCommitHandlerRegistration(path, commitHandler, this); + val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this); commitHandlers.put(path, registration) + LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path); + for(listener : commitHandlerRegistrationListeners) { + try { + listener.instance.onRegister(registration); + } catch (Exception e) { + LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e); + } + } return registration; } @@ -87,13 +101,29 @@ DataProvisionService { return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg)); } + + override registerCommitHandlerListener(RegistrationListener> commitHandlerListener) { + val ret = commitHandlerRegistrationListeners.register(commitHandlerListener); + + return ret; + } + protected final def removeListener(DataChangeListenerRegistration registration) { listeners.remove(registration.path, registration); } - protected final def removeCommitHandler(DataCommitHandlerRegistration registration) { + protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl registration) { commitHandlers.remove(registration.path, registration); + + LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path); + for(listener : commitHandlerRegistrationListeners) { + try { + listener.instance.onUnregister(registration); + } catch (Exception e) { + LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e); + } + } } protected final def getActiveCommitHandlers() { @@ -132,7 +162,7 @@ DataProvisionService { } @Data -package class ListenerStateCapture

, D,DCL extends DataChangeListener> { +package class ListenerStateCapture

, D, DCL extends DataChangeListener> { @Property P path; @@ -167,7 +197,9 @@ package class DataChangeListenerRegistration

, D, DCL extends D } -package class DataCommitHandlerRegistration

, D> extends AbstractObjectRegistration> { +package class DataCommitHandlerRegistrationImpl

, D> // +extends AbstractObjectRegistration> // +implements DataCommitHandlerRegistration { AbstractDataBroker dataBroker; @@ -184,10 +216,9 @@ package class DataCommitHandlerRegistration

, D> extends Abstra dataBroker.removeCommitHandler(this); dataBroker = null; } - } -package class TwoPhaseCommit

, D,DCL extends DataChangeListener> implements Callable> { +package class TwoPhaseCommit

, D, DCL extends DataChangeListener> implements Callable> { private static val log = LoggerFactory.getLogger(TwoPhaseCommit); @@ -214,15 +245,18 @@ package class TwoPhaseCommit

, D,DCL extends DataChangeListener val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths); + val transactionId = transaction.identifier; + + log.info("Transaction: {} Started.",transactionId); // requesting commits - val Iterable> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths); + val Iterable> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths); val List> handlerTransactions = new ArrayList(); try { for (handler : commitHandlers) { handlerTransactions.add(handler.requestCommit(transaction)); } } catch (Exception e) { - log.error("Request Commit failded", e); + log.error("Transaction: {} Request Commit failed", transactionId,e); return rollback(handlerTransactions, e); } val List> results = new ArrayList(); @@ -232,25 +266,25 @@ package class TwoPhaseCommit

, D,DCL extends DataChangeListener } listeners.publishDataChangeEvent(); } catch (Exception e) { - log.error("Finish Commit failed", e); + log.error("Transaction: {} Finish Commit failed",transactionId, e); return rollback(handlerTransactions, e); } - - + log.info("Transaction: {} Finished succesfully.",transactionId); return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet()); } - - def void publishDataChangeEvent(ImmutableList> listeners) { - for(listenerSet : listeners) { + + def void publishDataChangeEvent(ImmutableList> listeners) { + for (listenerSet : listeners) { val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path); val updatedOperational = dataBroker.readOperationalData(listenerSet.path); - - val changeEvent = new DataChangeEventImpl(transaction,listenerSet.initialConfigurationState,listenerSet.initialOperationalState,updatedOperational,updatedConfiguration); - for(listener : listenerSet.listeners) { + + val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState, + listenerSet.initialOperationalState, updatedOperational, updatedConfiguration); + for (listener : listenerSet.listeners) { try { listener.instance.onDataChanged(changeEvent); - + } catch (Exception e) { e.printStackTrace(); } @@ -267,6 +301,7 @@ package class TwoPhaseCommit

, D,DCL extends DataChangeListener return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet()); } } + public abstract class AbstractDataTransaction

, D> extends AbstractDataModification { @Property @@ -276,9 +311,9 @@ public abstract class AbstractDataTransaction

, D> extends Abst var AbstractDataBroker broker; - protected new(AbstractDataBroker dataBroker) { + protected new(Object identifier,AbstractDataBroker dataBroker) { super(dataBroker); - _identifier = new Object(); + _identifier = identifier; broker = dataBroker; status = TransactionStatus.NEW;