Datastore-constrained txes: elanmanager
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / ha / listeners / HwvtepNodeDataListener.java
index 04cfefd7d435892d46d10b89d7dc20e404baa87c..c7ea093ea56040e77524dc359dbfd94454753ffc 100644 (file)
@@ -10,15 +10,18 @@ package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
 import com.google.common.base.Optional;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.function.BiConsumer;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
+import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
+import org.opendaylight.genius.infra.Datastore;
+import org.opendaylight.genius.infra.Datastore.Operational;
 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.genius.infra.TypedReadWriteTransaction;
 import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
 import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
 import org.opendaylight.netvirt.elan.l2gw.ha.commands.MergeCommand;
@@ -34,14 +37,15 @@ import org.slf4j.LoggerFactory;
  * When an operational child node data is updated, it is copied to parent
  * When a config parent node data is updated , it is copied to all its children.
  */
-public abstract class HwvtepNodeDataListener<T extends DataObject>
-        extends AsyncDataTreeChangeListenerBase<T, HwvtepNodeDataListener<T>> {
+public abstract class HwvtepNodeDataListener<D extends Datastore, T extends DataObject>
+        extends AsyncDataTreeChangeListenerBase<T, HwvtepNodeDataListener<D, T>> {
 
     private static final Logger LOG = LoggerFactory.getLogger(HwvtepNodeDataListener.class);
 
     private final ManagedNewTransactionRunner txRunner;
+    private final SingleTransactionDataBroker singleTxBroker;
     private final MergeCommand<T, ?, ?> mergeCommand;
-    private final LogicalDatastoreType datastoreType;
+    private final Class<D> datastoreType;
     private final BiConsumer<InstanceIdentifier<T>, T> addOperation;
     private final BiConsumer<InstanceIdentifier<T>, T> removeOperation;
     private final HwvtepNodeHACache hwvtepNodeHACache;
@@ -49,15 +53,16 @@ public abstract class HwvtepNodeDataListener<T extends DataObject>
     public HwvtepNodeDataListener(DataBroker broker,
                                   HwvtepNodeHACache hwvtepNodeHACache,
                                   Class<T> clazz,
-                                  Class<HwvtepNodeDataListener<T>> eventClazz,
+                                  Class<HwvtepNodeDataListener<D, T>> eventClazz,
                                   MergeCommand<T, ?, ?> mergeCommand,
-                                  LogicalDatastoreType datastoreType) {
+                                  Class<D> datastoreType) {
         super(clazz, eventClazz);
         this.hwvtepNodeHACache = hwvtepNodeHACache;
         this.txRunner = new ManagedNewTransactionRunnerImpl(broker);
+        this.singleTxBroker = new SingleTransactionDataBroker(broker);
         this.mergeCommand = mergeCommand;
         this.datastoreType = datastoreType;
-        if (LogicalDatastoreType.OPERATIONAL == datastoreType) {
+        if (Operational.class.equals(datastoreType)) {
             this.addOperation = this::copyToParent;
             this.removeOperation = this::deleteFromParent;
         } else {
@@ -90,10 +95,10 @@ public abstract class HwvtepNodeDataListener<T extends DataObject>
         HAJobScheduler.getInstance().submitJob(() -> removeOperation.accept(identifier, dataRemoved));
     }
 
-    private boolean isNodeConnected(InstanceIdentifier<T> identifier, ReadTransaction tx)
+    private boolean isNodeConnected(InstanceIdentifier<T> identifier)
             throws ReadFailedException {
-        return tx.read(LogicalDatastoreType.OPERATIONAL, identifier.firstIdentifierOf(Node.class))
-                .checkedGet().isPresent();
+        return singleTxBroker.syncReadOptional(LogicalDatastoreType.OPERATIONAL,
+            identifier.firstIdentifierOf(Node.class)).isPresent();
     }
 
     private static <T extends DataObject> boolean isDataUpdated(Optional<T> existingDataOptional, T newData) {
@@ -110,7 +115,7 @@ public abstract class HwvtepNodeDataListener<T extends DataObject>
             LOG.trace("Copy child op data {} to parent {}", mergeCommand.getDescription(), getNodeId(parent));
             T parentData = mergeCommand.transform(parent, data);
             InstanceIdentifier<T> parentIdentifier = mergeCommand.generateId(parent, parentData);
-            ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(
+            ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType,
                 tx -> writeToMdsal(tx, parentData, parentIdentifier)), LOG, "Error copying to parent");
         }
     }
@@ -122,8 +127,8 @@ public abstract class HwvtepNodeDataListener<T extends DataObject>
         }
         InstanceIdentifier<Node> parent = getHAParent(identifier);
         if (parent != null) {
-            ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
-                if (isNodeConnected(identifier, tx)) {
+            ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType, tx -> {
+                if (isNodeConnected(identifier)) {
                     LOG.trace("Copy child op data {} to parent {} create:{}", mergeCommand.getDescription(),
                             getNodeId(parent), false);
                     T parentData = mergeCommand.transform(parent, data);
@@ -137,7 +142,7 @@ public abstract class HwvtepNodeDataListener<T extends DataObject>
     private void copyToChildren(final InstanceIdentifier<T> parentIdentifier, final T parentData) {
         Set<InstanceIdentifier<Node>> children = getChildrenForHANode(parentIdentifier);
         if (children != null) {
-            ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+            ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType, tx -> {
                 for (InstanceIdentifier<Node> child : children) {
                     LOG.trace("Copy parent config data {} to child {}", mergeCommand.getDescription(),
                             getNodeId(child));
@@ -152,7 +157,7 @@ public abstract class HwvtepNodeDataListener<T extends DataObject>
     private void deleteFromChildren(final InstanceIdentifier<T> parentIdentifier, final T parentData) {
         Set<InstanceIdentifier<Node>> children = getChildrenForHANode(parentIdentifier);
         if (children != null) {
-            ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+            ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType, tx -> {
                 for (InstanceIdentifier<Node> child : children) {
                     LOG.trace("Delete parent config data {} to child {}", mergeCommand.getDescription(),
                             getNodeId(child));
@@ -164,17 +169,17 @@ public abstract class HwvtepNodeDataListener<T extends DataObject>
         }
     }
 
-    private void writeToMdsal(final ReadWriteTransaction tx, final T data, final InstanceIdentifier<T> identifier)
-            throws ReadFailedException {
-        if (isDataUpdated(tx.read(datastoreType, identifier).checkedGet(), data)) {
-            tx.put(datastoreType, identifier, data);
+    private void writeToMdsal(final TypedReadWriteTransaction<D> tx, final T data,
+            final InstanceIdentifier<T> identifier) throws ExecutionException, InterruptedException {
+        if (isDataUpdated(tx.read(identifier).get(), data)) {
+            tx.put(identifier, data);
         }
     }
 
-    private void deleteFromMdsal(final ReadWriteTransaction tx,
-            final InstanceIdentifier<T> identifier) throws ReadFailedException {
-        if (tx.read(datastoreType, identifier).checkedGet().isPresent()) {
-            tx.delete(datastoreType, identifier);
+    private void deleteFromMdsal(final TypedReadWriteTransaction<D> tx,
+            final InstanceIdentifier<T> identifier) throws ExecutionException, InterruptedException {
+        if (tx.read(identifier).get().isPresent()) {
+            tx.delete(identifier);
         }
     }
 
@@ -183,16 +188,16 @@ public abstract class HwvtepNodeDataListener<T extends DataObject>
     }
 
     @Override
-    protected HwvtepNodeDataListener<T> getDataTreeChangeListener() {
+    protected HwvtepNodeDataListener<D, T> getDataTreeChangeListener() {
         return HwvtepNodeDataListener.this;
     }
 
-    protected Set<InstanceIdentifier<Node>> getChildrenForHANode(InstanceIdentifier identifier) {
+    protected Set<InstanceIdentifier<Node>> getChildrenForHANode(InstanceIdentifier<T> identifier) {
         InstanceIdentifier<Node> parent = identifier.firstIdentifierOf(Node.class);
         return hwvtepNodeHACache.getChildrenForHANode(parent);
     }
 
-    protected InstanceIdentifier<Node> getHAParent(InstanceIdentifier identifier) {
+    protected InstanceIdentifier<Node> getHAParent(InstanceIdentifier<T> identifier) {
         InstanceIdentifier<Node> child = identifier.firstIdentifierOf(Node.class);
         return hwvtepNodeHACache.getParent(child);
     }