Upstreaming changes in netvirt 40/42440/1
authorSuraj Ranjan <suraj.ranjan@ericsson.com>
Mon, 25 Jul 2016 06:57:32 +0000 (12:27 +0530)
committerSuraj Ranjan <suraj.ranjan@ericsson.com>
Mon, 25 Jul 2016 06:59:25 +0000 (12:29 +0530)
This commit includes following changes:

1> Fixing Port Up/Down Event handling for ELAN
2> Parallelization done in Arpcache

Parallelization of adding and removing arp cache done with the help of
DataStoreJobCoordinator.

Change-Id: Ic017bdc7165042550ad1046b55131854689a221a
Signed-off-by: Suraj Ranjan <suraj.ranjan@ericsson.com>
vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/internal/ElanInterfaceManager.java
vpnservice/vpnmanager/vpnmanager-impl/src/main/java/org/opendaylight/netvirt/vpnmanager/ArpConstants.java
vpnservice/vpnmanager/vpnmanager-impl/src/main/java/org/opendaylight/netvirt/vpnmanager/ArpScheduler.java
vpnservice/vpnmanager/vpnmanager-impl/src/main/java/org/opendaylight/netvirt/vpnmanager/ArpaddchacheTask.java [new file with mode: 0644]
vpnservice/vpnmanager/vpnmanager-impl/src/main/java/org/opendaylight/netvirt/vpnmanager/ArpremovechacheTask.java [new file with mode: 0644]
vpnservice/vpnmanager/vpnmanager-impl/src/main/java/org/opendaylight/netvirt/vpnmanager/MacEntry.java

index e8824a112e89b67c7fcfc2bf84e1460f515e1847..cf054c6c8408d1311a2477f93cceac109f409350 100644 (file)
@@ -179,7 +179,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
             }
         }
         ElanUtils.waitForTransactionToComplete(tx);
-        deleteFlowGroupTx.submit();
+        ElanUtils.waitForTransactionToComplete(deleteFlowGroupTx);
         DataStoreJobCoordinator coordinator = DataStoreJobCoordinator.getInstance();
         InterfaceRemoveWorkerOnElanInterface removeInterfaceWorker = new InterfaceRemoveWorkerOnElanInterface(interfaceName, elanInfo,
             interfaceInfo, isInterfaceStateRemoved, this, isLastElanInterface);
@@ -261,7 +261,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
         }
         deleteElanInterfaceFromConfigDS(interfaceName, tx);
         ElanUtils.waitForTransactionToComplete(tx);
-        deleteFlowGroupTx.submit();
+        ElanUtils.waitForTransactionToComplete(deleteFlowGroupTx);
     }
 
     private DpnInterfaces removeElanDpnInterfaceFromOperationalDataStore(String elanName, BigInteger dpId, String interfaceName,
@@ -519,7 +519,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
             }
         }
         ElanUtils.waitForTransactionToComplete(tx);
-        writeFlowGroupTx.submit();
+        ElanUtils.waitForTransactionToComplete(writeFlowGroupTx);
     }
 
     protected void removeInterfaceStaticMacEntires(String elanInstanceName, String interfaceName, PhysAddress physAddress) {
index bac53e897df8d676a60087a0edaa67472e832236..f1b4e9b583853329467fe4ef239a92f771775662 100644 (file)
@@ -20,5 +20,6 @@ public class ArpConstants {
         public static final String FAILED_TO_GET_SRC_IP_FOR_INTERFACE = "Failed to get src ip for %s";
         public static final String FAILED_TO_GET_SRC_MAC_FOR_INTERFACE = "Failed to get src mac for interface %s iid %s ";
         public static final int PERIOD = 10000;
+        public static final String ARPJOB = "Arpcache";
 
 }
index 4d2c88b69d2a17f9937524a33d38b1832eeecf04..8c7f141b2e7febf481fe335d333fbe37c393c055 100644 (file)
@@ -17,6 +17,7 @@ import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
+import org.opendaylight.genius.datastoreutils.DataStoreJobCoordinator;
 import org.opendaylight.yang.gen.v1.urn.huawei.params.xml.ns.yang.l3vpn.rev140815.VpnAfConfig;
 import org.opendaylight.yang.gen.v1.urn.huawei.params.xml.ns.yang.l3vpn.rev140815.VpnInstances;
 import org.opendaylight.yang.gen.v1.urn.huawei.params.xml.ns.yang.l3vpn.rev140815.vpn.instances.VpnInstance;
@@ -51,7 +52,6 @@ import java.util.concurrent.*;
 
 public class ArpScheduler extends AsyncDataTreeChangeListenerBase<VpnPortipToPort,ArpScheduler> {
 
-    private static final Logger logger = LoggerFactory.getLogger(ArpScheduler.class);
     private ScheduledExecutorService executorService;
     private OdlInterfaceRpcService interfaceRpc;
     private DataBroker dataBroker;
@@ -74,18 +74,6 @@ public class ArpScheduler extends AsyncDataTreeChangeListenerBase<VpnPortipToPor
         scheduleExpiredEntryDrainerTask();
     }
 
-    public void addOrUpdateMacEntryToQueue(String vpnName, MacAddress macAddress, InetAddress InetAddress, String interfaceName) {
-        MacEntry newMacEntry = new MacEntry(ArpConstants.arpCacheTimeout,vpnName,macAddress, InetAddress,interfaceName );
-        if (!macEntryQueue.contains(newMacEntry)) {
-            LOG.info("Adding ARP cache");
-            macEntryQueue.offer(newMacEntry);
-        }
-        else{
-            LOG.info("Updating ARP cache");
-            macEntryQueue.remove(newMacEntry);
-            macEntryQueue.offer(newMacEntry);        }
-    }
-
 
     private void scheduleExpiredEntryDrainerTask() {
         LOG.info("Scheduling expired entry drainer task");
@@ -107,86 +95,29 @@ public class ArpScheduler extends AsyncDataTreeChangeListenerBase<VpnPortipToPor
      }
 
 
-     private class ExpiredEntryDrainerTask implements Runnable {
-        @Override
-        public void run() {
-            WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
-            Collection<MacEntry> expiredMacEntries = new ArrayList<>();
-            macEntryQueue.drainTo(expiredMacEntries);
-            for (MacEntry macEntry: expiredMacEntries) {
-                LOG.info("Removing the ARP cache for"+macEntry);
-                InstanceIdentifier<VpnPortipToPort> id = getVpnPortipToPortInstanceOpDataIdentifier(macEntry.getIpAddress().getHostAddress(),macEntry.getVpnName());
-                Optional<VpnPortipToPort> vpnPortipToPort = VpnUtil.read(dataBroker, LogicalDatastoreType.CONFIGURATION, id);
-                if (vpnPortipToPort.isPresent()) {
-                    VpnPortipToPort vpnPortipToPortold = vpnPortipToPort.get();
-                    String fixedip = vpnPortipToPortold.getPortFixedip();
-                    String vpnName =  vpnPortipToPortold.getVpnName();
-                    String interfaceName =  vpnPortipToPortold.getPortName();
-                    String rd = getRouteDistinguisher(vpnName);
-                    deleteVrfEntries(rd,fixedip,tx);
-                    deleteAdjacencies(fixedip,vpnName,interfaceName,tx);
-                    tx.delete(LogicalDatastoreType.CONFIGURATION, id);
-                    waitForTransactionToComplete(tx);
-                 }
-
-              }
-        }
-     }
-        private void deleteVrfEntries(String rd, String fixedip, WriteTransaction tx) {
-            InstanceIdentifier<VrfEntry> vrfid= InstanceIdentifier.builder(FibEntries.class).
-                    child(VrfTables.class, new VrfTablesKey(rd)).
-                    child(VrfEntry.class,new VrfEntryKey(iptoprefix(fixedip))).
-                    build();
-
-            tx.delete(LogicalDatastoreType.CONFIGURATION, vrfid);
-            LOG.info("deleting the vrf entries");
-
-
-        }
-
+    private class ExpiredEntryDrainerTask implements Runnable {
+       @Override
+       public void run() {
+           Collection<MacEntry> expiredMacEntries = new ArrayList<>();
+           macEntryQueue.drainTo(expiredMacEntries);
+           for (MacEntry macEntry: expiredMacEntries) {
+               LOG.info("Removing the ARP cache for"+macEntry);
+               InstanceIdentifier<VpnPortipToPort> id = getVpnPortipToPortInstanceOpDataIdentifier(macEntry.getIpAddress().getHostAddress(),macEntry.getVpnName());
+               Optional<VpnPortipToPort> vpnPortipToPort = VpnUtil.read(dataBroker, LogicalDatastoreType.CONFIGURATION, id);
+               if (vpnPortipToPort.isPresent()) {
+                   VpnPortipToPort vpnPortipToPortold = vpnPortipToPort.get();
+                   String fixedip = vpnPortipToPortold.getPortFixedip();
+                   String vpnName =  vpnPortipToPortold.getVpnName();
+                   String interfaceName =  vpnPortipToPortold.getPortName();
+                   String rd = getRouteDistinguisher(vpnName);
+                   DataStoreJobCoordinator coordinator = DataStoreJobCoordinator.getInstance();
+                   coordinator.enqueueJob(buildJobKey(fixedip,vpnName), new ArpremovechacheTask(dataBroker,fixedip, vpnName,interfaceName, rd,id));
+               }
 
-    public void deleteAdjacencies(String fixedip, String vpnName, String interfaceName, WriteTransaction tx) {
-          InstanceIdentifier<VpnInterface> vpnIfId = VpnUtil.getVpnInterfaceIdentifier(interfaceName);
-          InstanceIdentifier<Adjacencies> path = vpnIfId.augmentation(Adjacencies.class);
-          Optional<Adjacencies> adjacencies = VpnUtil.read(dataBroker, LogicalDatastoreType.CONFIGURATION, path);
-          if (adjacencies.isPresent()) {
-              List<Adjacency> adjacencyList = adjacencies.get().getAdjacency();
-              InstanceIdentifier <Adjacency> adid = vpnIfId.augmentation(Adjacencies.class).child(Adjacency.class, new AdjacencyKey(iptoprefix(fixedip)));
-              Optional<Adjacency> newAdj = VpnUtil.read(dataBroker,  LogicalDatastoreType.CONFIGURATION, adid);
-              if(adjacencyList.contains(newAdj.get()))
-              adjacencyList.remove(newAdj.get());
-              Adjacencies aug = VpnUtil.getVpnInterfaceAugmentation(adjacencyList);
-              VpnInterface newVpnIntf = new VpnInterfaceBuilder().setKey(new VpnInterfaceKey(interfaceName)).
-                    setName(interfaceName).setVpnInstanceName(vpnName).addAugmentation(Adjacencies.class, aug).build();
-              tx.put(LogicalDatastoreType.CONFIGURATION, vpnIfId, newVpnIntf,true);
-              LOG.info("deleting the adjacencies ");
            }
+       }
     }
-    public static void waitForTransactionToComplete(WriteTransaction tx) {
-        CheckedFuture<Void, TransactionCommitFailedException> futures = tx.submit();
-        try {
-            futures.get();
-        } catch (InterruptedException | ExecutionException e) {
-            logger.error("Error writing to datastore {}", e);
-        }
-    }
-    private String iptoprefix(String ip){
-        return new StringBuilder(ip).append(ArpConstants.PREFIX).toString();
-
-     }
 
-    private static final FutureCallback<Void> DEFAULT_CALLBACK =
-        new FutureCallback<Void>() {
-        @Override
-        public synchronized void onSuccess(Void result) {
-             LOG.debug("Success in Datastore operation");
-        }
-
-        @Override
-        public void onFailure(Throwable error) {
-            LOG.error("Error in Datastore operation", error);
-        };
-    };
     private String getRouteDistinguisher(String vpnName) {
         InstanceIdentifier<VpnInstance> id = InstanceIdentifier.builder(VpnInstances.class)
                  .child(VpnInstance.class, new VpnInstanceKey(vpnName)).build();
@@ -228,12 +159,17 @@ public class ArpScheduler extends AsyncDataTreeChangeListenerBase<VpnPortipToPor
             MacAddress srcMacAddress = MacAddress.getDefaultInstance(value.getMacAddress());
             String vpnName =  value.getVpnName();
             String interfaceName =  value.getPortName();
-            addOrUpdateMacEntryToQueue(vpnName,srcMacAddress, srcInetAddr, interfaceName);
-           } catch (Exception e) {
+            Boolean islearnt = value.isLearnt();
+            if(islearnt)
+            {
+                DataStoreJobCoordinator coordinator = DataStoreJobCoordinator.getInstance();
+                coordinator.enqueueJob(buildJobKey(srcInetAddr.toString(),vpnName), new ArpaddchacheTask(srcInetAddr, srcMacAddress, vpnName,interfaceName, macEntryQueue));
+            }
+        } catch (Exception e) {
             LOG.error("Error in deserializing packet {} with exception {}", value, e);
-           e.printStackTrace();
-        }
+            e.printStackTrace();
 
+        }
     }
 
 
@@ -248,7 +184,11 @@ public class ArpScheduler extends AsyncDataTreeChangeListenerBase<VpnPortipToPor
             Boolean islearnt = value.isLearnt();
             if(islearnt)
             {
-               addOrUpdateMacEntryToQueue(vpnName,srcMacAddress, srcInetAddr, interfaceName);
+                DataStoreJobCoordinator coordinator = DataStoreJobCoordinator.getInstance();
+                if(islearnt)
+                {
+                    coordinator.enqueueJob(buildJobKey(srcInetAddr.toString(),vpnName), new ArpaddchacheTask(srcInetAddr, srcMacAddress, vpnName,interfaceName, macEntryQueue));
+                }
             }
         }
         catch (Exception e) {
@@ -267,4 +207,9 @@ public class ArpScheduler extends AsyncDataTreeChangeListenerBase<VpnPortipToPor
        // TODO Auto-generated method stub
 
      }
+
+    private String buildJobKey(String ip, String vpnName){
+        return new StringBuilder(ArpConstants.ARPJOB).append(ip).append(vpnName).toString();
+
+    }
 }
diff --git a/vpnservice/vpnmanager/vpnmanager-impl/src/main/java/org/opendaylight/netvirt/vpnmanager/ArpaddchacheTask.java b/vpnservice/vpnmanager/vpnmanager-impl/src/main/java/org/opendaylight/netvirt/vpnmanager/ArpaddchacheTask.java
new file mode 100644 (file)
index 0000000..a7a5870
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netvirt.vpnmanager;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.DelayQueue;
+
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.MacAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class ArpaddchacheTask implements Callable<List<ListenableFuture<Void>>> {
+    InetAddress srcInetAddr;
+    MacAddress srcMacAddress;
+    String vpnName;
+    String interfaceName;
+    DelayQueue<MacEntry> macEntryQueue;
+    private static final Logger LOG = LoggerFactory.getLogger(ArpaddchacheTask.class);
+
+    public ArpaddchacheTask(InetAddress srcInetAddr, MacAddress srcMacAddress, String vpnName, String interfaceName,
+                            DelayQueue<MacEntry> macEntryQueue) {
+        super();
+        this.srcInetAddr = srcInetAddr;
+        this.srcMacAddress = srcMacAddress;
+        this.vpnName = vpnName;
+        this.interfaceName = interfaceName;
+        this.macEntryQueue = macEntryQueue;
+    }
+
+
+
+    @Override
+    public List<ListenableFuture<Void>> call() throws Exception {
+        List<ListenableFuture<Void>> futures = new ArrayList<>();
+        addOrUpdateMacEntryToQueue(vpnName,srcMacAddress, srcInetAddr, interfaceName);
+        return futures;
+    }
+
+    public  void addOrUpdateMacEntryToQueue(String vpnName, MacAddress macAddress,InetAddress InetAddress, String interfaceName) {
+        MacEntry newMacEntry = new MacEntry(ArpConstants.arpCacheTimeout,vpnName,macAddress, InetAddress,interfaceName );
+        if (!macEntryQueue.contains(newMacEntry)) {
+            LOG.info("Adding ARP cache");
+            macEntryQueue.offer(newMacEntry);
+        }
+        else{
+            macEntryQueue.remove(newMacEntry);
+            macEntryQueue.offer(newMacEntry);
+        }
+    }
+}
diff --git a/vpnservice/vpnmanager/vpnmanager-impl/src/main/java/org/opendaylight/netvirt/vpnmanager/ArpremovechacheTask.java b/vpnservice/vpnmanager/vpnmanager-impl/src/main/java/org/opendaylight/netvirt/vpnmanager/ArpremovechacheTask.java
new file mode 100644 (file)
index 0000000..31d9ecf
--- /dev/null
@@ -0,0 +1,100 @@
+/*
+ * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netvirt.vpnmanager;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yang.gen.v1.urn.huawei.params.xml.ns.yang.l3vpn.rev140815.vpn.interfaces.VpnInterface;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.Adjacencies;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.adjacency.list.Adjacency;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.adjacency.list.AdjacencyKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.FibEntries;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.fibentries.VrfTables;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.fibentries.VrfTablesKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.vrfentries.VrfEntry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.vrfentries.VrfEntryKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.neutronvpn.rev150602.neutron.vpn.portip.port.data.VpnPortipToPort;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class ArpremovechacheTask implements Callable<List<ListenableFuture<Void>>> {
+    DataBroker dataBroker;
+    String fixedip;
+    String vpnName;
+    String interfaceName;
+    String rd;
+    InstanceIdentifier<VpnPortipToPort> id;
+    private static final Logger LOG = LoggerFactory.getLogger(ArpremovechacheTask.class);
+
+    public ArpremovechacheTask(DataBroker dataBroker, String fixedip, String vpnName, String interfaceName, String rd,
+                               InstanceIdentifier<VpnPortipToPort> id) {
+        super();
+        this.fixedip = fixedip;
+        this.vpnName = vpnName;
+        this.interfaceName = interfaceName;
+        this.rd = rd;
+        this.dataBroker = dataBroker;
+        this.id = id;
+    }
+
+    @Override
+    public List<ListenableFuture<Void>> call() throws Exception {
+        WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
+        List<ListenableFuture<Void>> result = new ArrayList<ListenableFuture<Void>>();
+        deleteVrfEntries(rd,fixedip,tx);
+        deleteAdjacencies(fixedip,vpnName,interfaceName,tx);
+        tx.delete(LogicalDatastoreType.CONFIGURATION, id);
+        CheckedFuture<Void, TransactionCommitFailedException> futures = tx.submit();
+        try {
+            futures.get();
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Error writing to datastore {}", e);
+        }
+        result.add(futures);
+        return result;
+
+    }
+
+    private void deleteVrfEntries(String rd, String fixedip, WriteTransaction tx) {
+        InstanceIdentifier<VrfEntry> vrfid= InstanceIdentifier.builder(FibEntries.class).
+                child(VrfTables.class, new VrfTablesKey(rd)).
+                child(VrfEntry.class,new VrfEntryKey(iptoprefix(fixedip))).
+                build();
+
+        tx.delete(LogicalDatastoreType.CONFIGURATION, vrfid);
+    }
+
+
+    public void deleteAdjacencies(String fixedip, String vpnName, String interfaceName, WriteTransaction tx) {
+        InstanceIdentifier<VpnInterface> vpnIfId = VpnUtil.getVpnInterfaceIdentifier(interfaceName);
+        InstanceIdentifier<Adjacencies> path = vpnIfId.augmentation(Adjacencies.class);
+        Optional<Adjacencies> adjacencies = VpnUtil.read(dataBroker, LogicalDatastoreType.CONFIGURATION, path);
+        if (adjacencies.isPresent()) {
+            InstanceIdentifier <Adjacency> adid = vpnIfId.augmentation(Adjacencies.class).child(Adjacency.class, new AdjacencyKey(iptoprefix(fixedip)));
+            tx.delete(LogicalDatastoreType.CONFIGURATION, adid);
+            LOG.info("deleting the adjacencies ");
+        }
+    }
+
+    private String iptoprefix(String ip){
+        return new StringBuilder(ip).append(ArpConstants.PREFIX).toString();
+
+    }
+}
index 9bf4a55a89da838df6834b21afd0ac4d242213b6..8b6b24c7882ec4aa2185a96495415f4182df3de0 100644 (file)
@@ -63,9 +63,9 @@ public class MacEntry implements Delayed {
 
     @Override
     public int compareTo(Delayed obj) {
-        if (this.expiryTime > ((MacEntry) obj).expiryTime) {
+        if (this.expiryTime < ((MacEntry) obj).expiryTime) {
             return -1;
-        } else if (this.expiryTime < ((MacEntry) obj).expiryTime) {
+        } else if (this.expiryTime > ((MacEntry) obj).expiryTime) {
             return 1;
         } else {
             return 0;
@@ -75,7 +75,7 @@ public class MacEntry implements Delayed {
     @Override
     public long getDelay(TimeUnit arg0) {
         long diff = expiryTime - System.currentTimeMillis();
-        return diff;
+        return arg0.convert(diff, TimeUnit.MILLISECONDS);
     }
 
     @Override