+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.common.impl.service\r
-\r
-import com.google.common.collect.FluentIterable\r
-import com.google.common.collect.HashMultimap\r
-import com.google.common.collect.ImmutableList\r
-import com.google.common.collect.Multimap\r
-import java.util.ArrayList\r
-import java.util.Arrays\r
-import java.util.Collection\r
-import java.util.Collections\r
-import java.util.HashSet\r
-import java.util.List\r
-import java.util.Set\r
-import java.util.concurrent.Callable\r
-import java.util.concurrent.ExecutorService\r
-import java.util.concurrent.Future\r
-import java.util.concurrent.atomic.AtomicLong\r
-import org.opendaylight.controller.md.sal.common.api.RegistrationListener\r
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus\r
-import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener\r
-import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher\r
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler\r
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction\r
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration\r
-import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory\r
-import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService\r
-import org.opendaylight.controller.md.sal.common.api.data.DataReader\r
-import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification\r
-import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter\r
-import org.opendaylight.controller.sal.common.util.Rpcs\r
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
-import org.opendaylight.yangtools.concepts.CompositeObjectRegistration\r
-import org.opendaylight.yangtools.concepts.ListenerRegistration\r
-import org.opendaylight.yangtools.concepts.Path\r
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
-import org.opendaylight.yangtools.yang.common.RpcResult\r
-import org.slf4j.LoggerFactory\r
-\r
-import static com.google.common.base.Preconditions.*\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
-import com.google.common.collect.Multimaps
-import java.util.concurrent.locks.Lock
-import java.util.concurrent.locks.ReentrantLock
-
-abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //\r
-DataReader<P, D>, //\r
-DataChangePublisher<P, D, DCL>, //\r
-DataProvisionService<P, D> {\r
-\r
- private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);\r
-\r
- @Property\r
- var ExecutorService executor;\r
-\r
- @Property\r
- var AbstractDataReadRouter<P, D> dataReadRouter;\r
- \r
- @Property\r
- private val AtomicLong submittedTransactionsCount = new AtomicLong;\r
- \r
- @Property\r
- private val AtomicLong failedTransactionsCount = new AtomicLong\r
- \r
- @Property\r
- private val AtomicLong finishedTransactionsCount = new AtomicLong\r
-\r
- Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
- Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
-
- private val Lock registrationLock = new ReentrantLock;
- \r
- val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();\r
- public new() {\r
- }\r
-\r
- protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(\r
- HashSet<P> paths) {
- return withLock(registrationLock) [|\r
- return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
- .transformAndConcat[value] //\r
- .transform[instance].toList()
- ]\r
- }\r
-\r
- override final readConfigurationData(P path) {\r
- return dataReadRouter.readConfigurationData(path);\r
- }\r
-\r
- override final readOperationalData(P path) {\r
- return dataReadRouter.readOperationalData(path);\r
- }
-
- private static def <T> withLock(Lock lock,Callable<T> method) {
- lock.lock
- try {
- return method.call
- } finally {
- lock.unlock
- }
- } \r
-\r
- override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
- return withLock(registrationLock) [|\r
- val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
- commitHandlers.put(path, registration)\r
- LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
- for(listener : commitHandlerRegistrationListeners) {\r
- try {\r
- listener.instance.onRegister(registration);\r
- } catch (Exception e) {\r
- LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
- }\r
- }
- return registration;
- ]\r
- }\r
-\r
- override final def registerDataChangeListener(P path, DCL listener) {\r
- return withLock(registrationLock) [|
- val reg = new DataChangeListenerRegistration(path, listener, this);\r
- listeners.put(path, reg);\r
- val initialConfig = dataReadRouter.readConfigurationData(path);\r
- val initialOperational = dataReadRouter.readOperationalData(path);\r
- val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
- listener.onDataChanged(event);\r
- return reg;
- ]\r
- }\r
-\r
- final def registerDataReader(P path, DataReader<P, D> reader) {\r
- return withLock(registrationLock) [|\r
- val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
- val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
- \r
- return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
- ]\r
- }\r
- \r
- override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {\r
- val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);\r
- return ret;\r
- }\r
- \r
- protected def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {\r
- return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);\r
- \r
- }\r
-\r
- protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
- return withLock(registrationLock) [|\r
- listeners.remove(registration.path, registration);
- ]\r
- }\r
-\r
- protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {\r
- return withLock(registrationLock) [|
- commitHandlers.remove(registration.path, registration);\r
- LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
- for(listener : commitHandlerRegistrationListeners) {\r
- try {\r
- listener.instance.onUnregister(registration);\r
- } catch (Exception e) {\r
- LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
- }\r
- }
- return null;
- ]\r
- }\r
-\r
- protected final def getActiveCommitHandlers() {\r
- return commitHandlers.entries;\r
- }\r
-\r
- protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(\r
- HashSet<P> paths) {
- return withLock(registrationLock) [|\r
- return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
- val operationalState = readOperationalData(key)\r
- val configurationState = readConfigurationData(key)\r
- return new ListenerStateCapture(key, value, operationalState, configurationState)\r
- ].toList()
- ]\r
- }\r
-\r
- protected def boolean isAffectedBy(P key, Set<P> paths) {\r
- if (paths.contains(key)) {\r
- return true;\r
- }\r
- for (path : paths) {\r
- if (key.contains(path)) {\r
- return true;\r
- }\r
- }\r
-\r
- return false;\r
- }\r
-\r
- package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {\r
- checkNotNull(transaction);\r
- transaction.changeStatus(TransactionStatus.SUBMITED);\r
- val task = new TwoPhaseCommit(transaction, this);\r
- submittedTransactionsCount.andIncrement;\r
- return executor.submit(task);\r
- }\r
-\r
-}\r
-\r
-@Data\r
-package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {\r
-\r
- @Property\r
- P path;\r
-\r
- @Property\r
- Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;\r
-\r
- @Property\r
- D initialOperationalState;\r
-\r
- @Property\r
- D initialConfigurationState;\r
-}\r
-\r
-package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {\r
-\r
- AbstractDataBroker<P, D, DCL> dataBroker;\r
-\r
- @Property\r
- val P path;\r
-\r
- new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {\r
- super(instance)\r
- dataBroker = broker;\r
- _path = path;\r
- }\r
-\r
- override protected removeRegistration() {\r
- dataBroker.removeListener(this);\r
- dataBroker = null;\r
- }\r
-\r
-}\r
-\r
-package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //\r
-extends AbstractObjectRegistration<DataCommitHandler<P, D>> //\r
-implements DataCommitHandlerRegistration<P, D> {\r
-\r
- AbstractDataBroker<P, D, ?> dataBroker;\r
-\r
- @Property\r
- val P path;\r
-\r
- new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {\r
- super(instance)\r
- dataBroker = broker;\r
- _path = path;\r
- }\r
-\r
- override protected removeRegistration() {\r
- dataBroker.removeCommitHandler(this);\r
- dataBroker = null;\r
- }\r
-}\r
-\r
-package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {\r
-\r
- private static val log = LoggerFactory.getLogger(TwoPhaseCommit);\r
-\r
- val AbstractDataTransaction<P, D> transaction;\r
- val AbstractDataBroker<P, D, DCL> dataBroker;\r
- \r
- new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {\r
- this.transaction = transaction;\r
- this.dataBroker = broker;\r
- }\r
-\r
- override call() throws Exception {\r
-\r
- // get affected paths\r
- val affectedPaths = new HashSet<P>();\r
-\r
- affectedPaths.addAll(transaction.createdConfigurationData.keySet);\r
- affectedPaths.addAll(transaction.updatedConfigurationData.keySet);\r
- affectedPaths.addAll(transaction.removedConfigurationData);\r
-\r
- affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
- affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
- affectedPaths.addAll(transaction.removedOperationalData);\r
-
- val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
-\r
- val transactionId = transaction.identifier;\r
-\r
- log.trace("Transaction: {} Started.",transactionId);\r
- log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths);
- // requesting commits\r
- val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
- val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
- try {\r
- for (handler : commitHandlers) {\r
- handlerTransactions.add(handler.requestCommit(transaction));\r
- }\r
- } catch (Exception e) {\r
- log.error("Transaction: {} Request Commit failed", transactionId,e);\r
- dataBroker.failedTransactionsCount.andIncrement\r
- transaction.changeStatus(TransactionStatus.FAILED)
- return rollback(handlerTransactions, e);\r
- }\r
- val List<RpcResult<Void>> results = new ArrayList();\r
- try {\r
- for (subtransaction : handlerTransactions) {\r
- results.add(subtransaction.finish());\r
- }\r
- listeners.publishDataChangeEvent();\r
- } catch (Exception e) {\r
- log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
- dataBroker.failedTransactionsCount.andIncrement
- transaction.changeStatus(TransactionStatus.FAILED)\r
- return rollback(handlerTransactions, e);\r
- }\r
- log.trace("Transaction: {} Finished successfully.",transactionId);\r
- dataBroker.finishedTransactionsCount.andIncrement;
- transaction.changeStatus(TransactionStatus.COMMITED)\r
- return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
-\r
- }\r
-\r
- def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {\r
- dataBroker.executor.submit [|\r
- for (listenerSet : listeners) {
- val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
- val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
-
- val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
- listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
- for (listener : listenerSet.listeners) {
- try {
- listener.instance.onDataChanged(changeEvent);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- } \r
- ]\r
- }\r
-\r
- def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {\r
- for (transaction : transactions) {\r
- transaction.rollback()\r
- }\r
-\r
- // FIXME return encountered error.\r
- return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());\r
- }\r
-}\r
-\r
-public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {\r
-\r
- private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
-
- @Property\r
- private val Object identifier;\r
-\r
- var TransactionStatus status;\r
-\r
- var AbstractDataBroker<P, D, ?> broker;\r
-\r
- protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {\r
- super(dataBroker);\r
- _identifier = identifier;\r
- broker = dataBroker;\r
- status = TransactionStatus.NEW;\r
- LOG.debug("Transaction {} Allocated.", identifier);
-\r
- //listeners = new ListenerRegistry<>();\r
- }\r
-\r
- override commit() {\r
- return broker.commit(this);\r
- }\r
-\r
- override readConfigurationData(P path) {\r
- val local = this.updatedConfigurationData.get(path);\r
- if(local != null) {\r
- return local;\r
- }\r
- \r
- return broker.readConfigurationData(path);\r
- }\r
-\r
- override readOperationalData(P path) {\r
- val local = this.updatedOperationalData.get(path);\r
- if(local != null) {\r
- return local;\r
- }\r
- return broker.readOperationalData(path);\r
- }\r
-\r
- override hashCode() {\r
- return identifier.hashCode;\r
- }\r
-\r
- override equals(Object obj) {\r
- if (this === obj)\r
- return true;\r
- if (obj == null)\r
- return false;\r
- if (getClass() != obj.getClass())\r
- return false;\r
- val other = (obj as AbstractDataTransaction<P,D>);\r
- if (broker == null) {\r
- if (other.broker != null)\r
- return false;\r
- } else if (!broker.equals(other.broker))\r
- return false;\r
- if (identifier == null) {\r
- if (other.identifier != null)\r
- return false;\r
- } else if (!identifier.equals(other.identifier))\r
- return false;\r
- return true;\r
- }\r
-\r
- override TransactionStatus getStatus() {\r
- return status;\r
- }\r
-\r
- protected abstract def void onStatusChange(TransactionStatus status);\r
-\r
- public def changeStatus(TransactionStatus status) {\r
- LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
- this.status = status;\r
- onStatusChange(status);\r
- }\r
-\r
-}\r