X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-common-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fcommon%2Fimpl%2Fservice%2FAbstractDataBroker.xtend;h=7c6f52f110fd1771650c9670c43a8136d8999a2b;hp=0c8f6109ed843ac84bdc5d4fef814e1e98518c7d;hb=3948bedd0129e44c0943bd77c91806425645cd72;hpb=ba4f2e7197ba694576dda2c163f0b2edc3552c8f diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend index 0c8f6109ed..7c6f52f110 100644 --- a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend @@ -45,6 +45,8 @@ import org.slf4j.LoggerFactory import static com.google.common.base.Preconditions.* import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent import com.google.common.collect.Multimaps +import java.util.concurrent.locks.Lock +import java.util.concurrent.locks.ReentrantLock abstract class AbstractDataBroker
, D, DCL extends DataChangeListener
> implements DataModificationTransactionFactory
, // DataReader
, // @@ -70,16 +72,20 @@ DataProvisionService
{ Multimap
> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create()); Multimap
> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());
+
+ private val Lock registrationLock = new ReentrantLock;
val ListenerRegistry paths) {
- return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
- .transformAndConcat[value] //
- .transform[instance].toList()
+ HashSet paths) {
+ return withLock(registrationLock) [|
+ return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
+ .transformAndConcat[value] //
+ .transform[instance].toList()
+ ]
}
override final readConfigurationData(P path) {
@@ -88,43 +94,56 @@ DataProvisionService {
override final readOperationalData(P path) {
return dataReadRouter.readOperationalData(path);
- }
-
- override final registerCommitHandler(P path, DataCommitHandler commitHandler) {
- val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
- commitHandlers.put(path, registration)
- LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);
- for(listener : commitHandlerRegistrationListeners) {
- try {
- listener.instance.onRegister(registration);
- } catch (Exception e) {
- LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
- }
- }
- return registration;
+ }
+
+ private static def commitHandler) {
+ return withLock(registrationLock) [|
+ val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
+ commitHandlers.put(path, registration)
+ LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);
+ for(listener : commitHandlerRegistrationListeners) {
+ try {
+ listener.instance.onRegister(registration);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
+ }
+ }
+ return registration;
+ ]
}
override final def registerDataChangeListener(P path, DCL listener) {
- val reg = new DataChangeListenerRegistration(path, listener, this);
- listeners.put(path, reg);
- val initialConfig = dataReadRouter.readConfigurationData(path);
- val initialOperational = dataReadRouter.readOperationalData(path);
- val event = createInitialListenerEvent(path,initialConfig,initialOperational);
- listener.onDataChanged(event);
- return reg;
+ return withLock(registrationLock) [|
+ val reg = new DataChangeListenerRegistration(path, listener, this);
+ listeners.put(path, reg);
+ val initialConfig = dataReadRouter.readConfigurationData(path);
+ val initialOperational = dataReadRouter.readOperationalData(path);
+ val event = createInitialListenerEvent(path,initialConfig,initialOperational);
+ listener.onDataChanged(event);
+ return reg;
+ ]
}
final def registerDataReader(P path, DataReader reader) {
-
- val confReg = dataReadRouter.registerConfigurationReader(path, reader);
- val dataReg = dataReadRouter.registerOperationalReader(path, reader);
-
- return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
+ return withLock(registrationLock) [|
+ val confReg = dataReadRouter.registerConfigurationReader(path, reader);
+ val dataReg = dataReadRouter.registerOperationalReader(path, reader);
+
+ return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
+ ]
}
override registerCommitHandlerListener(RegistrationListener {
}
- protected final def removeListener(DataChangeListenerRegistration registration) {
- listeners.remove(registration.path, registration);
+ protected final def removeListener(DataChangeListenerRegistration registration) {
+ return withLock(registrationLock) [|
+ listeners.remove(registration.path, registration);
+ ]
}
protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl registration) {
- commitHandlers.remove(registration.path, registration);
-
- LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
- for(listener : commitHandlerRegistrationListeners) {
- try {
- listener.instance.onUnregister(registration);
- } catch (Exception e) {
- LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
- }
- }
+ return withLock(registrationLock) [|
+ commitHandlers.remove(registration.path, registration);
+ LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
+ for(listener : commitHandlerRegistrationListeners) {
+ try {
+ listener.instance.onUnregister(registration);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
+ }
+ }
+ return null;
+ ]
}
protected final def getActiveCommitHandlers() {
@@ -155,12 +178,14 @@ DataProvisionService {
}
protected def /*Iterator paths) {
- return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
- val operationalState = readOperationalData(key)
- val configurationState = readConfigurationData(key)
- return new ListenerStateCapture(key, value, operationalState, configurationState)
- ].toList()
+ HashSet paths) {
+ return withLock(registrationLock) [|
+ return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
+ val operationalState = readOperationalData(key)
+ val configurationState = readConfigurationData(key)
+ return new ListenerStateCapture(key, value, operationalState, configurationState)
+ ].toList()
+ ]
}
protected def boolean isAffectedBy(P key, Set paths) {
@@ -267,12 +292,13 @@ package class TwoPhaseCommit , D, DCL extends DataChangeListene
affectedPaths.addAll(transaction.createdOperationalData.keySet);
affectedPaths.addAll(transaction.updatedOperationalData.keySet);
affectedPaths.addAll(transaction.removedOperationalData);
-
+
val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
val transactionId = transaction.identifier;
log.trace("Transaction: {} Started.",transactionId);
+ log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths);
// requesting commits
val Iterable , D, DCL extends DataChangeListene
} catch (Exception e) {
log.error("Transaction: {} Request Commit failed", transactionId,e);
dataBroker.failedTransactionsCount.andIncrement
+ transaction.changeStatus(TransactionStatus.FAILED)
return rollback(handlerTransactions, e);
}
val List , D, DCL extends DataChangeListene
listeners.publishDataChangeEvent();
} catch (Exception e) {
log.error("Transaction: {} Finish Commit failed",transactionId, e);
- dataBroker.failedTransactionsCount.andIncrement
+ dataBroker.failedTransactionsCount.andIncrement
+ transaction.changeStatus(TransactionStatus.FAILED)
return rollback(handlerTransactions, e);
}
log.trace("Transaction: {} Finished successfully.",transactionId);
- dataBroker.finishedTransactionsCount.andIncrement;
+ dataBroker.finishedTransactionsCount.andIncrement;
+ transaction.changeStatus(TransactionStatus.COMMITED)
return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
}