Merge "Fixed publishDataChangeEvent in 2phase commit"
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.xtend
index 32e59b869e20de484e4f1c27028698e1d8b05a49..7c6f52f110fd1771650c9670c43a8136d8999a2b 100644 (file)
@@ -1,3 +1,10 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.md.sal.common.impl.service\r
 \r
 import com.google.common.collect.FluentIterable\r
@@ -37,6 +44,9 @@ import org.opendaylight.yangtools.yang.common.RpcResult
 import org.slf4j.LoggerFactory\r
 \r
 import static com.google.common.base.Preconditions.*\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
+import com.google.common.collect.Multimaps
+import java.util.concurrent.locks.Lock
+import java.util.concurrent.locks.ReentrantLock
 
 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //\r
 DataReader<P, D>, //\r
@@ -60,18 +70,22 @@ DataProvisionService<P, D> {
     @Property\r
     private val AtomicLong finishedTransactionsCount = new AtomicLong\r
 \r
-    Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();\r
-    Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();\r
+    Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
+    Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
+    
+    private val Lock registrationLock = new ReentrantLock;
     \r
     val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();\r
     public new() {\r
     }\r
 \r
     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(\r
-        HashSet<P> paths) {\r
-        return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
-        .transformAndConcat[value] //\r
-        .transform[instance].toList()\r
+        HashSet<P> paths) {
+        return withLock(registrationLock) [|\r
+            return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
+                .transformAndConcat[value] //\r
+                .transform[instance].toList()
+        ]\r
     }\r
 \r
     override final readConfigurationData(P path) {\r
@@ -80,43 +94,56 @@ DataProvisionService<P, D> {
 \r
     override final readOperationalData(P path) {\r
         return dataReadRouter.readOperationalData(path);\r
-    }\r
-\r
-    override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {\r
-        val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
-        commitHandlers.put(path, registration)\r
-        LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);\r
-        for(listener : commitHandlerRegistrationListeners) {\r
-            try {\r
-                listener.instance.onRegister(registration);\r
-            } catch (Exception e) {\r
-                LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
-            }\r
-        }\r
-        return registration;\r
+    }
+    
+    private static def <T> withLock(Lock lock,Callable<T> method) {
+        lock.lock
+        try {
+            return method.call
+        } finally {
+            lock.unlock
+        }
+    } \r
+\r
+    override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
+        return withLock(registrationLock) [|\r
+            val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
+            commitHandlers.put(path, registration)\r
+            LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
+            for(listener : commitHandlerRegistrationListeners) {\r
+                try {\r
+                    listener.instance.onRegister(registration);\r
+                } catch (Exception e) {\r
+                    LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
+                }\r
+            }
+            return registration;
+        ]\r
     }\r
 \r
     override final def registerDataChangeListener(P path, DCL listener) {\r
-        val reg = new DataChangeListenerRegistration(path, listener, this);\r
-        listeners.put(path, reg);\r
-        val initialConfig = dataReadRouter.readConfigurationData(path);\r
-        val initialOperational = dataReadRouter.readOperationalData(path);\r
-        val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
-        listener.onDataChanged(event);\r
-        return reg;\r
+        return withLock(registrationLock) [|
+            val reg = new DataChangeListenerRegistration(path, listener, this);\r
+            listeners.put(path, reg);\r
+            val initialConfig = dataReadRouter.readConfigurationData(path);\r
+            val initialOperational = dataReadRouter.readOperationalData(path);\r
+            val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
+            listener.onDataChanged(event);\r
+            return reg;
+        ]\r
     }\r
 \r
     final def registerDataReader(P path, DataReader<P, D> reader) {\r
-\r
-        val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
-        val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
-\r
-        return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));\r
+        return withLock(registrationLock) [|\r
+            val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
+            val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
+    \r
+            return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
+        ]\r
     }\r
     \r
     override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {\r
         val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);\r
-        \r
         return ret;\r
     }\r
     \r
@@ -125,21 +152,25 @@ DataProvisionService<P, D> {
         \r
     }\r
 \r
-    protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {\r
-        listeners.remove(registration.path, registration);\r
+    protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
+        return withLock(registrationLock) [|\r
+            listeners.remove(registration.path, registration);
+        ]\r
     }\r
 \r
     protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {\r
-        commitHandlers.remove(registration.path, registration);\r
-        \r
-         LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
-        for(listener : commitHandlerRegistrationListeners) {\r
-            try {\r
-                listener.instance.onUnregister(registration);\r
-            } catch (Exception e) {\r
-                LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
-            }\r
-        }\r
+        return withLock(registrationLock) [|
+            commitHandlers.remove(registration.path, registration);\r
+             LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
+            for(listener : commitHandlerRegistrationListeners) {\r
+                try {\r
+                    listener.instance.onUnregister(registration);\r
+                } catch (Exception e) {\r
+                    LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
+                }\r
+            }
+            return null;
+        ]\r
     }\r
 \r
     protected final def getActiveCommitHandlers() {\r
@@ -147,12 +178,14 @@ DataProvisionService<P, D> {
     }\r
 \r
     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(\r
-        HashSet<P> paths) {\r
-        return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
-            val operationalState = readOperationalData(key)\r
-            val configurationState = readConfigurationData(key)\r
-            return new ListenerStateCapture(key, value, operationalState, configurationState)\r
-        ].toList()\r
+        HashSet<P> paths) {
+        return withLock(registrationLock) [|\r
+            return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
+                val operationalState = readOperationalData(key)\r
+                val configurationState = readConfigurationData(key)\r
+                return new ListenerStateCapture(key, value, operationalState, configurationState)\r
+            ].toList()
+        ]\r
     }\r
 \r
     protected def boolean isAffectedBy(P key, Set<P> paths) {\r
@@ -259,12 +292,13 @@ package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListene
         affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
         affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
         affectedPaths.addAll(transaction.removedOperationalData);\r
-\r
+
         val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
 \r
         val transactionId = transaction.identifier;\r
 \r
-        log.info("Transaction: {} Started.",transactionId);\r
+        log.trace("Transaction: {} Started.",transactionId);\r
+        log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths);
         // requesting commits\r
         val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
         val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
@@ -275,6 +309,7 @@ package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListene
         } catch (Exception e) {\r
             log.error("Transaction: {} Request Commit failed", transactionId,e);\r
             dataBroker.failedTransactionsCount.andIncrement\r
+            transaction.changeStatus(TransactionStatus.FAILED)
             return rollback(handlerTransactions, e);\r
         }\r
         val List<RpcResult<Void>> results = new ArrayList();\r
@@ -285,31 +320,35 @@ package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListene
             listeners.publishDataChangeEvent();\r
         } catch (Exception e) {\r
             log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
-            dataBroker.failedTransactionsCount.andIncrement\r
+            dataBroker.failedTransactionsCount.andIncrement
+            transaction.changeStatus(TransactionStatus.FAILED)\r
             return rollback(handlerTransactions, e);\r
         }\r
-        log.info("Transaction: {} Finished successfully.",transactionId);\r
-        dataBroker.finishedTransactionsCount.andIncrement;\r
+        log.trace("Transaction: {} Finished successfully.",transactionId);\r
+        dataBroker.finishedTransactionsCount.andIncrement;
+        transaction.changeStatus(TransactionStatus.COMMITED)\r
         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
 \r
     }\r
 \r
     def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {\r
-        for (listenerSet : listeners) {\r
-            val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);\r
-            val updatedOperational = dataBroker.readOperationalData(listenerSet.path);\r
-\r
-            val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,\r
-                listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);\r
-            for (listener : listenerSet.listeners) {\r
-                try {\r
-                    listener.instance.onDataChanged(changeEvent);\r
-\r
-                } catch (Exception e) {\r
-                    e.printStackTrace();\r
-                }\r
-            }\r
-        }\r
+        dataBroker.executor.submit [|\r
+            for (listenerSet : listeners) {
+                val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
+                val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
+
+                val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
+                    listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
+                for (listener : listenerSet.listeners) {
+                    try {
+                        listener.instance.onDataChanged(changeEvent);
+
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }        \r
+        ]\r
     }\r
 \r
     def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {\r
@@ -324,6 +363,8 @@ package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListene
 \r
 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {\r
 \r
+    private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
+
     @Property\r
     private val Object identifier;\r
 \r
@@ -336,6 +377,7 @@ public abstract class AbstractDataTransaction<P extends Path<P>, D> extends Abst
         _identifier = identifier;\r
         broker = dataBroker;\r
         status = TransactionStatus.NEW;\r
+        LOG.debug("Transaction {} Allocated.", identifier);
 \r
     //listeners = new ListenerRegistry<>();\r
     }\r
@@ -393,6 +435,7 @@ public abstract class AbstractDataTransaction<P extends Path<P>, D> extends Abst
     protected abstract def void onStatusChange(TransactionStatus status);\r
 \r
     public def changeStatus(TransactionStatus status) {\r
+        LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
         this.status = status;\r
         onStatusChange(status);\r
     }\r