Datastore txes : fibmanager 87/73887/9
authorStephen Kitt <skitt@redhat.com>
Tue, 10 Jul 2018 11:59:52 +0000 (13:59 +0200)
committerMichael Vorburger <vorburger@redhat.com>
Tue, 31 Jul 2018 11:57:53 +0000 (13:57 +0200)
This switches fibmanager to datastore-constrained transactions, except
where they conflict with the resource batching manager.

This requires Ic63fd992b33e9e1cd4049bc129545a9830b7a8c0.

JIRA: NETVIRT-1340
Change-Id: Ia766e6089ed3fd8b70f0a3f03e636ac1000c2cbb
Signed-off-by: Stephen Kitt <skitt@redhat.com>
fibmanager/impl/src/main/java/org/opendaylight/netvirt/fibmanager/EvpnVrfEntryHandler.java
fibmanager/impl/src/main/java/org/opendaylight/netvirt/fibmanager/FibUtil.java
fibmanager/impl/src/main/java/org/opendaylight/netvirt/fibmanager/NexthopManager.java
fibmanager/impl/src/main/java/org/opendaylight/netvirt/fibmanager/VrfEntryListener.java
vpnmanager/api/src/main/java/org/opendaylight/netvirt/vpnmanager/api/VpnExtraRouteHelper.java

index 667ac5e7dee7ca249bce4c6ebafdae576dd8b482..fcd979ae80fbc99c07cad8dc4233eb93c842ac0f 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.netvirt.fibmanager;
 
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
+
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import java.math.BigInteger;
@@ -86,14 +88,15 @@ public class EvpnVrfEntryHandler extends BaseVrfEntryHandler implements IVrfEntr
                     rd, vrfEntry.getDestPrefix(), elanTag);
             if (vpnToDpnList != null) {
                 jobCoordinator.enqueueJob("FIB-" + rd + "-" + vrfEntry.getDestPrefix(),
-                    () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
-                        for (final VpnToDpnList curDpn : vpnToDpnList) {
-                            if (curDpn.getDpnState() == VpnToDpnList.DpnState.Active) {
-                                vrfEntryListener.installSubnetRouteInFib(curDpn.getDpnId(), elanTag, rd,
+                    () -> Collections.singletonList(
+                        txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
+                            for (final VpnToDpnList curDpn : vpnToDpnList) {
+                                if (curDpn.getDpnState() == VpnToDpnList.DpnState.Active) {
+                                    vrfEntryListener.installSubnetRouteInFib(curDpn.getDpnId(), elanTag, rd,
                                         vpnId, vrfEntry, tx);
+                                }
                             }
-                        }
-                    })));
+                        })));
             }
             return;
         }
index e73c228f2ff7d0b852d4337fd6b4503769d76d67..44506e0a88ff40ed3f2816e45413fe7818218436 100644 (file)
@@ -28,7 +28,10 @@ import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
-import org.opendaylight.genius.infra.Datastore;
+import org.opendaylight.genius.infra.Datastore.Configuration;
+import org.opendaylight.genius.infra.Datastore.Operational;
+import org.opendaylight.genius.infra.TypedReadTransaction;
+import org.opendaylight.genius.infra.TypedReadWriteTransaction;
 import org.opendaylight.genius.infra.TypedWriteTransaction;
 import org.opendaylight.genius.mdsalutil.BucketInfo;
 import org.opendaylight.genius.mdsalutil.MDSALUtil;
@@ -201,6 +204,11 @@ public class FibUtil {
         return MDSALUtil.read(dataBroker, LogicalDatastoreType.OPERATIONAL, id);
     }
 
+    static Optional<VpnInstanceOpDataEntry> getVpnInstanceOpData(TypedReadTransaction<Operational> operTx, String rd)
+        throws ExecutionException, InterruptedException {
+        return operTx.read(getVpnInstanceOpDataIdentifier(rd)).get();
+    }
+
     VpnInstanceOpDataEntry getVpnInstance(String rd) {
         InstanceIdentifier<VpnInstanceOpDataEntry> id =
                 InstanceIdentifier.create(VpnInstanceOpData.class)
@@ -220,6 +228,11 @@ public class FibUtil {
         return localNextHopInfoData.isPresent() ? localNextHopInfoData.get() : null;
     }
 
+    static Prefixes getPrefixToInterface(TypedReadTransaction<Operational> operTx, Long vpnId, String ipPrefix)
+            throws ExecutionException, InterruptedException {
+        return operTx.read(getPrefixToInterfaceIdentifier(vpnId, ipPrefix)).get().orNull();
+    }
+
     String getMacAddressFromPrefix(String ifName, String vpnName, String ipPrefix) {
         Optional<Adjacency> adjacencyData = MDSALUtil.read(dataBroker, LogicalDatastoreType.OPERATIONAL,
                        getAdjacencyIdentifierOp(ifName, vpnName, ipPrefix));
@@ -502,7 +515,7 @@ public class FibUtil {
         }
     }
 
-    public void removeVrfTable(String rd, TypedWriteTransaction<Datastore.Configuration> writeConfigTxn) {
+    public void removeVrfTable(String rd, TypedWriteTransaction<Configuration> writeConfigTxn) {
         LOG.debug("Removing vrf table for rd {}", rd);
         InstanceIdentifier.InstanceIdentifierBuilder<VrfTables> idBuilder =
             InstanceIdentifier.builder(FibEntries.class).child(VrfTables.class, new VrfTablesKey(rd));
@@ -582,37 +595,34 @@ public class FibUtil {
         return "FIB-" + vpnId.toString() + "-" + dpnId.toString() ;
     }
 
-    public void updateUsedRdAndVpnToExtraRoute(WriteTransaction writeConfigTxn, WriteTransaction writeOperTxn,
-                                               String tunnelIpRemoved, String primaryRd, String prefix) {
-        Optional<VpnInstanceOpDataEntry> optVpnInstance = getVpnInstanceOpData(primaryRd);
+    public void updateUsedRdAndVpnToExtraRoute(TypedReadWriteTransaction<Configuration> confTx,
+            TypedReadWriteTransaction<Operational> operTx, String tunnelIpRemoved, String primaryRd, String prefix)
+            throws ExecutionException, InterruptedException {
+        Optional<VpnInstanceOpDataEntry> optVpnInstance = getVpnInstanceOpData(operTx, primaryRd);
         if (!optVpnInstance.isPresent()) {
             return;
         }
         VpnInstanceOpDataEntry vpnInstance = optVpnInstance.get();
         String vpnName = vpnInstance.getVpnInstanceName();
         long vpnId = vpnInstance.getVpnId();
-        List<String> usedRds = VpnExtraRouteHelper.getUsedRds(dataBroker, vpnId, prefix);
+        List<String> usedRds = VpnExtraRouteHelper.getUsedRds(confTx, vpnId, prefix);
         // To identify the rd to be removed, iterate through the allocated rds for the prefix and check
         // which rd is allocated for the particular OVS.
         for (String usedRd : usedRds) {
-            Optional<Routes> vpnExtraRoutes = VpnExtraRouteHelper
-                    .getVpnExtraroutes(dataBroker, vpnName, usedRd, prefix);
+            Optional<Routes> vpnExtraRoutes = VpnExtraRouteHelper.getVpnExtraroutes(operTx, vpnName, usedRd, prefix);
             if (vpnExtraRoutes.isPresent()) {
                 // Since all the nexthops under one OVS will be present under one rd, only 1 nexthop is read
                 // to identify the OVS
                 String nextHopRemoved = vpnExtraRoutes.get().getNexthopIpList().get(0);
-                Prefixes prefixToInterface = getPrefixToInterface(vpnId, getIpPrefix(nextHopRemoved));
+                Prefixes prefixToInterface = getPrefixToInterface(operTx, vpnId, getIpPrefix(nextHopRemoved));
                 if (prefixToInterface != null && tunnelIpRemoved
                         .equals(getEndpointIpAddressForDPN(prefixToInterface.getDpnId()))) {
                     LOG.info("updating data-stores for prefix {} with primaryRd {} for interface {} on vpn {} ",
                             prefix, primaryRd, prefixToInterface.getVpnInterfaceName(), vpnName);
-                    writeOperTxn.delete(LogicalDatastoreType.OPERATIONAL,
-                            FibUtil.getAdjacencyIdentifierOp(prefixToInterface.getVpnInterfaceName(),
+                    operTx.delete(FibUtil.getAdjacencyIdentifierOp(prefixToInterface.getVpnInterfaceName(),
                                     vpnName, prefix));
-                    writeOperTxn.delete(LogicalDatastoreType.OPERATIONAL,
-                            VpnExtraRouteHelper.getVpnToExtrarouteVrfIdIdentifier(vpnName, usedRd, prefix));
-                    writeConfigTxn.delete(LogicalDatastoreType.CONFIGURATION,
-                            VpnExtraRouteHelper.getUsedRdsIdentifier(vpnId, prefix, nextHopRemoved));
+                    operTx.delete(VpnExtraRouteHelper.getVpnToExtrarouteVrfIdIdentifier(vpnName, usedRd, prefix));
+                    confTx.delete(VpnExtraRouteHelper.getUsedRdsIdentifier(vpnId, prefix, nextHopRemoved));
                     break;
                 }
             }
@@ -667,33 +677,33 @@ public class FibUtil {
     }
 
     public static void updateLbGroupInfo(BigInteger dpnId, String destinationIp, String groupIdKey,
-            String groupId, WriteTransaction tx) {
+            String groupId, TypedWriteTransaction<Operational> tx) {
         InstanceIdentifier<DpnLbNexthops> id = getDpnLbNexthopsIdentifier(dpnId, destinationIp);
         DpnLbNexthops dpnToLbNextHop = buildDpnLbNextHops(dpnId, destinationIp, groupIdKey);
-        tx.merge(LogicalDatastoreType.OPERATIONAL, id, dpnToLbNextHop);
+        tx.merge(id, dpnToLbNextHop);
         InstanceIdentifier<Nexthops> nextHopsId = getNextHopsIdentifier(groupIdKey);
         Nexthops nextHopsToGroupId = buildNextHops(dpnId, groupIdKey, groupId);
-        tx.merge(LogicalDatastoreType.OPERATIONAL, nextHopsId, nextHopsToGroupId);
+        tx.merge(nextHopsId, nextHopsToGroupId);
     }
 
-    public static void removeDpnIdToNextHopInfo(String destinationIp, BigInteger dpnId, WriteTransaction tx) {
-        InstanceIdentifier<DpnLbNexthops> id = getDpnLbNexthopsIdentifier(dpnId, destinationIp);
-        tx.delete(LogicalDatastoreType.OPERATIONAL, id);
+    public static void removeDpnIdToNextHopInfo(String destinationIp, BigInteger dpnId,
+            TypedWriteTransaction<Operational> tx) {
+        tx.delete(getDpnLbNexthopsIdentifier(dpnId, destinationIp));
     }
 
     public static void removeOrUpdateNextHopInfo(BigInteger dpnId, String nextHopKey, String groupId,
-            Nexthops nexthops, WriteTransaction tx) {
+            Nexthops nexthops, TypedWriteTransaction<Operational> tx) {
         InstanceIdentifier<Nexthops> nextHopsId = getNextHopsIdentifier(nextHopKey);
         List<String> targetDeviceIds = nexthops.getTargetDeviceId();
         targetDeviceIds.remove(dpnId.toString());
         if (targetDeviceIds.isEmpty()) {
-            tx.delete(LogicalDatastoreType.OPERATIONAL, nextHopsId);
+            tx.delete(nextHopsId);
         } else {
             Nexthops nextHopsToGroupId = new NexthopsBuilder().withKey(new NexthopsKey(nextHopKey))
                 .setNexthopKey(nextHopKey)
                 .setGroupId(groupId)
                 .setTargetDeviceId(targetDeviceIds).build();
-            tx.put(LogicalDatastoreType.OPERATIONAL, nextHopsId, nextHopsToGroupId);
+            tx.put(nextHopsId, nextHopsToGroupId);
         }
     }
 
index 07d77ca12e75679103aafc663c3bcaee06caef8c..39a9bcd9312465d1f3ec3d0890a4cb5fe10ad187 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.netvirt.fibmanager;
 
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
+import static org.opendaylight.genius.infra.Datastore.OPERATIONAL;
 import static org.opendaylight.genius.mdsalutil.NWUtil.isIpv4Address;
 
 import com.google.common.base.Optional;
@@ -993,12 +995,11 @@ public class NexthopManager implements AutoCloseable {
         }
         Group group = MDSALUtil.buildGroup(groupId, groupIdKey, GroupTypes.GroupSelect,
                         MDSALUtil.buildBucketLists(listBucket));
-        ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(confTx -> {
-            mdsalApiManager.addGroupToTx(dpnId, group, confTx);
-        }), LOG, "Error adding load-balancing group");
-        ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(operTx -> {
-            FibUtil.updateLbGroupInfo(dpnId, destinationIp, groupIdKey, groupId.toString(), operTx);
-        }), LOG, "Error updating load-balancing group info");
+        ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
+            confTx -> mdsalApiManager.addGroup(confTx, dpnId, group)), LOG, "Error adding load-balancing group");
+        ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(OPERATIONAL,
+            operTx -> FibUtil.updateLbGroupInfo(dpnId, destinationIp, groupIdKey, groupId.toString(), operTx)), LOG,
+            "Error updating load-balancing group info");
         LOG.trace("LB group {} towards DC-GW installed on dpn {}. Group - {}", groupIdKey, dpnId, group);
     }
 
@@ -1043,8 +1044,8 @@ public class NexthopManager implements AutoCloseable {
     public void removeOrUpdateDcGwLoadBalancingGroup(List<String> availableDcGws, BigInteger dpnId,
             String destinationIp) {
         Preconditions.checkNotNull(availableDcGws, "There are no dc-gws present");
-        ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(confTx -> {
-            ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(operTx -> {
+        ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, confTx -> {
+            ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(OPERATIONAL, operTx -> {
                 int noOfDcGws = availableDcGws.size();
                 // If availableDcGws does not contain the destination Ip it means this is a configuration delete.
                 if (!availableDcGws.contains(destinationIp)) {
@@ -1058,7 +1059,7 @@ public class NexthopManager implements AutoCloseable {
                     return;
                 }
                 List<String> nextHopKeys = dpnLbNextHops.get().getNexthopKey();
-                nextHopKeys.forEach(nextHopKey -> {
+                for (String nextHopKey : nextHopKeys) {
                     Optional<Nexthops> optionalNextHops = fibUtil.getNexthops(nextHopKey);
                     if (!optionalNextHops.isPresent()) {
                         return;
@@ -1067,19 +1068,17 @@ public class NexthopManager implements AutoCloseable {
                     final String groupId = nexthops.getGroupId();
                     final long groupIdValue = Long.parseLong(groupId);
                     if (noOfDcGws > 1) {
-                        mdsalApiManager.removeBucketToTx(dpnId, groupIdValue, bucketId, confTx);
+                        mdsalApiManager.removeBucket(confTx, dpnId, groupIdValue, bucketId);
                     } else {
-                        Group group = MDSALUtil.buildGroup(groupIdValue, nextHopKey, GroupTypes.GroupSelect,
-                                MDSALUtil.buildBucketLists(Collections.emptyList()));
-                        LOG.trace("Removed LB group {} on dpn {}", group, dpnId);
-                        mdsalApiManager.removeGroupToTx(dpnId, group, confTx);
+                        LOG.trace("Removed LB group {} on dpn {}", groupIdValue, dpnId);
+                        mdsalApiManager.removeGroup(confTx, dpnId, groupIdValue);
                         removeNextHopPointer(nextHopKey);
                     }
                     // When the DC-GW is removed from configuration.
                     if (noOfDcGws != availableDcGws.size()) {
                         FibUtil.removeOrUpdateNextHopInfo(dpnId, nextHopKey, groupId, nexthops, operTx);
                     }
-                });
+                }
                 FibUtil.removeDpnIdToNextHopInfo(destinationIp, dpnId, operTx);
             }), LOG, "Error removing or updating load-balancing group");
         }), LOG, "Error removing or updating load-balancing group");
@@ -1092,7 +1091,7 @@ public class NexthopManager implements AutoCloseable {
     public void updateDcGwLoadBalancingGroup(List<String> availableDcGws,
             BigInteger dpnId, String destinationIp, boolean isTunnelUp, Class<? extends TunnelTypeBase> tunnelType) {
         Preconditions.checkNotNull(availableDcGws, "There are no dc-gws present");
-        ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(confTx -> {
+        ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, confTx -> {
             // TODO : Place the logic to construct all possible DC-GW combination here.
             int bucketId = availableDcGws.indexOf(destinationIp);
             Optional<DpnLbNexthops> dpnLbNextHops = fibUtil.getDpnLbNexthops(dpnId, destinationIp);
@@ -1100,7 +1099,7 @@ public class NexthopManager implements AutoCloseable {
                 return;
             }
             List<String> nextHopKeys = dpnLbNextHops.get().getNexthopKey();
-            nextHopKeys.forEach(nextHopKey -> {
+            for (String nextHopKey : nextHopKeys) {
                 Optional<Nexthops> optionalNextHops = fibUtil.getNexthops(nextHopKey);
                 if (!optionalNextHops.isPresent()) {
                     return;
@@ -1111,12 +1110,12 @@ public class NexthopManager implements AutoCloseable {
                 if (isTunnelUp) {
                     Bucket bucket = buildBucketForDcGwLbGroup(destinationIp, dpnId, bucketId, tunnelType);
                     LOG.trace("Added bucket {} to group {} on dpn {}.", bucket, groupId, dpnId);
-                    mdsalApiManager.addBucketToTx(dpnId, groupIdValue, bucket , confTx);
+                    mdsalApiManager.addBucket(confTx, dpnId, groupIdValue, bucket);
                 } else {
                     LOG.trace("Removed bucketId {} from group {} on dpn {}.", bucketId, groupId, dpnId);
-                    mdsalApiManager.removeBucketToTx(dpnId, groupIdValue, bucketId, confTx);
+                    mdsalApiManager.removeBucket(confTx, dpnId, groupIdValue, bucketId);
                 }
-            });
+            }
         }), LOG, "Error updating load-balancing group");
     }
 
index a85972866d678d4ec3fd9657a81fe89ee67e4883..6003a11c26f1530128f54eda0fc9fd3cfb0c7e37 100755 (executable)
@@ -7,6 +7,9 @@
  */
 package org.opendaylight.netvirt.fibmanager;
 
+import static org.opendaylight.controller.md.sal.binding.api.WriteTransaction.CREATE_MISSING_PARENTS;
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
+import static org.opendaylight.genius.infra.Datastore.OPERATIONAL;
 import static org.opendaylight.genius.mdsalutil.NWUtil.isIpv4Address;
 
 import com.google.common.base.Optional;
@@ -26,19 +29,22 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
 import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
 import org.opendaylight.genius.datastoreutils.listeners.DataTreeEventCallbackRegistrar;
+import org.opendaylight.genius.infra.Datastore.Configuration;
+import org.opendaylight.genius.infra.Datastore.Operational;
 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.infra.RetryingManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.TransactionAdapter;
+import org.opendaylight.genius.infra.TypedReadWriteTransaction;
+import org.opendaylight.genius.infra.TypedWriteTransaction;
 import org.opendaylight.genius.mdsalutil.ActionInfo;
 import org.opendaylight.genius.mdsalutil.FlowEntity;
 import org.opendaylight.genius.mdsalutil.InstructionInfo;
@@ -328,11 +334,17 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
             nextHopsRemoved.removeAll(FibHelper.getNextHopListFromRoutePaths(update));
             List<ListenableFuture<Void>> futures = new ArrayList<>();
             ListenableFuture<Void> configFuture =
-                    txRunner.callWithNewWriteOnlyTransactionAndSubmit(configTx ->
-                            futures.add(txRunner.callWithNewWriteOnlyTransactionAndSubmit(operTx ->
-                                    nextHopsRemoved.parallelStream()
-                                            .forEach(nextHopRemoved -> fibUtil.updateUsedRdAndVpnToExtraRoute(
-                                                    configTx, operTx, nextHopRemoved, rd, update.getDestPrefix())))));
+                txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, configTx ->
+                    futures.add(txRunner.callWithNewReadWriteTransactionAndSubmit(OPERATIONAL, operTx ->
+                        nextHopsRemoved.parallelStream()
+                            .forEach(nextHopRemoved -> {
+                                try {
+                                    fibUtil.updateUsedRdAndVpnToExtraRoute(
+                                        configTx, operTx, nextHopRemoved, rd, update.getDestPrefix());
+                                } catch (ExecutionException | InterruptedException e) {
+                                    throw new RuntimeException(e);
+                                }
+                            }))));
             futures.add(configFuture);
             Futures.addCallback(configFuture, new FutureCallback<Void>() {
                 @Override
@@ -393,15 +405,16 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                     + " with elantag {}", rd, vrfEntry.getDestPrefix(), elanTag);
             if (vpnToDpnList != null) {
                 jobCoordinator.enqueueJob(FibUtil.getJobKeyForRdPrefix(rd, vrfEntry.getDestPrefix()),
-                    () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
-                        for (final VpnToDpnList curDpn : vpnToDpnList) {
-                            if (curDpn.getDpnState() == VpnToDpnList.DpnState.Active) {
-                                installSubnetRouteInFib(curDpn.getDpnId(), elanTag, rd, vpnId, vrfEntry, tx);
-                                installSubnetBroadcastAddrDropRule(curDpn.getDpnId(), rd, vpnId.longValue(),
+                    () -> Collections.singletonList(
+                        txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
+                            for (final VpnToDpnList curDpn : vpnToDpnList) {
+                                if (curDpn.getDpnState() == VpnToDpnList.DpnState.Active) {
+                                    installSubnetRouteInFib(curDpn.getDpnId(), elanTag, rd, vpnId, vrfEntry, tx);
+                                    installSubnetBroadcastAddrDropRule(curDpn.getDpnId(), rd, vpnId.longValue(),
                                         vrfEntry, NwConstants.ADD_FLOW, tx);
+                                }
                             }
-                        }
-                    })));
+                        })));
             }
             return;
         }
@@ -417,7 +430,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         final List<BigInteger> localDpnIdList = createLocalFibEntry(vpnInstance.getVpnId(), rd, vrfEntry, etherType);
         if (!localDpnIdList.isEmpty() && vpnToDpnList != null) {
             jobCoordinator.enqueueJob(FibUtil.getJobKeyForRdPrefix(rd, vrfEntry.getDestPrefix()),
-                () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+                () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
                     synchronized (vpnInstance.getVpnInstanceName().intern()) {
                         for (VpnToDpnList vpnDpn : vpnToDpnList) {
                             if (!localDpnIdList.contains(vpnDpn.getDpnId())) {
@@ -425,7 +438,8 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                                     try {
                                         if (RouteOrigin.BGP.getValue().equals(vrfEntry.getOrigin())) {
                                             bgpRouteVrfEntryHandler.createRemoteFibEntry(vpnDpn.getDpnId(),
-                                                    vpnId, vrfTableKey.getRouteDistinguisher(), vrfEntry, tx,
+                                                    vpnId, vrfTableKey.getRouteDistinguisher(), vrfEntry,
+                                                    TransactionAdapter.toWriteTransaction(tx),
                                                     txnObjects);
                                         } else {
                                             createRemoteFibEntry(vpnDpn.getDpnId(), vpnInstance.getVpnId(),
@@ -493,9 +507,9 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
     }
 
     void installSubnetRouteInFib(final BigInteger dpnId, final long elanTag, final String rd,
-                                         final long vpnId, final VrfEntry vrfEntry, WriteTransaction tx) {
+            final long vpnId, final VrfEntry vrfEntry, TypedWriteTransaction<Configuration> tx) {
         if (tx == null) {
-            ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(
+            ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
                 newTx -> installSubnetRouteInFib(dpnId, elanTag, rd, vpnId, vrfEntry, newTx)), LOG,
                 "Error installing subnet route in FIB");
             return;
@@ -534,7 +548,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         instructions.add(new InstructionWriteMetadata(subnetRouteMeta, MetaDataUtil.METADATA_MASK_SUBNET_ROUTE));
         instructions.add(new InstructionGotoTable(NwConstants.L3_SUBNET_ROUTE_TABLE));
         baseVrfEntryHandler.makeConnectedRoute(dpnId, vpnId, vrfEntry, rd, instructions,
-                NwConstants.ADD_FLOW, tx, null);
+                NwConstants.ADD_FLOW, TransactionAdapter.toWriteTransaction(tx), null);
         if (vrfEntry.getRoutePaths() != null) {
             for (RoutePaths routePath : vrfEntry.getRoutePaths()) {
                 if (RouteOrigin.value(vrfEntry.getOrigin()) != RouteOrigin.SELF_IMPORTED) {
@@ -555,7 +569,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
     }
 
     private void installSubnetBroadcastAddrDropRule(final BigInteger dpnId, final String rd, final long vpnId,
-                                                    final VrfEntry vrfEntry, int addOrRemove, WriteTransaction tx) {
+            final VrfEntry vrfEntry, int addOrRemove, TypedWriteTransaction<Configuration> tx) {
         List<MatchInfo> matches = new ArrayList<>();
 
         LOG.debug("SUBNETROUTE: installSubnetBroadcastAddrDropRule: destPrefix {} rd {} vpnId {} dpnId {}",
@@ -602,9 +616,9 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                 .child(Table.class, new TableKey(flow.getTableId())).child(Flow.class, flowKey).build();
 
         if (addOrRemove == NwConstants.ADD_FLOW) {
-            tx.put(LogicalDatastoreType.CONFIGURATION, flowInstanceId,flow, true);
+            tx.put(flowInstanceId,flow, true);
         } else {
-            tx.delete(LogicalDatastoreType.CONFIGURATION, flowInstanceId);
+            tx.delete(flowInstanceId);
         }
     }
 
@@ -881,10 +895,10 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
             java.util.Optional<Long> optLabel = FibUtil.getLabelFromRoutePaths(vrfEntry);
             List<String> nextHopAddressList = FibHelper.getNextHopListFromRoutePaths(vrfEntry);
             String jobKey = FibUtil.getCreateLocalNextHopJobKey(vpnId, dpnId, vrfEntry.getDestPrefix());
-            jobCoordinator.enqueueJob(jobKey, () -> {
-                return Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+            jobCoordinator.enqueueJob(jobKey,
+                () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
                     baseVrfEntryHandler.makeConnectedRoute(dpnId, vpnId, vrfEntry, rd, instructions,
-                            NwConstants.ADD_FLOW, tx, null);
+                            NwConstants.ADD_FLOW, TransactionAdapter.toWriteTransaction(tx), null);
                     if (FibUtil.isBgpVpn(vpnName, rd)) {
                         optLabel.ifPresent(label -> {
                             if (RouteOrigin.value(vrfEntry.getOrigin()) != RouteOrigin.SELF_IMPORTED) {
@@ -911,8 +925,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                             }
                         });
                     }
-                }));
-            });
+                })));
             return dpnId;
         }
         LOG.error("localNextHopInfo received is null for prefix {} on rd {} on vpn {}", vrfEntry.getDestPrefix(), rd,
@@ -939,7 +952,8 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         return null;
     }
 
-    private boolean deleteLabelRouteInfo(LabelRouteInfo lri, String vpnInstanceName, WriteTransaction tx) {
+    private boolean deleteLabelRouteInfo(LabelRouteInfo lri, String vpnInstanceName,
+            TypedWriteTransaction<Operational> tx) {
         if (lri == null) {
             return true;
         }
@@ -957,7 +971,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         if (vpnInstancesList.isEmpty()) {
             LOG.debug("deleting LRI instance object for label {}", lri.getLabel());
             if (tx != null) {
-                tx.delete(LogicalDatastoreType.OPERATIONAL, lriId);
+                tx.delete(lriId);
             } else {
                 MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.OPERATIONAL, lriId);
             }
@@ -971,7 +985,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
     }
 
     void makeTunnelTableEntry(BigInteger dpId, long label, long groupId/*String egressInterfaceName*/,
-                                      WriteTransaction tx) {
+                                      TypedWriteTransaction<Configuration> tx) {
         List<ActionInfo> actionsInfos = Collections.singletonList(new ActionGroup(groupId));
 
         createTerminatingServiceActions(dpId, (int) label, actionsInfos, tx);
@@ -981,7 +995,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
     }
 
     public void createTerminatingServiceActions(BigInteger destDpId, int label, List<ActionInfo> actionsInfos,
-                                                WriteTransaction tx) {
+                                                TypedWriteTransaction<Configuration> tx) {
         List<MatchInfo> mkMatches = new ArrayList<>();
 
         LOG.debug("create terminatingServiceAction on DpnId = {} and serviceId = {} and actions = {}",
@@ -1009,11 +1023,10 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
             .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
             .child(Table.class, new TableKey(terminatingServiceTableFlowEntity.getTableId()))
             .child(Flow.class, flowKey).build();
-        tx.put(LogicalDatastoreType.CONFIGURATION, flowInstanceId, flowbld.build(),
-                WriteTransaction.CREATE_MISSING_PARENTS);
+        tx.put(flowInstanceId, flowbld.build(), CREATE_MISSING_PARENTS);
     }
 
-    private void removeTunnelTableEntry(BigInteger dpId, long label, WriteTransaction tx) {
+    private void removeTunnelTableEntry(BigInteger dpId, long label, TypedWriteTransaction<Configuration> tx) {
         FlowEntity flowEntity;
         LOG.debug("remove terminatingServiceActions called with DpnId = {} and label = {}", dpId, label);
         List<MatchInfo> mkMatches = new ArrayList<>();
@@ -1030,7 +1043,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
             .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
             .child(Table.class, new TableKey(flowEntity.getTableId())).child(Flow.class, flowKey).build();
 
-        tx.delete(LogicalDatastoreType.CONFIGURATION, flowInstanceId);
+        tx.delete(flowInstanceId);
         LOG.debug("Terminating service Entry for dpID {} : label : {} removed successfully", dpId, label);
     }
 
@@ -1046,7 +1059,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                         vrfEntry.getDestPrefix(), vpnName, rd);
                 return returnLocalDpnId;
             }
-            String vpnRd = (!usedRds.isEmpty()) ? usedRds.get(0) : rd;
+            String vpnRd = !usedRds.isEmpty() ? usedRds.get(0) : rd;
             //Is this fib route an extra route? If yes, get the nexthop which would be an adjacency
             //in the vpn
             Optional<Routes> extraRouteOptional = VpnExtraRouteHelper.getVpnExtraroutes(dataBroker,
@@ -1129,9 +1142,9 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
             }
 
             jobCoordinator.enqueueJob(FibUtil.getCreateLocalNextHopJobKey(vpnId, dpnId, vrfEntry.getDestPrefix()),
-                () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+                () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
                     baseVrfEntryHandler.makeConnectedRoute(dpnId, vpnId, vrfEntry, rd, null,
-                            NwConstants.DEL_FLOW, tx, null);
+                            NwConstants.DEL_FLOW, TransactionAdapter.toWriteTransaction(tx), null);
                     if (FibUtil.isBgpVpn(vpnName, rd)) {
                         if (RouteOrigin.value(vrfEntry.getOrigin()) != RouteOrigin.SELF_IMPORTED) {
                             FibUtil.getLabelFromRoutePaths(vrfEntry).ifPresent(label -> {
@@ -1153,11 +1166,11 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
     }
 
     private void createRemoteFibEntry(final BigInteger remoteDpnId, final long vpnId, String rd,
-            final VrfEntry vrfEntry, WriteTransaction tx) {
+            final VrfEntry vrfEntry, TypedWriteTransaction<Configuration> tx) {
         if (tx == null) {
-            ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(newTx -> {
-                createRemoteFibEntry(remoteDpnId, vpnId, rd, vrfEntry, newTx);
-            }), LOG, "Error creating remote FIB entry");
+            ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
+                newTx -> createRemoteFibEntry(remoteDpnId, vpnId, rd, vrfEntry, newTx)), LOG,
+                "Error creating remote FIB entry");
             return;
         }
 
@@ -1203,9 +1216,10 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                     Collections.singletonList(new ActionGroup(groupId));
             instructions.add(new InstructionApplyActions(actionInfos));
             baseVrfEntryHandler.makeConnectedRoute(remoteDpnId, vpnId, vrfEntry, rd, instructions,
-                    NwConstants.ADD_FLOW, tx, null);
+                    NwConstants.ADD_FLOW, TransactionAdapter.toWriteTransaction(tx), null);
         } else {
-            baseVrfEntryHandler.programRemoteFib(remoteDpnId, vpnId, vrfEntry, tx, rd, adjacencyResults, null);
+            baseVrfEntryHandler.programRemoteFib(remoteDpnId, vpnId, vrfEntry,
+                TransactionAdapter.toWriteTransaction(tx), rd, adjacencyResults, null);
         }
 
         LOG.debug("Successfully added FIB entry for prefix {} in vpnId {}", vrfEntry.getDestPrefix(), vpnId);
@@ -1304,7 +1318,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         public List<ListenableFuture<Void>> call() {
             // If another renderer(for eg : CSS) needs to be supported, check can be performed here
             // to call the respective helpers.
-            return Collections.singletonList(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+            return Collections.singletonList(txRunner.callWithNewReadWriteTransactionAndSubmit(OPERATIONAL, tx -> {
                 //First Cleanup LabelRouteInfo
                 //TODO(KIRAN) : Move the below block when addressing iRT/eRT for L3VPN Over VxLan
                 LOG.debug("cleanupVpnInterfaceWorker: rd {} prefix {}", rd, prefixInfo.getIpAddress());
@@ -1339,8 +1353,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                 String vpnName = null;
 
                 if (Prefixes.PrefixCue.PhysNetFunc.equals(prefixInfo.getPrefixCue())) {
-                    /*Get vpnId for rd = networkId since op vpnInterface will be pointing to rd = networkId
-                    * */
+                    // Get vpnId for rd = networkId since op vpnInterface will be pointing to rd = networkId
                     Optional<String> vpnNameOpt = fibUtil.getVpnNameFromRd(vrfEntry.getParentVpnRd());
                     if (vpnNameOpt.isPresent()) {
                         vpnId = fibUtil.getVpnId(vpnNameOpt.get());
@@ -1348,9 +1361,8 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                 }
                 if (optVpnName.isPresent()) {
                     vpnName = optVpnName.get();
-                    Optional<VpnInterfaceOpDataEntry> opVpnInterface = MDSALUtil
-                            .read(dataBroker, LogicalDatastoreType.OPERATIONAL,
-                                    fibUtil.getVpnInterfaceOpDataEntryIdentifier(ifName, vpnName));
+                    Optional<VpnInterfaceOpDataEntry> opVpnInterface = tx
+                            .read(FibUtil.getVpnInterfaceOpDataEntryIdentifier(ifName, vpnName)).get();
                     if (opVpnInterface.isPresent()) {
                         long associatedVpnId = fibUtil.getVpnId(vpnName);
                         if (vpnId != associatedVpnId) {
@@ -1369,11 +1381,10 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                     //Only one used Rd present in case of removal event
                     String usedRd = usedRds.get(0);
                     if (optVpnName.isPresent()) {
-                        tx.delete(LogicalDatastoreType.OPERATIONAL,
-                                baseVrfEntryHandler.getVpnToExtrarouteIdentifier(vpnName, usedRd,
+                        tx.delete(BaseVrfEntryHandler.getVpnToExtrarouteIdentifier(vpnName, usedRd,
                                         vrfEntry.getDestPrefix()));
-                        tx.delete(LogicalDatastoreType.CONFIGURATION,
-                                VpnExtraRouteHelper.getUsedRdsIdentifier(vpnId, vrfEntry.getDestPrefix()));
+                        txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, configTx ->
+                            configTx.delete(VpnExtraRouteHelper.getUsedRdsIdentifier(vpnId, vrfEntry.getDestPrefix())));
                     }
                 }
                 handleAdjacencyAndVpnOpInterfaceDeletion(vrfEntry, ifName, vpnName, tx);
@@ -1392,26 +1403,23 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
      * @param ifName - Interface name from VRFentry
      * @param vpnName - VPN name of corresponding VRF
      * @param tx - ReadWrite Tx
-     * @throws ReadFailedException - Exception thrown in case of read failed
      */
     private void handleAdjacencyAndVpnOpInterfaceDeletion(VrfEntry vrfEntry, String ifName, String vpnName,
-                                                          ReadWriteTransaction tx) throws ReadFailedException {
+                                                          TypedReadWriteTransaction<Operational> tx)
+            throws ExecutionException, InterruptedException {
         InstanceIdentifier<Adjacency> adjacencyIid =
                 FibUtil.getAdjacencyIdentifierOp(ifName, vpnName, vrfEntry.getDestPrefix());
-        Optional<Adjacency> adjacencyOptional = tx.read(LogicalDatastoreType.OPERATIONAL, adjacencyIid).checkedGet();
+        Optional<Adjacency> adjacencyOptional = tx.read(adjacencyIid).get();
         if (adjacencyOptional.isPresent()) {
             if (adjacencyOptional.get().getAdjacencyType() != Adjacency.AdjacencyType.PrimaryAdjacency) {
-                tx.delete(LogicalDatastoreType.OPERATIONAL,
-                        FibUtil.getAdjacencyIdentifierOp(ifName, vpnName, vrfEntry.getDestPrefix()));
+                tx.delete(FibUtil.getAdjacencyIdentifierOp(ifName, vpnName, vrfEntry.getDestPrefix()));
             } else {
-                tx.merge(LogicalDatastoreType.OPERATIONAL, adjacencyIid,
+                tx.merge(adjacencyIid,
                         new AdjacencyBuilder(adjacencyOptional.get()).setMarkedForDeletion(true).build());
             }
         }
 
-        Optional<AdjacenciesOp> optAdjacencies =
-                tx.read(LogicalDatastoreType.OPERATIONAL,
-                        FibUtil.getAdjListPathOp(ifName, vpnName)).checkedGet();
+        Optional<AdjacenciesOp> optAdjacencies = tx.read(FibUtil.getAdjListPathOp(ifName, vpnName)).get();
 
         if (!optAdjacencies.isPresent() || optAdjacencies.get().getAdjacency() == null) {
             return;
@@ -1424,8 +1432,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                         && adjacency.isMarkedForDeletion()
         )) {
             LOG.info("Clean up vpn interface {} to vpn {} list.", ifName, vpnName);
-            tx.delete(LogicalDatastoreType.OPERATIONAL,
-                    FibUtil.getVpnInterfaceOpDataEntryIdentifier(ifName, vpnName));
+            tx.delete(FibUtil.getVpnInterfaceOpDataEntryIdentifier(ifName, vpnName));
         }
     }
 
@@ -1460,20 +1467,22 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                     + " with elantag {}", rd, vrfEntry.getDestPrefix(), elanTag);
             if (vpnToDpnList != null) {
                 jobCoordinator.enqueueJob(FibUtil.getJobKeyForRdPrefix(rd, vrfEntry.getDestPrefix()),
-                    () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
-                        for (final VpnToDpnList curDpn : vpnToDpnList) {
-
-                            baseVrfEntryHandler.makeConnectedRoute(curDpn.getDpnId(), vpnInstance.getVpnId(), vrfEntry,
-                                vrfTableKey.getRouteDistinguisher(), null, NwConstants.DEL_FLOW, tx, null);
-                            if (RouteOrigin.value(vrfEntry.getOrigin()) != RouteOrigin.SELF_IMPORTED) {
-                                optionalLabel.ifPresent(label -> makeLFibTableEntry(curDpn.getDpnId(), label, null,
+                    () -> Collections.singletonList(
+                        txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
+                            for (final VpnToDpnList curDpn : vpnToDpnList) {
+
+                                baseVrfEntryHandler.makeConnectedRoute(curDpn.getDpnId(), vpnInstance.getVpnId(),
+                                    vrfEntry, vrfTableKey.getRouteDistinguisher(), null, NwConstants.DEL_FLOW,
+                                    TransactionAdapter.toWriteTransaction(tx), null);
+                                if (RouteOrigin.value(vrfEntry.getOrigin()) != RouteOrigin.SELF_IMPORTED) {
+                                    optionalLabel.ifPresent(label -> makeLFibTableEntry(curDpn.getDpnId(), label, null,
                                         DEFAULT_FIB_FLOW_PRIORITY, NwConstants.DEL_FLOW, tx));
-                            }
+                                }
 
-                            installSubnetBroadcastAddrDropRule(curDpn.getDpnId(), rd, vpnInstance.getVpnId(),
+                                installSubnetBroadcastAddrDropRule(curDpn.getDpnId(), rd, vpnInstance.getVpnId(),
                                     vrfEntry, NwConstants.DEL_FLOW, tx);
-                        }
-                    })));
+                            }
+                        })));
             }
             optionalLabel.ifPresent(label -> {
                 synchronized (label.toString().intern()) {
@@ -1529,18 +1538,20 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
             }
 
             jobCoordinator.enqueueJob(jobKey,
-                () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+                () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
                     if (localDpnIdList.size() <= 0) {
                         for (VpnToDpnList curDpn : vpnToDpnList) {
                             baseVrfEntryHandler.deleteRemoteRoute(BigInteger.ZERO, curDpn.getDpnId(),
-                                    vpnInstance.getVpnId(), vrfTableKey, vrfEntry, extraRouteOptional, tx);
+                                vpnInstance.getVpnId(), vrfTableKey, vrfEntry, extraRouteOptional,
+                                TransactionAdapter.toWriteTransaction(tx));
                         }
                     } else {
                         for (BigInteger localDpnId : localDpnIdList) {
                             for (VpnToDpnList curDpn : vpnToDpnList) {
                                 if (!curDpn.getDpnId().equals(localDpnId)) {
                                     baseVrfEntryHandler.deleteRemoteRoute(localDpnId, curDpn.getDpnId(),
-                                            vpnInstance.getVpnId(), vrfTableKey, vrfEntry, extraRouteOptional, tx);
+                                        vpnInstance.getVpnId(), vrfTableKey, vrfEntry, extraRouteOptional,
+                                        TransactionAdapter.toWriteTransaction(tx));
                                 }
                             }
                         }
@@ -1574,9 +1585,9 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
     }
 
     private void makeLFibTableEntry(BigInteger dpId, long label, List<InstructionInfo> instructions, int priority,
-                                    int addOrRemove, WriteTransaction tx) {
+                                    int addOrRemove, TypedWriteTransaction<Configuration> tx) {
         if (tx == null) {
-            ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(
+            ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
                 newTx -> makeLFibTableEntry(dpId, label, instructions, priority, addOrRemove, newTx)), LOG,
                 "Error making LFIB table entry");
             return;
@@ -1601,9 +1612,9 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
             .child(Table.class, new TableKey(flow.getTableId())).child(Flow.class, flowKey).build();
 
         if (addOrRemove == NwConstants.ADD_FLOW) {
-            tx.put(LogicalDatastoreType.CONFIGURATION, flowInstanceId, flow, WriteTransaction.CREATE_MISSING_PARENTS);
+            tx.put(flowInstanceId, flow, CREATE_MISSING_PARENTS);
         } else {
-            tx.delete(LogicalDatastoreType.CONFIGURATION, flowInstanceId);
+            tx.delete(flowInstanceId);
         }
 
         LOG.debug("LFIB Entry for dpID {} : label : {} instructions {} : key {} {} successfully",
@@ -1629,7 +1640,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                     return futures;
                 }
                 synchronized (vpnInstance.getVpnInstanceName().intern()) {
-                    futures.add(retryingTxRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+                    futures.add(retryingTxRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
                         for (final VrfEntry vrfEntry : vrfTable.get().getVrfEntry()) {
                             SubnetRoute subnetRoute = vrfEntry.augmentation(SubnetRoute.class);
                             if (subnetRoute != null) {
@@ -1676,7 +1687,8 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                                 if (RouteOrigin.BGP.getValue().equals(vrfEntry.getOrigin())) {
                                     List<SubTransaction> txnObjects =  new ArrayList<>();
                                     bgpRouteVrfEntryHandler.createRemoteFibEntry(dpnId, vpnId,
-                                            vrfTable.get().getRouteDistinguisher(), vrfEntry, tx, txnObjects);
+                                            vrfTable.get().getRouteDistinguisher(), vrfEntry,
+                                            TransactionAdapter.toWriteTransaction(tx), txnObjects);
                                 } else {
                                     createRemoteFibEntry(dpnId, vpnId, vrfTable.get().getRouteDistinguisher(),
                                             vrfEntry, tx);
@@ -1703,12 +1715,12 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         final Optional<VrfTables> vrfTable = MDSALUtil.read(dataBroker, LogicalDatastoreType.CONFIGURATION, id);
         if (vrfTable.isPresent()) {
             jobCoordinator.enqueueJob(FibUtil.getJobKeyForVpnIdDpnId(vpnId, dpnId),
-                () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+                () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
                     synchronized (vpnInstance.getVpnInstanceName().intern()) {
                         vrfTable.get().getVrfEntry().stream()
-                                .filter(vrfEntry -> RouteOrigin.BGP == RouteOrigin.value(vrfEntry.getOrigin()))
-                                .forEach(bgpRouteVrfEntryHandler.getConsumerForCreatingRemoteFib(dpnId, vpnId,
-                                        rd, remoteNextHopIp, vrfTable, tx, txnObjects));
+                            .filter(vrfEntry -> RouteOrigin.BGP == RouteOrigin.value(vrfEntry.getOrigin()))
+                            .forEach(bgpRouteVrfEntryHandler.getConsumerForCreatingRemoteFib(dpnId, vpnId,
+                                rd, remoteNextHopIp, vrfTable, TransactionAdapter.toWriteTransaction(tx), txnObjects));
                     }
                 })));
         }
@@ -1729,7 +1741,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         }
 
         jobCoordinator.enqueueJob(FibUtil.getJobKeyForVpnIdDpnId(vpnId, localDpnId),
-            () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+            () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
                 synchronized (vpnInstance.getVpnInstanceName().intern()) {
                     VrfTablesKey vrfTablesKey = new VrfTablesKey(rd);
                     VrfEntry vrfEntry = getVrfEntry(dataBroker, rd, destPrefix);
@@ -1769,7 +1781,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                                     usedRds.get(0), vrfEntry.getDestPrefix());
                         }
                         baseVrfEntryHandler.deleteRemoteRoute(null, localDpnId, vpnId, vrfTablesKey, modVrfEntry,
-                                extraRouteOptional, tx);
+                                extraRouteOptional, TransactionAdapter.toWriteTransaction(tx));
                     }
                 }
             })));
@@ -1788,7 +1800,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                 List<ListenableFuture<Void>> futures = new ArrayList<>();
                 if (vrfTable.isPresent()) {
                     synchronized (vpnInstance.getVpnInstanceName().intern()) {
-                        futures.add(retryingTxRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+                        futures.add(retryingTxRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
                             String vpnName = fibUtil.getVpnNameFromId(vpnInstance.getVpnId());
                             for (final VrfEntry vrfEntry : vrfTable.get().getVrfEntry()) {
                                 /* Handle subnet routes here */
@@ -1798,7 +1810,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                                     LOG.trace("SUBNETROUTE: cleanUpDpnForVpn: Cleaning subnetroute {} on dpn {}"
                                             + " for vpn {}", vrfEntry.getDestPrefix(), dpnId, rd);
                                     baseVrfEntryHandler.makeConnectedRoute(dpnId, vpnId, vrfEntry, rd, null,
-                                            NwConstants.DEL_FLOW, tx, null);
+                                            NwConstants.DEL_FLOW, TransactionAdapter.toWriteTransaction(tx), null);
                                     List<RoutePaths> routePaths = vrfEntry.getRoutePaths();
                                     if (routePaths != null) {
                                         for (RoutePaths routePath : routePaths) {
@@ -1860,12 +1872,14 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                                 }
                                 if (RouteOrigin.BGP.getValue().equals(vrfEntry.getOrigin())) {
                                     bgpRouteVrfEntryHandler.deleteRemoteRoute(null, dpnId, vpnId,
-                                            vrfTable.get().key(), vrfEntry, extraRouteOptional, tx, txnObjects);
+                                        vrfTable.get().key(), vrfEntry, extraRouteOptional,
+                                        TransactionAdapter.toWriteTransaction(tx), txnObjects);
                                 } else {
                                     if (subnetRoute == null || !fibUtil
                                             .isInterfacePresentInDpn(vrfEntry.getParentVpnRd(), dpnId)) {
                                         baseVrfEntryHandler.deleteRemoteRoute(null, dpnId, vpnId,
-                                                vrfTable.get().key(), vrfEntry, extraRouteOptional, tx);
+                                            vrfTable.get().key(), vrfEntry, extraRouteOptional,
+                                            TransactionAdapter.toWriteTransaction(tx));
                                     }
                                 }
                             }
@@ -1896,11 +1910,13 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
             jobCoordinator.enqueueJob(FibUtil.getJobKeyForVpnIdDpnId(vpnId, dpnId),
                 () -> {
                     synchronized (vpnInstance.getVpnInstanceName().intern()) {
-                        return Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(
-                            tx -> vrfTable.get().getVrfEntry().stream()
+                        return Collections.singletonList(
+                            txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
+                                tx -> vrfTable.get().getVrfEntry().stream()
                                     .filter(vrfEntry -> RouteOrigin.value(vrfEntry.getOrigin()) == RouteOrigin.BGP)
                                     .forEach(bgpRouteVrfEntryHandler.getConsumerForDeletingRemoteFib(dpnId, vpnId,
-                                            remoteNextHopIp, vrfTable, tx, txnObjects))));
+                                        remoteNextHopIp, vrfTable, TransactionAdapter.toWriteTransaction(tx),
+                                        txnObjects))));
                     }
                 });
         }
@@ -1971,7 +1987,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         optLabel.ifPresent(label -> {
             LOG.trace("Removing flow in FIB table for interVpnLink {}", interVpnLinkName);
 
-            ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+            ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
                 for (BigInteger dpId : targetDpns) {
                     LOG.debug("Removing flow: VrfEntry=[prefix={} label={}] dpn {} for InterVpnLink {} in LFIB",
                             vrfEntry.getDestPrefix(), label, dpId, interVpnLinkName);
index d5b2dd6fa3f07770957c4ff4180e255638438ca6..110f5eddfa70ce4b881e34f6614c48035870b1e7 100644 (file)
@@ -19,6 +19,9 @@ import java.util.concurrent.Future;
 
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.genius.infra.Datastore.Configuration;
+import org.opendaylight.genius.infra.Datastore.Operational;
+import org.opendaylight.genius.infra.TypedReadTransaction;
 import org.opendaylight.genius.mdsalutil.MDSALUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rev160406.TunnelTypeBase;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.GetTunnelTypeInputBuilder;
@@ -60,6 +63,11 @@ public final class VpnExtraRouteHelper {
         return MDSALUtil.read(broker, LogicalDatastoreType.OPERATIONAL, vpnExtraRoutesId);
     }
 
+    public static Optional<Routes> getVpnExtraroutes(TypedReadTransaction<Operational> operTx, String vpnName,
+            String vpnRd, String destPrefix) throws ExecutionException, InterruptedException {
+        return operTx.read(getVpnToExtrarouteVrfIdIdentifier(vpnName, vpnRd, destPrefix)).get();
+    }
+
     public static  InstanceIdentifier<Routes> getVpnToExtrarouteVrfIdIdentifier(String vpnName, String vrfId,
             String ipPrefix) {
         return InstanceIdentifier.builder(VpnToExtraroutes.class)
@@ -99,6 +107,13 @@ public final class VpnExtraRouteHelper {
                 .map(AllocatedRds::getRd).distinct().collect(toList()) : new ArrayList<>();
     }
 
+    public static  List<String> getUsedRds(TypedReadTransaction<Configuration> confTx, long vpnId, String destPrefix)
+            throws ExecutionException, InterruptedException {
+        Optional<DestPrefixes> usedRds = confTx.read(getUsedRdsIdentifier(vpnId, destPrefix)).get();
+        return usedRds.isPresent() ? usedRds.get().getAllocatedRds().stream()
+            .map(AllocatedRds::getRd).distinct().collect(toList()) : new ArrayList<>();
+    }
+
     public static  InstanceIdentifier<ExtrarouteRds> getUsedRdsIdentifier(long vpnId) {
         return InstanceIdentifier.builder(ExtrarouteRdsMap.class)
                 .child(ExtrarouteRds.class, new ExtrarouteRdsKey(vpnId)).build();