- }
- }
-
- def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
- for (transaction : transactions) {
- transaction.rollback()
- }
-
- // FIXME return encountered error.
- return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
- }
-}
-
-public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
-
- @Property
- private val Object identifier;
-
- var TransactionStatus status;
-
- var AbstractDataBroker<P, D, ?> broker;
-
- protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {
- super(dataBroker);
- _identifier = identifier;
- broker = dataBroker;
- status = TransactionStatus.NEW;
-
- //listeners = new ListenerRegistry<>();
- }
-
- override commit() {
- return broker.commit(this);
- }
-
- override readConfigurationData(P path) {
- return broker.readConfigurationData(path);
- }
-
- override readOperationalData(P path) {
- return broker.readOperationalData(path);
- }
-
- override hashCode() {
- return identifier.hashCode;
- }
-
- override equals(Object obj) {
- if (this === obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- val other = (obj as AbstractDataTransaction<P,D>);
- if (broker == null) {
- if (other.broker != null)
- return false;
- } else if (!broker.equals(other.broker))
- return false;
- if (identifier == null) {
- if (other.identifier != null)
- return false;
- } else if (!identifier.equals(other.identifier))
- return false;
- return true;
- }
-
- override TransactionStatus getStatus() {
- return status;
- }
-
- protected abstract def void onStatusChange(TransactionStatus status);
-
- public def changeStatus(TransactionStatus status) {
- this.status = status;
- onStatusChange(status);
- }
-
-}
+ return null;
+ ]\r
+ }\r
+\r
+ protected final def getActiveCommitHandlers() {\r
+ return commitHandlers.entries;\r
+ }\r
+\r
+ protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(\r
+ HashSet<P> paths) {
+ return withLock(registrationLock) [|\r
+ return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
+ val operationalState = readOperationalData(key)\r
+ val configurationState = readConfigurationData(key)\r
+ return new ListenerStateCapture(key, value, operationalState, configurationState)\r
+ ].toList()
+ ]\r
+ }\r
+\r
+ protected def boolean isAffectedBy(P key, Set<P> paths) {\r
+ if (paths.contains(key)) {\r
+ return true;\r
+ }\r
+ for (path : paths) {\r
+ if (key.contains(path)) {\r
+ return true;\r
+ }\r
+ }\r
+\r
+ return false;\r
+ }\r
+\r
+ package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {\r
+ checkNotNull(transaction);\r
+ transaction.changeStatus(TransactionStatus.SUBMITED);\r
+ val task = new TwoPhaseCommit(transaction, this);\r
+ submittedTransactionsCount.andIncrement;\r
+ return executor.submit(task);\r
+ }\r
+\r
+}\r
+\r
+@Data\r
+package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {\r
+\r
+ @Property\r
+ P path;\r
+\r
+ @Property\r
+ Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;\r
+\r
+ @Property\r
+ D initialOperationalState;\r
+\r
+ @Property\r
+ D initialConfigurationState;\r
+}\r
+\r
+package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {\r
+\r
+ AbstractDataBroker<P, D, DCL> dataBroker;\r
+\r
+ @Property\r
+ val P path;\r
+\r
+ new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {\r
+ super(instance)\r
+ dataBroker = broker;\r
+ _path = path;\r
+ }\r
+\r
+ override protected removeRegistration() {\r
+ dataBroker.removeListener(this);\r
+ dataBroker = null;\r
+ }\r
+\r
+}\r
+\r
+package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //\r
+extends AbstractObjectRegistration<DataCommitHandler<P, D>> //\r
+implements DataCommitHandlerRegistration<P, D> {\r
+\r
+ AbstractDataBroker<P, D, ?> dataBroker;\r
+\r
+ @Property\r
+ val P path;\r
+\r
+ new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {\r
+ super(instance)\r
+ dataBroker = broker;\r
+ _path = path;\r
+ }\r
+\r
+ override protected removeRegistration() {\r
+ dataBroker.removeCommitHandler(this);\r
+ dataBroker = null;\r
+ }\r
+}\r
+\r
+package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {\r
+\r
+ private static val log = LoggerFactory.getLogger(TwoPhaseCommit);\r
+\r
+ val AbstractDataTransaction<P, D> transaction;\r
+ val AbstractDataBroker<P, D, DCL> dataBroker;\r
+ \r
+ new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {\r
+ this.transaction = transaction;\r
+ this.dataBroker = broker;\r
+ }\r
+\r
+ override call() throws Exception {\r
+\r
+ // get affected paths\r
+ val affectedPaths = new HashSet<P>();\r
+\r
+ affectedPaths.addAll(transaction.createdConfigurationData.keySet);\r
+ affectedPaths.addAll(transaction.updatedConfigurationData.keySet);\r
+ affectedPaths.addAll(transaction.removedConfigurationData);\r
+\r
+ affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
+ affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
+ affectedPaths.addAll(transaction.removedOperationalData);\r
+
+ val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
+\r
+ val transactionId = transaction.identifier;\r
+\r
+ log.trace("Transaction: {} Started.",transactionId);\r
+ log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths);
+ // requesting commits\r
+ val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
+ val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
+ try {\r
+ for (handler : commitHandlers) {\r
+ handlerTransactions.add(handler.requestCommit(transaction));\r
+ }\r
+ } catch (Exception e) {\r
+ log.error("Transaction: {} Request Commit failed", transactionId,e);\r
+ dataBroker.failedTransactionsCount.andIncrement\r
+ transaction.changeStatus(TransactionStatus.FAILED)
+ return rollback(handlerTransactions, e);\r
+ }\r
+ val List<RpcResult<Void>> results = new ArrayList();\r
+ try {\r
+ for (subtransaction : handlerTransactions) {\r
+ results.add(subtransaction.finish());\r
+ }\r
+ listeners.publishDataChangeEvent();\r
+ } catch (Exception e) {\r
+ log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
+ dataBroker.failedTransactionsCount.andIncrement
+ transaction.changeStatus(TransactionStatus.FAILED)\r
+ return rollback(handlerTransactions, e);\r
+ }\r
+ log.trace("Transaction: {} Finished successfully.",transactionId);\r
+ dataBroker.finishedTransactionsCount.andIncrement;
+ transaction.changeStatus(TransactionStatus.COMMITED)\r
+ return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
+\r
+ }\r
+\r
+ def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {\r
+ for (listenerSet : listeners) {\r
+ val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);\r
+ val updatedOperational = dataBroker.readOperationalData(listenerSet.path);\r
+\r
+ val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,\r
+ listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);\r
+ for (listener : listenerSet.listeners) {\r
+ try {\r
+ listener.instance.onDataChanged(changeEvent);\r
+\r
+ } catch (Exception e) {\r
+ e.printStackTrace();\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+ def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {\r
+ for (transaction : transactions) {\r
+ transaction.rollback()\r
+ }\r
+\r
+ // FIXME return encountered error.\r
+ return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());\r
+ }\r
+}\r
+\r
+public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {\r
+\r
+ private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
+
+ @Property\r
+ private val Object identifier;\r
+\r
+ var TransactionStatus status;\r
+\r
+ var AbstractDataBroker<P, D, ?> broker;\r
+\r
+ protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {\r
+ super(dataBroker);\r
+ _identifier = identifier;\r
+ broker = dataBroker;\r
+ status = TransactionStatus.NEW;\r
+ LOG.debug("Transaction {} Allocated.", identifier);
+\r
+ //listeners = new ListenerRegistry<>();\r
+ }\r
+\r
+ override commit() {\r
+ return broker.commit(this);\r
+ }\r
+\r
+ override readConfigurationData(P path) {\r
+ val local = this.updatedConfigurationData.get(path);\r
+ if(local != null) {\r
+ return local;\r
+ }\r
+ \r
+ return broker.readConfigurationData(path);\r
+ }\r
+\r
+ override readOperationalData(P path) {\r
+ val local = this.updatedOperationalData.get(path);\r
+ if(local != null) {\r
+ return local;\r
+ }\r
+ return broker.readOperationalData(path);\r
+ }\r
+\r
+ override hashCode() {\r
+ return identifier.hashCode;\r
+ }\r
+\r
+ override equals(Object obj) {\r
+ if (this === obj)\r
+ return true;\r
+ if (obj == null)\r
+ return false;\r
+ if (getClass() != obj.getClass())\r
+ return false;\r
+ val other = (obj as AbstractDataTransaction<P,D>);\r
+ if (broker == null) {\r
+ if (other.broker != null)\r
+ return false;\r
+ } else if (!broker.equals(other.broker))\r
+ return false;\r
+ if (identifier == null) {\r
+ if (other.identifier != null)\r
+ return false;\r
+ } else if (!identifier.equals(other.identifier))\r
+ return false;\r
+ return true;\r
+ }\r
+\r
+ override TransactionStatus getStatus() {\r
+ return status;\r
+ }\r
+\r
+ protected abstract def void onStatusChange(TransactionStatus status);\r
+\r
+ public def changeStatus(TransactionStatus status) {\r
+ LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
+ this.status = status;\r
+ onStatusChange(status);\r
+ }\r
+\r
+}\r