import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
import org.opendaylight.yangtools.concepts.Path
import org.slf4j.LoggerFactory
-
-abstract class AbstractDataBroker<P extends Path<P>,D,DCL extends DataChangeListener<P,D>> implements
-DataModificationTransactionFactory<P, D>, //
+import java.util.HashSet
+import java.util.Map.Entry
+import java.util.Iterator
+import java.util.Collection
+import com.google.common.collect.FluentIterable;
+import java.util.Set
+import com.google.common.collect.ImmutableList
+
+abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
DataReader<P, D>, //
DataChangePublisher<P, D, DCL>, //
-DataProvisionService<P,D> {
+DataProvisionService<P, D> {
@Property
var ExecutorService executor;
@Property
- var AbstractDataReadRouter<P,D> dataReadRouter;
-
- Multimap<P, DataChangeListenerRegistration<P,D,DCL>> listeners = HashMultimap.create();
- Multimap<P, DataCommitHandlerRegistration<P,D>> commitHandlers = HashMultimap.create();
+ var AbstractDataReadRouter<P, D> dataReadRouter;
+ Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();
+ Multimap<P, DataCommitHandlerRegistration<P, D>> commitHandlers = HashMultimap.create();
public new() {
-
+ }
+
+ protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
+ HashSet<P> paths) {
+ return FluentIterable.from(commitHandlers.asMap.entrySet)
+ .filter[key.isAffectedBy(paths)] //
+ .transformAndConcat [value] //
+ .transform[instance].toList()
}
override final readConfigurationData(P path) {
return dataReadRouter.readOperationalData(path);
}
- override final registerCommitHandler(P path,
- DataCommitHandler<P, D> commitHandler) {
- val registration = new DataCommitHandlerRegistration(path,commitHandler,this);
- commitHandlers.put(path,registration)
- return registration;
+ override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
+ val registration = new DataCommitHandlerRegistration(path, commitHandler, this);
+ commitHandlers.put(path, registration)
+ return registration;
}
override final def registerDataChangeListener(P path, DCL listener) {
return reg;
}
- final def registerDataReader(P path,DataReader<P,D> reader) {
-
- val confReg = dataReadRouter.registerConfigurationReader(path,reader);
- val dataReg = dataReadRouter.registerOperationalReader(path,reader);
-
- return new CompositeObjectRegistration(reader,Arrays.asList(confReg,dataReg));
+ final def registerDataReader(P path, DataReader<P, D> reader) {
+
+ val confReg = dataReadRouter.registerConfigurationReader(path, reader);
+ val dataReg = dataReadRouter.registerOperationalReader(path, reader);
+
+ return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
}
- protected final def removeListener(DataChangeListenerRegistration<P,D,DCL> registration) {
+ protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
listeners.remove(registration.path, registration);
}
- protected final def removeCommitHandler(DataCommitHandlerRegistration<P,D> registration) {
+ protected final def removeCommitHandler(DataCommitHandlerRegistration<P, D> registration) {
commitHandlers.remove(registration.path, registration);
}
-
- protected final def getActiveCommitHandlers() {
- return commitHandlers.entries.map[ value.instance].toSet
+
+ protected final def getActiveCommitHandlers() {
+ return commitHandlers.entries;
}
- package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P,D> transaction) {
+ protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
+ HashSet<P> paths) {
+ return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
+ val operationalState = readOperationalData(key)
+ val configurationState = readConfigurationData(key)
+ return new ListenerStateCapture(key, value, operationalState, configurationState)
+ ].toList()
+ }
+
+ protected def boolean isAffectedBy(P key, Set<P> paths) {
+ if (paths.contains(key)) {
+ return true;
+ }
+ for (path : paths) {
+ if (key.contains(path)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
checkNotNull(transaction);
transaction.changeStatus(TransactionStatus.SUBMITED);
val task = new TwoPhaseCommit(transaction, this);
}
-package class DataChangeListenerRegistration<P extends Path<P>,D,DCL extends DataChangeListener<P,D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
+@Data
+package class ListenerStateCapture<P extends Path<P>, D,DCL extends DataChangeListener<P, D>> {
- AbstractDataBroker<P,D,DCL> dataBroker;
+ @Property
+ P path;
+
+ @Property
+ Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
+
+ @Property
+ D initialOperationalState;
+
+ @Property
+ D initialConfigurationState;
+}
+
+package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
+
+ AbstractDataBroker<P, D, DCL> dataBroker;
@Property
val P path;
- new(P path, DCL instance, AbstractDataBroker<P,D,DCL> broker) {
+ new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
super(instance)
dataBroker = broker;
_path = path;
}
-package class DataCommitHandlerRegistration<P extends Path<P>,D>
-extends AbstractObjectRegistration<DataCommitHandler<P, D>> {
+package class DataCommitHandlerRegistration<P extends Path<P>, D> extends AbstractObjectRegistration<DataCommitHandler<P, D>> {
- AbstractDataBroker<P,D,?> dataBroker;
+ AbstractDataBroker<P, D, ?> dataBroker;
@Property
val P path;
- new(P path, DataCommitHandler<P, D> instance,
- AbstractDataBroker<P,D,?> broker) {
+ new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
super(instance)
dataBroker = broker;
_path = path;
}
-package class TwoPhaseCommit<P extends Path<P>,D> implements Callable<RpcResult<TransactionStatus>> {
-
+package class TwoPhaseCommit<P extends Path<P>, D,DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
+
private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
- val AbstractDataTransaction<P,D> transaction;
- val AbstractDataBroker<P,D,?> dataBroker;
+ val AbstractDataTransaction<P, D> transaction;
+ val AbstractDataBroker<P, D, DCL> dataBroker;
- new(AbstractDataTransaction<P,D> transaction, AbstractDataBroker<P,D,?> broker) {
+ new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
this.transaction = transaction;
this.dataBroker = broker;
}
override call() throws Exception {
- val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.activeCommitHandlers;
+ // get affected paths
+ val affectedPaths = new HashSet<P>();
+
+ affectedPaths.addAll(transaction.createdConfigurationData.keySet);
+ affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
+ affectedPaths.addAll(transaction.removedConfigurationData);
+
+ affectedPaths.addAll(transaction.createdOperationalData.keySet);
+ affectedPaths.addAll(transaction.updatedOperationalData.keySet);
+ affectedPaths.addAll(transaction.removedOperationalData);
+
+ val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
// requesting commits
+ val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
try {
for (handler : commitHandlers) {
handlerTransactions.add(handler.requestCommit(transaction));
}
} catch (Exception e) {
- log.error("Request Commit failded",e);
- return rollback(handlerTransactions,e);
+ log.error("Request Commit failded", e);
+ return rollback(handlerTransactions, e);
}
val List<RpcResult<Void>> results = new ArrayList();
try {
for (subtransaction : handlerTransactions) {
results.add(subtransaction.finish());
}
+ listeners.publishDataChangeEvent();
} catch (Exception e) {
- log.error("Finish Commit failed",e);
- return rollback(handlerTransactions,e);
+ log.error("Finish Commit failed", e);
+ return rollback(handlerTransactions, e);
}
+
return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
+
+ }
+
+ def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D,DCL>> listeners) {
+ for(listenerSet : listeners) {
+ val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
+ val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
+
+ val changeEvent = new DataChangeEventImpl(transaction,listenerSet.initialConfigurationState,listenerSet.initialOperationalState,updatedOperational,updatedConfiguration);
+ for(listener : listenerSet.listeners) {
+ try {
+ listener.instance.onDataChanged(changeEvent);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
}
- def rollback(List<DataCommitTransaction<P, D>> transactions,Exception e) {
+ def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
for (transaction : transactions) {
transaction.rollback()
}
+
// FIXME return encountered error.
return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
}
}
-
public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
@Property
private val Object identifier;
-
var TransactionStatus status;
-
-
+
var AbstractDataBroker<P, D, ?> broker;
- protected new (AbstractDataBroker<P,D,?> dataBroker) {
+ protected new(AbstractDataBroker<P, D, ?> dataBroker) {
super(dataBroker);
_identifier = new Object();
broker = dataBroker;
status = TransactionStatus.NEW;
- //listeners = new ListenerRegistry<>();
+
+ //listeners = new ListenerRegistry<>();
}
- override commit() {
+ override commit() {
return broker.commit(this);
}
return false;
if (getClass() != obj.getClass())
return false;
- val other = (obj as AbstractDataTransaction<P,D>) ;
+ val other = (obj as AbstractDataTransaction<P,D>);
if (broker == null) {
if (other.broker != null)
return false;
return status;
}
-
protected abstract def void onStatusChange(TransactionStatus status);
-
+
public def changeStatus(TransactionStatus status) {
this.status = status;
onStatusChange(status);
}
-
+
}