Merge "Fixed publishDataChangeEvent in 2phase commit"
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.xtend
index e3d2b567a71f42b202bf028db6f0d8dcd5f70ae2..7c6f52f110fd1771650c9670c43a8136d8999a2b 100644 (file)
-package org.opendaylight.controller.md.sal.common.impl.service
-
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus
-import org.opendaylight.controller.md.sal.common.api.data.DataReader
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
-import org.opendaylight.yangtools.concepts.ListenerRegistration
-import com.google.common.collect.Multimap
-import static com.google.common.base.Preconditions.*;
-import java.util.List
-import com.google.common.collect.HashMultimap
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Callable
-import org.opendaylight.yangtools.yang.common.RpcResult
-import java.util.Collections
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
-import java.util.ArrayList
-import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
-import java.util.Arrays
-import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
-import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
-import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
-import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
-import org.opendaylight.controller.sal.common.util.Rpcs
-import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
-import java.util.concurrent.Future
-import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
-import org.opendaylight.yangtools.concepts.Path
-import org.slf4j.LoggerFactory
-import java.util.HashSet
-import java.util.Map.Entry
-import java.util.Iterator
-import java.util.Collection
-import com.google.common.collect.FluentIterable;
-import java.util.Set
-import com.google.common.collect.ImmutableList
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration
-import org.opendaylight.controller.md.sal.common.api.RegistrationListener
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry
-import java.util.concurrent.atomic.AtomicLong
-
-abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
-DataReader<P, D>, //
-DataChangePublisher<P, D, DCL>, //
-DataProvisionService<P, D> {
-
-    private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);
-
-    @Property
-    var ExecutorService executor;
-
-    @Property
-    var AbstractDataReadRouter<P, D> dataReadRouter;
-
-    Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();
-    Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.impl.service\r
+\r
+import com.google.common.collect.FluentIterable\r
+import com.google.common.collect.HashMultimap\r
+import com.google.common.collect.ImmutableList\r
+import com.google.common.collect.Multimap\r
+import java.util.ArrayList\r
+import java.util.Arrays\r
+import java.util.Collection\r
+import java.util.Collections\r
+import java.util.HashSet\r
+import java.util.List\r
+import java.util.Set\r
+import java.util.concurrent.Callable\r
+import java.util.concurrent.ExecutorService\r
+import java.util.concurrent.Future\r
+import java.util.concurrent.atomic.AtomicLong\r
+import org.opendaylight.controller.md.sal.common.api.RegistrationListener\r
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus\r
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener\r
+import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher\r
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler\r
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction\r
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration\r
+import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory\r
+import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService\r
+import org.opendaylight.controller.md.sal.common.api.data.DataReader\r
+import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification\r
+import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter\r
+import org.opendaylight.controller.sal.common.util.Rpcs\r
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
+import org.opendaylight.yangtools.concepts.CompositeObjectRegistration\r
+import org.opendaylight.yangtools.concepts.ListenerRegistration\r
+import org.opendaylight.yangtools.concepts.Path\r
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
+import org.opendaylight.yangtools.yang.common.RpcResult\r
+import org.slf4j.LoggerFactory\r
+\r
+import static com.google.common.base.Preconditions.*\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
+import com.google.common.collect.Multimaps
+import java.util.concurrent.locks.Lock
+import java.util.concurrent.locks.ReentrantLock
+
+abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //\r
+DataReader<P, D>, //\r
+DataChangePublisher<P, D, DCL>, //\r
+DataProvisionService<P, D> {\r
+\r
+    private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);\r
+\r
+    @Property\r
+    var ExecutorService executor;\r
+\r
+    @Property\r
+    var AbstractDataReadRouter<P, D> dataReadRouter;\r
+    \r
+    @Property\r
+    private val AtomicLong submittedTransactionsCount = new AtomicLong;\r
+    \r
+    @Property\r
+    private val AtomicLong failedTransactionsCount = new AtomicLong\r
+    \r
+    @Property\r
+    private val AtomicLong finishedTransactionsCount = new AtomicLong\r
+\r
+    Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
+    Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
     
-    val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();
-    public new() {
-    }
-
-    protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
+    private val Lock registrationLock = new ReentrantLock;
+    \r
+    val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();\r
+    public new() {\r
+    }\r
+\r
+    protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(\r
         HashSet<P> paths) {
-        return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
-        .transformAndConcat[value] //
-        .transform[instance].toList()
+        return withLock(registrationLock) [|\r
+            return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
+                .transformAndConcat[value] //\r
+                .transform[instance].toList()
+        ]\r
+    }\r
+\r
+    override final readConfigurationData(P path) {\r
+        return dataReadRouter.readConfigurationData(path);\r
+    }\r
+\r
+    override final readOperationalData(P path) {\r
+        return dataReadRouter.readOperationalData(path);\r
     }
-
-    override final readConfigurationData(P path) {
-        return dataReadRouter.readConfigurationData(path);
-    }
-
-    override final readOperationalData(P path) {
-        return dataReadRouter.readOperationalData(path);
-    }
-
+    
+    private static def <T> withLock(Lock lock,Callable<T> method) {
+        lock.lock
+        try {
+            return method.call
+        } finally {
+            lock.unlock
+        }
+    } \r
+\r
     override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
-        val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
-        commitHandlers.put(path, registration)
-        LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);
-        for(listener : commitHandlerRegistrationListeners) {
-            try {
-                listener.instance.onRegister(registration);
-            } catch (Exception e) {
-                LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
+        return withLock(registrationLock) [|\r
+            val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
+            commitHandlers.put(path, registration)\r
+            LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
+            for(listener : commitHandlerRegistrationListeners) {\r
+                try {\r
+                    listener.instance.onRegister(registration);\r
+                } catch (Exception e) {\r
+                    LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
+                }\r
             }
-        }
-        return registration;
-    }
-
-    override final def registerDataChangeListener(P path, DCL listener) {
-        val reg = new DataChangeListenerRegistration(path, listener, this);
-        listeners.put(path, reg);
-        return reg;
-    }
-
-    final def registerDataReader(P path, DataReader<P, D> reader) {
-
-        val confReg = dataReadRouter.registerConfigurationReader(path, reader);
-        val dataReg = dataReadRouter.registerOperationalReader(path, reader);
-
-        return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
-    }
-    
-    override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
-        val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);
-        
-        return ret;
-    }
-    
-
+            return registration;
+        ]\r
+    }\r
+\r
+    override final def registerDataChangeListener(P path, DCL listener) {\r
+        return withLock(registrationLock) [|
+            val reg = new DataChangeListenerRegistration(path, listener, this);\r
+            listeners.put(path, reg);\r
+            val initialConfig = dataReadRouter.readConfigurationData(path);\r
+            val initialOperational = dataReadRouter.readOperationalData(path);\r
+            val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
+            listener.onDataChanged(event);\r
+            return reg;
+        ]\r
+    }\r
+\r
+    final def registerDataReader(P path, DataReader<P, D> reader) {\r
+        return withLock(registrationLock) [|\r
+            val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
+            val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
+    \r
+            return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
+        ]\r
+    }\r
+    \r
+    override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {\r
+        val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);\r
+        return ret;\r
+    }\r
+    \r
+    protected  def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {\r
+        return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);\r
+        \r
+    }\r
+\r
     protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
-        listeners.remove(registration.path, registration);
-    }
-
-    protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {
-        commitHandlers.remove(registration.path, registration);
-        
-         LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
-        for(listener : commitHandlerRegistrationListeners) {
-            try {
-                listener.instance.onUnregister(registration);
-            } catch (Exception e) {
-                LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
+        return withLock(registrationLock) [|\r
+            listeners.remove(registration.path, registration);
+        ]\r
+    }\r
+\r
+    protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {\r
+        return withLock(registrationLock) [|
+            commitHandlers.remove(registration.path, registration);\r
+             LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
+            for(listener : commitHandlerRegistrationListeners) {\r
+                try {\r
+                    listener.instance.onUnregister(registration);\r
+                } catch (Exception e) {\r
+                    LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
+                }\r
             }
-        }
-    }
-
-    protected final def getActiveCommitHandlers() {
-        return commitHandlers.entries;
-    }
-
-    protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
+            return null;
+        ]\r
+    }\r
+\r
+    protected final def getActiveCommitHandlers() {\r
+        return commitHandlers.entries;\r
+    }\r
+\r
+    protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(\r
         HashSet<P> paths) {
-        return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
-            val operationalState = readOperationalData(key)
-            val configurationState = readConfigurationData(key)
-            return new ListenerStateCapture(key, value, operationalState, configurationState)
-        ].toList()
-    }
-
-    protected def boolean isAffectedBy(P key, Set<P> paths) {
-        if (paths.contains(key)) {
-            return true;
-        }
-        for (path : paths) {
-            if (key.contains(path)) {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
-        checkNotNull(transaction);
-        transaction.changeStatus(TransactionStatus.SUBMITED);
-        val task = new TwoPhaseCommit(transaction, this);
-        return executor.submit(task);
-    }
-
-}
-
-@Data
-package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
-
-    @Property
-    P path;
-
-    @Property
-    Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
-
-    @Property
-    D initialOperationalState;
-
-    @Property
-    D initialConfigurationState;
-}
-
-package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
-
-    AbstractDataBroker<P, D, DCL> dataBroker;
-
-    @Property
-    val P path;
-
-    new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
-        super(instance)
-        dataBroker = broker;
-        _path = path;
-    }
-
-    override protected removeRegistration() {
-        dataBroker.removeListener(this);
-        dataBroker = null;
-    }
-
-}
-
-package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //
-extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
-implements DataCommitHandlerRegistration<P, D> {
-
-    AbstractDataBroker<P, D, ?> dataBroker;
-
-    @Property
-    val P path;
-
-    new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
-        super(instance)
-        dataBroker = broker;
-        _path = path;
-    }
-
-    override protected removeRegistration() {
-        dataBroker.removeCommitHandler(this);
-        dataBroker = null;
-    }
-}
-
-package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
-
-    private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
-
-    val AbstractDataTransaction<P, D> transaction;
-    val AbstractDataBroker<P, D, DCL> dataBroker;
-
-    new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
-        this.transaction = transaction;
-        this.dataBroker = broker;
-    }
-
-    override call() throws Exception {
-
-        // get affected paths
-        val affectedPaths = new HashSet<P>();
-
-        affectedPaths.addAll(transaction.createdConfigurationData.keySet);
-        affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
-        affectedPaths.addAll(transaction.removedConfigurationData);
-
-        affectedPaths.addAll(transaction.createdOperationalData.keySet);
-        affectedPaths.addAll(transaction.updatedOperationalData.keySet);
-        affectedPaths.addAll(transaction.removedOperationalData);
-
-        val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
-
-        val transactionId = transaction.identifier;
-
-        log.info("Transaction: {} Started.",transactionId);
-        // requesting commits
-        val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
-        val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
-        try {
-            for (handler : commitHandlers) {
-                handlerTransactions.add(handler.requestCommit(transaction));
-            }
-        } catch (Exception e) {
-            log.error("Transaction: {} Request Commit failed", transactionId,e);
-            return rollback(handlerTransactions, e);
-        }
-        val List<RpcResult<Void>> results = new ArrayList();
-        try {
-            for (subtransaction : handlerTransactions) {
-                results.add(subtransaction.finish());
-            }
-            listeners.publishDataChangeEvent();
-        } catch (Exception e) {
-            log.error("Transaction: {} Finish Commit failed",transactionId, e);
-            return rollback(handlerTransactions, e);
-        }
-        log.info("Transaction: {} Finished succesfully.",transactionId);
-        return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
-
-    }
-
-    def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
-        for (listenerSet : listeners) {
-            val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
-            val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
-
-            val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
-                listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
-            for (listener : listenerSet.listeners) {
-                try {
-                    listener.instance.onDataChanged(changeEvent);
-
-                } catch (Exception e) {
-                    e.printStackTrace();
+        return withLock(registrationLock) [|\r
+            return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
+                val operationalState = readOperationalData(key)\r
+                val configurationState = readConfigurationData(key)\r
+                return new ListenerStateCapture(key, value, operationalState, configurationState)\r
+            ].toList()
+        ]\r
+    }\r
+\r
+    protected def boolean isAffectedBy(P key, Set<P> paths) {\r
+        if (paths.contains(key)) {\r
+            return true;\r
+        }\r
+        for (path : paths) {\r
+            if (key.contains(path)) {\r
+                return true;\r
+            }\r
+        }\r
+\r
+        return false;\r
+    }\r
+\r
+    package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {\r
+        checkNotNull(transaction);\r
+        transaction.changeStatus(TransactionStatus.SUBMITED);\r
+        val task = new TwoPhaseCommit(transaction, this);\r
+        submittedTransactionsCount.andIncrement;\r
+        return executor.submit(task);\r
+    }\r
+\r
+}\r
+\r
+@Data\r
+package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {\r
+\r
+    @Property\r
+    P path;\r
+\r
+    @Property\r
+    Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;\r
+\r
+    @Property\r
+    D initialOperationalState;\r
+\r
+    @Property\r
+    D initialConfigurationState;\r
+}\r
+\r
+package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {\r
+\r
+    AbstractDataBroker<P, D, DCL> dataBroker;\r
+\r
+    @Property\r
+    val P path;\r
+\r
+    new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {\r
+        super(instance)\r
+        dataBroker = broker;\r
+        _path = path;\r
+    }\r
+\r
+    override protected removeRegistration() {\r
+        dataBroker.removeListener(this);\r
+        dataBroker = null;\r
+    }\r
+\r
+}\r
+\r
+package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //\r
+extends AbstractObjectRegistration<DataCommitHandler<P, D>> //\r
+implements DataCommitHandlerRegistration<P, D> {\r
+\r
+    AbstractDataBroker<P, D, ?> dataBroker;\r
+\r
+    @Property\r
+    val P path;\r
+\r
+    new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {\r
+        super(instance)\r
+        dataBroker = broker;\r
+        _path = path;\r
+    }\r
+\r
+    override protected removeRegistration() {\r
+        dataBroker.removeCommitHandler(this);\r
+        dataBroker = null;\r
+    }\r
+}\r
+\r
+package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {\r
+\r
+    private static val log = LoggerFactory.getLogger(TwoPhaseCommit);\r
+\r
+    val AbstractDataTransaction<P, D> transaction;\r
+    val AbstractDataBroker<P, D, DCL> dataBroker;\r
+    \r
+    new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {\r
+        this.transaction = transaction;\r
+        this.dataBroker = broker;\r
+    }\r
+\r
+    override call() throws Exception {\r
+\r
+        // get affected paths\r
+        val affectedPaths = new HashSet<P>();\r
+\r
+        affectedPaths.addAll(transaction.createdConfigurationData.keySet);\r
+        affectedPaths.addAll(transaction.updatedConfigurationData.keySet);\r
+        affectedPaths.addAll(transaction.removedConfigurationData);\r
+\r
+        affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
+        affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
+        affectedPaths.addAll(transaction.removedOperationalData);\r
+
+        val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
+\r
+        val transactionId = transaction.identifier;\r
+\r
+        log.trace("Transaction: {} Started.",transactionId);\r
+        log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths);
+        // requesting commits\r
+        val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
+        val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
+        try {\r
+            for (handler : commitHandlers) {\r
+                handlerTransactions.add(handler.requestCommit(transaction));\r
+            }\r
+        } catch (Exception e) {\r
+            log.error("Transaction: {} Request Commit failed", transactionId,e);\r
+            dataBroker.failedTransactionsCount.andIncrement\r
+            transaction.changeStatus(TransactionStatus.FAILED)
+            return rollback(handlerTransactions, e);\r
+        }\r
+        val List<RpcResult<Void>> results = new ArrayList();\r
+        try {\r
+            for (subtransaction : handlerTransactions) {\r
+                results.add(subtransaction.finish());\r
+            }\r
+            listeners.publishDataChangeEvent();\r
+        } catch (Exception e) {\r
+            log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
+            dataBroker.failedTransactionsCount.andIncrement
+            transaction.changeStatus(TransactionStatus.FAILED)\r
+            return rollback(handlerTransactions, e);\r
+        }\r
+        log.trace("Transaction: {} Finished successfully.",transactionId);\r
+        dataBroker.finishedTransactionsCount.andIncrement;
+        transaction.changeStatus(TransactionStatus.COMMITED)\r
+        return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
+\r
+    }\r
+\r
+    def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {\r
+        dataBroker.executor.submit [|\r
+            for (listenerSet : listeners) {
+                val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
+                val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
+
+                val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
+                    listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
+                for (listener : listenerSet.listeners) {
+                    try {
+                        listener.instance.onDataChanged(changeEvent);
+
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
                 }
-            }
-        }
-    }
-
-    def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
-        for (transaction : transactions) {
-            transaction.rollback()
-        }
-
-        // FIXME return encountered error.
-        return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
-    }
-}
-
-public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
-
-    @Property
-    private val Object identifier;
-
-    var TransactionStatus status;
-
-    var AbstractDataBroker<P, D, ?> broker;
-
-    protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {
-        super(dataBroker);
-        _identifier = identifier;
-        broker = dataBroker;
-        status = TransactionStatus.NEW;
-
-    //listeners = new ListenerRegistry<>();
-    }
-
-    override commit() {
-        return broker.commit(this);
-    }
-
-    override readConfigurationData(P path) {
-        return broker.readConfigurationData(path);
-    }
-
-    override readOperationalData(P path) {
-        return broker.readOperationalData(path);
-    }
-
-    override hashCode() {
-        return identifier.hashCode;
-    }
-
-    override equals(Object obj) {
-        if (this === obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        val other = (obj as AbstractDataTransaction<P,D>);
-        if (broker == null) {
-            if (other.broker != null)
-                return false;
-        } else if (!broker.equals(other.broker))
-            return false;
-        if (identifier == null) {
-            if (other.identifier != null)
-                return false;
-        } else if (!identifier.equals(other.identifier))
-            return false;
-        return true;
-    }
-
-    override TransactionStatus getStatus() {
-        return status;
-    }
-
-    protected abstract def void onStatusChange(TransactionStatus status);
-
-    public def changeStatus(TransactionStatus status) {
-        this.status = status;
-        onStatusChange(status);
-    }
-
-}
+            }        \r
+        ]\r
+    }\r
+\r
+    def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {\r
+        for (transaction : transactions) {\r
+            transaction.rollback()\r
+        }\r
+\r
+        // FIXME return encountered error.\r
+        return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());\r
+    }\r
+}\r
+\r
+public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {\r
+\r
+    private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
+
+    @Property\r
+    private val Object identifier;\r
+\r
+    var TransactionStatus status;\r
+\r
+    var AbstractDataBroker<P, D, ?> broker;\r
+\r
+    protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {\r
+        super(dataBroker);\r
+        _identifier = identifier;\r
+        broker = dataBroker;\r
+        status = TransactionStatus.NEW;\r
+        LOG.debug("Transaction {} Allocated.", identifier);
+\r
+    //listeners = new ListenerRegistry<>();\r
+    }\r
+\r
+    override commit() {\r
+        return broker.commit(this);\r
+    }\r
+\r
+    override readConfigurationData(P path) {\r
+        val local = this.updatedConfigurationData.get(path);\r
+        if(local != null) {\r
+            return local;\r
+        }\r
+        \r
+        return broker.readConfigurationData(path);\r
+    }\r
+\r
+    override readOperationalData(P path) {\r
+        val local = this.updatedOperationalData.get(path);\r
+        if(local != null) {\r
+            return local;\r
+        }\r
+        return broker.readOperationalData(path);\r
+    }\r
+\r
+    override hashCode() {\r
+        return identifier.hashCode;\r
+    }\r
+\r
+    override equals(Object obj) {\r
+        if (this === obj)\r
+            return true;\r
+        if (obj == null)\r
+            return false;\r
+        if (getClass() != obj.getClass())\r
+            return false;\r
+        val other = (obj as AbstractDataTransaction<P,D>);\r
+        if (broker == null) {\r
+            if (other.broker != null)\r
+                return false;\r
+        } else if (!broker.equals(other.broker))\r
+            return false;\r
+        if (identifier == null) {\r
+            if (other.identifier != null)\r
+                return false;\r
+        } else if (!identifier.equals(other.identifier))\r
+            return false;\r
+        return true;\r
+    }\r
+\r
+    override TransactionStatus getStatus() {\r
+        return status;\r
+    }\r
+\r
+    protected abstract def void onStatusChange(TransactionStatus status);\r
+\r
+    public def changeStatus(TransactionStatus status) {\r
+        LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
+        this.status = status;\r
+        onStatusChange(status);\r
+    }\r
+\r
+}\r