X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-common-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fcommon%2Fimpl%2Fservice%2FAbstractDataBroker.xtend;h=2c3b0188f48096d34ea45485fe34ec1a1711efe1;hb=ab26d9cc92e2ab41016e473c469dfdef885ecf6c;hp=f90465f925a28bdeacf153fa9aeb0940ded58411;hpb=6b98de000257414b2ae9b1db708c0f7962a0f033;p=controller.git diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend index f90465f925..2c3b0188f4 100644 --- a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend @@ -1,3 +1,10 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ package org.opendaylight.controller.md.sal.common.impl.service import com.google.common.collect.FluentIterable @@ -37,6 +44,9 @@ import org.opendaylight.yangtools.yang.common.RpcResult import org.slf4j.LoggerFactory import static com.google.common.base.Preconditions.* import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent +import com.google.common.collect.Multimaps +import java.util.concurrent.locks.Lock +import java.util.concurrent.locks.ReentrantLock abstract class AbstractDataBroker
, D, DCL extends DataChangeListener
> implements DataModificationTransactionFactory
, // DataReader
, // @@ -60,18 +70,22 @@ DataProvisionService
{ @Property private val AtomicLong finishedTransactionsCount = new AtomicLong - Multimap
> listeners = HashMultimap.create(); - Multimap
> commitHandlers = HashMultimap.create(); + Multimap
> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create()); + Multimap
> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());
+
+ private val Lock registrationLock = new ReentrantLock;
val ListenerRegistry paths) {
- return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
- .transformAndConcat[value] //
- .transform[instance].toList()
+ HashSet paths) {
+ return withLock(registrationLock) [|
+ return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
+ .transformAndConcat[value] //
+ .transform[instance].toList()
+ ]
}
override final readConfigurationData(P path) {
@@ -80,43 +94,56 @@ DataProvisionService {
override final readOperationalData(P path) {
return dataReadRouter.readOperationalData(path);
- }
-
- override final registerCommitHandler(P path, DataCommitHandler commitHandler) {
- val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
- commitHandlers.put(path, registration)
- LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);
- for(listener : commitHandlerRegistrationListeners) {
- try {
- listener.instance.onRegister(registration);
- } catch (Exception e) {
- LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
- }
- }
- return registration;
+ }
+
+ private static def commitHandler) {
+ return withLock(registrationLock) [|
+ val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
+ commitHandlers.put(path, registration)
+ LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);
+ for(listener : commitHandlerRegistrationListeners) {
+ try {
+ listener.instance.onRegister(registration);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
+ }
+ }
+ return registration;
+ ]
}
override final def registerDataChangeListener(P path, DCL listener) {
- val reg = new DataChangeListenerRegistration(path, listener, this);
- listeners.put(path, reg);
- val initialConfig = dataReadRouter.readConfigurationData(path);
- val initialOperational = dataReadRouter.readOperationalData(path);
- val event = createInitialListenerEvent(path,initialConfig,initialOperational);
- listener.onDataChanged(event);
- return reg;
+ return withLock(registrationLock) [|
+ val reg = new DataChangeListenerRegistration(path, listener, this);
+ listeners.put(path, reg);
+ val initialConfig = dataReadRouter.readConfigurationData(path);
+ val initialOperational = dataReadRouter.readOperationalData(path);
+ val event = createInitialListenerEvent(path,initialConfig,initialOperational);
+ listener.onDataChanged(event);
+ return reg;
+ ]
}
final def registerDataReader(P path, DataReader reader) {
-
- val confReg = dataReadRouter.registerConfigurationReader(path, reader);
- val dataReg = dataReadRouter.registerOperationalReader(path, reader);
-
- return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
+ return withLock(registrationLock) [|
+ val confReg = dataReadRouter.registerConfigurationReader(path, reader);
+ val dataReg = dataReadRouter.registerOperationalReader(path, reader);
+
+ return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
+ ]
}
override registerCommitHandlerListener(RegistrationListener {
}
- protected final def removeListener(DataChangeListenerRegistration registration) {
- listeners.remove(registration.path, registration);
+ protected final def removeListener(DataChangeListenerRegistration registration) {
+ return withLock(registrationLock) [|
+ listeners.remove(registration.path, registration);
+ ]
}
protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl registration) {
- commitHandlers.remove(registration.path, registration);
-
- LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
- for(listener : commitHandlerRegistrationListeners) {
- try {
- listener.instance.onUnregister(registration);
- } catch (Exception e) {
- LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
- }
- }
+ return withLock(registrationLock) [|
+ commitHandlers.remove(registration.path, registration);
+ LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
+ for(listener : commitHandlerRegistrationListeners) {
+ try {
+ listener.instance.onUnregister(registration);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
+ }
+ }
+ return null;
+ ]
}
protected final def getActiveCommitHandlers() {
@@ -147,12 +178,14 @@ DataProvisionService {
}
protected def /*Iterator paths) {
- return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
- val operationalState = readOperationalData(key)
- val configurationState = readConfigurationData(key)
- return new ListenerStateCapture(key, value, operationalState, configurationState)
- ].toList()
+ HashSet paths) {
+ return withLock(registrationLock) [|
+ return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
+ val operationalState = readOperationalData(key)
+ val configurationState = readConfigurationData(key)
+ return new ListenerStateCapture(key, value, operationalState, configurationState)
+ ].toList()
+ ]
}
protected def boolean isAffectedBy(P key, Set paths) {
@@ -259,12 +292,13 @@ package class TwoPhaseCommit , D, DCL extends DataChangeListene
affectedPaths.addAll(transaction.createdOperationalData.keySet);
affectedPaths.addAll(transaction.updatedOperationalData.keySet);
affectedPaths.addAll(transaction.removedOperationalData);
-
+
val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
val transactionId = transaction.identifier;
- log.info("Transaction: {} Started.",transactionId);
+ log.trace("Transaction: {} Started.",transactionId);
+ log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths);
// requesting commits
val Iterable , D, DCL extends DataChangeListene
} catch (Exception e) {
log.error("Transaction: {} Request Commit failed", transactionId,e);
dataBroker.failedTransactionsCount.andIncrement
+ transaction.changeStatus(TransactionStatus.FAILED)
return rollback(handlerTransactions, e);
}
val List , D, DCL extends DataChangeListene
listeners.publishDataChangeEvent();
} catch (Exception e) {
log.error("Transaction: {} Finish Commit failed",transactionId, e);
- dataBroker.failedTransactionsCount.andIncrement
+ dataBroker.failedTransactionsCount.andIncrement
+ transaction.changeStatus(TransactionStatus.FAILED)
return rollback(handlerTransactions, e);
}
- log.info("Transaction: {} Finished successfully.",transactionId);
- dataBroker.finishedTransactionsCount.andIncrement;
+ log.trace("Transaction: {} Finished successfully.",transactionId);
+ dataBroker.finishedTransactionsCount.andIncrement;
+ transaction.changeStatus(TransactionStatus.COMMITED)
return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
}