X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-common-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fcommon%2Fimpl%2Fservice%2FAbstractDataBroker.xtend;h=e3d2b567a71f42b202bf028db6f0d8dcd5f70ae2;hb=f20add00faf71465ad092144689450c105f9bde3;hp=b878071183e14937a63a1ce8981bcfaa5785566d;hpb=350dbdeb0a3d942ba532ada1d1931baf591bec5b;p=controller.git diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend index b878071183..e3d2b567a7 100644 --- a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend @@ -27,25 +27,43 @@ import java.util.concurrent.Future import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter import org.opendaylight.yangtools.concepts.Path import org.slf4j.LoggerFactory - -abstract class AbstractDataBroker
,D,DCL extends DataChangeListener
> implements -DataModificationTransactionFactory
, // +import java.util.HashSet +import java.util.Map.Entry +import java.util.Iterator +import java.util.Collection +import com.google.common.collect.FluentIterable; +import java.util.Set +import com.google.common.collect.ImmutableList +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration +import org.opendaylight.controller.md.sal.common.api.RegistrationListener +import org.opendaylight.yangtools.concepts.util.ListenerRegistry +import java.util.concurrent.atomic.AtomicLong + +abstract class AbstractDataBroker
, D, DCL extends DataChangeListener
> implements DataModificationTransactionFactory
, // DataReader
, // DataChangePublisher
, // -DataProvisionService
{ +DataProvisionService
{ + + private static val LOG = LoggerFactory.getLogger(AbstractDataBroker); @Property var ExecutorService executor; @Property - var AbstractDataReadRouter
dataReadRouter; - - Multimap
> listeners = HashMultimap.create(); - Multimap
> commitHandlers = HashMultimap.create(); - + var AbstractDataReadRouter
dataReadRouter; + Multimap
> listeners = HashMultimap.create(); + Multimap
> commitHandlers = HashMultimap.create();
+
+ val ListenerRegistry paths) {
+ return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
+ .transformAndConcat[value] //
+ .transform[instance].toList()
}
override final readConfigurationData(P path) {
@@ -56,11 +74,18 @@ DataProvisionService {
return dataReadRouter.readOperationalData(path);
}
- override final registerCommitHandler(P path,
- DataCommitHandler commitHandler) {
- val registration = new DataCommitHandlerRegistration(path,commitHandler,this);
- commitHandlers.put(path,registration)
- return registration;
+ override final registerCommitHandler(P path, DataCommitHandler commitHandler) {
+ val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
+ commitHandlers.put(path, registration)
+ LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);
+ for(listener : commitHandlerRegistrationListeners) {
+ try {
+ listener.instance.onRegister(registration);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
+ }
+ }
+ return registration;
}
override final def registerDataChangeListener(P path, DCL listener) {
@@ -69,27 +94,65 @@ DataProvisionService {
return reg;
}
- final def registerDataReader(P path,DataReader reader) {
-
- val confReg = dataReadRouter.registerConfigurationReader(path,reader);
- val dataReg = dataReadRouter.registerOperationalReader(path,reader);
+ final def registerDataReader(P path, DataReader reader) {
+
+ val confReg = dataReadRouter.registerConfigurationReader(path, reader);
+ val dataReg = dataReadRouter.registerOperationalReader(path, reader);
+
+ return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
+ }
+
+ override registerCommitHandlerListener(RegistrationListener registration) {
+ protected final def removeListener(DataChangeListenerRegistration registration) {
listeners.remove(registration.path, registration);
}
- protected final def removeCommitHandler(DataCommitHandlerRegistration registration) {
+ protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl registration) {
commitHandlers.remove(registration.path, registration);
+
+ LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
+ for(listener : commitHandlerRegistrationListeners) {
+ try {
+ listener.instance.onUnregister(registration);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
+ }
+ }
}
-
- protected final def getActiveCommitHandlers() {
- return commitHandlers.entries.map[ value.instance].toSet
+
+ protected final def getActiveCommitHandlers() {
+ return commitHandlers.entries;
+ }
+
+ protected def /*Iterator paths) {
+ return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
+ val operationalState = readOperationalData(key)
+ val configurationState = readConfigurationData(key)
+ return new ListenerStateCapture(key, value, operationalState, configurationState)
+ ].toList()
+ }
+
+ protected def boolean isAffectedBy(P key, Set paths) {
+ if (paths.contains(key)) {
+ return true;
+ }
+ for (path : paths) {
+ if (key.contains(path)) {
+ return true;
+ }
+ }
+
+ return false;
}
- package final def Future transaction) {
+ package final def Future transaction) {
checkNotNull(transaction);
transaction.changeStatus(TransactionStatus.SUBMITED);
val task = new TwoPhaseCommit(transaction, this);
@@ -98,14 +161,30 @@ DataProvisionService {
}
-package class DataChangeListenerRegistration ,D,DCL extends DataChangeListener > extends AbstractObjectRegistration , D, DCL extends DataChangeListener > {
+
+ @Property
+ P path;
+
+ @Property
+ Collection , D, DCL extends DataChangeListener > extends AbstractObjectRegistration dataBroker;
+ AbstractDataBroker dataBroker;
@Property
val P path;
- new(P path, DCL instance, AbstractDataBroker broker) {
+ new(P path, DCL instance, AbstractDataBroker broker) {
super(instance)
dataBroker = broker;
_path = path;
@@ -118,16 +197,16 @@ package class DataChangeListenerRegistration ,D,DCL extends Dat
}
-package class DataCommitHandlerRegistration ,D>
-extends AbstractObjectRegistration , D> //
+extends AbstractObjectRegistration {
- AbstractDataBroker dataBroker;
+ AbstractDataBroker dataBroker;
@Property
val P path;
- new(P path, DataCommitHandler instance,
- AbstractDataBroker broker) {
+ new(P path, DataCommitHandler instance, AbstractDataBroker broker) {
super(instance)
dataBroker = broker;
_path = path;
@@ -137,52 +216,87 @@ extends AbstractObjectRegistration ,D> implements Callable , D, DCL extends DataChangeListener > implements Callable transaction;
- val AbstractDataBroker dataBroker;
+ val AbstractDataTransaction transaction;
+ val AbstractDataBroker dataBroker;
- new(AbstractDataTransaction transaction, AbstractDataBroker broker) {
+ new(AbstractDataTransaction transaction, AbstractDataBroker broker) {
this.transaction = transaction;
this.dataBroker = broker;
}
override call() throws Exception {
- val Iterable ();
+
+ affectedPaths.addAll(transaction.createdConfigurationData.keySet);
+ affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
+ affectedPaths.addAll(transaction.removedConfigurationData);
+
+ affectedPaths.addAll(transaction.createdOperationalData.keySet);
+ affectedPaths.addAll(transaction.updatedOperationalData.keySet);
+ affectedPaths.addAll(transaction.removedOperationalData);
+
+ val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
+ val transactionId = transaction.identifier;
+
+ log.info("Transaction: {} Started.",transactionId);
// requesting commits
+ val Iterable , D> extends Abst
@Property
private val Object identifier;
-
var TransactionStatus status;
-
-
+
var AbstractDataBroker broker;
- protected new (AbstractDataBroker dataBroker) {
+ protected new(Object identifier,AbstractDataBroker dataBroker) {
super(dataBroker);
- _identifier = new Object();
+ _identifier = identifier;
broker = dataBroker;
status = TransactionStatus.NEW;
- //listeners = new ListenerRegistry<>();
+
+ //listeners = new ListenerRegistry<>();
}
- override commit() {
+ override commit() {
return broker.commit(this);
}
@@ -230,7 +343,7 @@ public abstract class AbstractDataTransaction , D> extends Abst
return false;
if (getClass() != obj.getClass())
return false;
- val other = (obj as AbstractDataTransaction ) ;
+ val other = (obj as AbstractDataTransaction );
if (broker == null) {
if (other.broker != null)
return false;
@@ -248,12 +361,11 @@ public abstract class AbstractDataTransaction , D> extends Abst
return status;
}
-
protected abstract def void onStatusChange(TransactionStatus status);
-
+
public def changeStatus(TransactionStatus status) {
this.status = status;
onStatusChange(status);
}
-
+
}