\r
import static com.google.common.base.Preconditions.*\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
import com.google.common.collect.Multimaps
+import java.util.concurrent.locks.Lock
+import java.util.concurrent.locks.ReentrantLock
abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //\r
DataReader<P, D>, //\r
\r
Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
+
+ private val Lock registrationLock = new ReentrantLock;
\r
val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();\r
public new() {\r
}\r
\r
protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(\r
- HashSet<P> paths) {\r
- return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
- .transformAndConcat[value] //\r
- .transform[instance].toList()\r
+ HashSet<P> paths) {
+ return withLock(registrationLock) [|\r
+ return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
+ .transformAndConcat[value] //\r
+ .transform[instance].toList()
+ ]\r
}\r
\r
override final readConfigurationData(P path) {\r
\r
override final readOperationalData(P path) {\r
return dataReadRouter.readOperationalData(path);\r
- }\r
-\r
- override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {\r
- val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
- commitHandlers.put(path, registration)\r
- LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
- for(listener : commitHandlerRegistrationListeners) {\r
- try {\r
- listener.instance.onRegister(registration);\r
- } catch (Exception e) {\r
- LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
- }\r
- }\r
- return registration;\r
+ }
+
+ private static def <T> withLock(Lock lock,Callable<T> method) {
+ lock.lock
+ try {
+ return method.call
+ } finally {
+ lock.unlock
+ }
+ } \r
+\r
+ override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
+ return withLock(registrationLock) [|\r
+ val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
+ commitHandlers.put(path, registration)\r
+ LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
+ for(listener : commitHandlerRegistrationListeners) {\r
+ try {\r
+ listener.instance.onRegister(registration);\r
+ } catch (Exception e) {\r
+ LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
+ }\r
+ }
+ return registration;
+ ]\r
}\r
\r
override final def registerDataChangeListener(P path, DCL listener) {\r
- val reg = new DataChangeListenerRegistration(path, listener, this);\r
- listeners.put(path, reg);\r
- val initialConfig = dataReadRouter.readConfigurationData(path);\r
- val initialOperational = dataReadRouter.readOperationalData(path);\r
- val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
- listener.onDataChanged(event);\r
- return reg;\r
+ return withLock(registrationLock) [|
+ val reg = new DataChangeListenerRegistration(path, listener, this);\r
+ listeners.put(path, reg);\r
+ val initialConfig = dataReadRouter.readConfigurationData(path);\r
+ val initialOperational = dataReadRouter.readOperationalData(path);\r
+ val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
+ listener.onDataChanged(event);\r
+ return reg;
+ ]\r
}\r
\r
final def registerDataReader(P path, DataReader<P, D> reader) {\r
-\r
- val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
- val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
-\r
- return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));\r
+ return withLock(registrationLock) [|\r
+ val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
+ val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
+ \r
+ return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
+ ]\r
}\r
\r
override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {\r
val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);\r
- \r
return ret;\r
}\r
\r
\r
}\r
\r
- protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {\r
- listeners.remove(registration.path, registration);\r
+ protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
+ return withLock(registrationLock) [|\r
+ listeners.remove(registration.path, registration);
+ ]\r
}\r
\r
protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {\r
- commitHandlers.remove(registration.path, registration);\r
- \r
- LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
- for(listener : commitHandlerRegistrationListeners) {\r
- try {\r
- listener.instance.onUnregister(registration);\r
- } catch (Exception e) {\r
- LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
- }\r
- }\r
+ return withLock(registrationLock) [|
+ commitHandlers.remove(registration.path, registration);\r
+ LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
+ for(listener : commitHandlerRegistrationListeners) {\r
+ try {\r
+ listener.instance.onUnregister(registration);\r
+ } catch (Exception e) {\r
+ LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
+ }\r
+ }
+ return null;
+ ]\r
}\r
\r
protected final def getActiveCommitHandlers() {\r
}\r
\r
protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(\r
- HashSet<P> paths) {\r
- return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
- val operationalState = readOperationalData(key)\r
- val configurationState = readConfigurationData(key)\r
- return new ListenerStateCapture(key, value, operationalState, configurationState)\r
- ].toList()\r
+ HashSet<P> paths) {
+ return withLock(registrationLock) [|\r
+ return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
+ val operationalState = readOperationalData(key)\r
+ val configurationState = readConfigurationData(key)\r
+ return new ListenerStateCapture(key, value, operationalState, configurationState)\r
+ ].toList()
+ ]\r
}\r
\r
protected def boolean isAffectedBy(P key, Set<P> paths) {\r
affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
affectedPaths.addAll(transaction.removedOperationalData);\r
-\r
+
val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
\r
val transactionId = transaction.identifier;\r
\r
log.trace("Transaction: {} Started.",transactionId);\r
+ log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths);
// requesting commits\r
val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
} catch (Exception e) {\r
log.error("Transaction: {} Request Commit failed", transactionId,e);\r
dataBroker.failedTransactionsCount.andIncrement\r
+ transaction.changeStatus(TransactionStatus.FAILED)
return rollback(handlerTransactions, e);\r
}\r
val List<RpcResult<Void>> results = new ArrayList();\r
listeners.publishDataChangeEvent();\r
} catch (Exception e) {\r
log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
- dataBroker.failedTransactionsCount.andIncrement\r
+ dataBroker.failedTransactionsCount.andIncrement
+ transaction.changeStatus(TransactionStatus.FAILED)\r
return rollback(handlerTransactions, e);\r
}\r
log.trace("Transaction: {} Finished successfully.",transactionId);\r
- dataBroker.finishedTransactionsCount.andIncrement;\r
+ dataBroker.finishedTransactionsCount.andIncrement;
+ transaction.changeStatus(TransactionStatus.COMMITED)\r
return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
\r
}\r