Merge changes I0f752636,Idd154499,Ic35fa3e8
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.xtend
index b878071183e14937a63a1ce8981bcfaa5785566d..e3d2b567a71f42b202bf028db6f0d8dcd5f70ae2 100644 (file)
@@ -27,25 +27,43 @@ import java.util.concurrent.Future
 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
 import org.opendaylight.yangtools.concepts.Path
 import org.slf4j.LoggerFactory
-
-abstract class AbstractDataBroker<P extends Path<P>,D,DCL extends DataChangeListener<P,D>> implements 
-DataModificationTransactionFactory<P, D>, //
+import java.util.HashSet
+import java.util.Map.Entry
+import java.util.Iterator
+import java.util.Collection
+import com.google.common.collect.FluentIterable;
+import java.util.Set
+import com.google.common.collect.ImmutableList
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration
+import org.opendaylight.controller.md.sal.common.api.RegistrationListener
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry
+import java.util.concurrent.atomic.AtomicLong
+
+abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
 DataReader<P, D>, //
 DataChangePublisher<P, D, DCL>, //
-DataProvisionService<P,D> {
+DataProvisionService<P, D> {
+
+    private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);
 
     @Property
     var ExecutorService executor;
 
     @Property
-    var AbstractDataReadRouter<P,D> dataReadRouter;
-
-    Multimap<P, DataChangeListenerRegistration<P,D,DCL>> listeners = HashMultimap.create();
-    Multimap<P, DataCommitHandlerRegistration<P,D>> commitHandlers = HashMultimap.create();
-
+    var AbstractDataReadRouter<P, D> dataReadRouter;
 
+    Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();
+    Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();
+    
+    val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();
     public new() {
-        
+    }
+
+    protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
+        HashSet<P> paths) {
+        return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
+        .transformAndConcat[value] //
+        .transform[instance].toList()
     }
 
     override final readConfigurationData(P path) {
@@ -56,11 +74,18 @@ DataProvisionService<P,D> {
         return dataReadRouter.readOperationalData(path);
     }
 
-    override final registerCommitHandler(P path,
-        DataCommitHandler<P, D> commitHandler) {
-            val registration = new DataCommitHandlerRegistration(path,commitHandler,this);
-            commitHandlers.put(path,registration)
-            return registration;
+    override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
+        val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
+        commitHandlers.put(path, registration)
+        LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);
+        for(listener : commitHandlerRegistrationListeners) {
+            try {
+                listener.instance.onRegister(registration);
+            } catch (Exception e) {
+                LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
+            }
+        }
+        return registration;
     }
 
     override final def registerDataChangeListener(P path, DCL listener) {
@@ -69,27 +94,65 @@ DataProvisionService<P,D> {
         return reg;
     }
 
-     final def registerDataReader(P path,DataReader<P,D> reader) {
-        
-        val confReg = dataReadRouter.registerConfigurationReader(path,reader);
-        val dataReg = dataReadRouter.registerOperationalReader(path,reader);
+    final def registerDataReader(P path, DataReader<P, D> reader) {
+
+        val confReg = dataReadRouter.registerConfigurationReader(path, reader);
+        val dataReg = dataReadRouter.registerOperationalReader(path, reader);
+
+        return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
+    }
+    
+    override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
+        val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);
         
-        return new CompositeObjectRegistration(reader,Arrays.asList(confReg,dataReg));
+        return ret;
     }
+    
 
-    protected  final def removeListener(DataChangeListenerRegistration<P,D,DCL> registration) {
+    protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
         listeners.remove(registration.path, registration);
     }
 
-    protected  final def removeCommitHandler(DataCommitHandlerRegistration<P,D> registration) {
+    protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {
         commitHandlers.remove(registration.path, registration);
+        
+         LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
+        for(listener : commitHandlerRegistrationListeners) {
+            try {
+                listener.instance.onUnregister(registration);
+            } catch (Exception e) {
+                LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
+            }
+        }
     }
-    
-    protected  final def getActiveCommitHandlers() {
-        return commitHandlers.entries.map[ value.instance].toSet
+
+    protected final def getActiveCommitHandlers() {
+        return commitHandlers.entries;
+    }
+
+    protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
+        HashSet<P> paths) {
+        return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
+            val operationalState = readOperationalData(key)
+            val configurationState = readConfigurationData(key)
+            return new ListenerStateCapture(key, value, operationalState, configurationState)
+        ].toList()
+    }
+
+    protected def boolean isAffectedBy(P key, Set<P> paths) {
+        if (paths.contains(key)) {
+            return true;
+        }
+        for (path : paths) {
+            if (key.contains(path)) {
+                return true;
+            }
+        }
+
+        return false;
     }
 
-    package final def Future<RpcResult<TransactionStatus>>  commit(AbstractDataTransaction<P,D> transaction) {
+    package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
         checkNotNull(transaction);
         transaction.changeStatus(TransactionStatus.SUBMITED);
         val task = new TwoPhaseCommit(transaction, this);
@@ -98,14 +161,30 @@ DataProvisionService<P,D> {
 
 }
 
-package class DataChangeListenerRegistration<P extends Path<P>,D,DCL extends DataChangeListener<P,D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
+@Data
+package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
+
+    @Property
+    P path;
+
+    @Property
+    Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
+
+    @Property
+    D initialOperationalState;
+
+    @Property
+    D initialConfigurationState;
+}
+
+package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
 
-    AbstractDataBroker<P,D,DCL> dataBroker;
+    AbstractDataBroker<P, D, DCL> dataBroker;
 
     @Property
     val P path;
 
-    new(P path, DCL instance, AbstractDataBroker<P,D,DCL> broker) {
+    new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
         super(instance)
         dataBroker = broker;
         _path = path;
@@ -118,16 +197,16 @@ package class DataChangeListenerRegistration<P extends Path<P>,D,DCL extends Dat
 
 }
 
-package class DataCommitHandlerRegistration<P extends Path<P>,D>
-extends AbstractObjectRegistration<DataCommitHandler<P, D>> {
+package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //
+extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
+implements DataCommitHandlerRegistration<P, D> {
 
-    AbstractDataBroker<P,D,?> dataBroker;
+    AbstractDataBroker<P, D, ?> dataBroker;
 
     @Property
     val P path;
 
-    new(P path, DataCommitHandler<P, D> instance,
-        AbstractDataBroker<P,D,?> broker) {
+    new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
         super(instance)
         dataBroker = broker;
         _path = path;
@@ -137,52 +216,87 @@ extends AbstractObjectRegistration<DataCommitHandler<P, D>> {
         dataBroker.removeCommitHandler(this);
         dataBroker = null;
     }
-
 }
 
-package class TwoPhaseCommit<P extends Path<P>,D> implements Callable<RpcResult<TransactionStatus>> {
-    
+package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
+
     private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
 
-    val AbstractDataTransaction<P,D> transaction;
-    val AbstractDataBroker<P,D,?> dataBroker;
+    val AbstractDataTransaction<P, D> transaction;
+    val AbstractDataBroker<P, D, DCL> dataBroker;
 
-    new(AbstractDataTransaction<P,D> transaction, AbstractDataBroker<P,D,?> broker) {
+    new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
         this.transaction = transaction;
         this.dataBroker = broker;
     }
 
     override call() throws Exception {
 
-        val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.activeCommitHandlers;
+        // get affected paths
+        val affectedPaths = new HashSet<P>();
+
+        affectedPaths.addAll(transaction.createdConfigurationData.keySet);
+        affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
+        affectedPaths.addAll(transaction.removedConfigurationData);
+
+        affectedPaths.addAll(transaction.createdOperationalData.keySet);
+        affectedPaths.addAll(transaction.updatedOperationalData.keySet);
+        affectedPaths.addAll(transaction.removedOperationalData);
+
+        val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
 
+        val transactionId = transaction.identifier;
+
+        log.info("Transaction: {} Started.",transactionId);
         // requesting commits
+        val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
         val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
         try {
             for (handler : commitHandlers) {
                 handlerTransactions.add(handler.requestCommit(transaction));
             }
         } catch (Exception e) {
-            log.error("Request Commit failded",e);
-            return rollback(handlerTransactions,e);
+            log.error("Transaction: {} Request Commit failed", transactionId,e);
+            return rollback(handlerTransactions, e);
         }
         val List<RpcResult<Void>> results = new ArrayList();
         try {
             for (subtransaction : handlerTransactions) {
                 results.add(subtransaction.finish());
             }
+            listeners.publishDataChangeEvent();
         } catch (Exception e) {
-            log.error("Finish Commit failed",e);
-            return rollback(handlerTransactions,e);
+            log.error("Transaction: {} Finish Commit failed",transactionId, e);
+            return rollback(handlerTransactions, e);
         }
-
+        log.info("Transaction: {} Finished succesfully.",transactionId);
         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
+
     }
 
-    def rollback(List<DataCommitTransaction<P, D>> transactions,Exception e) {
+    def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
+        for (listenerSet : listeners) {
+            val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
+            val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
+
+            val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
+                listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
+            for (listener : listenerSet.listeners) {
+                try {
+                    listener.instance.onDataChanged(changeEvent);
+
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
         for (transaction : transactions) {
             transaction.rollback()
         }
+
         // FIXME return encountered error.
         return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
     }
@@ -193,21 +307,20 @@ public abstract class AbstractDataTransaction<P extends Path<P>, D> extends Abst
     @Property
     private val Object identifier;
 
-    
     var TransactionStatus status;
-    
-    
+
     var AbstractDataBroker<P, D, ?> broker;
 
-    protected new (AbstractDataBroker<P,D,?> dataBroker) {
+    protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {
         super(dataBroker);
-        _identifier = new Object();
+        _identifier = identifier;
         broker = dataBroker;
         status = TransactionStatus.NEW;
-        //listeners = new ListenerRegistry<>();
+
+    //listeners = new ListenerRegistry<>();
     }
 
-    override  commit() {
+    override commit() {
         return broker.commit(this);
     }
 
@@ -230,7 +343,7 @@ public abstract class AbstractDataTransaction<P extends Path<P>, D> extends Abst
             return false;
         if (getClass() != obj.getClass())
             return false;
-        val other = (obj as AbstractDataTransaction<P,D>) ;
+        val other = (obj as AbstractDataTransaction<P,D>);
         if (broker == null) {
             if (other.broker != null)
                 return false;
@@ -248,12 +361,11 @@ public abstract class AbstractDataTransaction<P extends Path<P>, D> extends Abst
         return status;
     }
 
-    
     protected abstract def void onStatusChange(TransactionStatus status);
-    
+
     public def changeStatus(TransactionStatus status) {
         this.status = status;
         onStatusChange(status);
     }
-    
+
 }