X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-common-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fcommon%2Fimpl%2Fservice%2FAbstractDataBroker.xtend;h=7c6f52f110fd1771650c9670c43a8136d8999a2b;hp=e3d2b567a71f42b202bf028db6f0d8dcd5f70ae2;hb=3948bedd0129e44c0943bd77c91806425645cd72;hpb=3e7cfb454f0e9d2eab68aef8bf855746c4df5cdb diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend index e3d2b567a7..7c6f52f110 100644 --- a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend @@ -1,371 +1,443 @@ -package org.opendaylight.controller.md.sal.common.impl.service - -import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler -import org.opendaylight.controller.md.sal.common.api.TransactionStatus -import org.opendaylight.controller.md.sal.common.api.data.DataReader -import org.opendaylight.yangtools.concepts.AbstractObjectRegistration -import org.opendaylight.yangtools.concepts.ListenerRegistration -import com.google.common.collect.Multimap -import static com.google.common.base.Preconditions.*; -import java.util.List -import com.google.common.collect.HashMultimap -import java.util.concurrent.ExecutorService -import java.util.concurrent.Callable -import org.opendaylight.yangtools.yang.common.RpcResult -import java.util.Collections -import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction -import java.util.ArrayList -import org.opendaylight.yangtools.concepts.CompositeObjectRegistration -import java.util.Arrays -import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService -import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory -import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher -import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener -import org.opendaylight.controller.sal.common.util.Rpcs -import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification -import java.util.concurrent.Future -import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter -import org.opendaylight.yangtools.concepts.Path -import org.slf4j.LoggerFactory -import java.util.HashSet -import java.util.Map.Entry -import java.util.Iterator -import java.util.Collection -import com.google.common.collect.FluentIterable; -import java.util.Set -import com.google.common.collect.ImmutableList -import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration -import org.opendaylight.controller.md.sal.common.api.RegistrationListener -import org.opendaylight.yangtools.concepts.util.ListenerRegistry -import java.util.concurrent.atomic.AtomicLong - -abstract class AbstractDataBroker
, D, DCL extends DataChangeListener
> implements DataModificationTransactionFactory
, // -DataReader
, // -DataChangePublisher
, // -DataProvisionService
{ - - private static val LOG = LoggerFactory.getLogger(AbstractDataBroker); - - @Property - var ExecutorService executor; - - @Property - var AbstractDataReadRouter
dataReadRouter; - - Multimap
> listeners = HashMultimap.create(); - Multimap
> commitHandlers = HashMultimap.create(); +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.common.impl.service + +import com.google.common.collect.FluentIterable +import com.google.common.collect.HashMultimap +import com.google.common.collect.ImmutableList +import com.google.common.collect.Multimap +import java.util.ArrayList +import java.util.Arrays +import java.util.Collection +import java.util.Collections +import java.util.HashSet +import java.util.List +import java.util.Set +import java.util.concurrent.Callable +import java.util.concurrent.ExecutorService +import java.util.concurrent.Future +import java.util.concurrent.atomic.AtomicLong +import org.opendaylight.controller.md.sal.common.api.RegistrationListener +import org.opendaylight.controller.md.sal.common.api.TransactionStatus +import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener +import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration +import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory +import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService +import org.opendaylight.controller.md.sal.common.api.data.DataReader +import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification +import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter +import org.opendaylight.controller.sal.common.util.Rpcs +import org.opendaylight.yangtools.concepts.AbstractObjectRegistration +import org.opendaylight.yangtools.concepts.CompositeObjectRegistration +import org.opendaylight.yangtools.concepts.ListenerRegistration +import org.opendaylight.yangtools.concepts.Path +import org.opendaylight.yangtools.concepts.util.ListenerRegistry +import org.opendaylight.yangtools.yang.common.RpcResult +import org.slf4j.LoggerFactory + +import static com.google.common.base.Preconditions.* import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent +import com.google.common.collect.Multimaps +import java.util.concurrent.locks.Lock +import java.util.concurrent.locks.ReentrantLock + +abstract class AbstractDataBroker
, D, DCL extends DataChangeListener
> implements DataModificationTransactionFactory
, // +DataReader
, // +DataChangePublisher
, // +DataProvisionService
{ + + private static val LOG = LoggerFactory.getLogger(AbstractDataBroker); + + @Property + var ExecutorService executor; + + @Property + var AbstractDataReadRouter
dataReadRouter; + + @Property + private val AtomicLong submittedTransactionsCount = new AtomicLong; + + @Property + private val AtomicLong failedTransactionsCount = new AtomicLong + + @Property + private val AtomicLong finishedTransactionsCount = new AtomicLong + + Multimap
> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create()); + Multimap
> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());
- val ListenerRegistry paths) {
- return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
- .transformAndConcat[value] //
- .transform[instance].toList()
+ return withLock(registrationLock) [|
+ return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
+ .transformAndConcat[value] //
+ .transform[instance].toList()
+ ]
+ }
+
+ override final readConfigurationData(P path) {
+ return dataReadRouter.readConfigurationData(path);
+ }
+
+ override final readOperationalData(P path) {
+ return dataReadRouter.readOperationalData(path);
}
-
- override final readConfigurationData(P path) {
- return dataReadRouter.readConfigurationData(path);
- }
-
- override final readOperationalData(P path) {
- return dataReadRouter.readOperationalData(path);
- }
-
+
+ private static def commitHandler) {
- val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
- commitHandlers.put(path, registration)
- LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);
- for(listener : commitHandlerRegistrationListeners) {
- try {
- listener.instance.onRegister(registration);
- } catch (Exception e) {
- LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
+ return withLock(registrationLock) [|
+ val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
+ commitHandlers.put(path, registration)
+ LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);
+ for(listener : commitHandlerRegistrationListeners) {
+ try {
+ listener.instance.onRegister(registration);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
+ }
}
- }
- return registration;
- }
-
- override final def registerDataChangeListener(P path, DCL listener) {
- val reg = new DataChangeListenerRegistration(path, listener, this);
- listeners.put(path, reg);
- return reg;
- }
-
- final def registerDataReader(P path, DataReader reader) {
-
- val confReg = dataReadRouter.registerConfigurationReader(path, reader);
- val dataReg = dataReadRouter.registerOperationalReader(path, reader);
-
- return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
- }
-
- override registerCommitHandlerListener(RegistrationListener reader) {
+ return withLock(registrationLock) [|
+ val confReg = dataReadRouter.registerConfigurationReader(path, reader);
+ val dataReg = dataReadRouter.registerOperationalReader(path, reader);
+
+ return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
+ ]
+ }
+
+ override registerCommitHandlerListener(RegistrationListener createInitialListenerEvent(P path,D initialConfig,D initialOperational) {
+ return new InitialDataChangeEventImpl (initialConfig,initialOperational);
+
+ }
+
protected final def removeListener(DataChangeListenerRegistration registration) {
- listeners.remove(registration.path, registration);
- }
-
- protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl registration) {
- commitHandlers.remove(registration.path, registration);
-
- LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
- for(listener : commitHandlerRegistrationListeners) {
- try {
- listener.instance.onUnregister(registration);
- } catch (Exception e) {
- LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
+ return withLock(registrationLock) [|
+ listeners.remove(registration.path, registration);
+ ]
+ }
+
+ protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl registration) {
+ return withLock(registrationLock) [|
+ commitHandlers.remove(registration.path, registration);
+ LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
+ for(listener : commitHandlerRegistrationListeners) {
+ try {
+ listener.instance.onUnregister(registration);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
+ }
}
- }
- }
-
- protected final def getActiveCommitHandlers() {
- return commitHandlers.entries;
- }
-
- protected def /*Iterator paths) {
- return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
- val operationalState = readOperationalData(key)
- val configurationState = readConfigurationData(key)
- return new ListenerStateCapture(key, value, operationalState, configurationState)
- ].toList()
- }
-
- protected def boolean isAffectedBy(P key, Set paths) {
- if (paths.contains(key)) {
- return true;
- }
- for (path : paths) {
- if (key.contains(path)) {
- return true;
- }
- }
-
- return false;
- }
-
- package final def Future transaction) {
- checkNotNull(transaction);
- transaction.changeStatus(TransactionStatus.SUBMITED);
- val task = new TwoPhaseCommit(transaction, this);
- return executor.submit(task);
- }
-
-}
-
-@Data
-package class ListenerStateCapture , D, DCL extends DataChangeListener > {
-
- @Property
- P path;
-
- @Property
- Collection , D, DCL extends DataChangeListener > extends AbstractObjectRegistration dataBroker;
-
- @Property
- val P path;
-
- new(P path, DCL instance, AbstractDataBroker broker) {
- super(instance)
- dataBroker = broker;
- _path = path;
- }
-
- override protected removeRegistration() {
- dataBroker.removeListener(this);
- dataBroker = null;
- }
-
-}
-
-package class DataCommitHandlerRegistrationImpl , D> //
-extends AbstractObjectRegistration {
-
- AbstractDataBroker dataBroker;
-
- @Property
- val P path;
-
- new(P path, DataCommitHandler instance, AbstractDataBroker broker) {
- super(instance)
- dataBroker = broker;
- _path = path;
- }
-
- override protected removeRegistration() {
- dataBroker.removeCommitHandler(this);
- dataBroker = null;
- }
-}
-
-package class TwoPhaseCommit , D, DCL extends DataChangeListener > implements Callable transaction;
- val AbstractDataBroker dataBroker;
-
- new(AbstractDataTransaction transaction, AbstractDataBroker broker) {
- this.transaction = transaction;
- this.dataBroker = broker;
- }
-
- override call() throws Exception {
-
- // get affected paths
- val affectedPaths = new HashSet ();
-
- affectedPaths.addAll(transaction.createdConfigurationData.keySet);
- affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
- affectedPaths.addAll(transaction.removedConfigurationData);
-
- affectedPaths.addAll(transaction.createdOperationalData.keySet);
- affectedPaths.addAll(transaction.updatedOperationalData.keySet);
- affectedPaths.addAll(transaction.removedOperationalData);
-
- val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
-
- val transactionId = transaction.identifier;
-
- log.info("Transaction: {} Started.",transactionId);
- // requesting commits
- val Iterable paths) {
+ if (paths.contains(key)) {
+ return true;
+ }
+ for (path : paths) {
+ if (key.contains(path)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ package final def Future transaction) {
+ checkNotNull(transaction);
+ transaction.changeStatus(TransactionStatus.SUBMITED);
+ val task = new TwoPhaseCommit(transaction, this);
+ submittedTransactionsCount.andIncrement;
+ return executor.submit(task);
+ }
+
+}
+
+@Data
+package class ListenerStateCapture , D, DCL extends DataChangeListener > {
+
+ @Property
+ P path;
+
+ @Property
+ Collection , D, DCL extends DataChangeListener > extends AbstractObjectRegistration dataBroker;
+
+ @Property
+ val P path;
+
+ new(P path, DCL instance, AbstractDataBroker broker) {
+ super(instance)
+ dataBroker = broker;
+ _path = path;
+ }
+
+ override protected removeRegistration() {
+ dataBroker.removeListener(this);
+ dataBroker = null;
+ }
+
+}
+
+package class DataCommitHandlerRegistrationImpl , D> //
+extends AbstractObjectRegistration {
+
+ AbstractDataBroker dataBroker;
+
+ @Property
+ val P path;
+
+ new(P path, DataCommitHandler instance, AbstractDataBroker broker) {
+ super(instance)
+ dataBroker = broker;
+ _path = path;
+ }
+
+ override protected removeRegistration() {
+ dataBroker.removeCommitHandler(this);
+ dataBroker = null;
+ }
+}
+
+package class TwoPhaseCommit , D, DCL extends DataChangeListener > implements Callable transaction;
+ val AbstractDataBroker dataBroker;
+
+ new(AbstractDataTransaction transaction, AbstractDataBroker broker) {
+ this.transaction = transaction;
+ this.dataBroker = broker;
+ }
+
+ override call() throws Exception {
+
+ // get affected paths
+ val affectedPaths = new HashSet ();
+
+ affectedPaths.addAll(transaction.createdConfigurationData.keySet);
+ affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
+ affectedPaths.addAll(transaction.removedConfigurationData);
+
+ affectedPaths.addAll(transaction.createdOperationalData.keySet);
+ affectedPaths.addAll(transaction.updatedOperationalData.keySet);
+ affectedPaths.addAll(transaction.removedOperationalData);
+
+ val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
+
+ val transactionId = transaction.identifier;
+
+ log.trace("Transaction: {} Started.",transactionId);
+ log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths);
+ // requesting commits
+ val Iterable , D> extends AbstractDataModification {
-
- @Property
- private val Object identifier;
-
- var TransactionStatus status;
-
- var AbstractDataBroker broker;
-
- protected new(Object identifier,AbstractDataBroker dataBroker) {
- super(dataBroker);
- _identifier = identifier;
- broker = dataBroker;
- status = TransactionStatus.NEW;
-
- //listeners = new ListenerRegistry<>();
- }
-
- override commit() {
- return broker.commit(this);
- }
-
- override readConfigurationData(P path) {
- return broker.readConfigurationData(path);
- }
-
- override readOperationalData(P path) {
- return broker.readOperationalData(path);
- }
-
- override hashCode() {
- return identifier.hashCode;
- }
-
- override equals(Object obj) {
- if (this === obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- val other = (obj as AbstractDataTransaction );
- if (broker == null) {
- if (other.broker != null)
- return false;
- } else if (!broker.equals(other.broker))
- return false;
- if (identifier == null) {
- if (other.identifier != null)
- return false;
- } else if (!identifier.equals(other.identifier))
- return false;
- return true;
- }
-
- override TransactionStatus getStatus() {
- return status;
- }
-
- protected abstract def void onStatusChange(TransactionStatus status);
-
- public def changeStatus(TransactionStatus status) {
- this.status = status;
- onStatusChange(status);
- }
-
-}
+ }
+ ]
+ }
+
+ def rollback(List , D> extends AbstractDataModification {
+
+ private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
+
+ @Property
+ private val Object identifier;
+
+ var TransactionStatus status;
+
+ var AbstractDataBroker broker;
+
+ protected new(Object identifier,AbstractDataBroker dataBroker) {
+ super(dataBroker);
+ _identifier = identifier;
+ broker = dataBroker;
+ status = TransactionStatus.NEW;
+ LOG.debug("Transaction {} Allocated.", identifier);
+
+ //listeners = new ListenerRegistry<>();
+ }
+
+ override commit() {
+ return broker.commit(this);
+ }
+
+ override readConfigurationData(P path) {
+ val local = this.updatedConfigurationData.get(path);
+ if(local != null) {
+ return local;
+ }
+
+ return broker.readConfigurationData(path);
+ }
+
+ override readOperationalData(P path) {
+ val local = this.updatedOperationalData.get(path);
+ if(local != null) {
+ return local;
+ }
+ return broker.readOperationalData(path);
+ }
+
+ override hashCode() {
+ return identifier.hashCode;
+ }
+
+ override equals(Object obj) {
+ if (this === obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ val other = (obj as AbstractDataTransaction );
+ if (broker == null) {
+ if (other.broker != null)
+ return false;
+ } else if (!broker.equals(other.broker))
+ return false;
+ if (identifier == null) {
+ if (other.identifier != null)
+ return false;
+ } else if (!identifier.equals(other.identifier))
+ return false;
+ return true;
+ }
+
+ override TransactionStatus getStatus() {
+ return status;
+ }
+
+ protected abstract def void onStatusChange(TransactionStatus status);
+
+ public def changeStatus(TransactionStatus status) {
+ LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
+ this.status = status;
+ onStatusChange(status);
+ }
+
+}