Merge "Fixed publishDataChangeEvent in 2phase commit"
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.xtend
index 0c8f610..7c6f52f 100644 (file)
@@ -45,6 +45,8 @@ import org.slf4j.LoggerFactory
 \r
 import static com.google.common.base.Preconditions.*\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
 import com.google.common.collect.Multimaps
+import java.util.concurrent.locks.Lock
+import java.util.concurrent.locks.ReentrantLock
 
 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //\r
 DataReader<P, D>, //\r
@@ -70,16 +72,20 @@ DataProvisionService<P, D> {
 \r
     Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
     Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
+    
+    private val Lock registrationLock = new ReentrantLock;
     \r
     val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();\r
     public new() {\r
     }\r
 \r
     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(\r
-        HashSet<P> paths) {\r
-        return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
-        .transformAndConcat[value] //\r
-        .transform[instance].toList()\r
+        HashSet<P> paths) {
+        return withLock(registrationLock) [|\r
+            return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
+                .transformAndConcat[value] //\r
+                .transform[instance].toList()
+        ]\r
     }\r
 \r
     override final readConfigurationData(P path) {\r
@@ -88,43 +94,56 @@ DataProvisionService<P, D> {
 \r
     override final readOperationalData(P path) {\r
         return dataReadRouter.readOperationalData(path);\r
-    }\r
-\r
-    override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {\r
-        val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
-        commitHandlers.put(path, registration)\r
-        LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
-        for(listener : commitHandlerRegistrationListeners) {\r
-            try {\r
-                listener.instance.onRegister(registration);\r
-            } catch (Exception e) {\r
-                LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
-            }\r
-        }\r
-        return registration;\r
+    }
+    
+    private static def <T> withLock(Lock lock,Callable<T> method) {
+        lock.lock
+        try {
+            return method.call
+        } finally {
+            lock.unlock
+        }
+    } \r
+\r
+    override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
+        return withLock(registrationLock) [|\r
+            val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
+            commitHandlers.put(path, registration)\r
+            LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
+            for(listener : commitHandlerRegistrationListeners) {\r
+                try {\r
+                    listener.instance.onRegister(registration);\r
+                } catch (Exception e) {\r
+                    LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
+                }\r
+            }
+            return registration;
+        ]\r
     }\r
 \r
     override final def registerDataChangeListener(P path, DCL listener) {\r
-        val reg = new DataChangeListenerRegistration(path, listener, this);\r
-        listeners.put(path, reg);\r
-        val initialConfig = dataReadRouter.readConfigurationData(path);\r
-        val initialOperational = dataReadRouter.readOperationalData(path);\r
-        val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
-        listener.onDataChanged(event);\r
-        return reg;\r
+        return withLock(registrationLock) [|
+            val reg = new DataChangeListenerRegistration(path, listener, this);\r
+            listeners.put(path, reg);\r
+            val initialConfig = dataReadRouter.readConfigurationData(path);\r
+            val initialOperational = dataReadRouter.readOperationalData(path);\r
+            val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
+            listener.onDataChanged(event);\r
+            return reg;
+        ]\r
     }\r
 \r
     final def registerDataReader(P path, DataReader<P, D> reader) {\r
-\r
-        val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
-        val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
-\r
-        return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));\r
+        return withLock(registrationLock) [|\r
+            val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
+            val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
+    \r
+            return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
+        ]\r
     }\r
     \r
     override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {\r
         val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);\r
-        \r
         return ret;\r
     }\r
     \r
@@ -133,21 +152,25 @@ DataProvisionService<P, D> {
         \r
     }\r
 \r
-    protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {\r
-        listeners.remove(registration.path, registration);\r
+    protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
+        return withLock(registrationLock) [|\r
+            listeners.remove(registration.path, registration);
+        ]\r
     }\r
 \r
     protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {\r
-        commitHandlers.remove(registration.path, registration);\r
-        \r
-         LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
-        for(listener : commitHandlerRegistrationListeners) {\r
-            try {\r
-                listener.instance.onUnregister(registration);\r
-            } catch (Exception e) {\r
-                LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
-            }\r
-        }\r
+        return withLock(registrationLock) [|
+            commitHandlers.remove(registration.path, registration);\r
+             LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
+            for(listener : commitHandlerRegistrationListeners) {\r
+                try {\r
+                    listener.instance.onUnregister(registration);\r
+                } catch (Exception e) {\r
+                    LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
+                }\r
+            }
+            return null;
+        ]\r
     }\r
 \r
     protected final def getActiveCommitHandlers() {\r
@@ -155,12 +178,14 @@ DataProvisionService<P, D> {
     }\r
 \r
     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(\r
-        HashSet<P> paths) {\r
-        return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
-            val operationalState = readOperationalData(key)\r
-            val configurationState = readConfigurationData(key)\r
-            return new ListenerStateCapture(key, value, operationalState, configurationState)\r
-        ].toList()\r
+        HashSet<P> paths) {
+        return withLock(registrationLock) [|\r
+            return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
+                val operationalState = readOperationalData(key)\r
+                val configurationState = readConfigurationData(key)\r
+                return new ListenerStateCapture(key, value, operationalState, configurationState)\r
+            ].toList()
+        ]\r
     }\r
 \r
     protected def boolean isAffectedBy(P key, Set<P> paths) {\r
@@ -267,12 +292,13 @@ package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListene
         affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
         affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
         affectedPaths.addAll(transaction.removedOperationalData);\r
-\r
+
         val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
 \r
         val transactionId = transaction.identifier;\r
 \r
         log.trace("Transaction: {} Started.",transactionId);\r
+        log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths);
         // requesting commits\r
         val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
         val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
@@ -283,6 +309,7 @@ package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListene
         } catch (Exception e) {\r
             log.error("Transaction: {} Request Commit failed", transactionId,e);\r
             dataBroker.failedTransactionsCount.andIncrement\r
+            transaction.changeStatus(TransactionStatus.FAILED)
             return rollback(handlerTransactions, e);\r
         }\r
         val List<RpcResult<Void>> results = new ArrayList();\r
@@ -293,11 +320,13 @@ package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListene
             listeners.publishDataChangeEvent();\r
         } catch (Exception e) {\r
             log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
-            dataBroker.failedTransactionsCount.andIncrement\r
+            dataBroker.failedTransactionsCount.andIncrement
+            transaction.changeStatus(TransactionStatus.FAILED)\r
             return rollback(handlerTransactions, e);\r
         }\r
         log.trace("Transaction: {} Finished successfully.",transactionId);\r
-        dataBroker.finishedTransactionsCount.andIncrement;\r
+        dataBroker.finishedTransactionsCount.andIncrement;
+        transaction.changeStatus(TransactionStatus.COMMITED)\r
         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
 \r
     }\r

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.