Migrate ListenableFutures.addErrorLogging() users
[netvirt.git] / fibmanager / impl / src / main / java / org / opendaylight / netvirt / fibmanager / VrfEntryListener.java
index 053ce59c33510563aacde4731331943370ea055c..10d646d8fbfc9c8de81780c8ba2d5d63ae9ed698 100644 (file)
@@ -7,12 +7,10 @@
  */
 package org.opendaylight.netvirt.fibmanager;
 
-import static org.opendaylight.controller.md.sal.binding.api.WriteTransaction.CREATE_MISSING_PARENTS;
 import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
 import static org.opendaylight.genius.infra.Datastore.OPERATIONAL;
 import static org.opendaylight.genius.mdsalutil.NWUtil.isIpv4Address;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
@@ -25,22 +23,21 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.locks.ReentrantLock;
-import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
+import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
 import org.opendaylight.genius.datastoreutils.listeners.DataTreeEventCallbackRegistrar;
 import org.opendaylight.genius.infra.Datastore.Configuration;
 import org.opendaylight.genius.infra.Datastore.Operational;
@@ -74,7 +71,10 @@ import org.opendaylight.genius.utils.JvmGlobalLocks;
 import org.opendaylight.genius.utils.ServiceIndex;
 import org.opendaylight.genius.utils.batching.SubTransaction;
 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
-import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
+import org.opendaylight.infrautils.utils.concurrent.Executors;
+import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.netvirt.elanmanager.api.IElanService;
 import org.opendaylight.netvirt.fibmanager.NexthopManager.AdjacencyResult;
 import org.opendaylight.netvirt.fibmanager.api.FibHelper;
@@ -83,6 +83,7 @@ import org.opendaylight.netvirt.vpnmanager.api.VpnExtraRouteHelper;
 import org.opendaylight.netvirt.vpnmanager.api.VpnHelper;
 import org.opendaylight.netvirt.vpnmanager.api.intervpnlink.InterVpnLinkCache;
 import org.opendaylight.netvirt.vpnmanager.api.intervpnlink.InterVpnLinkDataComposite;
+import org.opendaylight.serviceutils.tools.listener.AbstractAsyncDataTreeChangeListener;
 import org.opendaylight.serviceutils.upgrade.UpgradeState;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.MacAddress;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
@@ -93,6 +94,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.InstructionKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.FibEntries;
@@ -107,15 +109,17 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev15033
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.vrfentries.VrfEntry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.vrfentries.VrfEntryKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.vrfentrybase.RoutePaths;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.vrfentrybase.RoutePathsKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.AdjacenciesOp;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.adjacency.list.Adjacency;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.adjacency.list.AdjacencyBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.prefix.to._interface.vpn.ids.Prefixes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.prefix.to._interface.vpn.ids.PrefixesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.vpn._interface.op.data.VpnInterfaceOpDataEntry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.vpn.instance.op.data.VpnInstanceOpDataEntry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.vpn.instance.op.data.vpn.instance.op.data.entry.VpnToDpnList;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.vpn.instance.op.data.vpn.instance.op.data.entry.VpnToDpnListKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.vpn.to.extraroutes.vpn.extra.routes.Routes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.neutronvpn.l3vpn.rev200204.adjacency.list.Adjacency;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.neutronvpn.l3vpn.rev200204.adjacency.list.AdjacencyBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.netvirt.inter.vpn.link.rev160311.inter.vpn.link.states.InterVpnLinkState.State;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
@@ -124,9 +128,8 @@ import org.opendaylight.yangtools.yang.common.Uint64;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 @Singleton
-public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry, VrfEntryListener> {
+public class VrfEntryListener extends AbstractAsyncDataTreeChangeListener<VrfEntry> {
 
     private static final Logger LOG = LoggerFactory.getLogger(VrfEntryListener.class);
     private static final String FLOWID_PREFIX = "L3.";
@@ -166,7 +169,9 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                             final InterVpnLinkCache interVpnLinkCache,
                             final UpgradeState upgradeState,
                             final DataTreeEventCallbackRegistrar eventCallbacks) {
-        super(VrfEntry.class, VrfEntryListener.class);
+        super(dataBroker, LogicalDatastoreType.CONFIGURATION, InstanceIdentifier.create(FibEntries.class)
+                .child(VrfTables.class).child(VrfEntry.class),
+                Executors.newListeningSingleThreadExecutor("VrfEntryListener", LOG));
         this.dataBroker = dataBroker;
         this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
         this.retryingTxRunner = new RetryingManagedNewTransactionRunner(dataBroker, MAX_RETRIES);
@@ -183,11 +188,8 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         this.eventCallbacks = eventCallbacks;
     }
 
-    @Override
-    @PostConstruct
     public void init() {
         LOG.info("{} init", getClass().getSimpleName());
-        registerListener(LogicalDatastoreType.CONFIGURATION, dataBroker);
     }
 
     @Override
@@ -200,20 +202,11 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                 LOG.warn("Error closing {}", c, e);
             }
         });
+        Executors.shutdownAndAwaitTermination(getExecutorService());
     }
 
     @Override
-    protected VrfEntryListener getDataTreeChangeListener() {
-        return VrfEntryListener.this;
-    }
-
-    @Override
-    protected InstanceIdentifier<VrfEntry> getWildCardPath() {
-        return InstanceIdentifier.create(FibEntries.class).child(VrfTables.class).child(VrfEntry.class);
-    }
-
-    @Override
-    protected void add(final InstanceIdentifier<VrfEntry> identifier, final VrfEntry vrfEntry) {
+    public void add(final InstanceIdentifier<VrfEntry> identifier, final VrfEntry vrfEntry) {
         Preconditions.checkNotNull(vrfEntry, "VrfEntry should not be null or empty.");
         String rd = identifier.firstKeyOf(VrfTables.class).getRouteDistinguisher();
         LOG.debug("ADD: Adding Fib Entry rd {} prefix {} route-paths {}",
@@ -251,7 +244,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
     }
 
     @Override
-    protected void remove(InstanceIdentifier<VrfEntry> identifier, VrfEntry vrfEntry) {
+    public void remove(InstanceIdentifier<VrfEntry> identifier, VrfEntry vrfEntry) {
         Preconditions.checkNotNull(vrfEntry, "VrfEntry should not be null or empty.");
         String rd = identifier.firstKeyOf(VrfTables.class).getRouteDistinguisher();
         LOG.debug("REMOVE: Removing Fib Entry rd {} prefix {} route-paths {}",
@@ -292,7 +285,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
     // "Redundant nullcheck of originalRoutePath, which is known to be non-null" - the null checking for
     // originalRoutePath is a little dicey - safest to keep the checking even if not needed.
     @SuppressFBWarnings("RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
-    protected void update(InstanceIdentifier<VrfEntry> identifier, VrfEntry original, VrfEntry update) {
+    public void update(InstanceIdentifier<VrfEntry> identifier, VrfEntry original, VrfEntry update) {
         Preconditions.checkNotNull(update, "VrfEntry should not be null or empty.");
         final String rd = identifier.firstKeyOf(VrfTables.class).getRouteDistinguisher();
         LOG.debug("UPDATE: Updating Fib Entries to rd {} prefix {} route-paths {} origin {} old-origin {}", rd,
@@ -306,8 +299,8 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         }
 
         if (RouteOrigin.value(update.getOrigin()) == RouteOrigin.STATIC) {
-            List<RoutePaths> originalRoutePath = original.getRoutePaths();
-            List<RoutePaths> updateRoutePath = update.getRoutePaths();
+            List<RoutePaths> originalRoutePath = new ArrayList<RoutePaths>(original.nonnullRoutePaths().values());
+            List<RoutePaths> updateRoutePath = new ArrayList<RoutePaths>(update.nonnullRoutePaths().values());
             LOG.info("UPDATE: Original route-path {} update route-path {} ", originalRoutePath, updateRoutePath);
 
             //Updates need to be handled for extraroute even if original vrf entry route path is null or
@@ -392,17 +385,17 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         Preconditions.checkNotNull(vpnInstance, "Vpn Instance not available " + vrfTableKey.getRouteDistinguisher());
         Preconditions.checkNotNull(vpnInstance.getVpnId(), "Vpn Instance with rd " + vpnInstance.getVrfId()
                 + " has null vpnId!");
-        final Collection<VpnToDpnList> vpnToDpnList;
+        final Map<VpnToDpnListKey, VpnToDpnList> keyVpnToDpnListMap;
         if (vrfEntry.getParentVpnRd() != null
                 && FibHelper.isControllerManagedNonSelfImportedRoute(RouteOrigin.value(vrfEntry.getOrigin()))) {
             // This block MUST BE HIT only for PNF (Physical Network Function) FIB Entries.
             VpnInstanceOpDataEntry parentVpnInstance = fibUtil.getVpnInstance(vrfEntry.getParentVpnRd());
-            vpnToDpnList = parentVpnInstance != null ? parentVpnInstance.getVpnToDpnList() :
+            keyVpnToDpnListMap = parentVpnInstance != null ? parentVpnInstance.nonnullVpnToDpnList() :
                 vpnInstance.getVpnToDpnList();
             LOG.info("createFibEntries: Processing creation of PNF FIB entry with rd {} prefix {}",
                     vrfEntry.getParentVpnRd(), vrfEntry.getDestPrefix());
         } else {
-            vpnToDpnList = vpnInstance.getVpnToDpnList();
+            keyVpnToDpnListMap = vpnInstance.nonnullVpnToDpnList();
         }
         final Uint32 vpnId = vpnInstance.getVpnId();
         final String rd = vrfTableKey.getRouteDistinguisher();
@@ -411,11 +404,11 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
             final long elanTag = subnetRoute.getElantag().toJava();
             LOG.trace("SUBNETROUTE: createFibEntries: SubnetRoute augmented vrfentry found for rd {} prefix {}"
                     + " with elantag {}", rd, vrfEntry.getDestPrefix(), elanTag);
-            if (vpnToDpnList != null) {
+            if (keyVpnToDpnListMap != null) {
                 jobCoordinator.enqueueJob(FibUtil.getJobKeyForRdPrefix(rd, vrfEntry.getDestPrefix()),
                     () -> Collections.singletonList(
                         txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
-                            for (final VpnToDpnList curDpn : vpnToDpnList) {
+                            for (final VpnToDpnList curDpn : keyVpnToDpnListMap.values()) {
                                 if (curDpn.getDpnState() == VpnToDpnList.DpnState.Active) {
                                     installSubnetRouteInFib(curDpn.getDpnId(),
                                                                 elanTag, rd, vpnId, vrfEntry, tx);
@@ -438,21 +431,20 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
 
         final List<Uint64> localDpnIdList = createLocalFibEntry(vpnInstance.getVpnId(),
                                                                         rd, vrfEntry, etherType);
-        if (!localDpnIdList.isEmpty() && vpnToDpnList != null) {
+        if (!localDpnIdList.isEmpty() && keyVpnToDpnListMap != null) {
             jobCoordinator.enqueueJob(FibUtil.getJobKeyForRdPrefix(rd, vrfEntry.getDestPrefix()),
                 () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
                     final ReentrantLock lock = lockFor(vpnInstance);
                     lock.lock();
                     try {
-                        for (VpnToDpnList vpnDpn : vpnToDpnList) {
+                        for (VpnToDpnList vpnDpn : keyVpnToDpnListMap.values()) {
                             if (!localDpnIdList.contains(vpnDpn.getDpnId())) {
                                 if (vpnDpn.getDpnState() == VpnToDpnList.DpnState.Active) {
                                     try {
                                         if (RouteOrigin.BGP.getValue().equals(vrfEntry.getOrigin())) {
                                             bgpRouteVrfEntryHandler.createRemoteFibEntry(vpnDpn.getDpnId(),
                                                     vpnId, vrfTableKey.getRouteDistinguisher(), vrfEntry,
-                                                    TransactionAdapter.toWriteTransaction(tx),
-                                                    txnObjects);
+                                                    TransactionAdapter.toWriteTransaction(tx), txnObjects);
                                         } else {
                                             createRemoteFibEntry(vpnDpn.getDpnId(),
                                                     vpnInstance.getVpnId(),
@@ -474,7 +466,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         Optional<String> optVpnUuid = fibUtil.getVpnNameFromRd(rd);
         if (optVpnUuid.isPresent()) {
             String vpnUuid = optVpnUuid.get();
-            InterVpnLinkDataComposite interVpnLink = interVpnLinkCache.getInterVpnLinkByVpnId(vpnUuid).orNull();
+            InterVpnLinkDataComposite interVpnLink = interVpnLinkCache.getInterVpnLinkByVpnId(vpnUuid).orElse(null);
             if (interVpnLink != null) {
                 LOG.debug("InterVpnLink {} found in Cache linking Vpn {}", interVpnLink.getInterVpnLinkName(), vpnUuid);
                 FibUtil.getFirstNextHopAddress(vrfEntry).ifPresent(routeNexthop -> {
@@ -493,7 +485,14 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         InstanceIdentifier<VrfEntry> vrfEntryId =
                 InstanceIdentifier.builder(FibEntries.class).child(VrfTables.class, new VrfTablesKey(rd))
                         .child(VrfEntry.class, new VrfEntryKey(prefix)).build();
-        Optional<VrfEntry> vrfEntry = MDSALUtil.read(dataBroker, LogicalDatastoreType.CONFIGURATION, vrfEntryId);
+        Optional<VrfEntry> vrfEntry;
+        try {
+            vrfEntry = SingleTransactionDataBroker.syncReadOptional(dataBroker, LogicalDatastoreType.CONFIGURATION,
+                    vrfEntryId);
+        } catch (ExecutionException | InterruptedException e) {
+            LOG.error("refreshFibTables: Exception while reading VrfEntry Ds for the prefix {} rd {}", prefix, rd, e);
+            return;
+        }
         if (vrfEntry.isPresent()) {
             createFibEntries(vrfEntryId, vrfEntry.get());
         }
@@ -525,7 +524,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
     void installSubnetRouteInFib(final Uint64 dpnId, final long elanTag, final String rd,
             final Uint32 vpnId, final VrfEntry vrfEntry, TypedWriteTransaction<Configuration> tx) {
         if (tx == null) {
-            ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
+            LoggingFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
                 newTx -> installSubnetRouteInFib(dpnId, elanTag, rd, vpnId, vrfEntry, newTx)), LOG,
                 "Error installing subnet route in FIB");
             return;
@@ -571,7 +570,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         baseVrfEntryHandler.makeConnectedRoute(dpnId, vpnId, vrfEntry, rd, instructions,
                 NwConstants.ADD_FLOW, TransactionAdapter.toWriteTransaction(tx), null);
         if (vrfEntry.getRoutePaths() != null) {
-            for (RoutePaths routePath : vrfEntry.getRoutePaths()) {
+            for (RoutePaths routePath : vrfEntry.getRoutePaths().values()) {
                 if (RouteOrigin.value(vrfEntry.getOrigin()) != RouteOrigin.SELF_IMPORTED) {
                     List<ActionInfo> actionsInfos = new ArrayList<>();
                     // reinitialize instructions list for LFIB Table
@@ -638,7 +637,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                 .child(Table.class, new TableKey(flow.getTableId())).child(Flow.class, flowKey).build();
 
         if (addOrRemove == NwConstants.ADD_FLOW) {
-            tx.put(flowInstanceId,flow, true);
+            tx.mergeParentStructurePut(flowInstanceId,flow);
         } else {
             tx.delete(flowInstanceId);
         }
@@ -703,13 +702,13 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         Preconditions.checkArgument(vrfEntry.getRoutePaths() != null
             && vrfEntry.getRoutePaths().size() == 1);
         String destination = vrfEntry.getDestPrefix();
-        String nextHop = vrfEntry.getRoutePaths().get(0).getNexthopAddress();
+        String nextHop = new ArrayList<RoutePaths>(vrfEntry.getRoutePaths().values()).get(0).getNexthopAddress();
         String interVpnLinkName = interVpnLink.getInterVpnLinkName();
 
         // After having received a static route, we should check if the vpn is part of an inter-vpn-link.
         // In that case, we should populate the FIB table of the VPN pointing to LPortDisptacher table
         // using as metadata the LPortTag associated to that vpn in the inter-vpn-link.
-        if (interVpnLink.getState().or(State.Error) != State.Active) {
+        if (interVpnLink.getState().orElse(State.Error) != State.Active) {
             LOG.warn("Route to {} with nexthop={} cannot be installed because the interVpnLink {} is not active",
                 destination, nextHop, interVpnLinkName);
             return;
@@ -749,11 +748,16 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                             .L3VPN_SERVICE_INDEX)),
                     MetaDataUtil.getMetaDataMaskForLPortDispatcher()).buildInstruction(0),
                 new InstructionGotoTable(NwConstants.L3_INTERFACE_TABLE).buildInstruction(1));
+        Map<InstructionKey, Instruction> instructionsMap = new HashMap<InstructionKey, Instruction>();
+        int instructionKey = 0;
+        for (Instruction instructionObj : instructions) {
+            instructionsMap.put(new InstructionKey(++instructionKey), instructionObj);
+        }
 
         int priority = DEFAULT_FIB_FLOW_PRIORITY + prefixLength;
         String flowRef = getInterVpnFibFlowRef(interVpnLinkName, destination, nextHop);
         Flow flowEntity = MDSALUtil.buildFlowNew(NwConstants.L3_FIB_TABLE, flowRef, priority, flowRef, 0, 0,
-            COOKIE_VM_FIB_TABLE, matches, instructions);
+            COOKIE_VM_FIB_TABLE, matches, instructionsMap);
 
         LOG.trace("Installing flow in FIB table for vpn {} interVpnLink {} nextHop {} key {}",
             vpnUuid, interVpnLink.getInterVpnLinkName(), nextHop, flowRef);
@@ -804,7 +808,8 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                         localNextHopSeen = true;
                         Uint64 dpnId =
                                 checkCreateLocalFibEntry(localNextHopInfoLocal, localNextHopInfoLocal.getIpAddress(),
-                                        vpnId, rd, vrfEntry, vpnExtraRoute, vpnExtraRoutes, etherType);
+                                        vpnId, rd, vrfEntry, vpnExtraRoute, vpnExtraRoutes, etherType,
+                                        /*parentVpnId*/ null);
                         returnLocalDpnId.add(dpnId);
                     }
                 }
@@ -821,6 +826,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                     labelLock.lock();
                     try {
                         LabelRouteInfo lri = getLabelRouteInfo(lriKey);
+                        Uint32 parentVpnId = lri.getParentVpnid();
                         if (isPrefixAndNextHopPresentInLri(localNextHopIP, nextHopAddressList, lri)) {
                             Optional<VpnInstanceOpDataEntry> vpnInstanceOpDataEntryOptional =
                                     fibUtil.getVpnInstanceOpData(rd);
@@ -840,12 +846,13 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                                         label, localNextHopInfo.getVpnInterfaceName(), lri.getDpnId());
                                 if (vpnExtraRoutes.isEmpty()) {
                                     Uint64 dpnId = checkCreateLocalFibEntry(localNextHopInfo, localNextHopIP,
-                                            vpnId, rd, vrfEntry, null, vpnExtraRoutes, etherType);
+                                            vpnId, rd, vrfEntry, null, vpnExtraRoutes, etherType, parentVpnId);
                                     returnLocalDpnId.add(dpnId);
                                 } else {
                                     for (Routes extraRoutes : vpnExtraRoutes) {
                                         Uint64 dpnId = checkCreateLocalFibEntry(localNextHopInfo, localNextHopIP,
-                                                vpnId, rd, vrfEntry, extraRoutes, vpnExtraRoutes, etherType);
+                                                vpnId, rd, vrfEntry, extraRoutes, vpnExtraRoutes, etherType,
+                                                parentVpnId);
                                         returnLocalDpnId.add(dpnId);
                                     }
                                 }
@@ -861,7 +868,8 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
             }
         } else {
             Uint64 dpnId = checkCreateLocalFibEntry(localNextHopInfo, localNextHopIP, vpnId,
-                    rd, vrfEntry, /*routes*/ null, /*vpnExtraRoutes*/ null, etherType);
+                    rd, vrfEntry, /*routes*/ null, /*vpnExtraRoutes*/ null, etherType,
+                    /*parentVpnId*/ null);
             if (dpnId != null) {
                 returnLocalDpnId.add(dpnId);
             }
@@ -873,7 +881,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                                                 final Uint32 vpnId, final String rd,
                                                 final VrfEntry vrfEntry,
                                                 @Nullable Routes routes, @Nullable List<Routes> vpnExtraRoutes,
-                                                int etherType) {
+                                                int etherType, Uint32 parentVpnId) {
         String vpnName = fibUtil.getVpnNameFromId(vpnId);
         if (localNextHopInfo != null) {
             long groupId;
@@ -910,7 +918,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                 }
             } else {
                 groupId = nextHopManager.createLocalNextHop(vpnId, dpnId, interfaceName, localNextHopIP, prefix,
-                        gwMacAddress);
+                        gwMacAddress, parentVpnId);
                 localGroupId = groupId;
             }
             if (groupId == FibConstants.INVALID_GROUP_ID) {
@@ -957,9 +965,16 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         return Uint64.ZERO;
     }
 
-    private boolean isVpnPresentInDpn(String rd, Uint64 dpnId)  {
+    private boolean isVpnPresentInDpn(String rd, Uint64 dpnId) {
         InstanceIdentifier<VpnToDpnList> id = VpnHelper.getVpnToDpnListIdentifier(rd, dpnId);
-        Optional<VpnToDpnList> dpnInVpn = MDSALUtil.read(dataBroker, LogicalDatastoreType.OPERATIONAL, id);
+        Optional<VpnToDpnList> dpnInVpn;
+        try {
+            dpnInVpn = SingleTransactionDataBroker.syncReadOptional(dataBroker, LogicalDatastoreType.OPERATIONAL, id);
+        } catch (ExecutionException | InterruptedException e) {
+            LOG.error("isVpnPresentInDpn: Exception while reading VpnToDpnList Ds for the rd {} dpnId {}", rd,
+                    dpnId, e);
+            return false;
+        }
         return dpnInVpn.isPresent();
     }
 
@@ -972,7 +987,14 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
     private LabelRouteInfo getLabelRouteInfo(LabelRouteInfoKey label) {
         InstanceIdentifier<LabelRouteInfo> lriIid = InstanceIdentifier.builder(LabelRouteMap.class)
             .child(LabelRouteInfo.class, label).build();
-        Optional<LabelRouteInfo> opResult = MDSALUtil.read(dataBroker, LogicalDatastoreType.OPERATIONAL, lriIid);
+        Optional<LabelRouteInfo> opResult = null;
+        try {
+            opResult = SingleTransactionDataBroker.syncReadOptional(dataBroker, LogicalDatastoreType.OPERATIONAL,
+                    lriIid);
+        } catch (ExecutionException | InterruptedException e) {
+            LOG.error("refreshFibTables: Exception while reading LabelRouteInfo Ds for the label {}", label, e);
+            return null;
+        }
         if (opResult.isPresent()) {
             return opResult.get();
         }
@@ -990,7 +1012,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
             .child(LabelRouteInfo.class, new LabelRouteInfoKey(lri.getLabel())).build();
 
         List<String> vpnInstancesList = lri.getVpnInstanceList() != null
-            ? lri.getVpnInstanceList() : new ArrayList<>();
+            ? new ArrayList<>(lri.getVpnInstanceList()) : new ArrayList<>();
         if (vpnInstancesList.contains(vpnInstanceName)) {
             LOG.debug("vpninstance {} name is present", vpnInstanceName);
             vpnInstancesList.remove(vpnInstanceName);
@@ -1039,7 +1061,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
             MDSALUtil.buildFlowEntity(destDpId, NwConstants.INTERNAL_TUNNEL_TABLE,
             getTableMissFlowRef(destDpId, NwConstants.INTERNAL_TUNNEL_TABLE, label),
                     FibConstants.DEFAULT_VPN_INTERNAL_TUNNEL_TABLE_PRIORITY,
-                    String.format("%s:%d", "TST Flow Entry ", label), 0, 0,
+                    String.format("%s:%s", "TST Flow Entry ", label), 0, 0,
                     Uint64.valueOf(COOKIE_TUNNEL.longValue() + label.longValue()),
                     mkMatches, mkInstructions);
 
@@ -1052,7 +1074,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
             .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
             .child(Table.class, new TableKey(terminatingServiceTableFlowEntity.getTableId()))
             .child(Flow.class, flowKey).build();
-        tx.put(flowInstanceId, flowbld.build(), CREATE_MISSING_PARENTS);
+        tx.mergeParentStructurePut(flowInstanceId, flowbld.build());
     }
 
     private void removeTunnelTableEntry(Uint64 dpId, Uint32 label, TypedWriteTransaction<Configuration> tx) {
@@ -1065,7 +1087,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
             NwConstants.INTERNAL_TUNNEL_TABLE,
             getTableMissFlowRef(dpId, NwConstants.INTERNAL_TUNNEL_TABLE, label),
                 FibConstants.DEFAULT_VPN_INTERNAL_TUNNEL_TABLE_PRIORITY,
-                String.format("%s:%d", "TST Flow Entry ", label), 0, 0,
+                String.format("%s:%s", "TST Flow Entry ", label), 0, 0,
                 Uint64.valueOf(COOKIE_TUNNEL.longValue() + label.longValue()), mkMatches, null);
         Node nodeDpn = FibUtil.buildDpnNode(flowEntity.getDpnId());
         FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
@@ -1196,7 +1218,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
     private void createRemoteFibEntry(final Uint64 remoteDpnId, final Uint32 vpnId, String rd,
             final VrfEntry vrfEntry, TypedWriteTransaction<Configuration> tx) {
         if (tx == null) {
-            ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
+            LoggingFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
                 newTx -> createRemoteFibEntry(remoteDpnId, vpnId, rd, vrfEntry, newTx)), LOG,
                 "Error creating remote FIB entry");
             return;
@@ -1222,6 +1244,8 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         }
     }
 
+    // Allow deprecated TransactionRunner calls for now
+    @SuppressWarnings("ForbidCertainMethod")
     private void programRemoteFibWithLoadBalancingGroups(final Uint64 remoteDpnId, final Uint32 vpnId, String rd,
             final VrfEntry vrfEntry, List<Routes> vpnExtraRoutes) {
         // create loadbalancing groups for extra routes only when the extra route is
@@ -1341,7 +1365,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
             new CleanupVpnInterfaceWorker(prefixInfo, vpnId, rd, vrfEntry, extraRoute));
     }
 
-    private class CleanupVpnInterfaceWorker implements Callable<List<ListenableFuture<Void>>> {
+    private class CleanupVpnInterfaceWorker implements Callable<List<? extends ListenableFuture<?>>> {
         Prefixes prefixInfo;
         Uint32 vpnId;
         String rd;
@@ -1413,7 +1437,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                             .read(FibUtil.getVpnInterfaceOpDataEntryIdentifier(ifName, vpnName)).get();
                     if (opVpnInterface.isPresent()) {
                         Uint32 associatedVpnId = fibUtil.getVpnId(vpnName);
-                        if (Objects.equals(vpnId, associatedVpnId)) {
+                        if (!Objects.equals(vpnId, associatedVpnId)) {
                             LOG.warn("Prefixes {} are associated with different vpn instance with id {} rather than {}",
                                     vrfEntry.getDestPrefix(), associatedVpnId, vpnId);
                             LOG.warn("Not proceeding with Cleanup op data for prefix {}", vrfEntry.getDestPrefix());
@@ -1475,7 +1499,8 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
             return;
         }
 
-        @NonNull List<Adjacency> adjacencies = optAdjacencies.get().nonnullAdjacency();
+        @NonNull List<Adjacency> adjacencies
+                = new ArrayList<Adjacency>(optAdjacencies.get().nonnullAdjacency().values());
         if (adjacencies.size() <= 2
                 && adjacencies.stream().allMatch(adjacency ->
                 adjacency.getAdjacencyType() == Adjacency.AdjacencyType.PrimaryAdjacency
@@ -1495,17 +1520,17 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
             LOG.error("VPN Instance for rd {} is not available from VPN Op Instance Datastore", rd);
             return;
         }
-        final Collection<VpnToDpnList> vpnToDpnList;
+        final Map<VpnToDpnListKey, VpnToDpnList> keyVpnToDpnListMap;
         if (vrfEntry.getParentVpnRd() != null
                 && FibHelper.isControllerManagedNonSelfImportedRoute(RouteOrigin.value(vrfEntry.getOrigin()))) {
             // This block MUST BE HIT only for PNF (Physical Network Function) FIB Entries.
             VpnInstanceOpDataEntry parentVpnInstance = fibUtil.getVpnInstance(vrfEntry.getParentVpnRd());
-            vpnToDpnList = parentVpnInstance != null ? parentVpnInstance.getVpnToDpnList() :
+            keyVpnToDpnListMap = parentVpnInstance != null ? parentVpnInstance.getVpnToDpnList() :
                     vpnInstance.getVpnToDpnList();
             LOG.info("deleteFibEntries: Processing deletion of PNF FIB entry with rd {} prefix {}",
                     vrfEntry.getParentVpnRd(), vrfEntry.getDestPrefix());
         } else {
-            vpnToDpnList = vpnInstance.getVpnToDpnList();
+            keyVpnToDpnListMap = vpnInstance.getVpnToDpnList();
         }
 
         SubnetRoute subnetRoute = vrfEntry.augmentation(SubnetRoute.class);
@@ -1516,16 +1541,16 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
             long elanTag = subnetRoute.getElantag().toJava();
             LOG.trace("SUBNETROUTE: deleteFibEntries: SubnetRoute augmented vrfentry found for rd {} prefix {}"
                     + " with elantag {}", rd, vrfEntry.getDestPrefix(), elanTag);
-            if (vpnToDpnList != null) {
+            if (keyVpnToDpnListMap != null) {
                 jobCoordinator.enqueueJob(FibUtil.getJobKeyForRdPrefix(rd, vrfEntry.getDestPrefix()),
                     () -> Collections.singletonList(
                         txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
-                            for (final VpnToDpnList curDpn : vpnToDpnList) {
+                            for (final VpnToDpnList curDpn : keyVpnToDpnListMap.values()) {
 
                                 baseVrfEntryHandler.makeConnectedRoute(curDpn.getDpnId(),
                                     vpnInstance.getVpnId(),
-                                    vrfEntry, vrfTableKey.getRouteDistinguisher(), null, NwConstants.DEL_FLOW,
-                                    TransactionAdapter.toWriteTransaction(tx), null);
+                                    vrfEntry, vrfTableKey.getRouteDistinguisher(), null,
+                                        NwConstants.DEL_FLOW, TransactionAdapter.toWriteTransaction(tx), null);
                                 if (RouteOrigin.value(vrfEntry.getOrigin()) != RouteOrigin.SELF_IMPORTED) {
                                     optionalLabel.ifPresent(label -> makeLFibTableEntry(curDpn.getDpnId(),
                                         label, null, DEFAULT_FIB_FLOW_PRIORITY, NwConstants.DEL_FLOW, tx));
@@ -1573,7 +1598,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
 
         final List<Uint64> localDpnIdList = deleteLocalFibEntry(vpnInstance.getVpnId(),
             vrfTableKey.getRouteDistinguisher(), vrfEntry);
-        if (vpnToDpnList != null) {
+        if (keyVpnToDpnListMap != null) {
             List<String> usedRds = VpnExtraRouteHelper.getUsedRds(dataBroker,
                     vpnInstance.getVpnId(), vrfEntry.getDestPrefix());
             String jobKey;
@@ -1590,24 +1615,24 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                             .getVpnExtraroutes(dataBroker, vpnName, usedRds.get(0), vrfEntry.getDestPrefix());
                 }
             } else {
-                extraRouteOptional = Optional.absent();
+                extraRouteOptional = Optional.empty();
             }
 
             jobCoordinator.enqueueJob(FibUtil.getJobKeyForRdPrefix(rd, vrfEntry.getDestPrefix()),
                 () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
                     if (localDpnIdList.size() <= 0) {
-                        for (VpnToDpnList curDpn : vpnToDpnList) {
+                        for (VpnToDpnList curDpn : keyVpnToDpnListMap.values()) {
                             baseVrfEntryHandler.deleteRemoteRoute(Uint64.ZERO, curDpn.getDpnId(),
                                 vpnInstance.getVpnId(), vrfTableKey, vrfEntry, extraRouteOptional,
-                                TransactionAdapter.toWriteTransaction(tx));
+                                    TransactionAdapter.toWriteTransaction(tx));
                         }
                     } else {
                         for (Uint64 localDpnId : localDpnIdList) {
-                            for (VpnToDpnList curDpn : vpnToDpnList) {
+                            for (VpnToDpnList curDpn : keyVpnToDpnListMap.values()) {
                                 if (!Objects.equals(curDpn.getDpnId(), localDpnId)) {
                                     baseVrfEntryHandler.deleteRemoteRoute(localDpnId, curDpn.getDpnId(),
                                         vpnInstance.getVpnId(), vrfTableKey, vrfEntry, extraRouteOptional,
-                                        TransactionAdapter.toWriteTransaction(tx));
+                                            TransactionAdapter.toWriteTransaction(tx));
                                 }
                             }
                         }
@@ -1650,7 +1675,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
     private void makeLFibTableEntry(Uint64 dpId, Uint32 label, @Nullable List<InstructionInfo> instructions,
                                     int priority, int addOrRemove, TypedWriteTransaction<Configuration> tx) {
         if (tx == null) {
-            ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
+            LoggingFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
                 newTx -> makeLFibTableEntry(dpId, label, instructions, priority, addOrRemove, newTx)), LOG,
                 "Error making LFIB table entry");
             return;
@@ -1675,7 +1700,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
             .child(Table.class, new TableKey(flow.getTableId())).child(Flow.class, flowKey).build();
 
         if (addOrRemove == NwConstants.ADD_FLOW) {
-            tx.put(flowInstanceId, flow, CREATE_MISSING_PARENTS);
+            tx.mergeParentStructurePut(flowInstanceId, flow);
         } else {
             tx.delete(flowInstanceId);
         }
@@ -1707,7 +1732,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                 lock.lock();
                 try {
                     futures.add(retryingTxRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
-                        for (final VrfEntry vrfEntry : vrfTable.get().nonnullVrfEntry()) {
+                        for (final VrfEntry vrfEntry : vrfTable.get().nonnullVrfEntry().values()) {
                             SubnetRoute subnetRoute = vrfEntry.augmentation(SubnetRoute.class);
                             if (subnetRoute != null) {
                                 long elanTag = subnetRoute.getElantag().toJava();
@@ -1780,14 +1805,20 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         InstanceIdentifier<VrfTables> id = buildVrfId(rd);
         final VpnInstanceOpDataEntry vpnInstance = fibUtil.getVpnInstance(rd);
         List<SubTransaction> txnObjects =  new ArrayList<>();
-        final Optional<VrfTables> vrfTable = MDSALUtil.read(dataBroker, LogicalDatastoreType.CONFIGURATION, id);
+        final Optional<VrfTables> vrfTable;
+        try {
+            vrfTable = SingleTransactionDataBroker.syncReadOptional(dataBroker, LogicalDatastoreType.CONFIGURATION, id);
+        } catch (ExecutionException | InterruptedException e) {
+            LOG.error("populateExternalRoutesOnDpn: Exception while reading the VrfTable for the rd {}", rd, e);
+            return;
+        }
         if (vrfTable.isPresent()) {
             jobCoordinator.enqueueJob(FibUtil.getJobKeyForVpnIdDpnId(vpnId, dpnId),
                 () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
                     final ReentrantLock lock = lockFor(vpnInstance);
                     lock.lock();
                     try {
-                        vrfTable.get().nonnullVrfEntry().stream()
+                        vrfTable.get().nonnullVrfEntry().values().stream()
                             .filter(vrfEntry -> RouteOrigin.BGP == RouteOrigin.value(vrfEntry.getOrigin()))
                             .forEach(bgpRouteVrfEntryHandler.getConsumerForCreatingRemoteFib(dpnId, vpnId,
                                 rd, remoteNextHopIp, vrfTable, TransactionAdapter.toWriteTransaction(tx), txnObjects));
@@ -1824,9 +1855,9 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                     }
                     LOG.trace("manageRemoteRouteOnDPN :: action {}, DpnId {}, vpnId {}, rd {}, destPfx {}",
                             action, localDpnId, vpnId, rd, destPrefix);
-                    List<RoutePaths> routePathList = vrfEntry.getRoutePaths();
+                    Map<RoutePathsKey, RoutePaths> keyRoutePathsMap = vrfEntry.getRoutePaths();
                     VrfEntry modVrfEntry;
-                    if (routePathList == null || routePathList.isEmpty()) {
+                    if (keyRoutePathsMap == null || keyRoutePathsMap.isEmpty()) {
                         modVrfEntry = FibHelper.getVrfEntryBuilder(vrfEntry, label,
                                 Collections.singletonList(destTepIp),
                                 RouteOrigin.value(vrfEntry.getOrigin()), null /* parentVpnRd */).build();
@@ -1848,7 +1879,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                         }
                         //Is this fib route an extra route? If yes, get the nexthop which would be
                         //an adjacency in the vpn
-                        Optional<Routes> extraRouteOptional = Optional.absent();
+                        Optional<Routes> extraRouteOptional = Optional.empty();
                         if (RouteOrigin.value(vrfEntry.getOrigin()) == RouteOrigin.STATIC && usedRds.size() != 0) {
                             extraRouteOptional = VpnExtraRouteHelper.getVpnExtraroutes(dataBroker,
                                     fibUtil.getVpnNameFromId(vpnInstance.getVpnId()),
@@ -1870,117 +1901,118 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
             () -> {
                 InstanceIdentifier<VrfTables> id = buildVrfId(rd);
                 final VpnInstanceOpDataEntry vpnInstance = fibUtil.getVpnInstance(rd);
-                List<SubTransaction> txnObjects =  new ArrayList<>();
+                List<SubTransaction> txnObjects = new ArrayList<>();
                 final Optional<VrfTables> vrfTable = MDSALUtil.read(dataBroker,
                         LogicalDatastoreType.CONFIGURATION, id);
                 List<ListenableFuture<Void>> futures = new ArrayList<>();
-                if (vrfTable.isPresent()) {
-                    final ReentrantLock lock = lockFor(vpnInstance);
-                    lock.lock();
-                    try {
-                        futures.add(retryingTxRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
-                            String vpnName = fibUtil.getVpnNameFromId(vpnInstance.getVpnId());
-                            for (final VrfEntry vrfEntry : vrfTable.get().nonnullVrfEntry()) {
-                                /* parentRd is only filled for external PNF cases where the interface on the external
-                                 * network VPN are used to cleanup the flows. For all other cases, use "rd" for
-                                 * #fibUtil.isInterfacePresentInDpn().
-                                * */
-                                String parentRd = vrfEntry.getParentVpnRd() != null ? vrfEntry.getParentVpnRd()
-                                        : rd;
-                                /* Handle subnet routes here */
-                                SubnetRoute subnetRoute = vrfEntry.augmentation(SubnetRoute.class);
-                                if (subnetRoute != null && !fibUtil
-                                        .isInterfacePresentInDpn(parentRd, dpnId)) {
-                                    LOG.trace("SUBNETROUTE: cleanUpDpnForVpn: Cleaning subnetroute {} on dpn {}"
-                                            + " for vpn {}", vrfEntry.getDestPrefix(), dpnId, rd);
-                                    baseVrfEntryHandler.makeConnectedRoute(dpnId, vpnId, vrfEntry, rd, null,
-                                            NwConstants.DEL_FLOW, TransactionAdapter.toWriteTransaction(tx), null);
-                                    List<RoutePaths> routePaths = vrfEntry.getRoutePaths();
-                                    if (routePaths != null) {
-                                        for (RoutePaths routePath : routePaths) {
-                                            makeLFibTableEntry(dpnId, routePath.getLabel(), null,
-                                                    DEFAULT_FIB_FLOW_PRIORITY,
-                                                    NwConstants.DEL_FLOW, tx);
-                                            LOG.trace("SUBNETROUTE: cleanUpDpnForVpn: Released subnetroute label {}"
-                                                    + " for rd {} prefix {}", routePath.getLabel(), rd,
-                                                    vrfEntry.getDestPrefix());
-                                        }
+                if (!vrfTable.isPresent()) {
+                    LOG.error("cleanUpDpnForVpn: VRF Table not available for RD {}", rd);
+                    if (callback != null) {
+                        ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
+                        Futures.addCallback(listenableFuture, callback, MoreExecutors.directExecutor());
+                    }
+                    return futures;
+                }
+                final ReentrantLock lock = lockFor(vpnInstance);
+                lock.lock();
+                try {
+                    futures.add(retryingTxRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
+                        String vpnName = fibUtil.getVpnNameFromId(vpnInstance.getVpnId());
+                        for (final VrfEntry vrfEntry : vrfTable.get().nonnullVrfEntry().values()) {
+                            /* parentRd is only filled for external PNF cases where the interface on the external
+                             * network VPN are used to cleanup the flows. For all other cases, use "rd" for
+                             * #fibUtil.isInterfacePresentInDpn().
+                             * */
+                            String parentRd = vrfEntry.getParentVpnRd() != null ? vrfEntry.getParentVpnRd()
+                                    : rd;
+                            /* Handle subnet routes here */
+                            SubnetRoute subnetRoute = vrfEntry.augmentation(SubnetRoute.class);
+                            if (subnetRoute != null && !fibUtil
+                                    .isInterfacePresentInDpn(parentRd, dpnId)) {
+                                LOG.trace("SUBNETROUTE: cleanUpDpnForVpn: Cleaning subnetroute {} on dpn {}"
+                                        + " for vpn {}", vrfEntry.getDestPrefix(), dpnId, rd);
+                                baseVrfEntryHandler.makeConnectedRoute(dpnId, vpnId, vrfEntry, rd, null,
+                                        NwConstants.DEL_FLOW, TransactionAdapter.toWriteTransaction(tx), null);
+                                Map<RoutePathsKey, RoutePaths> keyRoutePathsMap = vrfEntry.getRoutePaths();
+                                if (keyRoutePathsMap != null) {
+                                    for (RoutePaths routePath : keyRoutePathsMap.values()) {
+                                        makeLFibTableEntry(dpnId, routePath.getLabel(), null,
+                                                DEFAULT_FIB_FLOW_PRIORITY,
+                                                NwConstants.DEL_FLOW, tx);
+                                        LOG.trace("SUBNETROUTE: cleanUpDpnForVpn: Released subnetroute label {}"
+                                                        + " for rd {} prefix {}", routePath.getLabel(), rd,
+                                                vrfEntry.getDestPrefix());
                                     }
-                                    installSubnetBroadcastAddrDropRule(dpnId, rd, vpnId, vrfEntry,
-                                            NwConstants.DEL_FLOW, tx);
-                                    continue;
-                                }
-                                // ping responder for router interfaces
-                                RouterInterface routerInt = vrfEntry.augmentation(RouterInterface.class);
-                                if (routerInt != null) {
-                                    LOG.trace("Router augmented vrfentry found for rd:{}, uuid:{}, ip:{}, mac:{}",
-                                            rd, routerInt.getUuid(), routerInt.getIpAddress(),
-                                            routerInt.getMacAddress());
-                                    routerInterfaceVrfEntryHandler.installRouterFibEntry(vrfEntry, dpnId, vpnId,
-                                            routerInt.getIpAddress(), new MacAddress(routerInt.getMacAddress()),
-                                            NwConstants.DEL_FLOW);
-                                    continue;
                                 }
-
-                                //Handle local flow deletion for imports
-                                if (RouteOrigin.value(vrfEntry.getOrigin()) == RouteOrigin.SELF_IMPORTED) {
-                                    java.util.Optional<Uint32> optionalLabel = FibUtil.getLabelFromRoutePaths(vrfEntry);
-                                    if (optionalLabel.isPresent()) {
-                                        List<String> nextHopList = FibHelper.getNextHopListFromRoutePaths(vrfEntry);
-                                        LabelRouteInfo lri = getLabelRouteInfo(optionalLabel.get());
-                                        if (isPrefixAndNextHopPresentInLri(vrfEntry.getDestPrefix(), nextHopList,
-                                                lri) && Objects.equals(lri.getDpnId(), dpnId)) {
-                                            deleteLocalFibEntry(vpnId, rd, vrfEntry);
-                                        }
+                                installSubnetBroadcastAddrDropRule(dpnId, rd, vpnId, vrfEntry,
+                                        NwConstants.DEL_FLOW, tx);
+                                continue;
+                            }
+                            // ping responder for router interfaces
+                            RouterInterface routerInt = vrfEntry.augmentation(RouterInterface.class);
+                            if (routerInt != null) {
+                                LOG.trace("Router augmented vrfentry found for rd:{}, uuid:{}, ip:{}, mac:{}",
+                                        rd, routerInt.getUuid(), routerInt.getIpAddress(),
+                                        routerInt.getMacAddress());
+                                routerInterfaceVrfEntryHandler.installRouterFibEntry(vrfEntry, dpnId, vpnId,
+                                        routerInt.getIpAddress(), new MacAddress(routerInt.getMacAddress()),
+                                        NwConstants.DEL_FLOW);
+                                continue;
+                            }
+                            //Handle local flow deletion for imports
+                            if (RouteOrigin.value(vrfEntry.getOrigin()) == RouteOrigin.SELF_IMPORTED) {
+                                java.util.Optional<Uint32> optionalLabel = FibUtil.getLabelFromRoutePaths(vrfEntry);
+                                if (optionalLabel.isPresent()) {
+                                    List<String> nextHopList = FibHelper.getNextHopListFromRoutePaths(vrfEntry);
+                                    LabelRouteInfo lri = getLabelRouteInfo(optionalLabel.get());
+                                    if (isPrefixAndNextHopPresentInLri(vrfEntry.getDestPrefix(), nextHopList,
+                                            lri) && Objects.equals(lri.getDpnId(), dpnId)) {
+                                        deleteLocalFibEntry(vpnId, rd, vrfEntry);
                                     }
                                 }
-
-                                // Passing null as we don't know the dpn
-                                // to which prefix is attached at this point
-                                List<String> usedRds = VpnExtraRouteHelper.getUsedRds(dataBroker,
-                                        vpnInstance.getVpnId(), vrfEntry.getDestPrefix());
-                                Optional<Routes> extraRouteOptional;
-                                //Is this fib route an extra route? If yes, get the nexthop which would be
-                                //an adjacency in the vpn
-                                if (usedRds != null && !usedRds.isEmpty()) {
-                                    if (usedRds.size() > 1) {
-                                        LOG.error("The extra route prefix is still present in some DPNs");
-                                        return;
-                                    } else {
-                                        extraRouteOptional = VpnExtraRouteHelper.getVpnExtraroutes(dataBroker, vpnName,
-                                                usedRds.get(0), vrfEntry.getDestPrefix());
-
-                                    }
+                            }
+                            // Passing null as we don't know the dpn
+                            // to which prefix is attached at this point
+                            List<String> usedRds = VpnExtraRouteHelper.getUsedRds(dataBroker,
+                                    vpnInstance.getVpnId(), vrfEntry.getDestPrefix());
+                            Optional<Routes> extraRouteOptional;
+                            //Is this fib route an extra route? If yes, get the nexthop which would be
+                            //an adjacency in the vpn
+                            if (usedRds != null && !usedRds.isEmpty()) {
+                                if (usedRds.size() > 1) {
+                                    LOG.error("The extra route prefix is still present in some DPNs");
+                                    return;
                                 } else {
-                                    extraRouteOptional = Optional.absent();
+                                    extraRouteOptional = VpnExtraRouteHelper.getVpnExtraroutes(dataBroker, vpnName,
+                                            usedRds.get(0), vrfEntry.getDestPrefix());
+
                                 }
-                                if (RouteOrigin.BGP.getValue().equals(vrfEntry.getOrigin())) {
-                                    bgpRouteVrfEntryHandler.deleteRemoteRoute(null, dpnId, vpnId,
+                            } else {
+                                extraRouteOptional = Optional.empty();
+                            }
+                            if (RouteOrigin.BGP.getValue().equals(vrfEntry.getOrigin())) {
+                                bgpRouteVrfEntryHandler.deleteRemoteRoute(null, dpnId, vpnId,
                                         vrfTable.get().key(), vrfEntry, extraRouteOptional,
                                         TransactionAdapter.toWriteTransaction(tx), txnObjects);
-                                } else {
-                                    if (subnetRoute == null || !fibUtil
-                                            .isInterfacePresentInDpn(parentRd, dpnId)) {
-                                        baseVrfEntryHandler.deleteRemoteRoute(null, dpnId, vpnId,
+                            } else {
+                                if (subnetRoute == null || !fibUtil
+                                        .isInterfacePresentInDpn(parentRd, dpnId)) {
+                                    baseVrfEntryHandler.deleteRemoteRoute(null, dpnId, vpnId,
                                             vrfTable.get().key(), vrfEntry, extraRouteOptional,
                                             TransactionAdapter.toWriteTransaction(tx));
-                                    }
                                 }
                             }
-                        }));
-                    } finally {
-                        lock.unlock();
-                    }
-                    if (callback != null) {
-                        ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
-                        Futures.addCallback(listenableFuture, callback, MoreExecutors.directExecutor());
-                    }
-                } else {
-                    LOG.error("cleanUpDpnForVpn: No vrf table found for rd {} vpnId {} dpn {}", rd, vpnId, dpnId);
+                        }
+                    }));
+                } finally {
+                    lock.unlock();
+                }
+                if (callback != null) {
+                    ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
+                    Futures.addCallback(listenableFuture, callback, MoreExecutors.directExecutor());
                 }
                 return futures;
             });
-
     }
 
     public void cleanUpExternalRoutesOnDpn(final Uint64 dpnId, final Uint32 vpnId, final String rd,
@@ -1991,7 +2023,13 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         InstanceIdentifier<VrfTables> id = buildVrfId(rd);
         final VpnInstanceOpDataEntry vpnInstance = fibUtil.getVpnInstance(rd);
         List<SubTransaction> txnObjects =  new ArrayList<>();
-        final Optional<VrfTables> vrfTable = MDSALUtil.read(dataBroker, LogicalDatastoreType.CONFIGURATION, id);
+        final Optional<VrfTables> vrfTable;
+        try {
+            vrfTable = SingleTransactionDataBroker.syncReadOptional(dataBroker, LogicalDatastoreType.CONFIGURATION, id);
+        } catch (ExecutionException | InterruptedException e) {
+            LOG.error("getVrfEntry: Exception while reading VrfTable for the rd {} vpnId {}", rd, vpnId, e);
+            return;
+        }
         if (vrfTable.isPresent()) {
             jobCoordinator.enqueueJob(FibUtil.getJobKeyForVpnIdDpnId(vpnId, dpnId),
                 () -> {
@@ -2000,11 +2038,11 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
                     try {
                         return Collections.singletonList(
                             txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
-                                tx -> vrfTable.get().nonnullVrfEntry().stream()
+                                tx -> vrfTable.get().nonnullVrfEntry().values().stream()
                                     .filter(vrfEntry -> RouteOrigin.value(vrfEntry.getOrigin()) == RouteOrigin.BGP)
                                     .forEach(bgpRouteVrfEntryHandler.getConsumerForDeletingRemoteFib(dpnId, vpnId,
                                         remoteNextHopIp, vrfTable, TransactionAdapter.toWriteTransaction(tx),
-                                        txnObjects))));
+                                            txnObjects))));
                     } finally {
                         lock.unlock();
                     }
@@ -2033,7 +2071,14 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         InstanceIdentifier<VrfEntry> vrfEntryId = InstanceIdentifier.builder(FibEntries.class)
             .child(VrfTables.class, new VrfTablesKey(rd))
             .child(VrfEntry.class, new VrfEntryKey(ipPrefix)).build();
-        Optional<VrfEntry> vrfEntry = MDSALUtil.read(broker, LogicalDatastoreType.CONFIGURATION, vrfEntryId);
+        Optional<VrfEntry> vrfEntry;
+        try {
+            vrfEntry = SingleTransactionDataBroker.syncReadOptional(broker, LogicalDatastoreType.CONFIGURATION,
+                    vrfEntryId);
+        } catch (ExecutionException | InterruptedException e) {
+            LOG.error("getVrfEntry: Exception while reading VrfEntry for the prefix {} rd {}", ipPrefix, rd, e);
+            return null;
+        }
         if (vrfEntry.isPresent()) {
             return vrfEntry.get();
         }
@@ -2078,7 +2123,7 @@ public class VrfEntryListener extends AsyncDataTreeChangeListenerBase<VrfEntry,
         optLabel.ifPresent(label -> {
             LOG.trace("Removing flow in FIB table for interVpnLink {}", interVpnLinkName);
 
-            ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
+            LoggingFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
                 for (Uint64 dpId : targetDpns) {
                     LOG.debug("Removing flow: VrfEntry=[prefix={} label={}] dpn {} for InterVpnLink {} in LFIB",
                             vrfEntry.getDestPrefix(), label, dpId, interVpnLinkName);