Fixed publishDataChangeEvent in 2phase commit
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.xtend
index f90465f925a28bdeacf153fa9aeb0940ded58411..0c8f6109ed843ac84bdc5d4fef814e1e98518c7d 100644 (file)
@@ -1,3 +1,10 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.md.sal.common.impl.service\r
 \r
 import com.google.common.collect.FluentIterable\r
@@ -37,6 +44,7 @@ import org.opendaylight.yangtools.yang.common.RpcResult
 import org.slf4j.LoggerFactory\r
 \r
 import static com.google.common.base.Preconditions.*\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
+import com.google.common.collect.Multimaps
 
 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //\r
 DataReader<P, D>, //\r
@@ -60,8 +68,8 @@ DataProvisionService<P, D> {
     @Property\r
     private val AtomicLong finishedTransactionsCount = new AtomicLong\r
 \r
-    Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();\r
-    Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();\r
+    Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
+    Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
     \r
     val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();\r
     public new() {\r
@@ -85,7 +93,7 @@ DataProvisionService<P, D> {
     override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {\r
         val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
         commitHandlers.put(path, registration)\r
-        LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);\r
+        LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
         for(listener : commitHandlerRegistrationListeners) {\r
             try {\r
                 listener.instance.onRegister(registration);\r
@@ -132,7 +140,7 @@ DataProvisionService<P, D> {
     protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {\r
         commitHandlers.remove(registration.path, registration);\r
         \r
-         LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
+         LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
         for(listener : commitHandlerRegistrationListeners) {\r
             try {\r
                 listener.instance.onUnregister(registration);\r
@@ -264,7 +272,7 @@ package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListene
 \r
         val transactionId = transaction.identifier;\r
 \r
-        log.info("Transaction: {} Started.",transactionId);\r
+        log.trace("Transaction: {} Started.",transactionId);\r
         // requesting commits\r
         val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
         val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
@@ -288,28 +296,30 @@ package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListene
             dataBroker.failedTransactionsCount.andIncrement\r
             return rollback(handlerTransactions, e);\r
         }\r
-        log.info("Transaction: {} Finished successfully.",transactionId);\r
+        log.trace("Transaction: {} Finished successfully.",transactionId);\r
         dataBroker.finishedTransactionsCount.andIncrement;\r
         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
 \r
     }\r
 \r
     def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {\r
-        for (listenerSet : listeners) {\r
-            val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);\r
-            val updatedOperational = dataBroker.readOperationalData(listenerSet.path);\r
-\r
-            val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,\r
-                listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);\r
-            for (listener : listenerSet.listeners) {\r
-                try {\r
-                    listener.instance.onDataChanged(changeEvent);\r
-\r
-                } catch (Exception e) {\r
-                    e.printStackTrace();\r
-                }\r
-            }\r
-        }\r
+        dataBroker.executor.submit [|\r
+            for (listenerSet : listeners) {
+                val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
+                val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
+
+                val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
+                    listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
+                for (listener : listenerSet.listeners) {
+                    try {
+                        listener.instance.onDataChanged(changeEvent);
+
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }        \r
+        ]\r
     }\r
 \r
     def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {\r