Bulk merge of l2gw changes
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / jobs / McastUpdateJob.java
index f96445c41f5ecf353460608d989ea7f4f027cc72..5bbf772623378699a802eeadb3f9c4dc185cc8f4 100644 (file)
@@ -8,26 +8,26 @@
 package org.opendaylight.netvirt.elan.l2gw.jobs;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Callable;
+import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
 import org.opendaylight.netvirt.elan.l2gw.utils.ElanL2GatewayMulticastUtils;
 import org.opendaylight.netvirt.elan.utils.ElanClusterUtils;
 import org.opendaylight.netvirt.elan.utils.ElanItmUtils;
-import org.opendaylight.netvirt.elan.utils.ElanUtils;
+import org.opendaylight.netvirt.elan.utils.Scheduler;
 import org.opendaylight.netvirt.elanmanager.utils.ElanL2GwCacheUtils;
 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yangtools.yang.common.Uint64;
 
-public class McastUpdateJob implements Callable<List<? extends ListenableFuture<?>>> {
+public class McastUpdateJob extends DataStoreJob {
     private String elanName;
     private String nodeId;
     private ElanL2GatewayMulticastUtils mcastUtils;
     private ElanClusterUtils elanClusterUtils;
     boolean add;
-    protected String jobKey;
     private IpAddress removedDstTep;
     private boolean dpnOrConnectionRemoved;
 
@@ -35,8 +35,9 @@ public class McastUpdateJob implements Callable<List<? extends ListenableFuture<
                           String nodeId,
                           boolean add,
                           ElanL2GatewayMulticastUtils mcastUtils,
-                          ElanClusterUtils elanClusterUtils) {
-        this.jobKey = ElanUtils.getBcGroupUpdateKey(elanName);
+                          ElanClusterUtils elanClusterUtils,
+                          Scheduler scheduler, JobCoordinator jobCoordinator) {
+        super(elanName + ":" + nodeId, scheduler, jobCoordinator);
         this.elanName = elanName;
         this.nodeId = nodeId;
         this.mcastUtils = mcastUtils;
@@ -45,46 +46,50 @@ public class McastUpdateJob implements Callable<List<? extends ListenableFuture<
     }
 
     public void submit() {
-        elanClusterUtils.runOnlyInOwnerNode(this.jobKey, "Mcast Update job",this);
+        elanClusterUtils.runOnlyInOwnerNode(super.jobKey, "Mcast Update job",this);
     }
 
     @Override
     public List<ListenableFuture<?>> call() throws Exception {
         L2GatewayDevice device = ElanL2GwCacheUtils.getL2GatewayDeviceFromCache(elanName, nodeId);
-        ListenableFuture<?> ft = null;
+        ListenableFuture<? extends Object> ft = null;
         //TODO: make prepareRemoteMcastMacUpdateOnDevice return a ListenableFuture<Void>
         if (add) {
-            ft = mcastUtils.prepareRemoteMcastMacUpdateOnDevice(elanName, device, !dpnOrConnectionRemoved ,
+            ft = mcastUtils.prepareRemoteMcastMacUpdateOnDevice(elanName, device, !dpnOrConnectionRemoved,
                     removedDstTep);
         } else {
             ft =  mcastUtils.deleteRemoteMcastMac(new NodeId(nodeId), elanName);
         }
-        List<ListenableFuture<?>> fts = new ArrayList<>();
+        processResult(ft);
+        List<ListenableFuture<? extends Object>> fts = new ArrayList<>();
         fts.add(ft);
         return fts;
     }
 
     public static void updateAllMcasts(String elanName,
                                        ElanL2GatewayMulticastUtils mcastUtils,
-                                       ElanClusterUtils elanClusterUtils) {
-        ElanL2GwCacheUtils.getInvolvedL2GwDevices(elanName).forEach(device -> {
-            new McastUpdateJob(elanName, device.getHwvtepNodeId(), true, mcastUtils,
-                    elanClusterUtils).submit();
+                                       ElanClusterUtils elanClusterUtils,
+                                       Scheduler scheduler, JobCoordinator jobCoordinator) {
+        ElanL2GwCacheUtils.getInvolvedL2GwDevices(elanName).keySet().forEach(nodeId -> {
+            new McastUpdateJob(elanName, nodeId, true, mcastUtils,
+                    elanClusterUtils, scheduler, jobCoordinator).submit();
         });
     }
 
     public static void removeMcastForNode(String elanName, String nodeId,
                                           ElanL2GatewayMulticastUtils mcastUtils,
-                                          ElanClusterUtils elanClusterUtils) {
+                                          ElanClusterUtils elanClusterUtils,
+                                          Scheduler scheduler, JobCoordinator jobCoordinator) {
         new McastUpdateJob(elanName, nodeId, false, mcastUtils,
-                elanClusterUtils).submit();
+                elanClusterUtils, scheduler, jobCoordinator).submit();
     }
 
     public static void updateMcastForNode(String elanName, String nodeId,
                                           ElanL2GatewayMulticastUtils mcastUtils,
-                                          ElanClusterUtils elanClusterUtils) {
+                                          ElanClusterUtils elanClusterUtils,
+                                          Scheduler scheduler, JobCoordinator jobCoordinator) {
         new McastUpdateJob(elanName, nodeId, true, mcastUtils,
-                elanClusterUtils).submit();
+                elanClusterUtils, scheduler, jobCoordinator).submit();
     }
 
     private McastUpdateJob setRemovedDstTep(IpAddress removedDstTep) {
@@ -99,19 +104,24 @@ public class McastUpdateJob implements Callable<List<? extends ListenableFuture<
 
     public static void updateAllMcastsForConnectionAdd(String elanName,
                                                        ElanL2GatewayMulticastUtils mcastUtils,
-                                                       ElanClusterUtils elanClusterUtils) {
-        ElanL2GwCacheUtils.getInvolvedL2GwDevices(elanName).forEach(device -> {
-            new McastUpdateJob(elanName, device.getHwvtepNodeId(), true , mcastUtils, elanClusterUtils).submit();
+                                                       ElanClusterUtils elanClusterUtils,
+                                                       Scheduler scheduler,
+                                                       JobCoordinator jobCoordinator) {
+        ElanL2GwCacheUtils.getInvolvedL2GwDevices(elanName).keySet().forEach(nodeId -> {
+            new McastUpdateJob(elanName, nodeId, true , mcastUtils, elanClusterUtils, scheduler,
+                    jobCoordinator).submit();
         });
     }
 
     public static void updateAllMcastsForConnectionDelete(String elanName,
                                                           ElanL2GatewayMulticastUtils mcastUtils,
                                                           ElanClusterUtils elanClusterUtils,
+                                                          Scheduler scheduler,
+                                                          JobCoordinator jobCoordinator,
                                                           L2GatewayDevice deletedDevice) {
-        ElanL2GwCacheUtils.getInvolvedL2GwDevices(elanName).forEach(device -> {
+        ElanL2GwCacheUtils.getInvolvedL2GwDevices(elanName).keySet().forEach(nodeId -> {
             IpAddress deletedTep = deletedDevice.getTunnelIp();
-            new McastUpdateJob(elanName, device.getHwvtepNodeId(), true , mcastUtils, elanClusterUtils)
+            new McastUpdateJob(elanName, nodeId, true , mcastUtils, elanClusterUtils, scheduler, jobCoordinator)
                     .setDpnOrconnectionRemoved()
                     .setRemovedDstTep(deletedTep)
                     .submit();
@@ -120,20 +130,25 @@ public class McastUpdateJob implements Callable<List<? extends ListenableFuture<
 
     public static void updateAllMcastsForDpnAdd(String elanName,
                                                 ElanL2GatewayMulticastUtils mcastUtils,
-                                                ElanClusterUtils elanClusterUtils) {
-        ElanL2GwCacheUtils.getInvolvedL2GwDevices(elanName).forEach(device -> {
-            new McastUpdateJob(elanName, device.getHwvtepNodeId(), true , mcastUtils, elanClusterUtils).submit();
+                                                ElanClusterUtils elanClusterUtils,
+                                                Scheduler scheduler,
+                                                JobCoordinator jobCoordinator) {
+        ElanL2GwCacheUtils.getInvolvedL2GwDevices(elanName).keySet().forEach(nodeId -> {
+            new McastUpdateJob(elanName, nodeId, true , mcastUtils, elanClusterUtils, scheduler, jobCoordinator)
+                    .submit();
         });
     }
 
     public static void updateAllMcastsForDpnDelete(String elanName,
                                                    ElanL2GatewayMulticastUtils mcastUtils,
                                                    ElanClusterUtils elanClusterUtils,
-                                                   Uint64 srcDpnId,
-                                                   ElanItmUtils elanItmUtils) {
-        ElanL2GwCacheUtils.getInvolvedL2GwDevices(elanName).forEach(device -> {
-            IpAddress deletedTep = elanItmUtils.getSourceDpnTepIp(srcDpnId, new NodeId(device.getHwvtepNodeId()));
-            new McastUpdateJob(elanName, device.getHwvtepNodeId(), true , mcastUtils, elanClusterUtils)
+                                                   BigInteger srcDpnId,
+                                                   ElanItmUtils elanItmUtils,
+                                                   Scheduler scheduler,
+                                                   JobCoordinator jobCoordinator) {
+        ElanL2GwCacheUtils.getInvolvedL2GwDevices(elanName).keySet().forEach(nodeId -> {
+            IpAddress deletedTep = elanItmUtils.getSourceDpnTepIp(Uint64.valueOf(srcDpnId), new NodeId(nodeId));
+            new McastUpdateJob(elanName, nodeId, true , mcastUtils, elanClusterUtils, scheduler, jobCoordinator)
                     .setDpnOrconnectionRemoved()
                     .setRemovedDstTep(deletedTep)
                     .submit();