Port ELAN service to datastore-constrained txns 26/73726/5
authoreceghkl <manu.b@ericsson.com>
Wed, 4 Jul 2018 06:14:32 +0000 (11:44 +0530)
committerSam Hague <shague@redhat.com>
Sat, 7 Jul 2018 00:56:42 +0000 (00:56 +0000)
* Most of the txRunners are converted to use DS constrained txns.
* Excluded methods are below
- Same tx used for both config and operational
- Methods where blocking calls are used.

JIRA: NETVIRT-1339

Change-Id: I78c40439c486a95d74b063886d3be49b0a999a28
Signed-off-by: eceghkl <manu.b@ericsson.com>
15 files changed:
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/evpn/utils/EvpnUtils.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/internal/ElanExtnTepConfigListener.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/internal/ElanInterfaceManager.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/internal/ElanLearntVpnVipToPortListener.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/internal/ElanNodeListener.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/internal/ElanPacketInHandler.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/internal/ElanSmacFlowEventListener.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/listeners/ElanInstanceListener.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/listeners/HwvtepPhysicalSwitchListener.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/listeners/HwvtepTerminationPointListener.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/utils/ElanL2GatewayMulticastUtils.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/utils/ElanL2GatewayUtils.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/recovery/impl/ElanInterfaceRecoveryHandler.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/utils/ElanForwardingEntriesHandler.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/utils/ElanUtils.java

index 7e41ff6b26306d5d4a24bc6cdc7d8c609e5a24a9..5eada5cd57cc83c45eb2a31262f4ea8f110ecff9 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.netvirt.elan.evpn.utils;
 
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
+
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -364,7 +366,7 @@ public class EvpnUtils {
     }
 
     public void bindElanServiceToExternalTunnel(String elanName, String interfaceName) {
-        ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+        ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
             int instructionKey = 0;
             LOG.trace("Binding external interface {} elan {}", interfaceName, elanName);
             List<Instruction> instructions = new ArrayList<>();
@@ -376,21 +378,20 @@ public class EvpnUtils {
                     ElanUtils.getElanServiceName(elanName, interfaceName), elanServiceIndex,
                     NwConstants.ELAN_SERVICE_INDEX, NwConstants.COOKIE_ELAN_INGRESS_TABLE, instructions);
             InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
-            if (!tx.read(LogicalDatastoreType.CONFIGURATION, bindServiceId).checkedGet().isPresent()) {
-                tx.put(LogicalDatastoreType.CONFIGURATION, bindServiceId, serviceInfo,
-                        WriteTransaction.CREATE_MISSING_PARENTS);
+            if (!tx.read(bindServiceId).get().isPresent()) {
+                tx.put(bindServiceId, serviceInfo, WriteTransaction.CREATE_MISSING_PARENTS);
             }
         }), LOG, "Error binding an ELAN service to an external tunnel");
     }
 
     public void unbindElanServiceFromExternalTunnel(String elanName, String interfaceName) {
-        ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+        ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
             LOG.trace("UnBinding external interface {} elan {}", interfaceManager, elanName);
             short elanServiceIndex =
                     ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
             InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
-            if (tx.read(LogicalDatastoreType.CONFIGURATION, bindServiceId).checkedGet().isPresent()) {
-                tx.delete(LogicalDatastoreType.CONFIGURATION, bindServiceId);
+            if (tx.read(bindServiceId).get().isPresent()) {
+                tx.delete(bindServiceId);
             }
         }), LOG, "Error binding an ELAN service to an external tunnel");
     }
index c9547810035b482cb709105444672af5fcb98da6..69890bb3a2becc0e9419291e78dd340b04c39625 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.netvirt.elan.internal;
 
+import static org.opendaylight.genius.infra.Datastore.OPERATIONAL;
+
 import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 import javax.inject.Singleton;
@@ -56,8 +58,8 @@ public class ElanExtnTepConfigListener
 
     @Override
     protected void add(InstanceIdentifier<ExternalTeps> iid, ExternalTeps tep) {
-        JdkFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
-            tx.put(LogicalDatastoreType.OPERATIONAL, iid, tep, true);
+        JdkFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(OPERATIONAL, tx -> {
+            tx.put(iid, tep, true);
         }), LOG, "Failed to update operational external teps {}", iid);
     }
 
@@ -67,8 +69,8 @@ public class ElanExtnTepConfigListener
 
     @Override
     protected void remove(InstanceIdentifier<ExternalTeps> iid, ExternalTeps tep) {
-        JdkFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
-            tx.delete(LogicalDatastoreType.OPERATIONAL, iid);
+        JdkFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(OPERATIONAL, tx -> {
+            tx.delete(iid);
         }), LOG, "Failed to update operational external teps {}", iid);
     }
 
index e48ca2efbae99639edaa1ecc1fa8b36e931bf5c0..cd513bebf32a4f05f59fced896aa585e822e5312 100644 (file)
@@ -7,12 +7,14 @@
  */
 package org.opendaylight.netvirt.elan.internal;
 
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
 import static org.opendaylight.netvirt.elan.utils.ElanUtils.isVxlanNetworkOrVxlanSegment;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -24,9 +26,11 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+
 import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 import javax.inject.Singleton;
+
 import org.apache.commons.lang3.StringUtils;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
@@ -34,8 +38,10 @@ import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
+import org.opendaylight.genius.infra.Datastore.Configuration;
 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.genius.infra.TypedReadWriteTransaction;
 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
 import org.opendaylight.genius.itm.globals.ITMConstants;
@@ -393,7 +399,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
             interfaceInfo, String interfaceName, boolean isLastElanInterface) {
         String elanName = elanInfo.getElanInstanceName();
         List<ListenableFuture<Void>> futures = new ArrayList<>();
-        futures.add(txRunner.callWithNewWriteOnlyTransactionAndSubmit(flowTx -> {
+        futures.add(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, flowTx -> {
             futures.add(txRunner.callWithNewReadWriteTransactionAndSubmit(interfaceTx -> {
                 InstanceIdentifier<ElanInterfaceMac> elanInterfaceId = ElanUtils
                         .getElanInterfaceMacEntriesOperationalDataPath(interfaceName);
@@ -945,19 +951,19 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
     }
 
     public void removeFilterEqualsTable(ElanInstance elanInfo, InterfaceInfo interfaceInfo,
-            WriteTransaction deleteFlowGroupTx) {
+            TypedReadWriteTransaction<Configuration> flowTx) {
         int ifTag = interfaceInfo.getInterfaceTag();
         Flow flow = MDSALUtil.buildFlow(NwConstants.ELAN_FILTER_EQUALS_TABLE,
                 getFlowRef(NwConstants.ELAN_FILTER_EQUALS_TABLE, ifTag, "group"));
 
-        mdsalManager.removeFlowToTx(interfaceInfo.getDpId(), flow, deleteFlowGroupTx);
+        mdsalManager.removeFlow(flowTx, interfaceInfo.getDpId(), flow);
 
         Flow flowEntity = MDSALUtil.buildFlowNew(NwConstants.ELAN_FILTER_EQUALS_TABLE,
                 getFlowRef(NwConstants.ELAN_FILTER_EQUALS_TABLE, ifTag, "drop"), 10, elanInfo.getElanInstanceName(), 0,
                 0, ElanConstants.COOKIE_ELAN_FILTER_EQUALS.add(BigInteger.valueOf(ifTag)),
                 getMatchesForFilterEqualsLPortTag(ifTag), MDSALUtil.buildInstructionsDrop());
 
-        mdsalManager.removeFlowToTx(interfaceInfo.getDpId(), flowEntity, deleteFlowGroupTx);
+        mdsalManager.removeFlow(flowTx, interfaceInfo.getDpId(), flowEntity);
     }
 
     private List<Bucket> getRemoteBCGroupBucketInfos(ElanInstance elanInfo, int bucketKeyStart,
index 8ce0936a925db2fed3545b3c723b6d973a5620cb..46e2aacc4ca9dc1713270b2ee0670046c7ba9119 100644 (file)
@@ -7,22 +7,32 @@
  */
 package org.opendaylight.netvirt.elan.internal;
 
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
+import static org.opendaylight.genius.infra.Datastore.OPERATIONAL;
+
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
+
 import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 import javax.inject.Singleton;
+
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
+import org.opendaylight.genius.infra.Datastore.Configuration;
+import org.opendaylight.genius.infra.Datastore.Operational;
 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.genius.infra.TransactionAdapter;
+import org.opendaylight.genius.infra.TypedReadWriteTransaction;
+import org.opendaylight.genius.infra.TypedWriteTransaction;
 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
@@ -124,15 +134,15 @@ public class ElanLearntVpnVipToPortListener extends
                 return Collections.emptyList();
             }
             List<ListenableFuture<Void>> futures = new ArrayList<>();
-            futures.add(txRunner.callWithNewWriteOnlyTransactionAndSubmit(interfaceTx -> futures.add(
-                    txRunner.callWithNewWriteOnlyTransactionAndSubmit(
-                        flowTx -> addMacEntryToDsAndSetupFlows(elanInterface.get().getElanInstanceName(),
-                                interfaceTx, flowTx, ElanConstants.STATIC_MAC_TIMEOUT)))));
+            futures.add(txRunner.callWithNewWriteOnlyTransactionAndSubmit(OPERATIONAL, interfaceTx ->
+                futures.add(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, flowTx ->
+                    addMacEntryToDsAndSetupFlows(elanInterface.get().getElanInstanceName(), interfaceTx,
+                            flowTx, ElanConstants.STATIC_MAC_TIMEOUT)))));
             return futures;
         }
 
-        private void addMacEntryToDsAndSetupFlows(String elanName, WriteTransaction interfaceTx,
-                WriteTransaction flowTx, int macTimeOut) throws ElanException {
+        private void addMacEntryToDsAndSetupFlows(String elanName, TypedWriteTransaction<Operational> interfaceTx,
+                TypedWriteTransaction<Configuration> flowTx, int macTimeOut) throws ElanException {
             LOG.trace("Adding mac address {} and interface name {} to ElanInterfaceForwardingEntries and "
                 + "ElanForwardingTables DS", macAddress, interfaceName);
             BigInteger timeStamp = new BigInteger(String.valueOf(System.currentTimeMillis()));
@@ -142,13 +152,13 @@ public class ElanLearntVpnVipToPortListener extends
                     .setIsStaticAddress(false).build();
             InstanceIdentifier<MacEntry> macEntryId = ElanUtils
                     .getInterfaceMacEntriesIdentifierOperationalDataPath(interfaceName, physAddress);
-            interfaceTx.put(LogicalDatastoreType.OPERATIONAL, macEntryId, macEntry);
+            interfaceTx.put(macEntryId, macEntry);
             InstanceIdentifier<MacEntry> elanMacEntryId =
                     ElanUtils.getMacEntryOperationalDataPath(elanName, physAddress);
-            interfaceTx.put(LogicalDatastoreType.OPERATIONAL, elanMacEntryId, macEntry);
+            interfaceTx.put(elanMacEntryId, macEntry);
             ElanInstance elanInstance = elanInstanceCache.get(elanName).orNull();
             elanUtils.setupMacFlows(elanInstance, interfaceManager.getInterfaceInfo(interfaceName), macTimeOut,
-                    macAddress, true, flowTx);
+                    macAddress, true, TransactionAdapter.toWriteTransaction(flowTx));
         }
     }
 
@@ -169,15 +179,15 @@ public class ElanLearntVpnVipToPortListener extends
                 return Collections.emptyList();
             }
             List<ListenableFuture<Void>> futures = new ArrayList<>();
-            futures.add(txRunner.callWithNewWriteOnlyTransactionAndSubmit(interfaceTx -> futures.add(
-                    txRunner.callWithNewWriteOnlyTransactionAndSubmit(
-                        flowTx -> deleteMacEntryFromDsAndRemoveFlows(elanInterface.get().getElanInstanceName(),
-                                interfaceTx, flowTx)))));
+            futures.add(txRunner.callWithNewWriteOnlyTransactionAndSubmit(OPERATIONAL, interfaceTx ->
+                futures.add(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, flowTx ->
+                    deleteMacEntryFromDsAndRemoveFlows(elanInterface.get().getElanInstanceName(),
+                            interfaceTx, flowTx)))));
             return futures;
         }
 
-        private void deleteMacEntryFromDsAndRemoveFlows(String elanName, WriteTransaction interfaceTx,
-                WriteTransaction flowTx) {
+        private void deleteMacEntryFromDsAndRemoveFlows(String elanName,
+                TypedWriteTransaction<Operational> interfaceTx, TypedReadWriteTransaction<Configuration> flowTx) {
             LOG.trace("Deleting mac address {} and interface name {} from ElanInterfaceForwardingEntries "
                     + "and ElanForwardingTables DS", macAddress, interfaceName);
             PhysAddress physAddress = new PhysAddress(macAddress);
@@ -185,9 +195,9 @@ public class ElanLearntVpnVipToPortListener extends
             InterfaceInfo interfaceInfo = interfaceManager.getInterfaceInfo(interfaceName);
             if (macEntry != null && interfaceInfo != null) {
                 elanUtils.deleteMacFlows(elanInstanceCache.get(elanName).orNull(), interfaceInfo, macEntry, flowTx);
-                interfaceTx.delete(LogicalDatastoreType.OPERATIONAL,
+                interfaceTx.delete(
                         ElanUtils.getInterfaceMacEntriesIdentifierOperationalDataPath(interfaceName, physAddress));
-                interfaceTx.delete(LogicalDatastoreType.OPERATIONAL,
+                interfaceTx.delete(
                         ElanUtils.getMacEntryOperationalDataPath(elanName, physAddress));
             }
         }
index 2e068a23976865f51c4b966c37adc68890788f5d..581550646d71f9e96dc5448c13d30ac165428cf4 100644 (file)
@@ -7,20 +7,25 @@
  */
 package org.opendaylight.netvirt.elan.internal;
 
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
+
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+
 import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 import javax.inject.Singleton;
+
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
+import org.opendaylight.genius.infra.Datastore.Configuration;
 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.genius.infra.TypedWriteTransaction;
 import org.opendaylight.genius.mdsalutil.ActionInfo;
 import org.opendaylight.genius.mdsalutil.BucketInfo;
 import org.opendaylight.genius.mdsalutil.FlowEntity;
@@ -129,7 +134,7 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
 
     private void createArpDefaultFlowsForArpCheckTable(BigInteger dpId) {
         jobCoordinator.enqueueJob("ARP_CHECK_TABLE-" + dpId.toString(),
-            () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+            () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
                 LOG.debug("Received notification to install Arp Check Default entries for dpn {} ", dpId);
                 createArpRequestMatchFlows(dpId, tx);
                 createArpResponseMatchFlows(dpId, tx);
@@ -310,7 +315,7 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
                         Collections.singletonList(new InstructionGotoTable(NwConstants.ELAN_BASE_TABLE))));
     }
 
-    private void createArpRequestMatchFlows(BigInteger dpId, WriteTransaction writeFlowTx) {
+    private void createArpRequestMatchFlows(BigInteger dpId, TypedWriteTransaction<Configuration> tx) {
 
         long arpRequestGroupId = ArpResponderUtil.retrieveStandardArpResponderGroupId(idManagerService);
         List<BucketInfo> buckets = ArpResponderUtil.getDefaultBucketInfos(NwConstants.ARP_RESPONDER_TABLE);
@@ -324,20 +329,20 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
                                 new ActionNxResubmit(NwConstants.ARP_LEARN_TABLE_2),
                                 new ActionNxResubmit(NwConstants.ELAN_BASE_TABLE)));
         LOG.trace("Invoking MDSAL to install Arp Rquest Match Flow for table {}", NwConstants.ARP_CHECK_TABLE);
-        mdsalManager.addFlowToTx(arpReqArpCheckTbl, writeFlowTx);
+        mdsalManager.addFlow(tx, arpReqArpCheckTbl);
     }
 
-    private void createArpResponseMatchFlows(BigInteger dpId, WriteTransaction writeFlowTx) {
+    private void createArpResponseMatchFlows(BigInteger dpId, TypedWriteTransaction<Configuration> tx) {
         FlowEntity arpRepArpCheckTbl = ArpResponderUtil.createArpDefaultFlow(dpId, NwConstants.ARP_CHECK_TABLE,
                 NwConstants.ARP_REPLY, () -> Arrays.asList(MatchEthernetType.ARP, MatchArpOp.REPLY), () ->
                         Arrays.asList(new ActionNxResubmit(NwConstants.ARP_LEARN_TABLE_1),
                                 new ActionNxResubmit(NwConstants.ARP_LEARN_TABLE_2),
                                 new ActionNxResubmit(NwConstants.ELAN_BASE_TABLE)));
         LOG.trace("Invoking MDSAL to install  Arp Reply Match Flow for Table {} ", NwConstants.ARP_CHECK_TABLE);
-        mdsalManager.addFlowToTx(arpRepArpCheckTbl, writeFlowTx);
+        mdsalManager.addFlow(tx, arpRepArpCheckTbl);
     }
 
-    private void createArpPuntAndLearnFlow(BigInteger dpId, WriteTransaction writeFlowTx) {
+    private void createArpPuntAndLearnFlow(BigInteger dpId, TypedWriteTransaction<Configuration> tx) {
         LOG.debug("adding arp punt and learn entry in table {}", NwConstants.ARP_LEARN_TABLE_1);
 
         List<MatchInfo> matches = new ArrayList<>();
@@ -387,10 +392,10 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
         FlowEntity flow = MDSALUtil.buildFlowEntity(dpId, NwConstants.ARP_LEARN_TABLE_1, flowid,
                 NwConstants.TABLE_MISS_PRIORITY, "arp punt/learn flow", 0,
                 0, cookie, matches, instructions);
-        mdsalManager.addFlowToTx(flow, writeFlowTx);
+        mdsalManager.addFlow(tx, flow);
     }
 
-    private void addGarpLearnMatchFlow(BigInteger dpId, WriteTransaction writeFlowTx) {
+    private void addGarpLearnMatchFlow(BigInteger dpId, TypedWriteTransaction<Configuration> tx) {
         List<ActionInfo> actions = new ArrayList<>();
         List<MatchInfoBase> matches = new ArrayList<>();
 
@@ -408,10 +413,10 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
         FlowEntity garpFlow = MDSALUtil.buildFlowEntity(dpId, NwConstants.ELAN_BASE_TABLE, flowid,
                 NwConstants.DEFAULT_ARP_FLOW_PRIORITY, "GARP learn match flow", 0, 0,
                 ElanConstants.COOKIE_ELAN_BASE_SMAC, matches, instructions);
-        mdsalManager.addFlowToTx(garpFlow, writeFlowTx);
+        mdsalManager.addFlow(tx, garpFlow);
     }
 
-    private void addArpLearnMatchFlow(BigInteger dpId, WriteTransaction writeFlowTx) {
+    private void addArpLearnMatchFlow(BigInteger dpId, TypedWriteTransaction<Configuration> tx) {
         List<ActionInfo> actions = new ArrayList<>();
         List<MatchInfoBase> matches = new ArrayList<>();
 
@@ -428,7 +433,7 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
         FlowEntity arpFlow = MDSALUtil.buildFlowEntity(dpId, NwConstants.ELAN_BASE_TABLE, flowid,
                 NwConstants.DEFAULT_ARP_FLOW_PRIORITY, "ARP learn match flow", 0, 0,
                 ElanConstants.COOKIE_ELAN_BASE_SMAC, matches, instructions);
-        mdsalManager.addFlowToTx(arpFlow, writeFlowTx);
+        mdsalManager.addFlow(tx, arpFlow);
     }
 
 }
index 047618095eaa962004d02b61d041006589d70a0b..3ab0a41775335d0fbfe1f7f645f78c65f3347b19 100755 (executable)
@@ -7,16 +7,25 @@
  */
 package org.opendaylight.netvirt.elan.internal;
 
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
+import static org.opendaylight.genius.infra.Datastore.OPERATIONAL;
+
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import java.math.BigInteger;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
+
 import javax.inject.Inject;
 import javax.inject.Singleton;
+
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.genius.infra.TransactionAdapter;
 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
 import org.opendaylight.genius.mdsalutil.MetaDataUtil;
@@ -162,7 +171,7 @@ public class ElanPacketInHandler implements PacketProcessingListener {
                                                MacEntry oldMacEntry, MacEntry newMacEntry,
                                                final boolean isVlanOrFlatProviderIface) {
         jobCoordinator.enqueueJob(ElanUtils.getElanMacKey(elanTag, macAddress),
-            () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+            () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(OPERATIONAL, tx -> {
                 if (oldMacEntry != null && oldMacEntry.getInterface().equals(interfaceName)) {
                     // This should never occur because of ovs temporary mac learning
                     elanManagerCounters.unknownSmacPktinForwardingEntriesRemoved();
@@ -172,7 +181,7 @@ public class ElanPacketInHandler implements PacketProcessingListener {
                         InstanceIdentifier<MacEntry> macEntryId = ElanUtils
                                 .getInterfaceMacEntriesIdentifierOperationalDataPath(interfaceName,
                                         physAddress);
-                        tx.delete(LogicalDatastoreType.OPERATIONAL, macEntryId);
+                        tx.delete(macEntryId);
                     } else {
                         // New FEs flood their packets on all interfaces. This can lead
                         // to many contradicting packet_ins. Ignore all packets received
@@ -187,8 +196,7 @@ public class ElanPacketInHandler implements PacketProcessingListener {
                 if (!isVlanOrFlatProviderIface && oldMacEntry == null) {
                     InstanceIdentifier<MacEntry> elanMacEntryId =
                             ElanUtils.getMacEntryOperationalDataPath(elanName, physAddress);
-                    tx.put(LogicalDatastoreType.OPERATIONAL, elanMacEntryId, newMacEntry,
-                            WriteTransaction.CREATE_MISSING_PARENTS);
+                    tx.put(elanMacEntryId, newMacEntry, WriteTransaction.CREATE_MISSING_PARENTS);
                 }
             })));
     }
@@ -203,14 +211,17 @@ public class ElanPacketInHandler implements PacketProcessingListener {
             elanL2GatewayUtils.scheduleAddDpnMacInExtDevices(elanInstance.getElanInstanceName(), dpId,
                     Collections.singletonList(physAddress));
             elanManagerCounters.unknownSmacPktinLearned();
-            return Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
-                elanUtils.setupMacFlows(elanInstance, interfaceInfo, elanInstance.getMacTimeout(),
-                        macAddress, !isVlanOrFlatProviderIface, tx);
-                InstanceIdentifier<MacEntry> macEntryId =
+            List<ListenableFuture<Void>> futures = new ArrayList<>();
+            futures.add(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
+                futures.add(txRunner.callWithNewWriteOnlyTransactionAndSubmit(OPERATIONAL, operTx -> {
+                    elanUtils.setupMacFlows(elanInstance, interfaceInfo, elanInstance.getMacTimeout(),
+                        macAddress, !isVlanOrFlatProviderIface, TransactionAdapter.toWriteTransaction(tx));
+                    InstanceIdentifier<MacEntry> macEntryId =
                         ElanUtils.getInterfaceMacEntriesIdentifierOperationalDataPath(interfaceName, physAddress);
-                tx.put(LogicalDatastoreType.OPERATIONAL, macEntryId, newMacEntry,
-                        WriteTransaction.CREATE_MISSING_PARENTS);
+                    operTx.put(macEntryId, newMacEntry, WriteTransaction.CREATE_MISSING_PARENTS);
+                }));
             }));
+            return futures;
         });
     }
 
@@ -243,7 +254,7 @@ public class ElanPacketInHandler implements PacketProcessingListener {
                     macEntry.getMacAddress(), macEntry.getInterface());
             return;
         }
-        ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(
+        ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION,
             tx -> elanUtils.deleteMacFlows(elanInfo, oldInterfaceLport, macEntry, tx)), LOG,
             "Error deleting invalid MAC entry");
         elanL2GatewayUtils.removeMacsFromElanExternalDevices(elanInfo,
index 53e1281ace494e0b4362aca6fc50d1624440b6f3..d390023cdf64eace05ba49011b2d747d3f1b4e7f 100644 (file)
@@ -7,6 +7,9 @@
  */
 package org.opendaylight.netvirt.elan.internal;
 
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
+import static org.opendaylight.genius.infra.Datastore.OPERATIONAL;
+
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -108,7 +111,7 @@ public class ElanSmacFlowEventListener implements SalFlowListener {
                 String elanInstanceName = elanTagInfo.getName();
                 LOG.info("Deleting the Mac-Entry:{} present on ElanInstance:{}", macEntry, elanInstanceName);
                 if (macEntry != null && interfaceInfo != null) {
-                    ListenableFuture<Void> result = txRunner.callWithNewWriteOnlyTransactionAndSubmit(
+                    ListenableFuture<Void> result = txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION,
                         tx -> elanUtils.deleteMacFlows(elanInstanceCache.get(elanInstanceName).orNull(),
                                 interfaceInfo, macEntry, tx));
                     elanFutures.add(result);
@@ -119,17 +122,18 @@ public class ElanSmacFlowEventListener implements SalFlowListener {
                 Optional<MacEntry> existingInterfaceMacEntry = ElanUtils.read(broker,
                         LogicalDatastoreType.OPERATIONAL, macEntryIdForElanInterface);
                 if (existingInterfaceMacEntry.isPresent()) {
-                    ListenableFuture<Void> future = txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
-                        tx.delete(LogicalDatastoreType.OPERATIONAL, macEntryIdForElanInterface);
-                        MacEntry macEntryInElanInstance = elanUtils.getMacEntryForElanInstance(elanInstanceName,
+                    ListenableFuture<Void> future = txRunner.callWithNewWriteOnlyTransactionAndSubmit(OPERATIONAL,
+                        tx -> {
+                            tx.delete(macEntryIdForElanInterface);
+                            MacEntry macEntryInElanInstance = elanUtils.getMacEntryForElanInstance(elanInstanceName,
                                 physAddress).orNull();
-                        if (macEntryInElanInstance != null
+                            if (macEntryInElanInstance != null
                                 && macEntryInElanInstance.getInterface().equals(interfaceName)) {
-                            InstanceIdentifier<MacEntry> macEntryIdForElanInstance = ElanUtils
+                                InstanceIdentifier<MacEntry> macEntryIdForElanInstance = ElanUtils
                                     .getMacEntryOperationalDataPath(elanInstanceName, physAddress);
-                            tx.delete(LogicalDatastoreType.OPERATIONAL, macEntryIdForElanInstance);
-                        }
-                    });
+                                tx.delete(macEntryIdForElanInstance);
+                            }
+                        });
                     elanFutures.add(future);
                     addCallBack(future, srcMacAddress);
                 }
index 048a2eec9f50c87373449da9ec0976085a5ca629..14ce85bcd0492f5afffc10ca90434889746086ba 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.netvirt.elan.l2gw.listeners;
 
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
+
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Collections;
 import java.util.List;
@@ -64,15 +66,16 @@ public class ElanInstanceListener extends AsyncClusteredDataTreeChangeListenerBa
                 if (connections.isEmpty()) {
                     return Collections.emptyList();
                 }
-                ListenableFuture<Void> future = txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
-                    for (L2gatewayConnection connection : connections) {
-                        InstanceIdentifier<L2gatewayConnection> iid =
-                                InstanceIdentifier.create(Neutron.class).child(
-                                        L2gatewayConnections.class).child(
-                                        L2gatewayConnection.class, connection.key());
-                        tx.delete(LogicalDatastoreType.CONFIGURATION, iid);
-                    }
-                });
+                ListenableFuture<Void> future = txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION,
+                    tx -> {
+                        for (L2gatewayConnection connection : connections) {
+                            InstanceIdentifier<L2gatewayConnection> iid =
+                                    InstanceIdentifier.create(Neutron.class).child(
+                                            L2gatewayConnections.class).child(
+                                            L2gatewayConnection.class, connection.key());
+                            tx.delete(iid);
+                        }
+                    });
                 ListenableFutures.addErrorLogging(future, LOG,
                         "Failed to delete associate L2 gateway connection while deleting network");
                 return Collections.singletonList(future);
index d22e74b3ee802504df9667f8ba7a4cea96afe407..2a16e8ebee10ca3fda88d313ba94f70244ee6196 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.netvirt.elan.l2gw.listeners;
 
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
+
 import com.google.common.base.Optional;
 import java.util.Collections;
 import java.util.HashSet;
@@ -380,15 +382,14 @@ public class HwvtepPhysicalSwitchListener
                                       PhysicalSwitchAugmentation phySwitchAdded) {
         if (phySwitchAdded.getTunnelIps() != null) {
             ListenableFutures.addErrorLogging(
-                txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
-                    Optional<PhysicalSwitchAugmentation> existingSwitch = tx.read(
-                            LogicalDatastoreType.CONFIGURATION, identifier).checkedGet();
+                txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
+                    Optional<PhysicalSwitchAugmentation> existingSwitch = tx.read(identifier).get();
                     PhysicalSwitchAugmentationBuilder psBuilder = new PhysicalSwitchAugmentationBuilder();
                     if (existingSwitch.isPresent()) {
                         psBuilder = new PhysicalSwitchAugmentationBuilder(existingSwitch.get());
                     }
                     psBuilder.setTunnelIps(phySwitchAdded.getTunnelIps());
-                    tx.put(LogicalDatastoreType.CONFIGURATION, identifier, psBuilder.build(), true);
+                    tx.put(identifier, psBuilder.build(), true);
                     LOG.trace("Updating config tunnel ips {}", identifier);
                 }), LOG, "Failed to update the config tunnel ips {}", identifier);
         }
index 0c0929b1e6b5b48516e0eebbc55b4e7f7faf414c..1717446ea4499c0906e4623cea0b443cc8815f2c 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.netvirt.elan.l2gw.listeners;
 
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
+
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -147,9 +149,9 @@ public class HwvtepTerminationPointListener
 
     private List<ListenableFuture<Void>> handlePortDeleted(InstanceIdentifier<TerminationPoint> identifier) {
         InstanceIdentifier<Node> psNodeIid = identifier.firstIdentifierOf(Node.class);
-        return Collections.singletonList(txRunner.callWithNewReadWriteTransactionAndSubmit(
-            tx -> tx.read(LogicalDatastoreType.CONFIGURATION, psNodeIid).checkedGet().toJavaUtil().ifPresent(
-                node -> tx.delete(LogicalDatastoreType.CONFIGURATION, identifier))));
+        return Collections.singletonList(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION,
+            tx -> tx.read(psNodeIid).get().toJavaUtil().ifPresent(
+                node -> tx.delete(identifier))));
     }
 
     private List<VlanBindings> getVlanBindings(List<L2gatewayConnection> l2GwConns, NodeId hwvtepNodeId, String psName,
index be13492035d6cb9ae7199d2c5d1039e96f0209d3..91822101af87712d33eff8fce4cd18d7041e0e86 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.netvirt.elan.l2gw.utils;
 
 import static java.util.Collections.emptyList;
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
 import static org.opendaylight.netvirt.elan.utils.ElanUtils.isVxlanNetworkOrVxlanSegment;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -116,8 +117,8 @@ public class ElanL2GatewayMulticastUtils {
      */
     public ListenableFuture<Void> handleMcastForElanL2GwDeviceAdd(String elanName, L2GatewayDevice device) {
         InstanceIdentifier<ExternalTeps> tepPath = buildExternalTepPath(elanName, device.getTunnelIp());
-        JdkFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
-            tx.put(LogicalDatastoreType.CONFIGURATION, tepPath, buildExternalTeps(device));
+        JdkFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
+            tx.put(tepPath, buildExternalTeps(device));
         }), LOG, "Failed to write to config external tep {}", tepPath);
         return updateMcastMacsForAllElanDevices(elanName, device, true/* updateThisDevice */);
     }
index 9c44bab10e83566e170a862f4429f486b46f832d..3aab5e93e8a60995edad6a6d2d0888222fed9b73 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.netvirt.elan.l2gw.utils;
 
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
+
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
@@ -1048,16 +1050,17 @@ public class ElanL2GatewayUtils {
                 return;
             }
             JdkFutures.addErrorLogging(
-                new ManagedNewTransactionRunnerImpl(dataBroker).callWithNewReadWriteTransactionAndSubmit(tx -> {
-                    optionalElan.get().getElanInstance().stream()
+                new ManagedNewTransactionRunnerImpl(dataBroker).callWithNewReadWriteTransactionAndSubmit(CONFIGURATION,
+                    tx -> {
+                        optionalElan.get().getElanInstance().stream()
                         .flatMap(elan -> elan.getExternalTeps().stream()
                                 .map(externalTep -> ElanL2GatewayMulticastUtils.buildExternalTepPath(
                                         elan.getElanInstanceName(), externalTep.getTepIp())))
                         .filter(externalTepIid -> Objects.equals(
                                 deviceVteps.getIpAddress(), externalTepIid.firstKeyOf(ExternalTeps.class).getTepIp()))
                         .peek(externalTepIid -> LOG.info("Deleting stale external tep {}", externalTepIid))
-                        .forEach(externalTepIid -> tx.delete(LogicalDatastoreType.CONFIGURATION, externalTepIid));
-                }), LOG, "Failed to delete stale external teps {}", deviceVteps);
+                        .forEach(externalTepIid -> tx.delete(externalTepIid));
+                    }), LOG, "Failed to delete stale external teps {}", deviceVteps);
             Thread.sleep(10000);//TODO remove the sleep currently it waits for interfacemgr to finish the cleanup
         } catch (ReadFailedException | InterruptedException e) {
             LOG.error("Failed to delete stale l2gw tep {}", deviceVteps, e);
index 9c21b62e61226c52c28c8425e9e3f802398a0053..660deb9c07365f14390c31cb2b94e4f409726ac2 100644 (file)
@@ -7,11 +7,12 @@
  */
 package org.opendaylight.netvirt.elan.recovery.impl;
 
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
+
 import java.util.concurrent.ExecutionException;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.utils.clustering.EntityOwnershipUtils;
@@ -61,11 +62,11 @@ public class ElanInterfaceRecoveryHandler implements ServiceRecoveryInterface {
                     .getElanInterfaceConfigurationDataPathId(entityId);
             try {
                 LOG.trace("deleting elan interface {}", entityId);
-                txRunner.callWithNewWriteOnlyTransactionAndSubmit(
-                    tx -> tx.delete(LogicalDatastoreType.CONFIGURATION, elanInterfaceId)).get();
+                txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
+                    tx -> tx.delete(elanInterfaceId)).get();
                 LOG.trace("recreating elan interface {}, {}", entityId, elanInterface);
-                txRunner.callWithNewWriteOnlyTransactionAndSubmit(
-                    tx -> tx.put(LogicalDatastoreType.CONFIGURATION, elanInterfaceId, elanInterface)).get();
+                txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
+                    tx -> tx.put(elanInterfaceId, elanInterface)).get();
             } catch (InterruptedException | ExecutionException e) {
                 LOG.error("Service recovery failed for elan interface {}", entityId, e);
             }
index 26f7e719777cada38c409c032e7202f1f92700ec..f3a90f497618a58298436bde2f33e0fd4480344b 100644 (file)
@@ -7,12 +7,17 @@
  */
 package org.opendaylight.netvirt.elan.utils;
 
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
+
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import java.util.ArrayList;
 import java.util.List;
+
 import javax.inject.Inject;
 import javax.inject.Singleton;
+
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -78,13 +83,14 @@ public class ElanForwardingEntriesHandler {
         createElanInterfaceForwardingTablesList(interfaceName, macEntry, tx);
     }
 
-    public void deleteElanInterfaceForwardingTablesList(String interfaceName, MacEntry mac, WriteTransaction tx) {
+    public void deleteElanInterfaceForwardingTablesList(String interfaceName, MacEntry mac,
+                                                        WriteTransaction interfaceTx) {
         InstanceIdentifier<MacEntry> existingMacEntryId = ElanUtils
                 .getInterfaceMacEntriesIdentifierOperationalDataPath(interfaceName, mac.getMacAddress());
         MacEntry existingInterfaceMacEntry = elanUtils
                 .getInterfaceMacEntriesOperationalDataPathFromId(existingMacEntryId);
         if (existingInterfaceMacEntry != null) {
-            tx.delete(LogicalDatastoreType.OPERATIONAL, existingMacEntryId);
+            interfaceTx.delete(LogicalDatastoreType.OPERATIONAL, existingMacEntryId);
         }
     }
 
@@ -132,7 +138,7 @@ public class ElanForwardingEntriesHandler {
                     .getMacEntryOperationalDataPath(elanInfo.getElanInstanceName(), macEntry.getMacAddress());
             interfaceTx.delete(LogicalDatastoreType.OPERATIONAL, macEntryId);
             deleteElanInterfaceForwardingTablesList(interfaceInfo.getInterfaceName(), macEntry, interfaceTx);
-            futures.add(txRunner.callWithNewWriteOnlyTransactionAndSubmit(flowTx -> {
+            futures.add(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, flowTx -> {
                 elanUtils.deleteMacFlows(elanInfo, interfaceInfo, macEntry, flowTx);
             }));
         }));
index c76535a65328409526b2a2de301b75fb7ef29c0f..f6145e3a24498d988932feda62b049f1760d8053 100755 (executable)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.netvirt.elan.utils;
 
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
+
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -17,6 +19,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
+
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -24,11 +27,13 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+
 import javax.annotation.CheckReturnValue;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.inject.Inject;
 import javax.inject.Singleton;
+
 import org.apache.commons.lang3.StringUtils;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
@@ -37,8 +42,10 @@ import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.genius.infra.Datastore.Configuration;
 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.genius.infra.TypedReadWriteTransaction;
 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
 import org.opendaylight.genius.interfacemanager.globals.InterfaceServiceUtil;
 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
@@ -1041,18 +1048,18 @@ public class ElanUtils {
     }
 
     public void deleteMacFlows(@Nullable ElanInstance elanInfo, @Nullable InterfaceInfo interfaceInfo,
-            MacEntry macEntry, WriteTransaction deleteFlowGroupTx) {
+            MacEntry macEntry, TypedReadWriteTransaction<Configuration> flowTx) {
         if (elanInfo == null || interfaceInfo == null) {
             return;
         }
         String macAddress = macEntry.getMacAddress().getValue();
         synchronized (getElanMacDPNKey(elanInfo.getElanTag(), macAddress, interfaceInfo.getDpId())) {
-            deleteMacFlows(elanInfo, interfaceInfo, macAddress, /* alsoDeleteSMAC */ true, deleteFlowGroupTx);
+            deleteMacFlows(elanInfo, interfaceInfo, macAddress, /* alsoDeleteSMAC */ true, flowTx);
         }
     }
 
     public void deleteMacFlows(ElanInstance elanInfo, InterfaceInfo interfaceInfo, String macAddress,
-            boolean deleteSmac, WriteTransaction deleteFlowGroupTx) {
+            boolean deleteSmac, TypedReadWriteTransaction<Configuration> flowTx) {
         String elanInstanceName = elanInfo.getElanInstanceName();
         List<DpnInterfaces> remoteFEs = getInvolvedDpnsInElan(elanInstanceName);
         BigInteger srcdpId = interfaceInfo.getDpId();
@@ -1061,40 +1068,39 @@ public class ElanUtils {
             Long elanTag = elanInfo.getElanTag();
             BigInteger dstDpId = dpnInterface.getDpId();
             if (executeDeleteMacFlows(elanInfo, interfaceInfo, macAddress, deleteSmac, elanInstanceName, srcdpId,
-                    elanTag, dstDpId, deleteFlowGroupTx)) {
+                    elanTag, dstDpId, flowTx)) {
                 isFlowsRemovedInSrcDpn = true;
             }
             executeEtreeDeleteMacFlows(elanInfo, interfaceInfo, macAddress, deleteSmac, elanInstanceName, srcdpId,
-                    elanTag, dstDpId, deleteFlowGroupTx);
+                    elanTag, dstDpId, flowTx);
         }
         if (!isFlowsRemovedInSrcDpn) {
-            deleteSmacAndDmacFlows(elanInfo, interfaceInfo, macAddress, deleteSmac, deleteFlowGroupTx);
+            deleteSmacAndDmacFlows(elanInfo, interfaceInfo, macAddress, deleteSmac, flowTx);
         }
     }
 
     private void executeEtreeDeleteMacFlows(ElanInstance elanInfo, InterfaceInfo interfaceInfo, String macAddress,
             boolean deleteSmac, String elanInstanceName, BigInteger srcdpId, Long elanTag, BigInteger dstDpId,
-            WriteTransaction deleteFlowGroupTx) {
+            TypedReadWriteTransaction<Configuration> flowTx) {
         EtreeLeafTagName etreeLeafTag = elanEtreeUtils.getEtreeLeafTagByElanTag(elanTag);
         if (etreeLeafTag != null) {
             executeDeleteMacFlows(elanInfo, interfaceInfo, macAddress, deleteSmac, elanInstanceName, srcdpId,
-                    etreeLeafTag.getEtreeLeafTag().getValue(), dstDpId, deleteFlowGroupTx);
+                    etreeLeafTag.getEtreeLeafTag().getValue(), dstDpId, flowTx);
         }
     }
 
     private boolean executeDeleteMacFlows(ElanInstance elanInfo, InterfaceInfo interfaceInfo, String macAddress,
             boolean deleteSmac, String elanInstanceName, BigInteger srcdpId, Long elanTag, BigInteger dstDpId,
-            WriteTransaction deleteFlowGroupTx) {
+            TypedReadWriteTransaction<Configuration> flowTx) {
         boolean isFlowsRemovedInSrcDpn = false;
         if (dstDpId.equals(srcdpId)) {
             isFlowsRemovedInSrcDpn = true;
-            deleteSmacAndDmacFlows(elanInfo, interfaceInfo, macAddress, deleteSmac, deleteFlowGroupTx);
+            deleteSmacAndDmacFlows(elanInfo, interfaceInfo, macAddress, deleteSmac, flowTx);
         } else if (isDpnPresent(dstDpId)) {
             mdsalManager
-                    .removeFlowToTx(dstDpId,
+                    .removeFlow(flowTx, dstDpId,
                             MDSALUtil.buildFlow(NwConstants.ELAN_DMAC_TABLE, getKnownDynamicmacFlowRef(
-                                    NwConstants.ELAN_DMAC_TABLE, dstDpId, srcdpId, macAddress, elanTag)),
-                            deleteFlowGroupTx);
+                                    NwConstants.ELAN_DMAC_TABLE, dstDpId, srcdpId, macAddress, elanTag)));
             LOG.debug("Dmac flow entry deleted for elan:{}, logical interface port:{} and mac address:{} on dpn:{}",
                     elanInstanceName, interfaceInfo.getPortName(), macAddress, dstDpId);
         }
@@ -1102,22 +1108,20 @@ public class ElanUtils {
     }
 
     private void deleteSmacAndDmacFlows(ElanInstance elanInfo, InterfaceInfo interfaceInfo, String macAddress,
-            boolean deleteSmac, WriteTransaction deleteFlowGroupTx) {
+            boolean deleteSmac, TypedReadWriteTransaction<Configuration> flowTx) {
         String elanInstanceName = elanInfo.getElanInstanceName();
         long ifTag = interfaceInfo.getInterfaceTag();
         BigInteger srcdpId = interfaceInfo.getDpId();
         Long elanTag = elanInfo.getElanTag();
         if (deleteSmac) {
             mdsalManager
-                    .removeFlowToTx(srcdpId,
+                    .removeFlow(flowTx, srcdpId,
                             MDSALUtil.buildFlow(NwConstants.ELAN_SMAC_TABLE, getKnownDynamicmacFlowRef(
-                                    NwConstants.ELAN_SMAC_TABLE, srcdpId, ifTag, macAddress, elanTag)),
-                            deleteFlowGroupTx);
+                                    NwConstants.ELAN_SMAC_TABLE, srcdpId, ifTag, macAddress, elanTag)));
         }
-        mdsalManager.removeFlowToTx(srcdpId,
+        mdsalManager.removeFlow(flowTx, srcdpId,
                 MDSALUtil.buildFlow(NwConstants.ELAN_DMAC_TABLE,
-                        getKnownDynamicmacFlowRef(NwConstants.ELAN_DMAC_TABLE, srcdpId, ifTag, macAddress, elanTag)),
-                deleteFlowGroupTx);
+                        getKnownDynamicmacFlowRef(NwConstants.ELAN_DMAC_TABLE, srcdpId, ifTag, macAddress, elanTag)));
         LOG.debug("All the required flows deleted for elan:{}, logical Interface port:{} and MAC address:{} on dpn:{}",
                 elanInstanceName, interfaceInfo.getPortName(), macAddress, srcdpId);
     }
@@ -1459,9 +1463,9 @@ public class ElanUtils {
     public void addDmacRedirectToDispatcherFlows(Long elanTag, String displayName,
             String macAddress, List<BigInteger> dpnIds) {
         for (BigInteger dpId : dpnIds) {
-            ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(
-                tx -> mdsalManager.addFlowToTx(
-                        buildDmacRedirectToDispatcherFlow(dpId, macAddress, displayName, elanTag), tx)), LOG,
+            ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
+                tx -> mdsalManager.addFlow(tx, buildDmacRedirectToDispatcherFlow(dpId, macAddress, displayName,
+                        elanTag))), LOG,
                 "Error adding DMAC redirect to dispatcher flows");
         }
     }