BUG 6363: Further performance improvements for VPN Engine 06/43306/4
authorSuraj Ranjan <suraj.ranjan@ericsson.com>
Mon, 8 Aug 2016 11:21:12 +0000 (16:51 +0530)
committerSam Hague <shague@redhat.com>
Tue, 9 Aug 2016 18:43:42 +0000 (18:43 +0000)
This fix removes waits in some critical places for
MDSAL operation to complete:
a. InterfaceStateChangeListener
b. VpnInterfaceManager

Change-Id: Ife821982f458a16f02e6bcf28cab960ad0d94d51
Signed-off-by: Suraj Ranjan <suraj.ranjan@ericsson.com>
vpnservice/fibmanager/fibmanager-api/src/main/java/org/opendaylight/netvirt/fibmanager/api/IFibManager.java
vpnservice/fibmanager/fibmanager-impl/src/main/java/org/opendaylight/netvirt/fibmanager/FibManagerImpl.java
vpnservice/fibmanager/fibmanager-impl/src/main/java/org/opendaylight/netvirt/fibmanager/FibRpcServiceImpl.java
vpnservice/fibmanager/fibmanager-impl/src/main/java/org/opendaylight/netvirt/fibmanager/VrfEntryListener.java
vpnservice/vpnmanager/vpnmanager-impl/src/main/java/org/opendaylight/netvirt/vpnmanager/InterfaceStateChangeListener.java
vpnservice/vpnmanager/vpnmanager-impl/src/main/java/org/opendaylight/netvirt/vpnmanager/TunnelInterfaceStateListener.java
vpnservice/vpnmanager/vpnmanager-impl/src/main/java/org/opendaylight/netvirt/vpnmanager/VpnInstanceListener.java
vpnservice/vpnmanager/vpnmanager-impl/src/main/java/org/opendaylight/netvirt/vpnmanager/VpnInterfaceManager.java

index 6f462826d319bcea0fbd4c632559c4c8452f75ac..457b26ebae2a45ee39f98e79d2927d50591bbd72 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.netvirt.fibmanager.api;
 
+import com.google.common.util.concurrent.FutureCallback;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 
@@ -15,12 +16,15 @@ import java.math.BigInteger;
 import java.util.List;
 
 public interface IFibManager {
-    void populateFibOnNewDpn(BigInteger dpnId, long vpnId, String rd);
+    void populateFibOnNewDpn(BigInteger dpnId, long vpnId, String rd,
+                             final FutureCallback<List<Void>> callback);
     void cleanUpDpnForVpn(BigInteger dpnId, long vpnId, String rd,
-                          String localNextHopIp, String remoteNextHopIp);
+                          String localNextHopIp, String remoteNextHopIp,
+                          final FutureCallback<List<Void>> callback);
     void populateFibOnDpn(BigInteger localDpnId, long vpnId, String rd,
                           String localNextHopIp, String remoteNextHopIp);
-    void cleanUpDpnForVpn(BigInteger dpnId, long vpnId, String rd);
+    void cleanUpDpnForVpn(BigInteger dpnId, long vpnId, String rd,
+                          final FutureCallback<List<Void>> callback);
     List<String> printFibEntries();
 
     // TODO Feels like this method is not used anywhere
index edf06a9ed35e42ad33f2c80db8d941b2caaba868..65427bfb0084907eed3a928e7560f7e44832c5c5 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.netvirt.fibmanager;
 
+import com.google.common.util.concurrent.FutureCallback;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import java.math.BigInteger;
 import java.util.List;
@@ -44,14 +45,15 @@ public class FibManagerImpl implements IFibManager {
     }
 
     @Override
-    public void populateFibOnNewDpn(BigInteger dpnId, long vpnId, String rd) {
-        vrfEntryListener.populateFibOnNewDpn(dpnId, vpnId, rd);
+    public void populateFibOnNewDpn(BigInteger dpnId, long vpnId, String rd,
+                                    FutureCallback<List<Void>> callback) {
+        vrfEntryListener.populateFibOnNewDpn(dpnId, vpnId, rd, callback);
     }
 
     @Override
     public void cleanUpDpnForVpn(BigInteger dpnId, long vpnId, String rd,
-                                 String localNextHopIp, String remoteNextHopIp) {
-        vrfEntryListener.cleanUpDpnForVpn(dpnId, vpnId, rd, localNextHopIp, remoteNextHopIp);
+                                 String localNextHopIp, String remoteNextHopIp, FutureCallback<List<Void>> callback) {
+        vrfEntryListener.cleanUpDpnForVpn(dpnId, vpnId, rd, localNextHopIp, remoteNextHopIp, callback);
     }
 
     @Override
@@ -61,8 +63,9 @@ public class FibManagerImpl implements IFibManager {
     }
 
     @Override
-    public void cleanUpDpnForVpn(BigInteger dpnId, long vpnId, String rd) {
-        vrfEntryListener.cleanUpDpnForVpn(dpnId, vpnId, rd);
+    public void cleanUpDpnForVpn(BigInteger dpnId, long vpnId, String rd,
+                                 FutureCallback<List<Void>> callback) {
+        vrfEntryListener.cleanUpDpnForVpn(dpnId, vpnId, rd, callback);
     }
 
     @Override
index 8795046580cf4ca8a8db75702fdbd0e8ca7113be..fb37c468cdd182c52ed142870b985ded2a6d3248 100644 (file)
@@ -119,13 +119,13 @@ public class FibRpcServiceImpl implements FibRpcService {
 
     @Override
     public Future<RpcResult<Void>> populateFibOnDpn(PopulateFibOnDpnInput input) {
-        fibManager.populateFibOnNewDpn(input.getDpid(), input.getVpnId(), input.getRd());
+        fibManager.populateFibOnNewDpn(input.getDpid(), input.getVpnId(), input.getRd(), null);
         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
     }
 
     @Override
     public Future<RpcResult<Void>> cleanupDpnForVpn(CleanupDpnForVpnInput input) {
-        fibManager.cleanUpDpnForVpn(input.getDpid(), input.getVpnId(), input.getRd());
+        fibManager.cleanUpDpnForVpn(input.getDpid(), input.getVpnId(), input.getRd(), null);
         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
     }
 
@@ -314,7 +314,7 @@ public class FibRpcServiceImpl implements FibRpcService {
                 MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.OPERATIONAL, id,
                         vpnToDpnList.setIpAddresses(ipAddresses).build());
                 LOG.debug("populate FIB on new dpn {} for VPN {}", dpnId, vpnName);
-                fibManager.populateFibOnNewDpn(dpnId, vpnId, rd);
+                fibManager.populateFibOnNewDpn(dpnId, vpnId, rd, null);
             }
         }
     }
@@ -340,7 +340,7 @@ public class FibRpcServiceImpl implements FibRpcService {
                             //Clean up the dpn
                             LOG.debug("Cleaning up dpn {} from VPN {}", dpnId, vpnName);
                             MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.OPERATIONAL, id);
-                            fibManager.cleanUpDpnForVpn(dpnId, vpnId, rd);
+                            fibManager.cleanUpDpnForVpn(dpnId, vpnId, rd, null);
                         }
                     } else {
                         delete(dataBroker, LogicalDatastoreType.OPERATIONAL, id.child(
index a1911e80b8fa23769a34ba444bf74cd5741ee883..71df57b484941631aa2303668cdf3ec8fe52d3f8 100644 (file)
@@ -10,6 +10,8 @@ package org.opendaylight.netvirt.fibmanager;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.math.BigInteger;
 import java.net.InetAddress;
@@ -1140,7 +1142,7 @@ public class VrfEntryListener extends AbstractDataChangeListener<VrfEntry> imple
                 if (vpnInterfaces.isEmpty()) {
                     LOG.trace("Last vpn interface {} on dpn {} for vpn {}. Clean up fib in dpn", intfName, dpnId, rd);
                     FibUtil.delete(dataBroker, LogicalDatastoreType.OPERATIONAL, id);
-                    cleanUpDpnForVpn(dpnId, vpnId, rd);
+                    cleanUpDpnForVpn(dpnId, vpnId, rd, null);
                 } else {
                     LOG.trace("Delete vpn interface {} from dpn {} to vpn {} list.", intfName, dpnId, rd);
                     FibUtil.delete(dataBroker, LogicalDatastoreType.OPERATIONAL, id.child(
@@ -1635,18 +1637,20 @@ public class VrfEntryListener extends AbstractDataChangeListener<VrfEntry> imple
         }
     }
 
-    public void populateFibOnNewDpn(final BigInteger dpnId, final long vpnId, final String rd) {
+    public void populateFibOnNewDpn(final BigInteger dpnId, final long vpnId, final String rd,
+                                    final FutureCallback<List<Void>> callback) {
         LOG.trace("New dpn {} for vpn {} : populateFibOnNewDpn", dpnId, rd);
         InstanceIdentifier<VrfTables> id = buildVrfId(rd);
         final VpnInstanceOpDataEntry vpnInstance = getVpnInstance(rd);
-        synchronized (vpnInstance.getVpnInstanceName().intern()) {
-            final Optional<VrfTables> vrfTable = FibUtil.read(dataBroker, LogicalDatastoreType.CONFIGURATION, id);
-            if (vrfTable.isPresent()) {
-                DataStoreJobCoordinator dataStoreCoordinator = DataStoreJobCoordinator.getInstance();
-                dataStoreCoordinator.enqueueJob("FIB-" + vpnId + "-" + dpnId.toString(),
-                        new Callable<List<ListenableFuture<Void>>>() {
-                            @Override
-                            public List<ListenableFuture<Void>> call() throws Exception {
+        final Optional<VrfTables> vrfTable = FibUtil.read(dataBroker, LogicalDatastoreType.CONFIGURATION, id);
+        if (vrfTable.isPresent()) {
+            DataStoreJobCoordinator dataStoreCoordinator = DataStoreJobCoordinator.getInstance();
+            dataStoreCoordinator.enqueueJob("FIB-" + vpnId + "-" +  dpnId.toString(),
+                    new Callable<List<ListenableFuture<Void>>>() {
+                        @Override
+                        public List<ListenableFuture<Void>> call() throws Exception {
+                            List<ListenableFuture<Void>> futures = new ArrayList<>();
+                            synchronized (vpnInstance.getVpnInstanceName().intern()) {
                                 WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
                                 for (final VrfEntry vrfEntry : vrfTable.get().getVrfEntry()) {
 
@@ -1673,12 +1677,15 @@ public class VrfEntryListener extends AbstractDataChangeListener<VrfEntry> imple
                                     createRemoteFibEntry(dpnId, vpnId, vrfTable.get().getKey(), vrfEntry, tx);
                                 }
                                 //TODO: if we have 100K entries in FIB, can it fit in one Tranasaction (?)
-                                List<ListenableFuture<Void>> futures = new ArrayList<>();
                                 futures.add(tx.submit());
-                                return futures;
+                                if (callback != null) {
+                                    ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
+                                    Futures.addCallback(listenableFuture, callback);
+                                }
                             }
-                        });
-            }
+                            return futures;
+                        }
+                    });
         }
     }
 
@@ -1688,7 +1695,6 @@ public class VrfEntryListener extends AbstractDataChangeListener<VrfEntry> imple
                 dpnId, vpnId, rd, localNextHopIp, remoteNextHopIp);
         InstanceIdentifier<VrfTables> id = buildVrfId(rd);
         final VpnInstanceOpDataEntry vpnInstance = getVpnInstance(rd);
-        synchronized (vpnInstance.getVpnInstanceName().intern()) {
             final Optional<VrfTables> vrfTable = FibUtil.read(dataBroker, LogicalDatastoreType.CONFIGURATION, id);
             if (vrfTable.isPresent()) {
                 DataStoreJobCoordinator dataStoreCoordinator = DataStoreJobCoordinator.getInstance();
@@ -1696,73 +1702,84 @@ public class VrfEntryListener extends AbstractDataChangeListener<VrfEntry> imple
                         new Callable<List<ListenableFuture<Void>>>() {
                             @Override
                             public List<ListenableFuture<Void>> call() throws Exception {
-                                WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
                                 List<ListenableFuture<Void>> futures = new ArrayList<>();
-                                LOG.trace("populate FIB starts on Dpn " + dpnId
-                                        + "rd  " + rd.toString()
-                                        + "localNextHopIp " + localNextHopIp
-                                        + "remoteNextHopIp" + remoteNextHopIp
-                                        + "vpnId " + vpnId );
-
-                                for (VrfEntry vrfEntry : vrfTable.get().getVrfEntry()) {
-                                    LOG.trace("old vrfEntry before populate:: {}", vrfEntry);
-
-                                    if (vrfEntry.getOrigin().equals(RouteOrigin.BGP.getValue())) {
-                                        if (remoteNextHopIp.trim().equals(vrfEntry.getNextHopAddressList()
-                                                .get(0).trim())) {
-                                            LOG.trace(" creating remote FIB entry for vfEntry {}", vrfEntry);
-                                            createRemoteFibEntry(dpnId, vpnId, vrfTable.get().getKey(),
-                                                    vrfEntry, writeTransaction);
-                                        }
-                                    }
-
-                                    if ((vrfEntry.getOrigin().equals(RouteOrigin.CONNECTED.getValue())) ||
-                                            (vrfEntry.getOrigin().equals(RouteOrigin.STATIC.getValue()))) {
-                                        String destPfx = vrfEntry.getDestPrefix();
-                                        BigInteger dpnIdForPrefix = nextHopManager.getDpnForPrefix(vpnId, destPfx);
-                                        if (dpnIdForPrefix == null) {
-                                            LOG.trace("Populate::the dpnIdForPrefix is null for prefix {}.",
-                                                    vrfEntry.getDestPrefix());
-                                            continue;
-                                        }
-                                        int sameDpnId = dpnIdForPrefix.compareTo(dpnId);
-                                        if (sameDpnId != 0) {
-                                            LOG.trace("Populate::Different srcDpnId {} and dpnIdForPrefix {}"
-                                                    + "for prefix {}",
-                                                    dpnId, dpnIdForPrefix, vrfEntry.getDestPrefix());
-                                            continue;
+                                synchronized (vpnInstance.getVpnInstanceName().intern()) {
+                                    WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
+                                    LOG.trace("populate FIB starts on Dpn " + dpnId
+                                            + "rd  " + rd.toString()
+                                            + "localNextHopIp " + localNextHopIp
+                                            + "remoteNextHopIp" + remoteNextHopIp
+                                            + "vpnId " + vpnId );
+                                    for (VrfEntry vrfEntry : vrfTable.get().getVrfEntry()) {
+                                        LOG.trace("old vrfEntry before populate:: {}", vrfEntry);
+
+                                        if (vrfEntry.getOrigin().equals(RouteOrigin.BGP.getValue())) {
+                                            if (remoteNextHopIp.trim().equals(vrfEntry.getNextHopAddressList().get(0).trim())) {
+                                                LOG.trace(" creating remote FIB entry for vfEntry {}", vrfEntry);
+                                                createRemoteFibEntry(dpnId, vpnId, vrfTable.get().getKey(), vrfEntry, writeTransaction);
+                                            }
+                                        } else if (vrfEntry.getOrigin().equals(RouteOrigin.STATIC.getValue())) {
+                                            BigInteger dpnIdForPrefix = null;
+                                            String destPfx = vrfEntry.getDestPrefix();
+                                            if (vrfEntry.getAugmentation(SubnetRoute.class) == null) {
+                                                Optional<Extraroute> extraRouteInfo =
+                                                        FibUtil.read(dataBroker, LogicalDatastoreType.OPERATIONAL,
+                                                                getVpnToExtrarouteIdentifier(rd, vrfEntry.getDestPrefix()));
+                                                if (extraRouteInfo.isPresent()) {
+                                                    continue;
+                                                }
+                                                dpnIdForPrefix = nextHopManager.getDpnForPrefix(vpnId, destPfx);
+                                            } else {
+                                                // Subnet Route handling
+                                                Optional<Prefixes> localNextHopInfoData =
+                                                        FibUtil.read(dataBroker, LogicalDatastoreType.OPERATIONAL,
+                                                                FibUtil.getPrefixToInterfaceIdentifier(vpnId, destPfx));
+                                                if (localNextHopInfoData.isPresent()) {
+                                                    Prefixes prefixes = localNextHopInfoData.get();
+                                                    dpnIdForPrefix = prefixes.getDpnId();
+                                                }
+                                            }
+                                            if (dpnIdForPrefix == null) {
+                                                LOG.trace("Populate::the dpnIdForPrefix is null for prefix {}.",
+                                                        vrfEntry.getDestPrefix());
+                                                continue;
+                                            }
+                                            int sameDpnId = dpnIdForPrefix.compareTo(dpnId);
+                                            if (sameDpnId != 0) {
+                                                LOG.trace("Populate::Different srcDpnId {} and dpnIdForPrefix {} for prefix {}",
+                                                        dpnId, dpnIdForPrefix, vrfEntry.getDestPrefix());
+                                                continue;
+                                            }
+                                            InstanceIdentifier<VrfEntry> vrfEntryId = getVrfEntryId(rd, vrfEntry.getDestPrefix());
+                                            List<String> newNextHopAddrList = vrfEntry.getNextHopAddressList();
+                                            newNextHopAddrList.add(localNextHopIp);
+                                            VrfEntry newVrfEntry =
+                                                    new VrfEntryBuilder(vrfEntry).setNextHopAddressList(newNextHopAddrList).build();
+                                            // Just update the VrfEntry
+                                            FibUtil.syncUpdate(dataBroker, LogicalDatastoreType.CONFIGURATION,
+                                                    vrfEntryId, newVrfEntry);
+                                            // writeTransaction.put(LogicalDatastoreType.CONFIGURATION,
+                                            //       vrfEntryId, newVrfEntry);
+                                            vrfEntry = getVrfEntry(dataBroker, rd, destPfx);
+                                            LOG.trace("updated vrfEntry after populate:: {}", vrfEntry);
                                         }
-
-                                        // Passing null as we don't know the dpn
-                                        // to which prefix is attached at this point
-                                        InstanceIdentifier<VrfEntry> vrfEntryId =
-                                                getVrfEntryId(rd, vrfEntry.getDestPrefix());
-
-                                        vrfEntry.getNextHopAddressList().add(localNextHopIp);
-                                        VrfEntry newVrfEntry = new VrfEntryBuilder(vrfEntry)
-                                                .setNextHopAddressList(vrfEntry.getNextHopAddressList()).build();
-                                        // Just update the VrfEntry
-                                        FibUtil.syncUpdate(dataBroker, LogicalDatastoreType.CONFIGURATION,
-                                                vrfEntryId, newVrfEntry);
-                                        vrfEntry = getVrfEntry(dataBroker, rd, destPfx);
-                                        LOG.trace("updated vrfEntry after populate:: {}", vrfEntry);
                                     }
+                                    futures.add(writeTransaction.submit());
+                                    LOG.trace("populate FIB ends on Dpn " + dpnId
+                                            + "rd  " + rd.toString()
+                                            + "localNextHopIp " + localNextHopIp
+                                            + "remoteNextHopIp" + remoteNextHopIp
+                                            + "vpnId " + vpnId);
                                 }
-                                futures.add(writeTransaction.submit());
-                                LOG.trace("populate FIB ends on Dpn " + dpnId
-                                        + "rd  " + rd.toString()
-                                        + "localNextHopIp " + localNextHopIp
-                                        + "remoteNextHopIp" + remoteNextHopIp
-                                        + "vpnId " + vpnId );
                                 return futures;
                             }
                         });
             }
-        }
     }
 
-    public void handleRemoteRoute(final boolean action, final BigInteger localDpnId, final BigInteger remoteDpnId, final long vpnId, final String  rd, final String destPrefix , final String localNextHopIP,
-                                  final String remoteNextHopIp) {
+    public void handleRemoteRoute(final boolean action, final BigInteger localDpnId, final BigInteger remoteDpnId,
+                                  final long vpnId, final String  rd, final String destPrefix ,
+                                  final String localNextHopIP, final String remoteNextHopIp) {
 
         DataStoreJobCoordinator dataStoreCoordinator = DataStoreJobCoordinator.getInstance();
         dataStoreCoordinator.enqueueJob(  "FIB" + rd.toString()
@@ -1798,83 +1815,83 @@ public class VrfEntryListener extends AbstractDataChangeListener<VrfEntry> imple
                 });
     }
 
-    public void cleanUpDpnForVpn(final BigInteger dpnId, final long vpnId, final String rd) {
+    public void cleanUpDpnForVpn(final BigInteger dpnId, final long vpnId, final String rd,
+                                 final FutureCallback<List<Void>> callback) {
         LOG.trace("Remove dpn {} for vpn {} : cleanUpDpnForVpn", dpnId, rd);
         InstanceIdentifier<VrfTables> id = buildVrfId(rd);
         final VpnInstanceOpDataEntry vpnInstance = getVpnInstance(rd);
-        synchronized (vpnInstance.getVpnInstanceName().intern()) {
             final Optional<VrfTables> vrfTable = FibUtil.read(dataBroker, LogicalDatastoreType.CONFIGURATION, id);
             if (vrfTable.isPresent()) {
                 DataStoreJobCoordinator dataStoreCoordinator = DataStoreJobCoordinator.getInstance();
                 dataStoreCoordinator.enqueueJob("FIB-" + vpnId + "-" + dpnId.toString(),
                         new Callable<List<ListenableFuture<Void>>>() {
-                            WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
                             @Override
                             public List<ListenableFuture<Void>> call() throws Exception {
-                                for (final VrfEntry vrfEntry : vrfTable.get().getVrfEntry()) {
-                                    // Handle subnet routes here
-                                    SubnetRoute subnetRoute = vrfEntry.getAugmentation(SubnetRoute.class);
-                                    if (subnetRoute != null) {
-                                        LOG.trace("Cleaning subnetroute {} on dpn {} for vpn {} : cleanUpDpnForVpn",
-                                                vrfEntry.getDestPrefix(), dpnId, rd);
-                                        makeConnectedRoute(dpnId, vpnId, vrfEntry, rd, null,
-                                                NwConstants.DEL_FLOW, tx);
-                                        makeLFibTableEntry(dpnId, vrfEntry.getLabel(), null,
-                                                DEFAULT_FIB_FLOW_PRIORITY, NwConstants.DEL_FLOW, tx);
-                                        LOG.trace("cleanUpDpnForVpn: Released subnetroute label {} for rd {} "
-                                                + "prefix {}",
-                                                vrfEntry.getLabel(), rd, vrfEntry.getDestPrefix());
-                                        continue;
+                                List<ListenableFuture<Void>> futures = new ArrayList<>();
+                                synchronized (vpnInstance.getVpnInstanceName().intern()) {
+                                    WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
+                                    for (final VrfEntry vrfEntry : vrfTable.get().getVrfEntry()) {
+                                      /* Handle subnet routes here */
+                                        SubnetRoute subnetRoute = vrfEntry.getAugmentation(SubnetRoute.class);
+                                        if (subnetRoute != null) {
+                                            LOG.trace("Cleaning subnetroute {} on dpn {} for vpn {} : cleanUpDpnForVpn", vrfEntry.getDestPrefix(),
+                                                    dpnId, rd);
+                                            makeConnectedRoute(dpnId, vpnId, vrfEntry, rd, null, NwConstants.DEL_FLOW, tx);
+                                            makeLFibTableEntry(dpnId, vrfEntry.getLabel(), null, DEFAULT_FIB_FLOW_PRIORITY, NwConstants.DEL_FLOW, tx);
+                                            LOG.trace("cleanUpDpnForVpn: Released subnetroute label {} for rd {} prefix {}", vrfEntry.getLabel(), rd,
+                                                    vrfEntry.getDestPrefix());
+                                            continue;
+                                        }
+                                        // Passing null as we don't know the dpn
+                                        // to which prefix is attached at this point
+                                        deleteRemoteRoute(null, dpnId, vpnId, vrfTable.get().getKey(), vrfEntry, tx);
+                                    }
+                                    futures.add(tx.submit());
+                                    if (callback != null) {
+                                        ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
+                                        Futures.addCallback(listenableFuture, callback);
                                     }
-                                    // Passing null as we don't know the dpn
-                                    // to which prefix is attached at this point
-                                    deleteRemoteRoute(null, dpnId, vpnId, vrfTable.get().getKey(), vrfEntry, tx);
                                 }
-                                List<ListenableFuture<Void>> futures = new ArrayList<>();
-                                futures.add(tx.submit());
                                 return futures;
                             }
 
                         });
-            }
 
         }
     }
 
     public void cleanUpDpnForVpn(final BigInteger dpnId, final long vpnId, final String rd,
-                                 final String localNextHopIp, final String remoteNextHopIp) {
+                                 final String localNextHopIp, final String remoteNextHopIp,
+                                 final FutureCallback<List<Void>> callback) {
         LOG.trace(  " cleanup remote routes on dpn {} for vpn {}, rd {}, " +
                         " localNexthopIp {} , remoteNexhtHopIp {} : cleanUpDpnForVpn",
                 dpnId, vpnId, rd, localNextHopIp, remoteNextHopIp);
         InstanceIdentifier<VrfTables> id = buildVrfId(rd);
         final VpnInstanceOpDataEntry vpnInstance = getVpnInstance(rd);
-        synchronized (vpnInstance.getVpnInstanceName().intern()) {
-            final Optional<VrfTables> vrfTable = FibUtil.read(dataBroker, LogicalDatastoreType.CONFIGURATION, id);
-            if (vrfTable.isPresent()) {
-                DataStoreJobCoordinator dataStoreCoordinator = DataStoreJobCoordinator.getInstance();
-                dataStoreCoordinator.enqueueJob(" FIB-" + vpnId + "-" + dpnId.toString(),
-                        new Callable<List<ListenableFuture<Void>>>() {
-                            @Override
-                            public List<ListenableFuture<Void>> call() throws Exception {
+        final Optional<VrfTables> vrfTable = FibUtil.read(dataBroker, LogicalDatastoreType.CONFIGURATION, id);
+        if (vrfTable.isPresent()) {
+            DataStoreJobCoordinator dataStoreCoordinator = DataStoreJobCoordinator.getInstance();
+            dataStoreCoordinator.enqueueJob(" FIB-" + vpnId + "-" + dpnId.toString(),
+                    new Callable<List<ListenableFuture<Void>>>() {
+                        @Override
+                        public List<ListenableFuture<Void>> call() throws Exception {
+                            List<ListenableFuture<Void>> futures = new ArrayList<>();
+                            synchronized (vpnInstance.getVpnInstanceName().intern()) {
                                 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
-                                List<ListenableFuture<Void>> futures = new ArrayList<>();
                                 LOG.trace("cleanup FIB starts on Dpn " + dpnId
                                         + "rd  " + rd.toString()
                                         + "localNextHopIp " + localNextHopIp
                                         + "remoteNextHopIp" + remoteNextHopIp
-                                        + "vpnId " + vpnId );
+                                        + "vpnId " + vpnId);
 
                                 for (VrfEntry vrfEntry : vrfTable.get().getVrfEntry()) {
                                     LOG.trace("old vrfEntry before cleanup:: {}", vrfEntry);
-                                    if (remoteNextHopIp.trim().equals(vrfEntry.getNextHopAddressList().get(0)
-                                            .trim())) {
+                                    if (remoteNextHopIp.trim().equals(vrfEntry.getNextHopAddressList().get(0).trim())) {
                                         LOG.trace(" deleting remote FIB entry {}", vrfEntry);
-                                        deleteRemoteRoute(null, dpnId, vpnId, vrfTable.get().getKey(),
-                                                vrfEntry, writeTransaction);
+                                        deleteRemoteRoute(null, dpnId, vpnId, vrfTable.get().getKey(), vrfEntry, writeTransaction);
                                     }
 
-                                    if (localNextHopIp.trim().equals(vrfEntry.getNextHopAddressList().get(0)
-                                            .trim())) {
+                                    if (localNextHopIp.trim().equals(vrfEntry.getNextHopAddressList().get(0).trim())) {
                                         LOG.trace("changing the nexthopip for local VM routes {} on dpn {}",
                                                 vrfEntry.getDestPrefix(), dpnId);
                                         String destPfx = vrfEntry.getDestPrefix();
@@ -1890,16 +1907,20 @@ public class VrfEntryListener extends AbstractDataChangeListener<VrfEntry> imple
                                     }
                                 }
                                 futures.add(writeTransaction.submit());
+                                if (callback != null) {
+                                    ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
+                                    Futures.addCallback(listenableFuture, callback);
+                                }
                                 LOG.trace("cleanup FIB ends on Dpn " + dpnId
                                         + "rd  " + rd.toString()
                                         + "localNextHopIp " + localNextHopIp
                                         + "remoteNextHopIp" + remoteNextHopIp
-                                        + "vpnId " + vpnId );
-                                return futures;
+                                        + "vpnId " + vpnId);
                             }
-                        });
+                            return futures;
+                        }
+                    });
 
-            }
         }
     }
 
index 820cffd24a2f0980fb5d0af3377aadcb257ad71c..9fbd2baaec057b7826b924ae7c0f3b53cb386911 100644 (file)
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -72,6 +73,7 @@ public class InterfaceStateChangeListener extends AbstractDataChangeListener<Int
     @Override
     protected void add(InstanceIdentifier<Interface> identifier, Interface intrf) {
         LOG.trace("Received interface {} add event", intrf);
+        LOG.info("Received interface {} add event", intrf.getName());
         try {
             final String interfaceName = intrf.getName();
             LOG.info("Received interface add event for interface {} ", interfaceName);
@@ -106,33 +108,11 @@ public class InterfaceStateChangeListener extends AbstractDataChangeListener<Int
                                         } else {
                                             LOG.info("Unable to process add for interface {} for NAT service", interfaceName);
                                         }
-                                        CheckedFuture<Void, TransactionCommitFailedException> futures = writeOperTxn.submit();
-                                        try {
-                                            futures.get();
-                                        } catch (InterruptedException | ExecutionException e) {
-                                            LOG.error("Error adding Oper data for interface {} to vpn {} on dpn {}", interfaceName,
-                                                    vpnInterface.getVpnInstanceName(), dpnId);
-                                            throw new RuntimeException(e.getMessage());
-                                        }
-                                        futures = writeConfigTxn.submit();
-                                        try {
-                                            futures.get();
-                                        } catch (InterruptedException | ExecutionException e) {
-                                            LOG.error("Error adding Config data for interface {} to vpn {} on dpn {}", interfaceName,
-                                                    vpnInterface.getVpnInstanceName(), dpnId);
-                                            throw new RuntimeException(e.getMessage());
-                                        }
-                                        futures = writeInvTxn.submit();
-                                        try {
-                                            futures.get();
-                                        } catch (InterruptedException | ExecutionException e) {
-                                            LOG.error("Error adding inventory/flow data for interface {} to vpn {} on dpn {}", interfaceName,
-                                                    vpnInterface.getVpnInstanceName(), dpnId);
-                                            throw new RuntimeException(e.getMessage());
-                                        }
-                                        LOG.warn("InterfaceStateChangeListner returning null while adding the " +
-                                                "interface {}", interfaceName);
-                                        return null;
+                                        List<ListenableFuture<Void>> futures = new ArrayList<ListenableFuture<Void>>();
+                                        futures.add(writeOperTxn.submit());
+                                        futures.add(writeConfigTxn.submit());
+                                        futures.add(writeInvTxn.submit());
+                                        return futures;
                                     }
                                 });
                     }
@@ -149,6 +129,7 @@ public class InterfaceStateChangeListener extends AbstractDataChangeListener<Int
     @Override
     protected void remove(InstanceIdentifier<Interface> identifier, Interface intrf) {
         LOG.trace("Received interface {} down event", intrf);
+        LOG.info("Received interface {} remove event", intrf.getName());
         try {
             final String interfaceName = intrf.getName();
             LOG.info("Received port DOWN event for interface {} ", interfaceName);
@@ -187,31 +168,11 @@ public class InterfaceStateChangeListener extends AbstractDataChangeListener<Int
                                 if (routerInterface != null) {
                                     handleRouterInterfacesDownEvent(routerInterface.getRouterName(), interfaceName, dpnId, writeOperTxn);
                                 }
-                                CheckedFuture<Void, TransactionCommitFailedException> futures = writeOperTxn.submit();
-                                try {
-                                    futures.get();
-                                } catch (InterruptedException | ExecutionException e) {
-                                    LOG.error("Error removing Oper data for interface {} from vpn {} on dpn {}", interfaceName,
-                                            vpnInterface.getVpnInstanceName(), dpnId);
-                                    throw new RuntimeException(e.getMessage());
-                                }
-                                futures = writeConfigTxn.submit();
-                                try {
-                                    futures.get();
-                                } catch (InterruptedException | ExecutionException e) {
-                                    LOG.error("Error removing Config data for interface {} from vpn {} on dpn {}", interfaceName,
-                                            vpnInterface.getVpnInstanceName(), dpnId);
-                                    throw new RuntimeException(e.getMessage());
-                                }
-                                futures = writeInvTxn.submit();
-                                try {
-                                    futures.get();
-                                } catch (InterruptedException | ExecutionException e) {
-                                    LOG.error("Error removing Inventory/Flow data for interface {} from vpn {} on dpn {}", interfaceName,
-                                            vpnInterface.getVpnInstanceName(), dpnId);
-                                    throw new RuntimeException(e.getMessage());
-                                }
-                                return null;
+                                List<ListenableFuture<Void>> futures = new ArrayList<ListenableFuture<Void>>();
+                                futures.add(writeOperTxn.submit());
+                                futures.add(writeConfigTxn.submit());
+                                futures.add(writeInvTxn.submit());
+                                return futures;
                             }
                         });
             }
@@ -247,31 +208,11 @@ public class InterfaceStateChangeListener extends AbstractDataChangeListener<Int
                                         WriteTransaction writeInvTxn = dataBroker.newWriteOnlyTransaction();
                                         vpnInterfaceManager.processVpnInterfaceUp(dpnId, vpnInterface, ifIndex, 
                                                 true, writeConfigTxn, writeOperTxn, writeInvTxn);
-                                        CheckedFuture<Void, TransactionCommitFailedException> futures = writeOperTxn.submit();
-                                        try {
-                                            futures.get();
-                                        } catch (InterruptedException | ExecutionException e) {
-                                            LOG.error("Error updating oper data for interface {} in vpn {} on dpn {}", interfaceName,
-                                                    vpnInterface.getVpnInstanceName(), dpnId);
-                                            throw new RuntimeException(e.getMessage());
-                                        }
-                                        futures = writeConfigTxn.submit();
-                                        try {
-                                            futures.get();
-                                        } catch (InterruptedException | ExecutionException e) {
-                                            LOG.error("Error updating config data for interface {} in vpn {} on dpn {}", interfaceName,
-                                                    vpnInterface.getVpnInstanceName(), dpnId);
-                                            throw new RuntimeException(e.getMessage());
-                                        }
-                                        futures = writeInvTxn.submit();
-                                        try {
-                                            futures.get();
-                                        } catch (InterruptedException | ExecutionException e) {
-                                            LOG.error("Error adding inventory/flow data for interface {} to vpn {} on dpn {}", interfaceName,
-                                                    vpnInterface.getVpnInstanceName(), dpnId);
-                                            throw new RuntimeException(e.getMessage());
-                                        }
-                                        return null;
+                                        List<ListenableFuture<Void>> futures = new ArrayList<ListenableFuture<Void>>();
+                                        futures.add(writeOperTxn.submit());
+                                        futures.add(writeConfigTxn.submit());
+                                        futures.add(writeInvTxn.submit());
+                                        return futures;
                                     }
                                 });
                     } else if (update.getOperStatus().equals(Interface.OperStatus.Down)) {
@@ -285,31 +226,11 @@ public class InterfaceStateChangeListener extends AbstractDataChangeListener<Int
                                         WriteTransaction writeInvTxn = dataBroker.newWriteOnlyTransaction();
                                         vpnInterfaceManager.processVpnInterfaceDown(dpnId, interfaceName, ifIndex, true, false,
                                                 writeConfigTxn, writeOperTxn, writeInvTxn);
-                                        CheckedFuture<Void, TransactionCommitFailedException> futures = writeOperTxn.submit();
-                                        try {
-                                            futures.get();
-                                        } catch (InterruptedException | ExecutionException e) {
-                                            LOG.error("Error updating oper data for interface {} from vpn {} on dpn {}", interfaceName,
-                                                    vpnInterface.getVpnInstanceName(), dpnId);
-                                            throw new RuntimeException(e.getMessage());
-                                        }
-                                        futures = writeConfigTxn.submit();
-                                        try {
-                                            futures.get();
-                                        } catch (InterruptedException | ExecutionException e) {
-                                            LOG.error("Error updating config data for interface {} from vpn {} on dpn {}", interfaceName,
-                                                    vpnInterface.getVpnInstanceName(), dpnId);
-                                            throw new RuntimeException(e.getMessage());
-                                        }
-                                        futures = writeInvTxn.submit();
-                                        try {
-                                            futures.get();
-                                        } catch (InterruptedException | ExecutionException e) {
-                                            LOG.error("Error updating inventory/flow for interface {} from vpn {} on dpn {}", interfaceName,
-                                                    vpnInterface.getVpnInstanceName(), dpnId);
-                                            throw new RuntimeException(e.getMessage());
-                                        }
-                                        return null;
+                                        List<ListenableFuture<Void>> futures = new ArrayList<ListenableFuture<Void>>();
+                                        futures.add(writeOperTxn.submit());
+                                        futures.add(writeConfigTxn.submit());
+                                        futures.add(writeInvTxn.submit());
+                                        return futures;
                                     }
                                 });
                     }
index 4420ce76951db822e8f76886b6b8adc15a903067..57b8eead50c7f5c6fab4b73abe567848613ea091 100644 (file)
@@ -244,7 +244,7 @@ public class TunnelInterfaceStateListener extends AbstractDataChangeListener<Sta
                                             }
                                             if ((tunTypeVal == VpnConstants.ITMTunnelLocType.External.getValue()) &&
                                                     (dcgwPresentStatus == VpnConstants.DCGWPresentStatus.Absent.getValue())) {                                                bgpManager.withdrawPrefix(rd, adjacency.getIpAddress());
-                                                fibManager.cleanUpDpnForVpn(srcDpnId, vpnId, rd, srcTepIp, destTepIp);
+                                                fibManager.cleanUpDpnForVpn(srcDpnId, vpnId, rd, srcTepIp, destTepIp, null);
                                             }
                                         }
                                     } catch (Exception e) {
index 8d28556d7250c66e29e6eb7190eb9118ee25a361..1398bd1ef093a5f4d933dbf4acffe25842ac8bcf 100644 (file)
@@ -104,7 +104,7 @@ public class VpnInstanceListener extends AbstractDataChangeListener<VpnInstance>
         VpnInstanceOpDataEntry vpnOpEntry = null;
         Long intfCount = 0L;
         Long currentIntfCount = 0L;
-        Integer retryCount = 1;
+        Integer retryCount = 2;
         long timeout = VpnConstants.MIN_WAIT_TIME_IN_MILLISECONDS;
         Optional<VpnInstanceOpDataEntry> vpnOpValue = null;
         vpnOpValue = VpnUtil.read(dataBroker, LogicalDatastoreType.OPERATIONAL,
index b13581ee9c64477ad7631be505d4b1c62d6fd7b3..f6cfad03713108f23cf357dd1656b2276de00c8e 100644 (file)
@@ -196,6 +196,7 @@ public class VpnInterfaceManager extends AbstractDataChangeListener<VpnInterface
     @Override
     public void add(final InstanceIdentifier<VpnInterface> identifier, final VpnInterface vpnInterface) {
         LOG.trace("VPN Interface add event - key: {}, value: {}" ,identifier, vpnInterface );
+        LOG.info("VPN Interface add event - intfName {}" ,vpnInterface.getName());
         final VpnInterfaceKey key = identifier.firstKeyOf(VpnInterface.class, VpnInterfaceKey.class);
         final String interfaceName = key.getName();
 
@@ -214,31 +215,11 @@ public class VpnInterfaceManager extends AbstractDataChangeListener<VpnInterface
                                 WriteTransaction writeOperTxn = dataBroker.newWriteOnlyTransaction();
                                 WriteTransaction writeInvTxn = dataBroker.newWriteOnlyTransaction();
                                 processVpnInterfaceUp(dpnId, vpnInterface, ifIndex, false, writeConfigTxn, writeOperTxn, writeInvTxn);
-                                CheckedFuture<Void, TransactionCommitFailedException> futures = writeOperTxn.submit();
-                                try {
-                                    futures.get();
-                                } catch (InterruptedException | ExecutionException e) {
-                                    LOG.error("Error adding oper data for interface {} to vpn {} on dpn {}", interfaceName,
-                                            vpnInterface.getVpnInstanceName(), dpnId);
-                                    throw new RuntimeException(e.getMessage());
-                                }
-                                futures = writeConfigTxn.submit();
-                                try {
-                                    futures.get();
-                                } catch (InterruptedException | ExecutionException e) {
-                                    LOG.error("Error adding config data for interface {} to vpn {} on dpn {}", interfaceName,
-                                        vpnInterface.getVpnInstanceName(), dpnId);
-                                    throw new RuntimeException(e.getMessage());
-                                }
-                                futures = writeInvTxn.submit();
-                                try {
-                                    futures.get();
-                                } catch (InterruptedException | ExecutionException e) {
-                                    LOG.error("Error adding inventory/flow data for interface {} to vpn {} on dpn {}", interfaceName,
-                                        vpnInterface.getVpnInstanceName(), dpnId);
-                                    throw new RuntimeException(e.getMessage());
-                                }
-                                return null;
+                                List<ListenableFuture<Void>> futures = new ArrayList<ListenableFuture<Void>>();
+                                futures.add(writeOperTxn.submit());
+                                futures.add(writeConfigTxn.submit());
+                                futures.add(writeInvTxn.submit());
+                                return futures;
                             }
                         });
             }catch (Exception e){
@@ -315,7 +296,7 @@ public class VpnInterfaceManager extends AbstractDataChangeListener<VpnInterface
 
             // FIB didn't get a chance yet to clean up this VPNInterface
             // Let us give it a chance here !
-            LOG.info("VPN Interface {} waiting for FIB to clean up! ", interfaceName);
+            LOG.info("Trying to add VPN Interface {}, but waiting for FIB to clean up! ", interfaceName);
             try {
                 Runnable notifyTask = new VpnNotifyTask();
                 vpnIntfMap.put(interfaceName, notifyTask);
@@ -427,6 +408,8 @@ public class VpnInterfaceManager extends AbstractDataChangeListener<VpnInterface
 //    }
 
 
+
+
     private void advertiseAdjacenciesForVpnToBgp(BigInteger dpnId, final InstanceIdentifier<VpnInterface> identifier,
                                                  VpnInterface intf) {
         //Read NextHops
@@ -526,29 +509,39 @@ public class VpnInterfaceManager extends AbstractDataChangeListener<VpnInterface
         }
     }
 
-    private void bindService(BigInteger dpId, String vpnInstanceName, String vpnInterfaceName, int lPortTag,
-                             WriteTransaction writeConfigTxn, WriteTransaction writeInvTxn) {
-        int priority = VpnConstants.DEFAULT_FLOW_PRIORITY;
-        long vpnId = VpnUtil.getVpnId(dataBroker, vpnInstanceName);
-
-        int instructionKey = 0;
-        List<Instruction> instructions = new ArrayList<Instruction>();
-
-        instructions.add(MDSALUtil.buildAndGetWriteMetadaInstruction(MetaDataUtil.getVpnIdMetadata(vpnId),
-                MetaDataUtil.METADATA_MASK_VRFID, ++instructionKey));
-        instructions.add(MDSALUtil.buildAndGetGotoTableInstruction(NwConstants.L3_FIB_TABLE, ++instructionKey));
-
-        short l3vpn_service_index = ServiceIndex.getIndex(NwConstants.L3VPN_SERVICE_NAME, NwConstants.L3VPN_SERVICE_INDEX);
-        BoundServices
-                serviceInfo =
-                InterfaceUtils.getBoundServices(String.format("%s.%s.%s", "vpn",vpnInstanceName, vpnInterfaceName),
-                        l3vpn_service_index, priority,
-                        NwConstants.COOKIE_VM_INGRESS_TABLE, instructions);
-        writeConfigTxn.put(LogicalDatastoreType.CONFIGURATION,
-                InterfaceUtils.buildServiceId(vpnInterfaceName, l3vpn_service_index), serviceInfo, true);
-        makeArpFlow(dpId, l3vpn_service_index, lPortTag, vpnInterfaceName,
+    private void bindService(BigInteger dpId, final String vpnInstanceName, final String vpnInterfaceName,
+                             int lPortTag, WriteTransaction writeConfigTxn, WriteTransaction writeInvTxn) {
+        final int priority = VpnConstants.DEFAULT_FLOW_PRIORITY;
+        final long vpnId = VpnUtil.getVpnId(dataBroker, vpnInstanceName);
+
+        DataStoreJobCoordinator dataStoreCoordinator = DataStoreJobCoordinator.getInstance();
+        dataStoreCoordinator.enqueueJob(vpnInterfaceName,
+                new Callable<List<ListenableFuture<Void>>>() {
+                    @Override
+                    public List<ListenableFuture<Void>> call() throws Exception {
+                        WriteTransaction writeTxn = dataBroker.newWriteOnlyTransaction();
+                        int instructionKey = 0;
+                        List<Instruction> instructions = new ArrayList<Instruction>();
+
+                        instructions.add(MDSALUtil.buildAndGetWriteMetadaInstruction(
+                                BigInteger.valueOf(vpnId), MetaDataUtil.METADATA_MASK_VRFID, ++instructionKey));
+                        instructions.add(MDSALUtil.buildAndGetGotoTableInstruction(NwConstants.L3_FIB_TABLE, ++instructionKey));
+
+                        BoundServices
+                                serviceInfo =
+                                InterfaceUtils.getBoundServices(String.format("%s.%s.%s", "vpn",vpnInstanceName, vpnInterfaceName),
+                                        NwConstants.L3VPN_SERVICE_INDEX, priority,
+                                        NwConstants.COOKIE_VM_INGRESS_TABLE, instructions);
+                        writeTxn.put(LogicalDatastoreType.CONFIGURATION,
+                                InterfaceUtils.buildServiceId(vpnInterfaceName, NwConstants.L3VPN_SERVICE_INDEX), serviceInfo, true);
+                        List<ListenableFuture<Void>> futures = new ArrayList<ListenableFuture<Void>>();
+                        futures.add(writeTxn.submit());
+                        return futures;
+                    }
+                });
+        makeArpFlow(dpId, NwConstants.L3VPN_SERVICE_INDEX, lPortTag, vpnInterfaceName,
                 vpnId, ArpReplyOrRequest.REQUEST, NwConstants.ADD_FLOW, writeInvTxn);
-        makeArpFlow(dpId, l3vpn_service_index, lPortTag, vpnInterfaceName,
+        makeArpFlow(dpId, NwConstants.L3VPN_SERVICE_INDEX, lPortTag, vpnInterfaceName,
                 vpnId, ArpReplyOrRequest.REPLY, NwConstants.ADD_FLOW, writeInvTxn);
 
     }
@@ -614,9 +607,8 @@ public class VpnInterfaceManager extends AbstractDataChangeListener<VpnInterface
 
             for (Adjacency nextHop : aug.getAdjacency()) {
                 long label = nextHop.getLabel();
-                List<String> nextHopList = new ArrayList<>(nextHop.getNextHopIpList());
                 if (rd != null) {
-                    addToLabelMapper(label, dpnId, nextHop.getIpAddress(), nextHopList, vpnId,
+                    addToLabelMapper(label, dpnId, nextHop.getIpAddress(), Arrays.asList(nextHopIp), vpnId,
                             interfaceName, null,false, rd, writeOperTxn);
                     addPrefixToBGP(rd, nextHop.getIpAddress(), nextHopIp, label, writeConfigTxn);
                     //TODO: ERT - check for VPNs importing my route
@@ -858,6 +850,51 @@ public class VpnInterfaceManager extends AbstractDataChangeListener<VpnInterface
         return rd;
     }
 
+    /**
+     * JobCallback class is used as a future callback for
+     * main and rollback workers to handle success and failure.
+     */
+    private class DpnEnterExitVpnWorker implements FutureCallback<List<Void>> {
+        BigInteger dpnId;
+        String vpnName;
+        String rd;
+        boolean entered;
+
+        public DpnEnterExitVpnWorker(BigInteger dpnId, String vpnName, String rd, boolean entered) {
+            this.entered = entered;
+            this.dpnId = dpnId;
+            this.vpnName = vpnName;
+            this.rd = rd;
+        }
+
+        /**
+         * @param voids
+         * This implies that all the future instances have returned success. -- TODO: Confirm this
+         */
+        @Override
+        public void onSuccess(List<Void> voids) {
+            if (entered) {
+                publishAddNotification(dpnId, vpnName, rd);
+            } else {
+                publishRemoveNotification(dpnId, vpnName, rd);
+            }
+        }
+
+        /**
+         *
+         * @param throwable
+         * This method is used to handle failure callbacks.
+         * If more retry needed, the retrycount is decremented and mainworker is executed again.
+         * After retries completed, rollbackworker is executed.
+         * If rollbackworker fails, this is a double-fault. Double fault is logged and ignored.
+         */
+
+        @Override
+        public void onFailure(Throwable throwable) {
+            LOG.warn("Job: failed with exception: {}", throwable.getStackTrace());
+        }
+    }
+
     private void createOrUpdateVpnToDpnList(long vpnId, BigInteger dpnId, String intfName, String vpnName) {
         String routeDistinguisher = getRouteDistinguisher(vpnName);
         String rd = (routeDistinguisher == null) ? vpnName : routeDistinguisher;
@@ -872,16 +909,25 @@ public class VpnInterfaceManager extends AbstractDataChangeListener<VpnInterface
                     vpnInterface = new VpnInterfacesBuilder().setInterfaceName(intfName).build();
 
             if (dpnInVpn.isPresent()) {
+                VpnToDpnList vpnToDpnList = dpnInVpn.get();
+                List<org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.vpn.instance.op.data
+                        .vpn.instance.op.data.entry.vpn.to.dpn.list.VpnInterfaces> vpnInterfaces = vpnToDpnList.getVpnInterfaces();
+                if (vpnInterfaces == null) {
+                    vpnInterfaces = new ArrayList<>();
+                }
+                vpnInterfaces.add(vpnInterface);
+                VpnToDpnListBuilder vpnToDpnListBuilder = new VpnToDpnListBuilder().setDpnId(dpnId);
+                vpnToDpnListBuilder.setDpnState(VpnToDpnList.DpnState.Active).setVpnInterfaces(vpnInterfaces);
                 if (writeTxn != null) {
-                    writeTxn.put(LogicalDatastoreType.OPERATIONAL, id.child(
-                            org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.vpn.instance
-                                    .op.data.vpn.instance.op.data.entry.vpn.to.dpn.list.VpnInterfaces.class,
-                            new VpnInterfacesKey(intfName)), vpnInterface, true);
+                    writeTxn.put(LogicalDatastoreType.OPERATIONAL, id, vpnToDpnListBuilder.build(), true);
                 } else {
-                    VpnUtil.syncWrite(dataBroker, LogicalDatastoreType.OPERATIONAL, id.child(
-                            org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.vpn.instance
-                                    .op.data.vpn.instance.op.data.entry.vpn.to.dpn.list.VpnInterfaces.class,
-                            new VpnInterfacesKey(intfName)), vpnInterface);
+                    VpnUtil.syncWrite(dataBroker, LogicalDatastoreType.OPERATIONAL, id, vpnToDpnListBuilder.build());
+                }
+                /* If earlier state was inactive, it is considered new DPN coming back to the
+                 * same VPN
+                 */
+                if (vpnToDpnList.getDpnState() == VpnToDpnList.DpnState.Inactive) {
+                    newDpnOnVpn = Boolean.TRUE;
                 }
             } else {
                 List<org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.vpn.instance.op.data
@@ -904,22 +950,22 @@ public class VpnInterfaceManager extends AbstractDataChangeListener<VpnInterface
                 LOG.error("Error adding to dpnToVpnList for vpn {} interface {} dpn {}", vpnName, intfName, dpnId);
                 throw new RuntimeException(e.getMessage());
             }
-             /*
-            * Informing the Fib only after writeTxn is submitted successfuly.
-            */
-            if (newDpnOnVpn) {
-                LOG.debug("Sending populateFib event for new dpn {} in VPN {}", dpnId, vpnName);
-                fibManager.populateFibOnNewDpn(dpnId, vpnId, rd);
-                publishAddNotification(dpnId, vpnName, rd);
-            }
+
+        }
+        /*
+         * Informing the Fib only after writeTxn is submitted successfuly.
+         */
+        if (newDpnOnVpn) {
+            LOG.debug("Sending populateFib event for new dpn {} in VPN {}", dpnId, vpnName);
+            fibManager.populateFibOnNewDpn(dpnId, vpnId, rd, new DpnEnterExitVpnWorker(dpnId, vpnName, rd, true /* entered */));
+            publishAddNotification(dpnId, vpnName, rd);
         }
     }
 
     private void removeOrUpdateVpnToDpnList(long vpnId, BigInteger dpnId, String intfName, String vpnName) {
         Boolean lastDpnOnVpn = Boolean.FALSE;
-
+        String rd = VpnUtil.getVpnRd(dataBroker, vpnName);
         synchronized (vpnName.intern()) {
-            String rd = VpnUtil.getVpnRd(dataBroker, vpnName);
             InstanceIdentifier<VpnToDpnList> id = VpnUtil.getVpnToDpnListIdentifier(rd, dpnId);
             Optional<VpnToDpnList> dpnInVpn = VpnUtil.read(dataBroker, LogicalDatastoreType.OPERATIONAL, id);
             WriteTransaction writeTxn = dataBroker.newWriteOnlyTransaction();
@@ -968,11 +1014,11 @@ public class VpnInterfaceManager extends AbstractDataChangeListener<VpnInterface
                 LOG.error("Error removing from dpnToVpnList for vpn {} interface {} dpn {}", vpnName, intfName, dpnId);
                 throw new RuntimeException(e.getMessage());
             }
-            if (lastDpnOnVpn) {
-                LOG.debug("Sending cleanup event for dpn {} in VPN {}", dpnId, vpnName);
-                fibManager.cleanUpDpnForVpn(dpnId, vpnId, rd);
-                publishRemoveNotification(dpnId, vpnName, rd);
-            }
+        }
+        if (lastDpnOnVpn) {
+            LOG.debug("Sending cleanup event for dpn {} in VPN {}", dpnId, vpnName);
+            fibManager.cleanUpDpnForVpn(dpnId, vpnId, rd, new DpnEnterExitVpnWorker(dpnId, vpnName, rd, false /* exited */));
+            publishRemoveNotification(dpnId, vpnName, rd);
         }
     }  
 
@@ -1027,6 +1073,7 @@ public class VpnInterfaceManager extends AbstractDataChangeListener<VpnInterface
     @Override
     public void remove( InstanceIdentifier<VpnInterface> identifier, VpnInterface vpnInterface) {
         LOG.trace("Remove event - key: {}, value: {}" ,identifier, vpnInterface );
+        LOG.info("VPN Interface remove event - intfName {}" ,vpnInterface.getName());
         final VpnInterfaceKey key = identifier.firstKeyOf(VpnInterface.class, VpnInterfaceKey.class);
         final String interfaceName = key.getName();
 
@@ -1064,31 +1111,11 @@ public class VpnInterfaceManager extends AbstractDataChangeListener<VpnInterface
                             WriteTransaction writeOperTxn = dataBroker.newWriteOnlyTransaction();
                             WriteTransaction writeInvTxn = dataBroker.newWriteOnlyTransaction();
                             processVpnInterfaceDown(dpId, interfaceName, ifIndex, false, true, writeConfigTxn, writeOperTxn, writeInvTxn);
-                            CheckedFuture<Void, TransactionCommitFailedException> futures = writeOperTxn.submit();
-                            try {
-                                futures.get();
-                            } catch (InterruptedException | ExecutionException e) {
-                                LOG.error("Error removing Oper data for interface {} from vpn {} on dpn {}", interfaceName,
-                                        vpnOpInterface.getVpnInstanceName(), dpId);
-                                throw new RuntimeException(e.getMessage());
-                            }
-                            futures = writeConfigTxn.submit();
-                            try {
-                                futures.get();
-                            } catch (InterruptedException | ExecutionException e) {
-                                LOG.error("Error removing Config data for interface {} from vpn {} on dpn {}", interfaceName,
-                                        vpnOpInterface.getVpnInstanceName(), dpId);
-                                throw new RuntimeException(e.getMessage());
-                            }
-                            futures = writeInvTxn.submit();
-                            try {
-                                futures.get();
-                            } catch (InterruptedException | ExecutionException e) {
-                                LOG.error("Error removing Inventory/Flow data for interface {} from vpn {} on dpn {}", interfaceName,
-                                        vpnOpInterface.getVpnInstanceName(), dpId);
-                                throw new RuntimeException(e.getMessage());
-                            }
-                            return null;
+                            List<ListenableFuture<Void>> futures = new ArrayList<ListenableFuture<Void>>();
+                            futures.add(writeOperTxn.submit());
+                            futures.add(writeConfigTxn.submit());
+                            futures.add(writeInvTxn.submit());
+                            return futures;
                         }
                     });
 
@@ -1218,14 +1245,26 @@ public class VpnInterfaceManager extends AbstractDataChangeListener<VpnInterface
     }
 
 
-    private void unbindService(BigInteger dpId, String vpnInstanceName, String vpnInterfaceName,
+    private void unbindService(BigInteger dpId, String vpnInstanceName, final String vpnInterfaceName,
                                int lPortTag, boolean isInterfaceStateDown, boolean isConfigRemoval,
                                WriteTransaction writeConfigTxn, WriteTransaction writeInvTxn) {
         short l3vpn_service_index = ServiceIndex.getIndex(NwConstants.L3VPN_SERVICE_NAME, NwConstants.L3VPN_SERVICE_INDEX);
         if (!isInterfaceStateDown && isConfigRemoval) {
-            writeConfigTxn.delete(LogicalDatastoreType.CONFIGURATION,
-                    InterfaceUtils.buildServiceId(vpnInterfaceName,
-                            l3vpn_service_index));
+            DataStoreJobCoordinator dataStoreCoordinator = DataStoreJobCoordinator.getInstance();
+            dataStoreCoordinator.enqueueJob(vpnInterfaceName,
+                    new Callable<List<ListenableFuture<Void>>>() {
+                        @Override
+                        public List<ListenableFuture<Void>> call() throws Exception {
+                            WriteTransaction writeTxn = dataBroker.newWriteOnlyTransaction();
+                            writeTxn.delete(LogicalDatastoreType.CONFIGURATION,
+                                    InterfaceUtils.buildServiceId(vpnInterfaceName,
+                                            NwConstants.L3VPN_SERVICE_INDEX));
+
+                            List<ListenableFuture<Void>> futures = new ArrayList<ListenableFuture<Void>>();
+                            futures.add(writeTxn.submit());
+                            return futures;
+                        }
+                    });
         }
         long vpnId = VpnUtil.getVpnId(dataBroker, vpnInstanceName);
         makeArpFlow(dpId, l3vpn_service_index, lPortTag, vpnInterfaceName,
@@ -1249,6 +1288,7 @@ public class VpnInterfaceManager extends AbstractDataChangeListener<VpnInterface
     @Override
     protected void update(InstanceIdentifier<VpnInterface> identifier, VpnInterface original, VpnInterface update) {
         LOG.trace("Updating VPN Interface : key {},  original value={}, update value={}", identifier, original, update);
+        LOG.info("VPN Interface update event - intfName {}" ,update.getName());
         String oldVpnName = original.getVpnInstanceName();
         String newVpnName = update.getVpnInstanceName();
         BigInteger dpnId = update.getDpnId();