package org.opendaylight.controller.md.sal.common.impl.service import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler import org.opendaylight.controller.md.sal.common.api.TransactionStatus import org.opendaylight.controller.md.sal.common.api.data.DataReader import org.opendaylight.yangtools.concepts.AbstractObjectRegistration import org.opendaylight.yangtools.concepts.ListenerRegistration import com.google.common.collect.Multimap import static com.google.common.base.Preconditions.*; import java.util.List import com.google.common.collect.HashMultimap import java.util.concurrent.ExecutorService import java.util.concurrent.Callable import org.opendaylight.yangtools.yang.common.RpcResult import java.util.Collections import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction import java.util.ArrayList import org.opendaylight.yangtools.concepts.CompositeObjectRegistration import java.util.Arrays import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener import org.opendaylight.controller.sal.common.util.Rpcs import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification import java.util.concurrent.Future import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter import org.opendaylight.yangtools.concepts.Path import org.slf4j.LoggerFactory abstract class AbstractDataBroker
,D,DCL extends DataChangeListener
> implements DataModificationTransactionFactory
, // DataReader
, // DataChangePublisher
, // DataProvisionService
{ @Property var ExecutorService executor; @Property var AbstractDataReadRouter
dataReadRouter; Multimap
> listeners = HashMultimap.create(); Multimap
> commitHandlers = HashMultimap.create(); public new() { } override final readConfigurationData(P path) { return dataReadRouter.readConfigurationData(path); } override final readOperationalData(P path) { return dataReadRouter.readOperationalData(path); } override final registerCommitHandler(P path, DataCommitHandler
commitHandler) { val registration = new DataCommitHandlerRegistration(path,commitHandler,this); commitHandlers.put(path,registration) return registration; } override final def registerDataChangeListener(P path, DCL listener) { val reg = new DataChangeListenerRegistration(path, listener, this); listeners.put(path, reg); return reg; } final def registerDataReader(P path,DataReader
reader) { val confReg = dataReadRouter.registerConfigurationReader(path,reader); val dataReg = dataReadRouter.registerOperationalReader(path,reader); return new CompositeObjectRegistration(reader,Arrays.asList(confReg,dataReg)); } protected final def removeListener(DataChangeListenerRegistration
registration) { listeners.remove(registration.path, registration); } protected final def removeCommitHandler(DataCommitHandlerRegistration
registration) {
commitHandlers.remove(registration.path, registration);
}
protected final def getActiveCommitHandlers() {
return commitHandlers.entries.map[ value.instance].toSet
}
package final def Future transaction) {
checkNotNull(transaction);
transaction.changeStatus(TransactionStatus.SUBMITED);
val task = new TwoPhaseCommit(transaction, this);
return executor.submit(task);
}
}
package class DataChangeListenerRegistration ,D,DCL extends DataChangeListener > extends AbstractObjectRegistration dataBroker;
@Property
val P path;
new(P path, DCL instance, AbstractDataBroker broker) {
super(instance)
dataBroker = broker;
_path = path;
}
override protected removeRegistration() {
dataBroker.removeListener(this);
dataBroker = null;
}
}
package class DataCommitHandlerRegistration ,D>
extends AbstractObjectRegistration dataBroker;
@Property
val P path;
new(P path, DataCommitHandler instance,
AbstractDataBroker broker) {
super(instance)
dataBroker = broker;
_path = path;
}
override protected removeRegistration() {
dataBroker.removeCommitHandler(this);
dataBroker = null;
}
}
package class TwoPhaseCommit ,D> implements Callable transaction;
val AbstractDataBroker dataBroker;
new(AbstractDataTransaction transaction, AbstractDataBroker broker) {
this.transaction = transaction;
this.dataBroker = broker;
}
override call() throws Exception {
val Iterable , D> extends AbstractDataModification {
@Property
private val Object identifier;
var TransactionStatus status;
var AbstractDataBroker broker;
protected new (AbstractDataBroker dataBroker) {
super(dataBroker);
_identifier = new Object();
broker = dataBroker;
status = TransactionStatus.NEW;
//listeners = new ListenerRegistry<>();
}
override commit() {
return broker.commit(this);
}
override readConfigurationData(P path) {
return broker.readConfigurationData(path);
}
override readOperationalData(P path) {
return broker.readOperationalData(path);
}
override hashCode() {
return identifier.hashCode;
}
override equals(Object obj) {
if (this === obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
val other = (obj as AbstractDataTransaction ) ;
if (broker == null) {
if (other.broker != null)
return false;
} else if (!broker.equals(other.broker))
return false;
if (identifier == null) {
if (other.identifier != null)
return false;
} else if (!identifier.equals(other.identifier))
return false;
return true;
}
override TransactionStatus getStatus() {
return status;
}
protected abstract def void onStatusChange(TransactionStatus status);
public def changeStatus(TransactionStatus status) {
this.status = status;
onStatusChange(status);
}
}