Merge "Fixed publishDataChangeEvent in 2phase commit"
authorEd Warnicke <eaw@cisco.com>
Fri, 7 Feb 2014 06:36:35 +0000 (06:36 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 7 Feb 2014 06:36:35 +0000 (06:36 +0000)
1  2 
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend

index 2c3b0188f48096d34ea45485fe34ec1a1711efe1,0c8f6109ed843ac84bdc5d4fef814e1e98518c7d..7c6f52f110fd1771650c9670c43a8136d8999a2b
@@@ -45,8 -45,6 +45,8 @@@ import org.slf4j.LoggerFactor
  \r
  import static com.google.common.base.Preconditions.*\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
  import com.google.common.collect.Multimaps
 +import java.util.concurrent.locks.Lock
 +import java.util.concurrent.locks.ReentrantLock
  
  abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //\r
  DataReader<P, D>, //\r
@@@ -72,20 -70,16 +72,20 @@@ DataProvisionService<P, D> 
  \r
      Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
      Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
 +    
 +    private val Lock registrationLock = new ReentrantLock;
      \r
      val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();\r
      public new() {\r
      }\r
  \r
      protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(\r
 -        HashSet<P> paths) {\r
 -        return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
 -        .transformAndConcat[value] //\r
 -        .transform[instance].toList()\r
 +        HashSet<P> paths) {
 +        return withLock(registrationLock) [|\r
 +            return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
 +                .transformAndConcat[value] //\r
 +                .transform[instance].toList()
 +        ]\r
      }\r
  \r
      override final readConfigurationData(P path) {\r
  \r
      override final readOperationalData(P path) {\r
          return dataReadRouter.readOperationalData(path);\r
 -    }\r
 -\r
 -    override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {\r
 -        val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
 -        commitHandlers.put(path, registration)\r
 -        LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
 -        for(listener : commitHandlerRegistrationListeners) {\r
 -            try {\r
 -                listener.instance.onRegister(registration);\r
 -            } catch (Exception e) {\r
 -                LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
 -            }\r
 -        }\r
 -        return registration;\r
 +    }
 +    
 +    private static def <T> withLock(Lock lock,Callable<T> method) {
 +        lock.lock
 +        try {
 +            return method.call
 +        } finally {
 +            lock.unlock
 +        }
 +    } \r
 +\r
 +    override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
 +        return withLock(registrationLock) [|\r
 +            val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
 +            commitHandlers.put(path, registration)\r
 +            LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
 +            for(listener : commitHandlerRegistrationListeners) {\r
 +                try {\r
 +                    listener.instance.onRegister(registration);\r
 +                } catch (Exception e) {\r
 +                    LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
 +                }\r
 +            }
 +            return registration;
 +        ]\r
      }\r
  \r
      override final def registerDataChangeListener(P path, DCL listener) {\r
 -        val reg = new DataChangeListenerRegistration(path, listener, this);\r
 -        listeners.put(path, reg);\r
 -        val initialConfig = dataReadRouter.readConfigurationData(path);\r
 -        val initialOperational = dataReadRouter.readOperationalData(path);\r
 -        val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
 -        listener.onDataChanged(event);\r
 -        return reg;\r
 +        return withLock(registrationLock) [|
 +            val reg = new DataChangeListenerRegistration(path, listener, this);\r
 +            listeners.put(path, reg);\r
 +            val initialConfig = dataReadRouter.readConfigurationData(path);\r
 +            val initialOperational = dataReadRouter.readOperationalData(path);\r
 +            val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
 +            listener.onDataChanged(event);\r
 +            return reg;
 +        ]\r
      }\r
  \r
      final def registerDataReader(P path, DataReader<P, D> reader) {\r
 -\r
 -        val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
 -        val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
 -\r
 -        return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));\r
 +        return withLock(registrationLock) [|\r
 +            val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
 +            val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
 +    \r
 +            return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
 +        ]\r
      }\r
      \r
      override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {\r
          val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);\r
 -        \r
          return ret;\r
      }\r
      \r
          \r
      }\r
  \r
 -    protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {\r
 -        listeners.remove(registration.path, registration);\r
 +    protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
 +        return withLock(registrationLock) [|\r
 +            listeners.remove(registration.path, registration);
 +        ]\r
      }\r
  \r
      protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {\r
 -        commitHandlers.remove(registration.path, registration);\r
 -        \r
 -         LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
 -        for(listener : commitHandlerRegistrationListeners) {\r
 -            try {\r
 -                listener.instance.onUnregister(registration);\r
 -            } catch (Exception e) {\r
 -                LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
 -            }\r
 -        }\r
 +        return withLock(registrationLock) [|
 +            commitHandlers.remove(registration.path, registration);\r
 +             LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
 +            for(listener : commitHandlerRegistrationListeners) {\r
 +                try {\r
 +                    listener.instance.onUnregister(registration);\r
 +                } catch (Exception e) {\r
 +                    LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
 +                }\r
 +            }
 +            return null;
 +        ]\r
      }\r
  \r
      protected final def getActiveCommitHandlers() {\r
      }\r
  \r
      protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(\r
 -        HashSet<P> paths) {\r
 -        return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
 -            val operationalState = readOperationalData(key)\r
 -            val configurationState = readConfigurationData(key)\r
 -            return new ListenerStateCapture(key, value, operationalState, configurationState)\r
 -        ].toList()\r
 +        HashSet<P> paths) {
 +        return withLock(registrationLock) [|\r
 +            return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
 +                val operationalState = readOperationalData(key)\r
 +                val configurationState = readConfigurationData(key)\r
 +                return new ListenerStateCapture(key, value, operationalState, configurationState)\r
 +            ].toList()
 +        ]\r
      }\r
  \r
      protected def boolean isAffectedBy(P key, Set<P> paths) {\r
@@@ -292,13 -267,12 +292,13 @@@ package class TwoPhaseCommit<P extends 
          affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
          affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
          affectedPaths.addAll(transaction.removedOperationalData);\r
 -\r
 +
          val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
  \r
          val transactionId = transaction.identifier;\r
  \r
          log.trace("Transaction: {} Started.",transactionId);\r
 +        log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths);
          // requesting commits\r
          val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
          val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
          } catch (Exception e) {\r
              log.error("Transaction: {} Request Commit failed", transactionId,e);\r
              dataBroker.failedTransactionsCount.andIncrement\r
 +            transaction.changeStatus(TransactionStatus.FAILED)
              return rollback(handlerTransactions, e);\r
          }\r
          val List<RpcResult<Void>> results = new ArrayList();\r
              listeners.publishDataChangeEvent();\r
          } catch (Exception e) {\r
              log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
 -            dataBroker.failedTransactionsCount.andIncrement\r
 +            dataBroker.failedTransactionsCount.andIncrement
 +            transaction.changeStatus(TransactionStatus.FAILED)\r
              return rollback(handlerTransactions, e);\r
          }\r
          log.trace("Transaction: {} Finished successfully.",transactionId);\r
 -        dataBroker.finishedTransactionsCount.andIncrement;\r
 +        dataBroker.finishedTransactionsCount.andIncrement;
 +        transaction.changeStatus(TransactionStatus.COMMITED)\r
          return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
  \r
      }\r
  \r
      def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {\r
-         for (listenerSet : listeners) {\r
-             val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);\r
-             val updatedOperational = dataBroker.readOperationalData(listenerSet.path);\r
\r
-             val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,\r
-                 listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);\r
-             for (listener : listenerSet.listeners) {\r
-                 try {\r
-                     listener.instance.onDataChanged(changeEvent);\r
\r
-                 } catch (Exception e) {\r
-                     e.printStackTrace();\r
-                 }\r
-             }\r
-         }\r
+         dataBroker.executor.submit [|\r
+             for (listenerSet : listeners) {
+                 val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
+                 val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
+                 val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
+                     listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
+                 for (listener : listenerSet.listeners) {
+                     try {
+                         listener.instance.onDataChanged(changeEvent);
+                     } catch (Exception e) {
+                         e.printStackTrace();
+                     }
+                 }
+             }        \r
+         ]\r
      }\r
  \r
      def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {\r