import com.google.common.base.Optional;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
+import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
+import org.opendaylight.genius.infra.Datastore;
+import org.opendaylight.genius.infra.Datastore.Operational;
import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.genius.infra.TypedReadWriteTransaction;
import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
import org.opendaylight.netvirt.elan.l2gw.ha.commands.MergeCommand;
* When an operational child node data is updated, it is copied to parent
* When a config parent node data is updated , it is copied to all its children.
*/
-public abstract class HwvtepNodeDataListener<T extends DataObject>
- extends AsyncDataTreeChangeListenerBase<T, HwvtepNodeDataListener<T>> {
+public abstract class HwvtepNodeDataListener<D extends Datastore, T extends DataObject>
+ extends AsyncDataTreeChangeListenerBase<T, HwvtepNodeDataListener<D, T>> {
private static final Logger LOG = LoggerFactory.getLogger(HwvtepNodeDataListener.class);
private final ManagedNewTransactionRunner txRunner;
+ private final SingleTransactionDataBroker singleTxBroker;
private final MergeCommand<T, ?, ?> mergeCommand;
- private final LogicalDatastoreType datastoreType;
+ private final Class<D> datastoreType;
private final BiConsumer<InstanceIdentifier<T>, T> addOperation;
private final BiConsumer<InstanceIdentifier<T>, T> removeOperation;
private final HwvtepNodeHACache hwvtepNodeHACache;
public HwvtepNodeDataListener(DataBroker broker,
HwvtepNodeHACache hwvtepNodeHACache,
Class<T> clazz,
- Class<HwvtepNodeDataListener<T>> eventClazz,
+ Class<HwvtepNodeDataListener<D, T>> eventClazz,
MergeCommand<T, ?, ?> mergeCommand,
- LogicalDatastoreType datastoreType) {
+ Class<D> datastoreType) {
super(clazz, eventClazz);
this.hwvtepNodeHACache = hwvtepNodeHACache;
this.txRunner = new ManagedNewTransactionRunnerImpl(broker);
+ this.singleTxBroker = new SingleTransactionDataBroker(broker);
this.mergeCommand = mergeCommand;
this.datastoreType = datastoreType;
- if (LogicalDatastoreType.OPERATIONAL == datastoreType) {
+ if (Operational.class.equals(datastoreType)) {
this.addOperation = this::copyToParent;
this.removeOperation = this::deleteFromParent;
} else {
HAJobScheduler.getInstance().submitJob(() -> removeOperation.accept(identifier, dataRemoved));
}
- private boolean isNodeConnected(InstanceIdentifier<T> identifier, ReadTransaction tx)
+ private boolean isNodeConnected(InstanceIdentifier<T> identifier)
throws ReadFailedException {
- return tx.read(LogicalDatastoreType.OPERATIONAL, identifier.firstIdentifierOf(Node.class))
- .checkedGet().isPresent();
+ return singleTxBroker.syncReadOptional(LogicalDatastoreType.OPERATIONAL,
+ identifier.firstIdentifierOf(Node.class)).isPresent();
}
private static <T extends DataObject> boolean isDataUpdated(Optional<T> existingDataOptional, T newData) {
LOG.trace("Copy child op data {} to parent {}", mergeCommand.getDescription(), getNodeId(parent));
T parentData = mergeCommand.transform(parent, data);
InstanceIdentifier<T> parentIdentifier = mergeCommand.generateId(parent, parentData);
- ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(
+ ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType,
tx -> writeToMdsal(tx, parentData, parentIdentifier)), LOG, "Error copying to parent");
}
}
}
InstanceIdentifier<Node> parent = getHAParent(identifier);
if (parent != null) {
- ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
- if (isNodeConnected(identifier, tx)) {
+ ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType, tx -> {
+ if (isNodeConnected(identifier)) {
LOG.trace("Copy child op data {} to parent {} create:{}", mergeCommand.getDescription(),
getNodeId(parent), false);
T parentData = mergeCommand.transform(parent, data);
private void copyToChildren(final InstanceIdentifier<T> parentIdentifier, final T parentData) {
Set<InstanceIdentifier<Node>> children = getChildrenForHANode(parentIdentifier);
if (children != null) {
- ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+ ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType, tx -> {
for (InstanceIdentifier<Node> child : children) {
LOG.trace("Copy parent config data {} to child {}", mergeCommand.getDescription(),
getNodeId(child));
private void deleteFromChildren(final InstanceIdentifier<T> parentIdentifier, final T parentData) {
Set<InstanceIdentifier<Node>> children = getChildrenForHANode(parentIdentifier);
if (children != null) {
- ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+ ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType, tx -> {
for (InstanceIdentifier<Node> child : children) {
LOG.trace("Delete parent config data {} to child {}", mergeCommand.getDescription(),
getNodeId(child));
}
}
- private void writeToMdsal(final ReadWriteTransaction tx, final T data, final InstanceIdentifier<T> identifier)
- throws ReadFailedException {
- if (isDataUpdated(tx.read(datastoreType, identifier).checkedGet(), data)) {
- tx.put(datastoreType, identifier, data);
+ private void writeToMdsal(final TypedReadWriteTransaction<D> tx, final T data,
+ final InstanceIdentifier<T> identifier) throws ExecutionException, InterruptedException {
+ if (isDataUpdated(tx.read(identifier).get(), data)) {
+ tx.put(identifier, data);
}
}
- private void deleteFromMdsal(final ReadWriteTransaction tx,
- final InstanceIdentifier<T> identifier) throws ReadFailedException {
- if (tx.read(datastoreType, identifier).checkedGet().isPresent()) {
- tx.delete(datastoreType, identifier);
+ private void deleteFromMdsal(final TypedReadWriteTransaction<D> tx,
+ final InstanceIdentifier<T> identifier) throws ExecutionException, InterruptedException {
+ if (tx.read(identifier).get().isPresent()) {
+ tx.delete(identifier);
}
}
}
@Override
- protected HwvtepNodeDataListener<T> getDataTreeChangeListener() {
+ protected HwvtepNodeDataListener<D, T> getDataTreeChangeListener() {
return HwvtepNodeDataListener.this;
}
- protected Set<InstanceIdentifier<Node>> getChildrenForHANode(InstanceIdentifier identifier) {
+ protected Set<InstanceIdentifier<Node>> getChildrenForHANode(InstanceIdentifier<T> identifier) {
InstanceIdentifier<Node> parent = identifier.firstIdentifierOf(Node.class);
return hwvtepNodeHACache.getChildrenForHANode(parent);
}
- protected InstanceIdentifier<Node> getHAParent(InstanceIdentifier identifier) {
+ protected InstanceIdentifier<Node> getHAParent(InstanceIdentifier<T> identifier) {
InstanceIdentifier<Node> child = identifier.firstIdentifierOf(Node.class);
return hwvtepNodeHACache.getParent(child);
}