- return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
- val operationalState = readOperationalData(key)
- val configurationState = readConfigurationData(key)
- return new ListenerStateCapture(key, value, operationalState, configurationState)
- ].toList()
- }
-
- protected def boolean isAffectedBy(P key, Set<P> paths) {
- if (paths.contains(key)) {
- return true;
- }
- for (path : paths) {
- if (key.contains(path)) {
- return true;
- }
- }
-
- return false;
- }
-
- package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
- checkNotNull(transaction);
- transaction.changeStatus(TransactionStatus.SUBMITED);
- val task = new TwoPhaseCommit(transaction, this);
- return executor.submit(task);
- }
-
-}
-
-@Data
-package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
-
- @Property
- P path;
-
- @Property
- Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
-
- @Property
- D initialOperationalState;
-
- @Property
- D initialConfigurationState;
-}
-
-package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
-
- AbstractDataBroker<P, D, DCL> dataBroker;
-
- @Property
- val P path;
-
- new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
- super(instance)
- dataBroker = broker;
- _path = path;
- }
-
- override protected removeRegistration() {
- dataBroker.removeListener(this);
- dataBroker = null;
- }
-
-}
-
-package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //
-extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
-implements DataCommitHandlerRegistration<P, D> {
-
- AbstractDataBroker<P, D, ?> dataBroker;
-
- @Property
- val P path;
-
- new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
- super(instance)
- dataBroker = broker;
- _path = path;
- }
-
- override protected removeRegistration() {
- dataBroker.removeCommitHandler(this);
- dataBroker = null;
- }
-}
-
-package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
-
- private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
-
- val AbstractDataTransaction<P, D> transaction;
- val AbstractDataBroker<P, D, DCL> dataBroker;
-
- new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
- this.transaction = transaction;
- this.dataBroker = broker;
- }
-
- override call() throws Exception {
-
- // get affected paths
- val affectedPaths = new HashSet<P>();
-
- affectedPaths.addAll(transaction.createdConfigurationData.keySet);
- affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
- affectedPaths.addAll(transaction.removedConfigurationData);
-
- affectedPaths.addAll(transaction.createdOperationalData.keySet);
- affectedPaths.addAll(transaction.updatedOperationalData.keySet);
- affectedPaths.addAll(transaction.removedOperationalData);
-
- val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
-
- val transactionId = transaction.identifier;
-
- log.info("Transaction: {} Started.",transactionId);
- // requesting commits
- val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
- val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
- try {
- for (handler : commitHandlers) {
- handlerTransactions.add(handler.requestCommit(transaction));
- }
- } catch (Exception e) {
- log.error("Transaction: {} Request Commit failed", transactionId,e);
- return rollback(handlerTransactions, e);
- }
- val List<RpcResult<Void>> results = new ArrayList();
- try {
- for (subtransaction : handlerTransactions) {
- results.add(subtransaction.finish());
- }
- listeners.publishDataChangeEvent();
- } catch (Exception e) {
- log.error("Transaction: {} Finish Commit failed",transactionId, e);
- return rollback(handlerTransactions, e);
- }
- log.info("Transaction: {} Finished succesfully.",transactionId);
- return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
-
- }
-
- def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
- for (listenerSet : listeners) {
- val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
- val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
-
- val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
- listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
- for (listener : listenerSet.listeners) {
- try {
- listener.instance.onDataChanged(changeEvent);
-
- } catch (Exception e) {
- e.printStackTrace();
+ return withLock(registrationLock) [|\r
+ return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
+ val operationalState = readOperationalData(key)\r
+ val configurationState = readConfigurationData(key)\r
+ return new ListenerStateCapture(key, value, operationalState, configurationState)\r
+ ].toList()
+ ]\r
+ }\r
+\r
+ protected def boolean isAffectedBy(P key, Set<P> paths) {\r
+ if (paths.contains(key)) {\r
+ return true;\r
+ }\r
+ for (path : paths) {\r
+ if (key.contains(path)) {\r
+ return true;\r
+ }\r
+ }\r
+\r
+ return false;\r
+ }\r
+\r
+ package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {\r
+ checkNotNull(transaction);\r
+ transaction.changeStatus(TransactionStatus.SUBMITED);\r
+ val task = new TwoPhaseCommit(transaction, this);\r
+ submittedTransactionsCount.andIncrement;\r
+ return executor.submit(task);\r
+ }\r
+\r
+}\r
+\r
+@Data\r
+package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {\r
+\r
+ @Property\r
+ P path;\r
+\r
+ @Property\r
+ Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;\r
+\r
+ @Property\r
+ D initialOperationalState;\r
+\r
+ @Property\r
+ D initialConfigurationState;\r
+}\r
+\r
+package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {\r
+\r
+ AbstractDataBroker<P, D, DCL> dataBroker;\r
+\r
+ @Property\r
+ val P path;\r
+\r
+ new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {\r
+ super(instance)\r
+ dataBroker = broker;\r
+ _path = path;\r
+ }\r
+\r
+ override protected removeRegistration() {\r
+ dataBroker.removeListener(this);\r
+ dataBroker = null;\r
+ }\r
+\r
+}\r
+\r
+package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //\r
+extends AbstractObjectRegistration<DataCommitHandler<P, D>> //\r
+implements DataCommitHandlerRegistration<P, D> {\r
+\r
+ AbstractDataBroker<P, D, ?> dataBroker;\r
+\r
+ @Property\r
+ val P path;\r
+\r
+ new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {\r
+ super(instance)\r
+ dataBroker = broker;\r
+ _path = path;\r
+ }\r
+\r
+ override protected removeRegistration() {\r
+ dataBroker.removeCommitHandler(this);\r
+ dataBroker = null;\r
+ }\r
+}\r
+\r
+package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {\r
+\r
+ private static val log = LoggerFactory.getLogger(TwoPhaseCommit);\r
+\r
+ val AbstractDataTransaction<P, D> transaction;\r
+ val AbstractDataBroker<P, D, DCL> dataBroker;\r
+ \r
+ new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {\r
+ this.transaction = transaction;\r
+ this.dataBroker = broker;\r
+ }\r
+\r
+ override call() throws Exception {\r
+\r
+ // get affected paths\r
+ val affectedPaths = new HashSet<P>();\r
+\r
+ affectedPaths.addAll(transaction.createdConfigurationData.keySet);\r
+ affectedPaths.addAll(transaction.updatedConfigurationData.keySet);\r
+ affectedPaths.addAll(transaction.removedConfigurationData);\r
+\r
+ affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
+ affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
+ affectedPaths.addAll(transaction.removedOperationalData);\r
+
+ val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
+\r
+ val transactionId = transaction.identifier;\r
+\r
+ log.trace("Transaction: {} Started.",transactionId);\r
+ log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths);
+ // requesting commits\r
+ val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
+ val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
+ try {\r
+ for (handler : commitHandlers) {\r
+ handlerTransactions.add(handler.requestCommit(transaction));\r
+ }\r
+ } catch (Exception e) {\r
+ log.error("Transaction: {} Request Commit failed", transactionId,e);\r
+ dataBroker.failedTransactionsCount.andIncrement\r
+ transaction.changeStatus(TransactionStatus.FAILED)
+ return rollback(handlerTransactions, e);\r
+ }\r
+ val List<RpcResult<Void>> results = new ArrayList();\r
+ try {\r
+ for (subtransaction : handlerTransactions) {\r
+ results.add(subtransaction.finish());\r
+ }\r
+ listeners.publishDataChangeEvent();\r
+ } catch (Exception e) {\r
+ log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
+ dataBroker.failedTransactionsCount.andIncrement
+ transaction.changeStatus(TransactionStatus.FAILED)\r
+ return rollback(handlerTransactions, e);\r
+ }\r
+ log.trace("Transaction: {} Finished successfully.",transactionId);\r
+ dataBroker.finishedTransactionsCount.andIncrement;
+ transaction.changeStatus(TransactionStatus.COMMITED)\r
+ return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
+\r
+ }\r
+\r
+ def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {\r
+ dataBroker.executor.submit [|\r
+ for (listenerSet : listeners) {
+ val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
+ val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
+
+ val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
+ listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
+ for (listener : listenerSet.listeners) {
+ try {
+ listener.instance.onDataChanged(changeEvent);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }