Fixed bug when Binding-Aware Data Change Listeners we're not triggered.
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.xtend
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend
deleted file mode 100644 (file)
index 7c6f52f..0000000
+++ /dev/null
@@ -1,443 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.common.impl.service\r
-\r
-import com.google.common.collect.FluentIterable\r
-import com.google.common.collect.HashMultimap\r
-import com.google.common.collect.ImmutableList\r
-import com.google.common.collect.Multimap\r
-import java.util.ArrayList\r
-import java.util.Arrays\r
-import java.util.Collection\r
-import java.util.Collections\r
-import java.util.HashSet\r
-import java.util.List\r
-import java.util.Set\r
-import java.util.concurrent.Callable\r
-import java.util.concurrent.ExecutorService\r
-import java.util.concurrent.Future\r
-import java.util.concurrent.atomic.AtomicLong\r
-import org.opendaylight.controller.md.sal.common.api.RegistrationListener\r
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus\r
-import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener\r
-import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher\r
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler\r
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction\r
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration\r
-import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory\r
-import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService\r
-import org.opendaylight.controller.md.sal.common.api.data.DataReader\r
-import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification\r
-import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter\r
-import org.opendaylight.controller.sal.common.util.Rpcs\r
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
-import org.opendaylight.yangtools.concepts.CompositeObjectRegistration\r
-import org.opendaylight.yangtools.concepts.ListenerRegistration\r
-import org.opendaylight.yangtools.concepts.Path\r
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
-import org.opendaylight.yangtools.yang.common.RpcResult\r
-import org.slf4j.LoggerFactory\r
-\r
-import static com.google.common.base.Preconditions.*\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
-import com.google.common.collect.Multimaps
-import java.util.concurrent.locks.Lock
-import java.util.concurrent.locks.ReentrantLock
-
-abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //\r
-DataReader<P, D>, //\r
-DataChangePublisher<P, D, DCL>, //\r
-DataProvisionService<P, D> {\r
-\r
-    private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);\r
-\r
-    @Property\r
-    var ExecutorService executor;\r
-\r
-    @Property\r
-    var AbstractDataReadRouter<P, D> dataReadRouter;\r
-    \r
-    @Property\r
-    private val AtomicLong submittedTransactionsCount = new AtomicLong;\r
-    \r
-    @Property\r
-    private val AtomicLong failedTransactionsCount = new AtomicLong\r
-    \r
-    @Property\r
-    private val AtomicLong finishedTransactionsCount = new AtomicLong\r
-\r
-    Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
-    Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
-    
-    private val Lock registrationLock = new ReentrantLock;
-    \r
-    val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();\r
-    public new() {\r
-    }\r
-\r
-    protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(\r
-        HashSet<P> paths) {
-        return withLock(registrationLock) [|\r
-            return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
-                .transformAndConcat[value] //\r
-                .transform[instance].toList()
-        ]\r
-    }\r
-\r
-    override final readConfigurationData(P path) {\r
-        return dataReadRouter.readConfigurationData(path);\r
-    }\r
-\r
-    override final readOperationalData(P path) {\r
-        return dataReadRouter.readOperationalData(path);\r
-    }
-    
-    private static def <T> withLock(Lock lock,Callable<T> method) {
-        lock.lock
-        try {
-            return method.call
-        } finally {
-            lock.unlock
-        }
-    } \r
-\r
-    override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
-        return withLock(registrationLock) [|\r
-            val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
-            commitHandlers.put(path, registration)\r
-            LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
-            for(listener : commitHandlerRegistrationListeners) {\r
-                try {\r
-                    listener.instance.onRegister(registration);\r
-                } catch (Exception e) {\r
-                    LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
-                }\r
-            }
-            return registration;
-        ]\r
-    }\r
-\r
-    override final def registerDataChangeListener(P path, DCL listener) {\r
-        return withLock(registrationLock) [|
-            val reg = new DataChangeListenerRegistration(path, listener, this);\r
-            listeners.put(path, reg);\r
-            val initialConfig = dataReadRouter.readConfigurationData(path);\r
-            val initialOperational = dataReadRouter.readOperationalData(path);\r
-            val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
-            listener.onDataChanged(event);\r
-            return reg;
-        ]\r
-    }\r
-\r
-    final def registerDataReader(P path, DataReader<P, D> reader) {\r
-        return withLock(registrationLock) [|\r
-            val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
-            val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
-    \r
-            return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
-        ]\r
-    }\r
-    \r
-    override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {\r
-        val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);\r
-        return ret;\r
-    }\r
-    \r
-    protected  def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {\r
-        return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);\r
-        \r
-    }\r
-\r
-    protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
-        return withLock(registrationLock) [|\r
-            listeners.remove(registration.path, registration);
-        ]\r
-    }\r
-\r
-    protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {\r
-        return withLock(registrationLock) [|
-            commitHandlers.remove(registration.path, registration);\r
-             LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
-            for(listener : commitHandlerRegistrationListeners) {\r
-                try {\r
-                    listener.instance.onUnregister(registration);\r
-                } catch (Exception e) {\r
-                    LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
-                }\r
-            }
-            return null;
-        ]\r
-    }\r
-\r
-    protected final def getActiveCommitHandlers() {\r
-        return commitHandlers.entries;\r
-    }\r
-\r
-    protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(\r
-        HashSet<P> paths) {
-        return withLock(registrationLock) [|\r
-            return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
-                val operationalState = readOperationalData(key)\r
-                val configurationState = readConfigurationData(key)\r
-                return new ListenerStateCapture(key, value, operationalState, configurationState)\r
-            ].toList()
-        ]\r
-    }\r
-\r
-    protected def boolean isAffectedBy(P key, Set<P> paths) {\r
-        if (paths.contains(key)) {\r
-            return true;\r
-        }\r
-        for (path : paths) {\r
-            if (key.contains(path)) {\r
-                return true;\r
-            }\r
-        }\r
-\r
-        return false;\r
-    }\r
-\r
-    package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {\r
-        checkNotNull(transaction);\r
-        transaction.changeStatus(TransactionStatus.SUBMITED);\r
-        val task = new TwoPhaseCommit(transaction, this);\r
-        submittedTransactionsCount.andIncrement;\r
-        return executor.submit(task);\r
-    }\r
-\r
-}\r
-\r
-@Data\r
-package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {\r
-\r
-    @Property\r
-    P path;\r
-\r
-    @Property\r
-    Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;\r
-\r
-    @Property\r
-    D initialOperationalState;\r
-\r
-    @Property\r
-    D initialConfigurationState;\r
-}\r
-\r
-package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {\r
-\r
-    AbstractDataBroker<P, D, DCL> dataBroker;\r
-\r
-    @Property\r
-    val P path;\r
-\r
-    new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {\r
-        super(instance)\r
-        dataBroker = broker;\r
-        _path = path;\r
-    }\r
-\r
-    override protected removeRegistration() {\r
-        dataBroker.removeListener(this);\r
-        dataBroker = null;\r
-    }\r
-\r
-}\r
-\r
-package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //\r
-extends AbstractObjectRegistration<DataCommitHandler<P, D>> //\r
-implements DataCommitHandlerRegistration<P, D> {\r
-\r
-    AbstractDataBroker<P, D, ?> dataBroker;\r
-\r
-    @Property\r
-    val P path;\r
-\r
-    new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {\r
-        super(instance)\r
-        dataBroker = broker;\r
-        _path = path;\r
-    }\r
-\r
-    override protected removeRegistration() {\r
-        dataBroker.removeCommitHandler(this);\r
-        dataBroker = null;\r
-    }\r
-}\r
-\r
-package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {\r
-\r
-    private static val log = LoggerFactory.getLogger(TwoPhaseCommit);\r
-\r
-    val AbstractDataTransaction<P, D> transaction;\r
-    val AbstractDataBroker<P, D, DCL> dataBroker;\r
-    \r
-    new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {\r
-        this.transaction = transaction;\r
-        this.dataBroker = broker;\r
-    }\r
-\r
-    override call() throws Exception {\r
-\r
-        // get affected paths\r
-        val affectedPaths = new HashSet<P>();\r
-\r
-        affectedPaths.addAll(transaction.createdConfigurationData.keySet);\r
-        affectedPaths.addAll(transaction.updatedConfigurationData.keySet);\r
-        affectedPaths.addAll(transaction.removedConfigurationData);\r
-\r
-        affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
-        affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
-        affectedPaths.addAll(transaction.removedOperationalData);\r
-
-        val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
-\r
-        val transactionId = transaction.identifier;\r
-\r
-        log.trace("Transaction: {} Started.",transactionId);\r
-        log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths);
-        // requesting commits\r
-        val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
-        val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
-        try {\r
-            for (handler : commitHandlers) {\r
-                handlerTransactions.add(handler.requestCommit(transaction));\r
-            }\r
-        } catch (Exception e) {\r
-            log.error("Transaction: {} Request Commit failed", transactionId,e);\r
-            dataBroker.failedTransactionsCount.andIncrement\r
-            transaction.changeStatus(TransactionStatus.FAILED)
-            return rollback(handlerTransactions, e);\r
-        }\r
-        val List<RpcResult<Void>> results = new ArrayList();\r
-        try {\r
-            for (subtransaction : handlerTransactions) {\r
-                results.add(subtransaction.finish());\r
-            }\r
-            listeners.publishDataChangeEvent();\r
-        } catch (Exception e) {\r
-            log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
-            dataBroker.failedTransactionsCount.andIncrement
-            transaction.changeStatus(TransactionStatus.FAILED)\r
-            return rollback(handlerTransactions, e);\r
-        }\r
-        log.trace("Transaction: {} Finished successfully.",transactionId);\r
-        dataBroker.finishedTransactionsCount.andIncrement;
-        transaction.changeStatus(TransactionStatus.COMMITED)\r
-        return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
-\r
-    }\r
-\r
-    def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {\r
-        dataBroker.executor.submit [|\r
-            for (listenerSet : listeners) {
-                val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
-                val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
-
-                val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
-                    listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
-                for (listener : listenerSet.listeners) {
-                    try {
-                        listener.instance.onDataChanged(changeEvent);
-
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                }
-            }        \r
-        ]\r
-    }\r
-\r
-    def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {\r
-        for (transaction : transactions) {\r
-            transaction.rollback()\r
-        }\r
-\r
-        // FIXME return encountered error.\r
-        return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());\r
-    }\r
-}\r
-\r
-public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {\r
-\r
-    private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
-
-    @Property\r
-    private val Object identifier;\r
-\r
-    var TransactionStatus status;\r
-\r
-    var AbstractDataBroker<P, D, ?> broker;\r
-\r
-    protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {\r
-        super(dataBroker);\r
-        _identifier = identifier;\r
-        broker = dataBroker;\r
-        status = TransactionStatus.NEW;\r
-        LOG.debug("Transaction {} Allocated.", identifier);
-\r
-    //listeners = new ListenerRegistry<>();\r
-    }\r
-\r
-    override commit() {\r
-        return broker.commit(this);\r
-    }\r
-\r
-    override readConfigurationData(P path) {\r
-        val local = this.updatedConfigurationData.get(path);\r
-        if(local != null) {\r
-            return local;\r
-        }\r
-        \r
-        return broker.readConfigurationData(path);\r
-    }\r
-\r
-    override readOperationalData(P path) {\r
-        val local = this.updatedOperationalData.get(path);\r
-        if(local != null) {\r
-            return local;\r
-        }\r
-        return broker.readOperationalData(path);\r
-    }\r
-\r
-    override hashCode() {\r
-        return identifier.hashCode;\r
-    }\r
-\r
-    override equals(Object obj) {\r
-        if (this === obj)\r
-            return true;\r
-        if (obj == null)\r
-            return false;\r
-        if (getClass() != obj.getClass())\r
-            return false;\r
-        val other = (obj as AbstractDataTransaction<P,D>);\r
-        if (broker == null) {\r
-            if (other.broker != null)\r
-                return false;\r
-        } else if (!broker.equals(other.broker))\r
-            return false;\r
-        if (identifier == null) {\r
-            if (other.identifier != null)\r
-                return false;\r
-        } else if (!identifier.equals(other.identifier))\r
-            return false;\r
-        return true;\r
-    }\r
-\r
-    override TransactionStatus getStatus() {\r
-        return status;\r
-    }\r
-\r
-    protected abstract def void onStatusChange(TransactionStatus status);\r
-\r
-    public def changeStatus(TransactionStatus status) {\r
-        LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
-        this.status = status;\r
-        onStatusChange(status);\r
-    }\r
-\r
-}\r