Fixed two-phase commit bug when change originated in DOM Broker
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.xtend
index 74c4e0a148640a2278deee8a87bf67c4c27eb334..e3d2b567a71f42b202bf028db6f0d8dcd5f70ae2 100644 (file)
@@ -34,12 +34,18 @@ import java.util.Collection
 import com.google.common.collect.FluentIterable;
 import java.util.Set
 import com.google.common.collect.ImmutableList
 import com.google.common.collect.FluentIterable;
 import java.util.Set
 import com.google.common.collect.ImmutableList
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration
+import org.opendaylight.controller.md.sal.common.api.RegistrationListener
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry
+import java.util.concurrent.atomic.AtomicLong
 
 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
 DataReader<P, D>, //
 DataChangePublisher<P, D, DCL>, //
 DataProvisionService<P, D> {
 
 
 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
 DataReader<P, D>, //
 DataChangePublisher<P, D, DCL>, //
 DataProvisionService<P, D> {
 
+    private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);
+
     @Property
     var ExecutorService executor;
 
     @Property
     var ExecutorService executor;
 
@@ -47,17 +53,17 @@ DataProvisionService<P, D> {
     var AbstractDataReadRouter<P, D> dataReadRouter;
 
     Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();
     var AbstractDataReadRouter<P, D> dataReadRouter;
 
     Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();
-    Multimap<P, DataCommitHandlerRegistration<P, D>> commitHandlers = HashMultimap.create();
-
+    Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();
+    
+    val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();
     public new() {
     }
 
     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
         HashSet<P> paths) {
     public new() {
     }
 
     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
         HashSet<P> paths) {
-        return FluentIterable.from(commitHandlers.asMap.entrySet)
-            .filter[key.isAffectedBy(paths)] //
-            .transformAndConcat [value] //
-            .transform[instance].toList()
+        return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
+        .transformAndConcat[value] //
+        .transform[instance].toList()
     }
 
     override final readConfigurationData(P path) {
     }
 
     override final readConfigurationData(P path) {
@@ -69,8 +75,16 @@ DataProvisionService<P, D> {
     }
 
     override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
     }
 
     override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
-        val registration = new DataCommitHandlerRegistration(path, commitHandler, this);
+        val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
         commitHandlers.put(path, registration)
         commitHandlers.put(path, registration)
+        LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);
+        for(listener : commitHandlerRegistrationListeners) {
+            try {
+                listener.instance.onRegister(registration);
+            } catch (Exception e) {
+                LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
+            }
+        }
         return registration;
     }
 
         return registration;
     }
 
@@ -87,13 +101,29 @@ DataProvisionService<P, D> {
 
         return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
     }
 
         return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
     }
+    
+    override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
+        val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);
+        
+        return ret;
+    }
+    
 
     protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
         listeners.remove(registration.path, registration);
     }
 
 
     protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
         listeners.remove(registration.path, registration);
     }
 
-    protected final def removeCommitHandler(DataCommitHandlerRegistration<P, D> registration) {
+    protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {
         commitHandlers.remove(registration.path, registration);
         commitHandlers.remove(registration.path, registration);
+        
+         LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
+        for(listener : commitHandlerRegistrationListeners) {
+            try {
+                listener.instance.onUnregister(registration);
+            } catch (Exception e) {
+                LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
+            }
+        }
     }
 
     protected final def getActiveCommitHandlers() {
     }
 
     protected final def getActiveCommitHandlers() {
@@ -132,7 +162,7 @@ DataProvisionService<P, D> {
 }
 
 @Data
 }
 
 @Data
-package class ListenerStateCapture<P extends Path<P>, D,DCL extends DataChangeListener<P, D>> {
+package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
 
     @Property
     P path;
 
     @Property
     P path;
@@ -167,7 +197,9 @@ package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends D
 
 }
 
 
 }
 
-package class DataCommitHandlerRegistration<P extends Path<P>, D> extends AbstractObjectRegistration<DataCommitHandler<P, D>> {
+package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //
+extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
+implements DataCommitHandlerRegistration<P, D> {
 
     AbstractDataBroker<P, D, ?> dataBroker;
 
 
     AbstractDataBroker<P, D, ?> dataBroker;
 
@@ -184,10 +216,9 @@ package class DataCommitHandlerRegistration<P extends Path<P>, D> extends Abstra
         dataBroker.removeCommitHandler(this);
         dataBroker = null;
     }
         dataBroker.removeCommitHandler(this);
         dataBroker = null;
     }
-
 }
 
 }
 
-package class TwoPhaseCommit<P extends Path<P>, D,DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
+package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
 
     private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
 
 
     private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
 
@@ -214,15 +245,18 @@ package class TwoPhaseCommit<P extends Path<P>, D,DCL extends DataChangeListener
 
         val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
 
 
         val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
 
+        val transactionId = transaction.identifier;
+
+        log.info("Transaction: {} Started.",transactionId);
         // requesting commits
         // requesting commits
-        val Iterable<DataCommitHandler<P, D>> commitHandlers =   dataBroker.affectedCommitHandlers(affectedPaths);
+        val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
         val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
         try {
             for (handler : commitHandlers) {
                 handlerTransactions.add(handler.requestCommit(transaction));
             }
         } catch (Exception e) {
         val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
         try {
             for (handler : commitHandlers) {
                 handlerTransactions.add(handler.requestCommit(transaction));
             }
         } catch (Exception e) {
-            log.error("Request Commit failded", e);
+            log.error("Transaction: {} Request Commit failed", transactionId,e);
             return rollback(handlerTransactions, e);
         }
         val List<RpcResult<Void>> results = new ArrayList();
             return rollback(handlerTransactions, e);
         }
         val List<RpcResult<Void>> results = new ArrayList();
@@ -232,25 +266,25 @@ package class TwoPhaseCommit<P extends Path<P>, D,DCL extends DataChangeListener
             }
             listeners.publishDataChangeEvent();
         } catch (Exception e) {
             }
             listeners.publishDataChangeEvent();
         } catch (Exception e) {
-            log.error("Finish Commit failed", e);
+            log.error("Transaction: {} Finish Commit failed",transactionId, e);
             return rollback(handlerTransactions, e);
         }
             return rollback(handlerTransactions, e);
         }
-
-        
+        log.info("Transaction: {} Finished succesfully.",transactionId);
         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
 
     }
         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
 
     }
-    
-    def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D,DCL>> listeners) {
-        for(listenerSet : listeners) {
+
+    def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
+        for (listenerSet : listeners) {
             val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
             val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
             val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
             val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
-            
-            val changeEvent = new DataChangeEventImpl(transaction,listenerSet.initialConfigurationState,listenerSet.initialOperationalState,updatedOperational,updatedConfiguration);
-            for(listener : listenerSet.listeners) {
+
+            val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
+                listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
+            for (listener : listenerSet.listeners) {
                 try {
                     listener.instance.onDataChanged(changeEvent);
                 try {
                     listener.instance.onDataChanged(changeEvent);
-                    
+
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
@@ -267,6 +301,7 @@ package class TwoPhaseCommit<P extends Path<P>, D,DCL extends DataChangeListener
         return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
     }
 }
         return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
     }
 }
+
 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
 
     @Property
 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
 
     @Property
@@ -276,9 +311,9 @@ public abstract class AbstractDataTransaction<P extends Path<P>, D> extends Abst
 
     var AbstractDataBroker<P, D, ?> broker;
 
 
     var AbstractDataBroker<P, D, ?> broker;
 
-    protected new(AbstractDataBroker<P, D, ?> dataBroker) {
+    protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {
         super(dataBroker);
         super(dataBroker);
-        _identifier = new Object();
+        _identifier = identifier;
         broker = dataBroker;
         status = TransactionStatus.NEW;
 
         broker = dataBroker;
         status = TransactionStatus.NEW;