From ab26d9cc92e2ab41016e473c469dfdef885ecf6c Mon Sep 17 00:00:00 2001 From: Tony Tkacik Date: Tue, 28 Jan 2014 16:21:58 +0100 Subject: [PATCH] Added explicit locking of the data change listener / commit handler registration - This locking fixes ConcurrentModificationException in cases when transaction processing is computing affected commit handlers and data change listeners. Change-Id: Icdd9a2ea36102f681a5d82390c3b9d942560b878 Signed-off-by: Tony Tkacik --- .../impl/service/AbstractDataBroker.xtend | 133 +++++++++++------- 1 file changed, 81 insertions(+), 52 deletions(-) diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend index 927975ca53..2c3b0188f4 100644 --- a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend @@ -45,6 +45,8 @@ import org.slf4j.LoggerFactory import static com.google.common.base.Preconditions.* import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent import com.google.common.collect.Multimaps +import java.util.concurrent.locks.Lock +import java.util.concurrent.locks.ReentrantLock abstract class AbstractDataBroker

, D, DCL extends DataChangeListener> implements DataModificationTransactionFactory, // DataReader, // @@ -70,16 +72,20 @@ DataProvisionService { Multimap> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create()); Multimap> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create()); + + private val Lock registrationLock = new ReentrantLock; val ListenerRegistry>> commitHandlerRegistrationListeners = new ListenerRegistry(); public new() { } protected def /*Iterator>,D>>*/ affectedCommitHandlers( - HashSet

paths) { - return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] // - .transformAndConcat[value] // - .transform[instance].toList() + HashSet

paths) { + return withLock(registrationLock) [| + return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] // + .transformAndConcat[value] // + .transform[instance].toList() + ] } override final readConfigurationData(P path) { @@ -88,43 +94,56 @@ DataProvisionService { override final readOperationalData(P path) { return dataReadRouter.readOperationalData(path); - } - - override final registerCommitHandler(P path, DataCommitHandler commitHandler) { - val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this); - commitHandlers.put(path, registration) - LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path); - for(listener : commitHandlerRegistrationListeners) { - try { - listener.instance.onRegister(registration); - } catch (Exception e) { - LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e); - } - } - return registration; + } + + private static def withLock(Lock lock,Callable method) { + lock.lock + try { + return method.call + } finally { + lock.unlock + } + } + + override final registerCommitHandler(P path, DataCommitHandler commitHandler) { + return withLock(registrationLock) [| + val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this); + commitHandlers.put(path, registration) + LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path); + for(listener : commitHandlerRegistrationListeners) { + try { + listener.instance.onRegister(registration); + } catch (Exception e) { + LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e); + } + } + return registration; + ] } override final def registerDataChangeListener(P path, DCL listener) { - val reg = new DataChangeListenerRegistration(path, listener, this); - listeners.put(path, reg); - val initialConfig = dataReadRouter.readConfigurationData(path); - val initialOperational = dataReadRouter.readOperationalData(path); - val event = createInitialListenerEvent(path,initialConfig,initialOperational); - listener.onDataChanged(event); - return reg; + return withLock(registrationLock) [| + val reg = new DataChangeListenerRegistration(path, listener, this); + listeners.put(path, reg); + val initialConfig = dataReadRouter.readConfigurationData(path); + val initialOperational = dataReadRouter.readOperationalData(path); + val event = createInitialListenerEvent(path,initialConfig,initialOperational); + listener.onDataChanged(event); + return reg; + ] } final def registerDataReader(P path, DataReader reader) { - - val confReg = dataReadRouter.registerConfigurationReader(path, reader); - val dataReg = dataReadRouter.registerOperationalReader(path, reader); - - return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg)); + return withLock(registrationLock) [| + val confReg = dataReadRouter.registerConfigurationReader(path, reader); + val dataReg = dataReadRouter.registerOperationalReader(path, reader); + + return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg)); + ] } override registerCommitHandlerListener(RegistrationListener> commitHandlerListener) { val ret = commitHandlerRegistrationListeners.register(commitHandlerListener); - return ret; } @@ -133,21 +152,25 @@ DataProvisionService { } - protected final def removeListener(DataChangeListenerRegistration registration) { - listeners.remove(registration.path, registration); + protected final def removeListener(DataChangeListenerRegistration registration) { + return withLock(registrationLock) [| + listeners.remove(registration.path, registration); + ] } protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl registration) { - commitHandlers.remove(registration.path, registration); - - LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path); - for(listener : commitHandlerRegistrationListeners) { - try { - listener.instance.onUnregister(registration); - } catch (Exception e) { - LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e); - } - } + return withLock(registrationLock) [| + commitHandlers.remove(registration.path, registration); + LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path); + for(listener : commitHandlerRegistrationListeners) { + try { + listener.instance.onUnregister(registration); + } catch (Exception e) { + LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e); + } + } + return null; + ] } protected final def getActiveCommitHandlers() { @@ -155,12 +178,14 @@ DataProvisionService { } protected def /*Iterator>,D>>*/ affectedListenersWithInitialState( - HashSet

paths) { - return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [ - val operationalState = readOperationalData(key) - val configurationState = readConfigurationData(key) - return new ListenerStateCapture(key, value, operationalState, configurationState) - ].toList() + HashSet

paths) { + return withLock(registrationLock) [| + return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [ + val operationalState = readOperationalData(key) + val configurationState = readConfigurationData(key) + return new ListenerStateCapture(key, value, operationalState, configurationState) + ].toList() + ] } protected def boolean isAffectedBy(P key, Set

paths) { @@ -267,12 +292,13 @@ package class TwoPhaseCommit

, D, DCL extends DataChangeListene affectedPaths.addAll(transaction.createdOperationalData.keySet); affectedPaths.addAll(transaction.updatedOperationalData.keySet); affectedPaths.addAll(transaction.removedOperationalData); - + val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths); val transactionId = transaction.identifier; log.trace("Transaction: {} Started.",transactionId); + log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths); // requesting commits val Iterable> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths); val List> handlerTransactions = new ArrayList(); @@ -283,6 +309,7 @@ package class TwoPhaseCommit

, D, DCL extends DataChangeListene } catch (Exception e) { log.error("Transaction: {} Request Commit failed", transactionId,e); dataBroker.failedTransactionsCount.andIncrement + transaction.changeStatus(TransactionStatus.FAILED) return rollback(handlerTransactions, e); } val List> results = new ArrayList(); @@ -293,11 +320,13 @@ package class TwoPhaseCommit

, D, DCL extends DataChangeListene listeners.publishDataChangeEvent(); } catch (Exception e) { log.error("Transaction: {} Finish Commit failed",transactionId, e); - dataBroker.failedTransactionsCount.andIncrement + dataBroker.failedTransactionsCount.andIncrement + transaction.changeStatus(TransactionStatus.FAILED) return rollback(handlerTransactions, e); } log.trace("Transaction: {} Finished successfully.",transactionId); - dataBroker.finishedTransactionsCount.andIncrement; + dataBroker.finishedTransactionsCount.andIncrement; + transaction.changeStatus(TransactionStatus.COMMITED) return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet()); } -- 2.36.6