Fix for Bug 144, Bug 147, 148 - improved codec
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.xtend
index b878071183e14937a63a1ce8981bcfaa5785566d..74c4e0a148640a2278deee8a87bf67c4c27eb334 100644 (file)
@@ -27,25 +27,37 @@ import java.util.concurrent.Future
 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
 import org.opendaylight.yangtools.concepts.Path
 import org.slf4j.LoggerFactory
-
-abstract class AbstractDataBroker<P extends Path<P>,D,DCL extends DataChangeListener<P,D>> implements 
-DataModificationTransactionFactory<P, D>, //
+import java.util.HashSet
+import java.util.Map.Entry
+import java.util.Iterator
+import java.util.Collection
+import com.google.common.collect.FluentIterable;
+import java.util.Set
+import com.google.common.collect.ImmutableList
+
+abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
 DataReader<P, D>, //
 DataChangePublisher<P, D, DCL>, //
-DataProvisionService<P,D> {
+DataProvisionService<P, D> {
 
     @Property
     var ExecutorService executor;
 
     @Property
-    var AbstractDataReadRouter<P,D> dataReadRouter;
-
-    Multimap<P, DataChangeListenerRegistration<P,D,DCL>> listeners = HashMultimap.create();
-    Multimap<P, DataCommitHandlerRegistration<P,D>> commitHandlers = HashMultimap.create();
+    var AbstractDataReadRouter<P, D> dataReadRouter;
 
+    Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();
+    Multimap<P, DataCommitHandlerRegistration<P, D>> commitHandlers = HashMultimap.create();
 
     public new() {
-        
+    }
+
+    protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
+        HashSet<P> paths) {
+        return FluentIterable.from(commitHandlers.asMap.entrySet)
+            .filter[key.isAffectedBy(paths)] //
+            .transformAndConcat [value] //
+            .transform[instance].toList()
     }
 
     override final readConfigurationData(P path) {
@@ -56,11 +68,10 @@ DataProvisionService<P,D> {
         return dataReadRouter.readOperationalData(path);
     }
 
-    override final registerCommitHandler(P path,
-        DataCommitHandler<P, D> commitHandler) {
-            val registration = new DataCommitHandlerRegistration(path,commitHandler,this);
-            commitHandlers.put(path,registration)
-            return registration;
+    override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
+        val registration = new DataCommitHandlerRegistration(path, commitHandler, this);
+        commitHandlers.put(path, registration)
+        return registration;
     }
 
     override final def registerDataChangeListener(P path, DCL listener) {
@@ -69,27 +80,49 @@ DataProvisionService<P,D> {
         return reg;
     }
 
-     final def registerDataReader(P path,DataReader<P,D> reader) {
-        
-        val confReg = dataReadRouter.registerConfigurationReader(path,reader);
-        val dataReg = dataReadRouter.registerOperationalReader(path,reader);
-        
-        return new CompositeObjectRegistration(reader,Arrays.asList(confReg,dataReg));
+    final def registerDataReader(P path, DataReader<P, D> reader) {
+
+        val confReg = dataReadRouter.registerConfigurationReader(path, reader);
+        val dataReg = dataReadRouter.registerOperationalReader(path, reader);
+
+        return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
     }
 
-    protected  final def removeListener(DataChangeListenerRegistration<P,D,DCL> registration) {
+    protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
         listeners.remove(registration.path, registration);
     }
 
-    protected  final def removeCommitHandler(DataCommitHandlerRegistration<P,D> registration) {
+    protected final def removeCommitHandler(DataCommitHandlerRegistration<P, D> registration) {
         commitHandlers.remove(registration.path, registration);
     }
-    
-    protected  final def getActiveCommitHandlers() {
-        return commitHandlers.entries.map[ value.instance].toSet
+
+    protected final def getActiveCommitHandlers() {
+        return commitHandlers.entries;
     }
 
-    package final def Future<RpcResult<TransactionStatus>>  commit(AbstractDataTransaction<P,D> transaction) {
+    protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
+        HashSet<P> paths) {
+        return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
+            val operationalState = readOperationalData(key)
+            val configurationState = readConfigurationData(key)
+            return new ListenerStateCapture(key, value, operationalState, configurationState)
+        ].toList()
+    }
+
+    protected def boolean isAffectedBy(P key, Set<P> paths) {
+        if (paths.contains(key)) {
+            return true;
+        }
+        for (path : paths) {
+            if (key.contains(path)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
         checkNotNull(transaction);
         transaction.changeStatus(TransactionStatus.SUBMITED);
         val task = new TwoPhaseCommit(transaction, this);
@@ -98,14 +131,30 @@ DataProvisionService<P,D> {
 
 }
 
-package class DataChangeListenerRegistration<P extends Path<P>,D,DCL extends DataChangeListener<P,D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
+@Data
+package class ListenerStateCapture<P extends Path<P>, D,DCL extends DataChangeListener<P, D>> {
 
-    AbstractDataBroker<P,D,DCL> dataBroker;
+    @Property
+    P path;
+
+    @Property
+    Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
+
+    @Property
+    D initialOperationalState;
+
+    @Property
+    D initialConfigurationState;
+}
+
+package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
+
+    AbstractDataBroker<P, D, DCL> dataBroker;
 
     @Property
     val P path;
 
-    new(P path, DCL instance, AbstractDataBroker<P,D,DCL> broker) {
+    new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
         super(instance)
         dataBroker = broker;
         _path = path;
@@ -118,16 +167,14 @@ package class DataChangeListenerRegistration<P extends Path<P>,D,DCL extends Dat
 
 }
 
-package class DataCommitHandlerRegistration<P extends Path<P>,D>
-extends AbstractObjectRegistration<DataCommitHandler<P, D>> {
+package class DataCommitHandlerRegistration<P extends Path<P>, D> extends AbstractObjectRegistration<DataCommitHandler<P, D>> {
 
-    AbstractDataBroker<P,D,?> dataBroker;
+    AbstractDataBroker<P, D, ?> dataBroker;
 
     @Property
     val P path;
 
-    new(P path, DataCommitHandler<P, D> instance,
-        AbstractDataBroker<P,D,?> broker) {
+    new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
         super(instance)
         dataBroker = broker;
         _path = path;
@@ -140,74 +187,105 @@ extends AbstractObjectRegistration<DataCommitHandler<P, D>> {
 
 }
 
-package class TwoPhaseCommit<P extends Path<P>,D> implements Callable<RpcResult<TransactionStatus>> {
-    
+package class TwoPhaseCommit<P extends Path<P>, D,DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
+
     private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
 
-    val AbstractDataTransaction<P,D> transaction;
-    val AbstractDataBroker<P,D,?> dataBroker;
+    val AbstractDataTransaction<P, D> transaction;
+    val AbstractDataBroker<P, D, DCL> dataBroker;
 
-    new(AbstractDataTransaction<P,D> transaction, AbstractDataBroker<P,D,?> broker) {
+    new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
         this.transaction = transaction;
         this.dataBroker = broker;
     }
 
     override call() throws Exception {
 
-        val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.activeCommitHandlers;
+        // get affected paths
+        val affectedPaths = new HashSet<P>();
+
+        affectedPaths.addAll(transaction.createdConfigurationData.keySet);
+        affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
+        affectedPaths.addAll(transaction.removedConfigurationData);
+
+        affectedPaths.addAll(transaction.createdOperationalData.keySet);
+        affectedPaths.addAll(transaction.updatedOperationalData.keySet);
+        affectedPaths.addAll(transaction.removedOperationalData);
+
+        val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
 
         // requesting commits
+        val Iterable<DataCommitHandler<P, D>> commitHandlers =   dataBroker.affectedCommitHandlers(affectedPaths);
         val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
         try {
             for (handler : commitHandlers) {
                 handlerTransactions.add(handler.requestCommit(transaction));
             }
         } catch (Exception e) {
-            log.error("Request Commit failded",e);
-            return rollback(handlerTransactions,e);
+            log.error("Request Commit failded", e);
+            return rollback(handlerTransactions, e);
         }
         val List<RpcResult<Void>> results = new ArrayList();
         try {
             for (subtransaction : handlerTransactions) {
                 results.add(subtransaction.finish());
             }
+            listeners.publishDataChangeEvent();
         } catch (Exception e) {
-            log.error("Finish Commit failed",e);
-            return rollback(handlerTransactions,e);
+            log.error("Finish Commit failed", e);
+            return rollback(handlerTransactions, e);
         }
 
+        
         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
+
+    }
+    
+    def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D,DCL>> listeners) {
+        for(listenerSet : listeners) {
+            val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
+            val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
+            
+            val changeEvent = new DataChangeEventImpl(transaction,listenerSet.initialConfigurationState,listenerSet.initialOperationalState,updatedOperational,updatedConfiguration);
+            for(listener : listenerSet.listeners) {
+                try {
+                    listener.instance.onDataChanged(changeEvent);
+                    
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
     }
 
-    def rollback(List<DataCommitTransaction<P, D>> transactions,Exception e) {
+    def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
         for (transaction : transactions) {
             transaction.rollback()
         }
+
         // FIXME return encountered error.
         return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
     }
 }
-
 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
 
     @Property
     private val Object identifier;
 
-    
     var TransactionStatus status;
-    
-    
+
     var AbstractDataBroker<P, D, ?> broker;
 
-    protected new (AbstractDataBroker<P,D,?> dataBroker) {
+    protected new(AbstractDataBroker<P, D, ?> dataBroker) {
         super(dataBroker);
         _identifier = new Object();
         broker = dataBroker;
         status = TransactionStatus.NEW;
-        //listeners = new ListenerRegistry<>();
+
+    //listeners = new ListenerRegistry<>();
     }
 
-    override  commit() {
+    override commit() {
         return broker.commit(this);
     }
 
@@ -230,7 +308,7 @@ public abstract class AbstractDataTransaction<P extends Path<P>, D> extends Abst
             return false;
         if (getClass() != obj.getClass())
             return false;
-        val other = (obj as AbstractDataTransaction<P,D>) ;
+        val other = (obj as AbstractDataTransaction<P,D>);
         if (broker == null) {
             if (other.broker != null)
                 return false;
@@ -248,12 +326,11 @@ public abstract class AbstractDataTransaction<P extends Path<P>, D> extends Abst
         return status;
     }
 
-    
     protected abstract def void onStatusChange(TransactionStatus status);
-    
+
     public def changeStatus(TransactionStatus status) {
         this.status = status;
         onStatusChange(status);
     }
-    
+
 }