X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-common-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fcommon%2Fimpl%2Fservice%2FAbstractDataBroker.xtend;h=f90465f925a28bdeacf153fa9aeb0940ded58411;hb=6b98de000257414b2ae9b1db708c0f7962a0f033;hp=e3d2b567a71f42b202bf028db6f0d8dcd5f70ae2;hpb=f062dc05cc7caaf0c1811856370f1c9e2f1e5c34;p=controller.git diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend index e3d2b567a7..f90465f925 100644 --- a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend @@ -1,371 +1,404 @@ -package org.opendaylight.controller.md.sal.common.impl.service - -import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler -import org.opendaylight.controller.md.sal.common.api.TransactionStatus -import org.opendaylight.controller.md.sal.common.api.data.DataReader -import org.opendaylight.yangtools.concepts.AbstractObjectRegistration -import org.opendaylight.yangtools.concepts.ListenerRegistration -import com.google.common.collect.Multimap -import static com.google.common.base.Preconditions.*; -import java.util.List -import com.google.common.collect.HashMultimap -import java.util.concurrent.ExecutorService -import java.util.concurrent.Callable -import org.opendaylight.yangtools.yang.common.RpcResult -import java.util.Collections -import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction -import java.util.ArrayList -import org.opendaylight.yangtools.concepts.CompositeObjectRegistration -import java.util.Arrays -import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService -import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory -import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher -import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener -import org.opendaylight.controller.sal.common.util.Rpcs -import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification -import java.util.concurrent.Future -import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter -import org.opendaylight.yangtools.concepts.Path -import org.slf4j.LoggerFactory -import java.util.HashSet -import java.util.Map.Entry -import java.util.Iterator -import java.util.Collection -import com.google.common.collect.FluentIterable; -import java.util.Set -import com.google.common.collect.ImmutableList -import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration -import org.opendaylight.controller.md.sal.common.api.RegistrationListener -import org.opendaylight.yangtools.concepts.util.ListenerRegistry -import java.util.concurrent.atomic.AtomicLong - -abstract class AbstractDataBroker
, D, DCL extends DataChangeListener
> implements DataModificationTransactionFactory
, // -DataReader
, // -DataChangePublisher
, // -DataProvisionService
{ - - private static val LOG = LoggerFactory.getLogger(AbstractDataBroker); - - @Property - var ExecutorService executor; - - @Property - var AbstractDataReadRouter
dataReadRouter; - - Multimap
> listeners = HashMultimap.create(); - Multimap
> commitHandlers = HashMultimap.create();
-
- val ListenerRegistry paths) {
- return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
- .transformAndConcat[value] //
- .transform[instance].toList()
- }
-
- override final readConfigurationData(P path) {
- return dataReadRouter.readConfigurationData(path);
- }
-
- override final readOperationalData(P path) {
- return dataReadRouter.readOperationalData(path);
- }
-
- override final registerCommitHandler(P path, DataCommitHandler commitHandler) {
- val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
- commitHandlers.put(path, registration)
- LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);
- for(listener : commitHandlerRegistrationListeners) {
- try {
- listener.instance.onRegister(registration);
- } catch (Exception e) {
- LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
- }
- }
- return registration;
- }
-
- override final def registerDataChangeListener(P path, DCL listener) {
- val reg = new DataChangeListenerRegistration(path, listener, this);
- listeners.put(path, reg);
- return reg;
- }
-
- final def registerDataReader(P path, DataReader reader) {
-
- val confReg = dataReadRouter.registerConfigurationReader(path, reader);
- val dataReg = dataReadRouter.registerOperationalReader(path, reader);
-
- return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
- }
-
- override registerCommitHandlerListener(RegistrationListener registration) {
- listeners.remove(registration.path, registration);
- }
-
- protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl registration) {
- commitHandlers.remove(registration.path, registration);
-
- LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
- for(listener : commitHandlerRegistrationListeners) {
- try {
- listener.instance.onUnregister(registration);
- } catch (Exception e) {
- LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
- }
- }
- }
-
- protected final def getActiveCommitHandlers() {
- return commitHandlers.entries;
- }
-
- protected def /*Iterator paths) {
- return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
- val operationalState = readOperationalData(key)
- val configurationState = readConfigurationData(key)
- return new ListenerStateCapture(key, value, operationalState, configurationState)
- ].toList()
- }
-
- protected def boolean isAffectedBy(P key, Set paths) {
- if (paths.contains(key)) {
- return true;
- }
- for (path : paths) {
- if (key.contains(path)) {
- return true;
- }
- }
-
- return false;
- }
-
- package final def Future transaction) {
- checkNotNull(transaction);
- transaction.changeStatus(TransactionStatus.SUBMITED);
- val task = new TwoPhaseCommit(transaction, this);
- return executor.submit(task);
- }
-
-}
-
-@Data
-package class ListenerStateCapture , D, DCL extends DataChangeListener > {
-
- @Property
- P path;
-
- @Property
- Collection , D, DCL extends DataChangeListener > extends AbstractObjectRegistration dataBroker;
-
- @Property
- val P path;
-
- new(P path, DCL instance, AbstractDataBroker broker) {
- super(instance)
- dataBroker = broker;
- _path = path;
- }
-
- override protected removeRegistration() {
- dataBroker.removeListener(this);
- dataBroker = null;
- }
-
-}
-
-package class DataCommitHandlerRegistrationImpl , D> //
-extends AbstractObjectRegistration {
-
- AbstractDataBroker dataBroker;
-
- @Property
- val P path;
-
- new(P path, DataCommitHandler instance, AbstractDataBroker broker) {
- super(instance)
- dataBroker = broker;
- _path = path;
- }
-
- override protected removeRegistration() {
- dataBroker.removeCommitHandler(this);
- dataBroker = null;
- }
-}
-
-package class TwoPhaseCommit , D, DCL extends DataChangeListener > implements Callable transaction;
- val AbstractDataBroker dataBroker;
-
- new(AbstractDataTransaction transaction, AbstractDataBroker broker) {
- this.transaction = transaction;
- this.dataBroker = broker;
- }
-
- override call() throws Exception {
-
- // get affected paths
- val affectedPaths = new HashSet ();
-
- affectedPaths.addAll(transaction.createdConfigurationData.keySet);
- affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
- affectedPaths.addAll(transaction.removedConfigurationData);
-
- affectedPaths.addAll(transaction.createdOperationalData.keySet);
- affectedPaths.addAll(transaction.updatedOperationalData.keySet);
- affectedPaths.addAll(transaction.removedOperationalData);
-
- val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
-
- val transactionId = transaction.identifier;
-
- log.info("Transaction: {} Started.",transactionId);
- // requesting commits
- val Iterable , D> extends AbstractDataModification {
-
- @Property
- private val Object identifier;
-
- var TransactionStatus status;
-
- var AbstractDataBroker broker;
-
- protected new(Object identifier,AbstractDataBroker dataBroker) {
- super(dataBroker);
- _identifier = identifier;
- broker = dataBroker;
- status = TransactionStatus.NEW;
-
- //listeners = new ListenerRegistry<>();
- }
-
- override commit() {
- return broker.commit(this);
- }
-
- override readConfigurationData(P path) {
- return broker.readConfigurationData(path);
- }
-
- override readOperationalData(P path) {
- return broker.readOperationalData(path);
- }
-
- override hashCode() {
- return identifier.hashCode;
- }
-
- override equals(Object obj) {
- if (this === obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- val other = (obj as AbstractDataTransaction );
- if (broker == null) {
- if (other.broker != null)
- return false;
- } else if (!broker.equals(other.broker))
- return false;
- if (identifier == null) {
- if (other.identifier != null)
- return false;
- } else if (!identifier.equals(other.identifier))
- return false;
- return true;
- }
-
- override TransactionStatus getStatus() {
- return status;
- }
-
- protected abstract def void onStatusChange(TransactionStatus status);
-
- public def changeStatus(TransactionStatus status) {
- this.status = status;
- onStatusChange(status);
- }
-
-}
+package org.opendaylight.controller.md.sal.common.impl.service
+
+import com.google.common.collect.FluentIterable
+import com.google.common.collect.HashMultimap
+import com.google.common.collect.ImmutableList
+import com.google.common.collect.Multimap
+import java.util.ArrayList
+import java.util.Arrays
+import java.util.Collection
+import java.util.Collections
+import java.util.HashSet
+import java.util.List
+import java.util.Set
+import java.util.concurrent.Callable
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Future
+import java.util.concurrent.atomic.AtomicLong
+import org.opendaylight.controller.md.sal.common.api.RegistrationListener
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
+import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration
+import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
+import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
+import org.opendaylight.controller.md.sal.common.api.data.DataReader
+import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
+import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
+import org.opendaylight.controller.sal.common.util.Rpcs
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
+import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
+import org.opendaylight.yangtools.concepts.ListenerRegistration
+import org.opendaylight.yangtools.concepts.Path
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry
+import org.opendaylight.yangtools.yang.common.RpcResult
+import org.slf4j.LoggerFactory
+
+import static com.google.common.base.Preconditions.*
import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
+
+abstract class AbstractDataBroker , D, DCL extends DataChangeListener > implements DataModificationTransactionFactory , //
+DataReader , //
+DataChangePublisher , //
+DataProvisionService {
+
+ private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);
+
+ @Property
+ var ExecutorService executor;
+
+ @Property
+ var AbstractDataReadRouter dataReadRouter;
+
+ @Property
+ private val AtomicLong submittedTransactionsCount = new AtomicLong;
+
+ @Property
+ private val AtomicLong failedTransactionsCount = new AtomicLong
+
+ @Property
+ private val AtomicLong finishedTransactionsCount = new AtomicLong
+
+ Multimap > listeners = HashMultimap.create();
+ Multimap > commitHandlers = HashMultimap.create();
+
+ val ListenerRegistry paths) {
+ return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
+ .transformAndConcat[value] //
+ .transform[instance].toList()
+ }
+
+ override final readConfigurationData(P path) {
+ return dataReadRouter.readConfigurationData(path);
+ }
+
+ override final readOperationalData(P path) {
+ return dataReadRouter.readOperationalData(path);
+ }
+
+ override final registerCommitHandler(P path, DataCommitHandler commitHandler) {
+ val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
+ commitHandlers.put(path, registration)
+ LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);
+ for(listener : commitHandlerRegistrationListeners) {
+ try {
+ listener.instance.onRegister(registration);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
+ }
+ }
+ return registration;
+ }
+
+ override final def registerDataChangeListener(P path, DCL listener) {
+ val reg = new DataChangeListenerRegistration(path, listener, this);
+ listeners.put(path, reg);
+ val initialConfig = dataReadRouter.readConfigurationData(path);
+ val initialOperational = dataReadRouter.readOperationalData(path);
+ val event = createInitialListenerEvent(path,initialConfig,initialOperational);
+ listener.onDataChanged(event);
+ return reg;
+ }
+
+ final def registerDataReader(P path, DataReader reader) {
+
+ val confReg = dataReadRouter.registerConfigurationReader(path, reader);
+ val dataReg = dataReadRouter.registerOperationalReader(path, reader);
+
+ return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
+ }
+
+ override registerCommitHandlerListener(RegistrationListener createInitialListenerEvent(P path,D initialConfig,D initialOperational) {
+ return new InitialDataChangeEventImpl (initialConfig,initialOperational);
+
+ }
+
+ protected final def removeListener(DataChangeListenerRegistration registration) {
+ listeners.remove(registration.path, registration);
+ }
+
+ protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl registration) {
+ commitHandlers.remove(registration.path, registration);
+
+ LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
+ for(listener : commitHandlerRegistrationListeners) {
+ try {
+ listener.instance.onUnregister(registration);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
+ }
+ }
+ }
+
+ protected final def getActiveCommitHandlers() {
+ return commitHandlers.entries;
+ }
+
+ protected def /*Iterator paths) {
+ return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
+ val operationalState = readOperationalData(key)
+ val configurationState = readConfigurationData(key)
+ return new ListenerStateCapture(key, value, operationalState, configurationState)
+ ].toList()
+ }
+
+ protected def boolean isAffectedBy(P key, Set paths) {
+ if (paths.contains(key)) {
+ return true;
+ }
+ for (path : paths) {
+ if (key.contains(path)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ package final def Future transaction) {
+ checkNotNull(transaction);
+ transaction.changeStatus(TransactionStatus.SUBMITED);
+ val task = new TwoPhaseCommit(transaction, this);
+ submittedTransactionsCount.andIncrement;
+ return executor.submit(task);
+ }
+
+}
+
+@Data
+package class ListenerStateCapture , D, DCL extends DataChangeListener > {
+
+ @Property
+ P path;
+
+ @Property
+ Collection , D, DCL extends DataChangeListener > extends AbstractObjectRegistration dataBroker;
+
+ @Property
+ val P path;
+
+ new(P path, DCL instance, AbstractDataBroker broker) {
+ super(instance)
+ dataBroker = broker;
+ _path = path;
+ }
+
+ override protected removeRegistration() {
+ dataBroker.removeListener(this);
+ dataBroker = null;
+ }
+
+}
+
+package class DataCommitHandlerRegistrationImpl , D> //
+extends AbstractObjectRegistration {
+
+ AbstractDataBroker dataBroker;
+
+ @Property
+ val P path;
+
+ new(P path, DataCommitHandler instance, AbstractDataBroker broker) {
+ super(instance)
+ dataBroker = broker;
+ _path = path;
+ }
+
+ override protected removeRegistration() {
+ dataBroker.removeCommitHandler(this);
+ dataBroker = null;
+ }
+}
+
+package class TwoPhaseCommit , D, DCL extends DataChangeListener > implements Callable transaction;
+ val AbstractDataBroker dataBroker;
+
+ new(AbstractDataTransaction transaction, AbstractDataBroker broker) {
+ this.transaction = transaction;
+ this.dataBroker = broker;
+ }
+
+ override call() throws Exception {
+
+ // get affected paths
+ val affectedPaths = new HashSet ();
+
+ affectedPaths.addAll(transaction.createdConfigurationData.keySet);
+ affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
+ affectedPaths.addAll(transaction.removedConfigurationData);
+
+ affectedPaths.addAll(transaction.createdOperationalData.keySet);
+ affectedPaths.addAll(transaction.updatedOperationalData.keySet);
+ affectedPaths.addAll(transaction.removedOperationalData);
+
+ val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
+
+ val transactionId = transaction.identifier;
+
+ log.info("Transaction: {} Started.",transactionId);
+ // requesting commits
+ val Iterable , D> extends AbstractDataModification {
+
+ private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
+
+ @Property
+ private val Object identifier;
+
+ var TransactionStatus status;
+
+ var AbstractDataBroker broker;
+
+ protected new(Object identifier,AbstractDataBroker dataBroker) {
+ super(dataBroker);
+ _identifier = identifier;
+ broker = dataBroker;
+ status = TransactionStatus.NEW;
+ LOG.debug("Transaction {} Allocated.", identifier);
+
+ //listeners = new ListenerRegistry<>();
+ }
+
+ override commit() {
+ return broker.commit(this);
+ }
+
+ override readConfigurationData(P path) {
+ val local = this.updatedConfigurationData.get(path);
+ if(local != null) {
+ return local;
+ }
+
+ return broker.readConfigurationData(path);
+ }
+
+ override readOperationalData(P path) {
+ val local = this.updatedOperationalData.get(path);
+ if(local != null) {
+ return local;
+ }
+ return broker.readOperationalData(path);
+ }
+
+ override hashCode() {
+ return identifier.hashCode;
+ }
+
+ override equals(Object obj) {
+ if (this === obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ val other = (obj as AbstractDataTransaction );
+ if (broker == null) {
+ if (other.broker != null)
+ return false;
+ } else if (!broker.equals(other.broker))
+ return false;
+ if (identifier == null) {
+ if (other.identifier != null)
+ return false;
+ } else if (!identifier.equals(other.identifier))
+ return false;
+ return true;
+ }
+
+ override TransactionStatus getStatus() {
+ return status;
+ }
+
+ protected abstract def void onStatusChange(TransactionStatus status);
+
+ public def changeStatus(TransactionStatus status) {
+ LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
+ this.status = status;
+ onStatusChange(status);
+ }
+
+}