Bug 7775: Updated to use DataStoreJobCoordinator for flow programming 44/53144/10
authorShashidhar Raja <shashidharr@altencalsoftlabs.com>
Fri, 10 Mar 2017 14:20:57 +0000 (19:50 +0530)
committerSam Hague <shague@redhat.com>
Fri, 24 Mar 2017 01:43:40 +0000 (01:43 +0000)
Updated syncFlow() in AbstractAclServiceImpl.java to use DJC while flow is
added/removed to serialize/parallelize flow programming based on device
id. This also avoids possibility of OptimisticLockException getting
generated from ACL while flows are programmed.

Also, Interface state listener updated to use DJC to serialize/parallelize
interface state processing changes.

This review is dependent on below genius patches:
https://git.opendaylight.org/gerrit/#/c/53143/
https://git.opendaylight.org/gerrit/#/c/53352/

Depends-On: I8c15af6607f83a08354520872e4a5ff0ea16d2bf
Change-Id: I578cdf332d8e5575ac31b3b7c5fdf9d7ea86013d
Signed-off-by: Shashidhar Raja <shashidharr@altencalsoftlabs.com>
vpnservice/aclservice/impl/src/main/java/org/opendaylight/netvirt/aclservice/AbstractAclServiceImpl.java
vpnservice/aclservice/impl/src/main/java/org/opendaylight/netvirt/aclservice/listeners/AclInterfaceStateListener.java
vpnservice/aclservice/impl/src/test/java/org/opendaylight/netvirt/aclservice/LearnEgressAclServiceImplTest.java
vpnservice/aclservice/impl/src/test/java/org/opendaylight/netvirt/aclservice/StatelessEgressAclServiceImplTest.java
vpnservice/aclservice/impl/src/test/java/org/opendaylight/netvirt/aclservice/StatelessIngressAclServiceImplTest.java
vpnservice/aclservice/impl/src/test/java/org/opendaylight/netvirt/aclservice/tests/AclServiceTestBase.java
vpnservice/aclservice/impl/src/test/java/org/opendaylight/netvirt/aclservice/tests/AclServiceTestModule.java
vpnservice/aclservice/impl/src/test/java/org/opendaylight/netvirt/aclservice/utils/AclServiceTestUtils.java

index 93f34955340b39debd73ad7087641c6f61e3a15b..ce9236767a9904241a4eab7320ae54d94834de3f 100644 (file)
@@ -9,12 +9,14 @@ package org.opendaylight.netvirt.aclservice;
 
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.genius.datastoreutils.DataStoreJobCoordinator;
 import org.opendaylight.genius.mdsalutil.ActionInfo;
 import org.opendaylight.genius.mdsalutil.FlowEntity;
 import org.opendaylight.genius.mdsalutil.InstructionInfo;
@@ -338,16 +340,23 @@ public abstract class AbstractAclServiceImpl implements AclServiceListener {
     protected void syncFlow(BigInteger dpId, short tableId, String flowId, int priority, String flowName,
             int idleTimeOut, int hardTimeOut, BigInteger cookie, List<? extends MatchInfoBase> matches,
             List<InstructionInfo> instructions, int addOrRemove) {
+        DataStoreJobCoordinator dataStoreCoordinator = DataStoreJobCoordinator.getInstance();
         if (addOrRemove == NwConstants.DEL_FLOW) {
             FlowEntity flowEntity = MDSALUtil.buildFlowEntity(dpId, tableId, flowId, priority, flowName, idleTimeOut,
                     hardTimeOut, cookie, matches, null);
             LOG.trace("Removing Acl Flow DpnId {}, flowId {}", dpId, flowId);
-            mdsalManager.removeFlow(flowEntity);
+            dataStoreCoordinator.enqueueJob(dpId.toString(),
+                () -> {
+                    return Arrays.asList(mdsalManager.removeFlow(dpId, flowEntity));
+                });
         } else {
             FlowEntity flowEntity = MDSALUtil.buildFlowEntity(dpId, tableId, flowId, priority, flowName, idleTimeOut,
                     hardTimeOut, cookie, matches, instructions);
             LOG.trace("Installing DpnId {}, flowId {}", dpId, flowId);
-            mdsalManager.installFlow(flowEntity);
+            dataStoreCoordinator.enqueueJob(dpId.toString(),
+                () -> {
+                    return Arrays.asList(mdsalManager.installFlow(dpId, flowEntity));
+                });
         }
     }
 
index 54e5ffc0a68cfbb0aecc011e732b9143125c54fe..08ab2414825de2c0e97b0f5a632094b5e803cd0a 100644 (file)
@@ -7,6 +7,9 @@
  */
 package org.opendaylight.netvirt.aclservice.listeners;
 
+import com.google.common.util.concurrent.Futures;
+
+import java.util.Arrays;
 import java.util.List;
 
 import javax.annotation.PostConstruct;
@@ -17,6 +20,7 @@ import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeLis
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
+import org.opendaylight.genius.datastoreutils.DataStoreJobCoordinator;
 import org.opendaylight.netvirt.aclservice.api.AclServiceManager;
 import org.opendaylight.netvirt.aclservice.api.AclServiceManager.Action;
 import org.opendaylight.netvirt.aclservice.api.utils.AclInterface;
@@ -68,17 +72,22 @@ public class AclInterfaceStateListener extends AsyncDataTreeChangeListenerBase<I
     @Override
     protected void remove(InstanceIdentifier<Interface> key, Interface dataObjectModification) {
         String interfaceId = dataObjectModification.getName();
-        AclInterface aclInterface = AclInterfaceCacheUtil.getAclInterfaceFromCache(interfaceId);
-        if (AclServiceUtils.isOfInterest(aclInterface)) {
-            AclInterfaceCacheUtil.removeAclInterfaceFromCache(interfaceId);
-            if (aclClusterUtil.isEntityOwner()) {
-                aclServiceManger.notify(aclInterface, null, Action.REMOVE);
-            }
-            List<Uuid> aclList = aclInterface.getSecurityGroups();
-            if (aclList != null) {
-                aclDataUtil.removeAclInterfaceMap(aclList, aclInterface);
-            }
-        }
+        DataStoreJobCoordinator dataStoreCoordinator = DataStoreJobCoordinator.getInstance();
+        dataStoreCoordinator.enqueueJob(interfaceId,
+            () -> {
+                AclInterface aclInterface = AclInterfaceCacheUtil.getAclInterfaceFromCache(interfaceId);
+                if (AclServiceUtils.isOfInterest(aclInterface)) {
+                    AclInterfaceCacheUtil.removeAclInterfaceFromCache(interfaceId);
+                    if (aclClusterUtil.isEntityOwner()) {
+                        aclServiceManger.notify(aclInterface, null, Action.REMOVE);
+                    }
+                    List<Uuid> aclList = aclInterface.getSecurityGroups();
+                    if (aclList != null) {
+                        aclDataUtil.removeAclInterfaceMap(aclList, aclInterface);
+                    }
+                }
+                return Arrays.asList(Futures.immediateCheckedFuture(null));
+            });
     }
 
     @Override
@@ -92,16 +101,21 @@ public class AclInterfaceStateListener extends AsyncDataTreeChangeListenerBase<I
 
     @Override
     protected void add(InstanceIdentifier<Interface> key, Interface dataObjectModification) {
-        AclInterface aclInterface = updateAclInterfaceCache(dataObjectModification);
-        if (AclServiceUtils.isOfInterest(aclInterface)) {
-            List<Uuid> aclList = aclInterface.getSecurityGroups();
-            if (aclList != null) {
-                aclDataUtil.addAclInterfaceMap(aclList, aclInterface);
-            }
-            if (aclClusterUtil.isEntityOwner()) {
-                aclServiceManger.notify(aclInterface, null, Action.ADD);
-            }
-        }
+        DataStoreJobCoordinator dataStoreCoordinator = DataStoreJobCoordinator.getInstance();
+        dataStoreCoordinator.enqueueJob(dataObjectModification.getName(),
+            () -> {
+                AclInterface aclInterface = updateAclInterfaceCache(dataObjectModification);
+                if (AclServiceUtils.isOfInterest(aclInterface)) {
+                    List<Uuid> aclList = aclInterface.getSecurityGroups();
+                    if (aclList != null) {
+                        aclDataUtil.addAclInterfaceMap(aclList, aclInterface);
+                    }
+                    if (aclClusterUtil.isEntityOwner()) {
+                        aclServiceManger.notify(aclInterface, null, Action.ADD);
+                    }
+                }
+                return Arrays.asList(Futures.immediateCheckedFuture(null));
+            });
     }
 
     @Override
index 3296f657916eb51e4271be595471506f5b414a73..035b7dd9fc8fcdf2ad1ff19811019fecbec2966e 100644 (file)
@@ -15,6 +15,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
 
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 
 import java.math.BigInteger;
@@ -30,6 +31,7 @@ import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.genius.mdsalutil.FlowEntity;
 import org.opendaylight.genius.mdsalutil.NwConstants;
 import org.opendaylight.genius.mdsalutil.NxMatchFieldType;
@@ -77,8 +79,8 @@ public class LearnEgressAclServiceImplTest {
     @Mock AclserviceConfig config;
     @Mock IdManagerService idManager;
 
-    MethodInvocationParamSaver<Void> installFlowValueSaver = null;
-    MethodInvocationParamSaver<Void> removeFlowValueSaver = null;
+    MethodInvocationParamSaver<CheckedFuture<Void, TransactionCommitFailedException>> installFlowValueSaver = null;
+    MethodInvocationParamSaver<CheckedFuture<Void, TransactionCommitFailedException>> removeFlowValueSaver = null;
 
     final Integer tcpFinIdleTimeoutValue = 60;
 
@@ -90,10 +92,10 @@ public class LearnEgressAclServiceImplTest {
         doReturn(Futures.immediateCheckedFuture(null)).when(mockWriteTx).submit();
         doReturn(mockReadTx).when(dataBroker).newReadOnlyTransaction();
         doReturn(mockWriteTx).when(dataBroker).newWriteOnlyTransaction();
-        installFlowValueSaver = new MethodInvocationParamSaver<>(null);
-        doAnswer(installFlowValueSaver).when(mdsalManager).installFlow(any(FlowEntity.class));
-        removeFlowValueSaver = new MethodInvocationParamSaver<>(null);
-        doAnswer(installFlowValueSaver).when(mdsalManager).removeFlow(any(FlowEntity.class));
+        installFlowValueSaver = new MethodInvocationParamSaver<>(Futures.immediateCheckedFuture(null));
+        doAnswer(installFlowValueSaver).when(mdsalManager).installFlow(any(BigInteger.class), any(FlowEntity.class));
+        removeFlowValueSaver = new MethodInvocationParamSaver<>(Futures.immediateCheckedFuture(null));
+        doAnswer(installFlowValueSaver).when(mdsalManager).removeFlow(any(BigInteger.class), any(FlowEntity.class));
         doReturn(tcpFinIdleTimeoutValue).when(config).getSecurityGroupTcpFinIdleTimeout();
     }
 
@@ -115,9 +117,10 @@ public class LearnEgressAclServiceImplTest {
         Uuid sgUuid = new Uuid("12345678-1234-1234-1234-123456789012");
         AclInterface ai = stubTcpAclInterface(sgUuid, "if_name", "1.1.1.1/32", 80, 80);
         assertEquals(true, testedService.applyAcl(ai));
+        Thread.sleep(1000);
         assertEquals(10, installFlowValueSaver.getNumOfInvocations());
 
-        FlowEntity flow = (FlowEntity) installFlowValueSaver.getInvocationParams(9).get(0);
+        FlowEntity flow = (FlowEntity) installFlowValueSaver.getInvocationParams(9).get(1);
         AclServiceTestUtils.verifyMatchInfo(flow.getMatchInfoList(),
                 NxMatchFieldType.nx_tcp_dst_with_mask, "80", "65535");
         AclServiceTestUtils.verifyActionTypeExist(flow.getInstructionInfoList().get(0), ActionLearn.class);
@@ -141,9 +144,10 @@ public class LearnEgressAclServiceImplTest {
         Uuid sgUuid = new Uuid("12345678-1234-1234-1234-123456789012");
         AclInterface ai = stubAllowAllInterface(sgUuid, "if_name");
         assertEquals(true, testedService.applyAcl(ai));
+        Thread.sleep(1000);
         assertEquals(10, installFlowValueSaver.getNumOfInvocations());
 
-        FlowEntity flow = (FlowEntity) installFlowValueSaver.getInvocationParams(9).get(0);
+        FlowEntity flow = (FlowEntity) installFlowValueSaver.getInvocationParams(9).get(1);
         AclServiceTestUtils.verifyActionTypeExist(flow.getInstructionInfoList().get(0), ActionLearn.class);
     }
 
@@ -152,12 +156,13 @@ public class LearnEgressAclServiceImplTest {
         Uuid sgUuid = new Uuid("12345678-1234-1234-1234-123456789012");
         AclInterface ai = stubTcpAclInterface(sgUuid, "if_name", "1.1.1.1/32", 80, 84);
         assertEquals(true, testedService.applyAcl(ai));
+        Thread.sleep(1000);
         assertEquals(11, installFlowValueSaver.getNumOfInvocations());
-        FlowEntity firstRangeFlow = (FlowEntity) installFlowValueSaver.getInvocationParams(9).get(0);
+        FlowEntity firstRangeFlow = (FlowEntity) installFlowValueSaver.getInvocationParams(9).get(1);
         AclServiceTestUtils.verifyMatchInfo(firstRangeFlow.getMatchInfoList(),
                 NxMatchFieldType.nx_tcp_dst_with_mask, "80", "65532");
 
-        FlowEntity secondRangeFlow = (FlowEntity) installFlowValueSaver.getInvocationParams(10).get(0);
+        FlowEntity secondRangeFlow = (FlowEntity) installFlowValueSaver.getInvocationParams(10).get(1);
         AclServiceTestUtils.verifyMatchInfo(secondRangeFlow.getMatchInfoList(),
                 NxMatchFieldType.nx_tcp_dst_with_mask, "84", "65535");
     }
@@ -167,8 +172,9 @@ public class LearnEgressAclServiceImplTest {
         Uuid sgUuid = new Uuid("12345678-1234-1234-1234-123456789012");
         AclInterface ai = stubUdpAclInterface(sgUuid, "if_name", "1.1.1.1/32", 80, 80);
         assertEquals(true, testedService.applyAcl(ai));
+        Thread.sleep(1000);
         assertEquals(10, installFlowValueSaver.getNumOfInvocations());
-        FlowEntity flow = (FlowEntity) installFlowValueSaver.getInvocationParams(9).get(0);
+        FlowEntity flow = (FlowEntity) installFlowValueSaver.getInvocationParams(9).get(1);
         AclServiceTestUtils.verifyMatchInfo(flow.getMatchInfoList(),
                 NxMatchFieldType.nx_udp_dst_with_mask, "80", "65535");
         AclServiceTestUtils.verifyActionTypeExist(flow.getInstructionInfoList().get(0), ActionLearn.class);
@@ -193,6 +199,7 @@ public class LearnEgressAclServiceImplTest {
         Uuid sgUuid = new Uuid("12345678-1234-1234-1234-123456789012");
         AclInterface ai = stubTcpAclInterface(sgUuid, "if_name", "1.1.1.1/32", 80, 80);
         assertEquals(true, testedService.removeAcl(ai));
+        Thread.sleep(1000);
         assertEquals(5, removeFlowValueSaver.getNumOfInvocations());
         FlowEntity firstRangeFlow = (FlowEntity) removeFlowValueSaver.getInvocationParams(4).get(0);
         assertTrue(firstRangeFlow.getMatchInfoList().contains(new MatchTcpFlags(2)));
index ea4573aa8f111719a97ff663d6ba7dd3acedb609..3354a8d07b5e279a4527f29e04974676c21984b7 100644 (file)
@@ -15,6 +15,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
 
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import java.math.BigInteger;
 import java.util.Collections;
@@ -27,6 +28,7 @@ import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.genius.mdsalutil.FlowEntity;
 import org.opendaylight.genius.mdsalutil.NwConstants;
 import org.opendaylight.genius.mdsalutil.NxMatchFieldType;
@@ -73,8 +75,8 @@ public class StatelessEgressAclServiceImplTest {
     @Mock AclserviceConfig config;
     @Mock IdManagerService idManager;
 
-    MethodInvocationParamSaver<Void> installFlowValueSaver = null;
-    MethodInvocationParamSaver<Void> removeFlowValueSaver = null;
+    MethodInvocationParamSaver<CheckedFuture<Void, TransactionCommitFailedException>> installFlowValueSaver = null;
+    MethodInvocationParamSaver<CheckedFuture<Void, TransactionCommitFailedException>> removeFlowValueSaver = null;
 
     @Before
     public void setUp() {
@@ -84,10 +86,10 @@ public class StatelessEgressAclServiceImplTest {
         doReturn(Futures.immediateCheckedFuture(null)).when(mockWriteTx).submit();
         doReturn(mockReadTx).when(dataBroker).newReadOnlyTransaction();
         doReturn(mockWriteTx).when(dataBroker).newWriteOnlyTransaction();
-        installFlowValueSaver = new MethodInvocationParamSaver<>(null);
-        doAnswer(installFlowValueSaver).when(mdsalManager).installFlow(any(FlowEntity.class));
-        removeFlowValueSaver = new MethodInvocationParamSaver<>(null);
-        doAnswer(removeFlowValueSaver).when(mdsalManager).removeFlow(any(FlowEntity.class));
+        installFlowValueSaver = new MethodInvocationParamSaver<>(Futures.immediateCheckedFuture(null));
+        removeFlowValueSaver = new MethodInvocationParamSaver<>(Futures.immediateCheckedFuture(null));
+        doAnswer(removeFlowValueSaver).when(mdsalManager).removeFlow(any(BigInteger.class), any(FlowEntity.class));
+        doAnswer(installFlowValueSaver).when(mdsalManager).installFlow(any(BigInteger.class), any(FlowEntity.class));
     }
 
     @Test
@@ -108,15 +110,15 @@ public class StatelessEgressAclServiceImplTest {
         Uuid sgUuid = new Uuid("12345678-1234-1234-1234-123456789012");
         AclInterface ai = stubTcpAclInterface(sgUuid, "if_name", "1.1.1.1/32", 80, 80);
         assertEquals(true, testedService.applyAcl(ai));
+        Thread.sleep(1000);
         assertEquals(10, installFlowValueSaver.getNumOfInvocations());
 
-        FlowEntity firstRangeFlow = (FlowEntity) installFlowValueSaver.getInvocationParams(9).get(0);
+        FlowEntity firstRangeFlow = (FlowEntity) installFlowValueSaver.getInvocationParams(9).get(1);
         AclServiceTestUtils.verifyMatchInfo(firstRangeFlow.getMatchInfoList(),
                 NxMatchFieldType.nx_tcp_dst_with_mask, "80", "65535");
         assertTrue(firstRangeFlow.getMatchInfoList().contains(new MatchTcpFlags(2)));
         AclServiceTestUtils.verifyActionInfo(firstRangeFlow.getInstructionInfoList().get(0),
                 new ActionNxResubmit(NwConstants.LPORT_DISPATCHER_TABLE));
-
     }
 
     @Test
@@ -124,9 +126,10 @@ public class StatelessEgressAclServiceImplTest {
         Uuid sgUuid = new Uuid("12345678-1234-1234-1234-123456789012");
         AclInterface ai = stubAllowAllInterface(sgUuid, "if_name");
         assertEquals(true, testedService.applyAcl(ai));
+        Thread.sleep(1000);
         assertEquals(10, installFlowValueSaver.getNumOfInvocations());
 
-        FlowEntity firstRangeFlow = (FlowEntity) installFlowValueSaver.getInvocationParams(9).get(0);
+        FlowEntity firstRangeFlow = (FlowEntity) installFlowValueSaver.getInvocationParams(9).get(1);
         AclServiceTestUtils.verifyActionInfo(firstRangeFlow.getInstructionInfoList().get(0),
                 new ActionNxResubmit(NwConstants.LPORT_DISPATCHER_TABLE));
     }
@@ -136,13 +139,14 @@ public class StatelessEgressAclServiceImplTest {
         Uuid sgUuid = new Uuid("12345678-1234-1234-1234-123456789012");
         AclInterface ai = stubTcpAclInterface(sgUuid, "if_name", "1.1.1.1/32", 80, 84);
         assertEquals(true, testedService.applyAcl(ai));
+        Thread.sleep(1000);
         assertEquals(11, installFlowValueSaver.getNumOfInvocations());
-        FlowEntity firstRangeFlow = (FlowEntity) installFlowValueSaver.getInvocationParams(9).get(0);
+        FlowEntity firstRangeFlow = (FlowEntity) installFlowValueSaver.getInvocationParams(9).get(1);
         AclServiceTestUtils.verifyMatchInfo(firstRangeFlow.getMatchInfoList(),
                 NxMatchFieldType.nx_tcp_dst_with_mask, "80", "65532");
         assertTrue(firstRangeFlow.getMatchInfoList().contains(new MatchTcpFlags(2)));
 
-        FlowEntity secondRangeFlow = (FlowEntity) installFlowValueSaver.getInvocationParams(10).get(0);
+        FlowEntity secondRangeFlow = (FlowEntity) installFlowValueSaver.getInvocationParams(10).get(1);
         AclServiceTestUtils.verifyMatchInfo(secondRangeFlow.getMatchInfoList(),
                 NxMatchFieldType.nx_tcp_dst_with_mask, "84", "65535");
         assertTrue(secondRangeFlow.getMatchInfoList().contains(new MatchTcpFlags(2)));
@@ -153,6 +157,7 @@ public class StatelessEgressAclServiceImplTest {
         Uuid sgUuid = new Uuid("12345678-1234-1234-1234-123456789012");
         AclInterface ai = stubUdpAclInterface(sgUuid, "if_name", "1.1.1.1/32", 80, 80);
         assertEquals(true, testedService.applyAcl(ai));
+        Thread.sleep(1000);
         assertEquals(9, installFlowValueSaver.getNumOfInvocations());
     }
 
@@ -161,8 +166,10 @@ public class StatelessEgressAclServiceImplTest {
         Uuid sgUuid = new Uuid("12345678-1234-1234-1234-123456789012");
         AclInterface ai = stubTcpAclInterface(sgUuid, "if_name", "1.1.1.1/32", 80, 80);
         assertEquals(true, testedService.removeAcl(ai));
+        Thread.sleep(1000);
+
         assertEquals(10, removeFlowValueSaver.getNumOfInvocations());
-        FlowEntity firstRangeFlow = (FlowEntity) removeFlowValueSaver.getInvocationParams(9).get(0);
+        FlowEntity firstRangeFlow = (FlowEntity) removeFlowValueSaver.getInvocationParams(9).get(1);
         assertTrue(firstRangeFlow.getMatchInfoList().contains(new MatchTcpFlags(2)));
         AclServiceTestUtils.verifyMatchInfo(firstRangeFlow.getMatchInfoList(),
                 NxMatchFieldType.nx_tcp_dst_with_mask, "80", "65535");
index 76d39fc98d5b3f33251c672fb61607e609657c27..3393295f092301242d8cf8d8169da3f9f5c156c5 100644 (file)
@@ -15,6 +15,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
 
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import java.math.BigInteger;
 import java.util.Collections;
@@ -27,6 +28,7 @@ import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.genius.mdsalutil.FlowEntity;
 import org.opendaylight.genius.mdsalutil.NwConstants;
 import org.opendaylight.genius.mdsalutil.NxMatchFieldType;
@@ -73,8 +75,8 @@ public class StatelessIngressAclServiceImplTest {
     @Mock AclserviceConfig config;
     @Mock IdManagerService idManager;
 
-    MethodInvocationParamSaver<Void> installFlowValueSaver = null;
-    MethodInvocationParamSaver<Void> removeFlowValueSaver = null;
+    MethodInvocationParamSaver<CheckedFuture<Void, TransactionCommitFailedException>> installFlowValueSaver = null;
+    MethodInvocationParamSaver<CheckedFuture<Void, TransactionCommitFailedException>> removeFlowValueSaver = null;
 
     @Before
     public void setUp() {
@@ -84,10 +86,10 @@ public class StatelessIngressAclServiceImplTest {
         doReturn(Futures.immediateCheckedFuture(null)).when(mockWriteTx).submit();
         doReturn(mockReadTx).when(dataBroker).newReadOnlyTransaction();
         doReturn(mockWriteTx).when(dataBroker).newWriteOnlyTransaction();
-        installFlowValueSaver = new MethodInvocationParamSaver<>(null);
-        doAnswer(installFlowValueSaver).when(mdsalManager).installFlow(any(FlowEntity.class));
-        removeFlowValueSaver = new MethodInvocationParamSaver<>(null);
-        doAnswer(removeFlowValueSaver).when(mdsalManager).removeFlow(any(FlowEntity.class));
+        installFlowValueSaver = new MethodInvocationParamSaver<>(Futures.immediateCheckedFuture(null));
+        doAnswer(installFlowValueSaver).when(mdsalManager).installFlow(any(BigInteger.class), any(FlowEntity.class));
+        removeFlowValueSaver = new MethodInvocationParamSaver<>(Futures.immediateCheckedFuture(null));
+        doAnswer(removeFlowValueSaver).when(mdsalManager).removeFlow(any(BigInteger.class), any(FlowEntity.class));
     }
 
     @Test
@@ -108,9 +110,10 @@ public class StatelessIngressAclServiceImplTest {
         Uuid sgUuid = new Uuid("12345678-1234-1234-1234-123456789012");
         AclInterface ai = stubTcpAclInterface(sgUuid, "if_name", "1.1.1.1/32", 80, 80);
         assertEquals(true, testedService.applyAcl(ai));
+        Thread.sleep(1000);
         assertEquals(7, installFlowValueSaver.getNumOfInvocations());
 
-        FlowEntity firstRangeFlow = (FlowEntity) installFlowValueSaver.getInvocationParams(6).get(0);
+        FlowEntity firstRangeFlow = (FlowEntity) installFlowValueSaver.getInvocationParams(6).get(1);
         AclServiceTestUtils.verifyMatchInfo(firstRangeFlow.getMatchInfoList(),
                 NxMatchFieldType.nx_tcp_dst_with_mask, "80", "65535");
         assertTrue(firstRangeFlow.getMatchInfoList().contains(new MatchTcpFlags(2)));
@@ -123,9 +126,10 @@ public class StatelessIngressAclServiceImplTest {
         Uuid sgUuid = new Uuid("12345678-1234-1234-1234-123456789012");
         AclInterface ai = stubAllowAllInterface(sgUuid, "if_name");
         assertEquals(true, testedService.applyAcl(ai));
+        Thread.sleep(1000);
         assertEquals(7, installFlowValueSaver.getNumOfInvocations());
 
-        FlowEntity firstRangeFlow = (FlowEntity) installFlowValueSaver.getInvocationParams(6).get(0);
+        FlowEntity firstRangeFlow = (FlowEntity) installFlowValueSaver.getInvocationParams(6).get(1);
         AclServiceTestUtils.verifyActionInfo(firstRangeFlow.getInstructionInfoList().get(0),
                 new ActionNxResubmit(NwConstants.EGRESS_LPORT_DISPATCHER_TABLE));
     }
@@ -135,15 +139,16 @@ public class StatelessIngressAclServiceImplTest {
         Uuid sgUuid = new Uuid("12345678-1234-1234-1234-123456789012");
         AclInterface ai = stubTcpAclInterface(sgUuid, "if_name", "1.1.1.1/32", 80, 84);
         assertEquals(true, testedService.applyAcl(ai));
+        Thread.sleep(1000);
         assertEquals(8, installFlowValueSaver.getNumOfInvocations());
-        FlowEntity firstRangeFlow = (FlowEntity) installFlowValueSaver.getInvocationParams(6).get(0);
+        FlowEntity firstRangeFlow = (FlowEntity) installFlowValueSaver.getInvocationParams(6).get(1);
         // should have been 80-83 will be fixed as part of the port range support
         // https://bugs.opendaylight.org/show_bug.cgi?id=6200
         AclServiceTestUtils.verifyMatchInfo(firstRangeFlow.getMatchInfoList(),
                 NxMatchFieldType.nx_tcp_dst_with_mask, "80", "65532");
         assertTrue(firstRangeFlow.getMatchInfoList().contains(new MatchTcpFlags(2)));
 
-        FlowEntity secondRangeFlow = (FlowEntity) installFlowValueSaver.getInvocationParams(7).get(0);
+        FlowEntity secondRangeFlow = (FlowEntity) installFlowValueSaver.getInvocationParams(7).get(1);
         AclServiceTestUtils.verifyMatchInfo(secondRangeFlow.getMatchInfoList(),
                 NxMatchFieldType.nx_tcp_dst_with_mask, "84", "65535");
         assertTrue(secondRangeFlow.getMatchInfoList().contains(new MatchTcpFlags(2)));
@@ -154,6 +159,7 @@ public class StatelessIngressAclServiceImplTest {
         Uuid sgUuid = new Uuid("12345678-1234-1234-1234-123456789012");
         AclInterface ai = stubUdpAclInterface(sgUuid, "if_name", "1.1.1.1/32", 80, 80);
         assertEquals(true, testedService.applyAcl(ai));
+        Thread.sleep(1000);
         assertEquals(6, installFlowValueSaver.getNumOfInvocations());
     }
 
@@ -162,8 +168,9 @@ public class StatelessIngressAclServiceImplTest {
         Uuid sgUuid = new Uuid("12345678-1234-1234-1234-123456789012");
         AclInterface ai = stubTcpAclInterface(sgUuid, "if_name", "1.1.1.1/32", 80, 80);
         assertEquals(true, testedService.removeAcl(ai));
+        Thread.sleep(1000);
         assertEquals(7, removeFlowValueSaver.getNumOfInvocations());
-        FlowEntity firstSynFlow = (FlowEntity) removeFlowValueSaver.getInvocationParams(6).get(0);
+        FlowEntity firstSynFlow = (FlowEntity) removeFlowValueSaver.getInvocationParams(6).get(1);
         AclServiceTestUtils.verifyMatchInfo(firstSynFlow.getMatchInfoList(),
                 NxMatchFieldType.nx_tcp_dst_with_mask, "80", "65535");
         assertTrue(firstSynFlow.getMatchInfoList().contains(MatchTcpFlags.SYN));
index 103b7dc4a262476264970b1b9f9cac2b0209ca44..b15c1e6c1f06baa00b77ecff05bcdec311ee873f 100644 (file)
@@ -15,6 +15,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import javax.inject.Inject;
+
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -22,6 +23,7 @@ import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
 import org.opendaylight.genius.datastoreutils.testutils.AsyncEventsWaiter;
+import org.opendaylight.genius.datastoreutils.testutils.JobCoordinatorEventsWaiter;
 import org.opendaylight.genius.mdsalutil.FlowEntity;
 import org.opendaylight.genius.mdsalutil.NwConstants;
 import org.opendaylight.genius.mdsalutil.interfaces.testutils.TestIMdsalApiManager;
@@ -90,6 +92,7 @@ public abstract class AclServiceTestBase {
     SingleTransactionDataBroker singleTransactionDataBroker;
     @Inject TestIMdsalApiManager mdsalApiManager;
     @Inject AsyncEventsWaiter asyncEventsWaiter;
+    @Inject JobCoordinatorEventsWaiter coordinatorEventsWaiter;
 
     @Before
     public void beforeEachTest() throws Exception {
@@ -438,6 +441,7 @@ public abstract class AclServiceTestBase {
     abstract void newInterfaceWithAapIpv4AllCheck();
 
     protected void assertFlowsInAnyOrder(Iterable<FlowEntity> expectedFlows) {
+        coordinatorEventsWaiter.awaitEventsConsumption();
         mdsalApiManager.assertFlowsInAnyOrder(expectedFlows);
     }
 
index b00fba81fc6e12995a74f96acfb383b018448869..13587e6ee2351d2c943d99602492d523b8b1a1b0 100644 (file)
@@ -17,6 +17,8 @@ import org.mockito.Mockito;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.binding.test.DataBrokerTestModule;
+import org.opendaylight.genius.datastoreutils.testutils.JobCoordinatorEventsWaiter;
+import org.opendaylight.genius.datastoreutils.testutils.TestableJobCoordinatorEventsWaiter;
 import org.opendaylight.genius.mdsalutil.interfaces.IMdsalApiManager;
 import org.opendaylight.genius.mdsalutil.interfaces.testutils.TestIMdsalApiManager;
 import org.opendaylight.netvirt.aclservice.stats.TestOdlDirectStatisticsService;
@@ -68,6 +70,7 @@ public class AclServiceTestModule extends AbstractModule {
         bind(IdManagerService.class).toInstance(Mockito.mock(TestIdManagerService.class, realOrException()));
         bind(OpendaylightDirectStatisticsService.class)
                 .toInstance(Mockito.mock(TestOdlDirectStatisticsService.class, realOrException()));
+        bind(JobCoordinatorEventsWaiter.class).toInstance(new TestableJobCoordinatorEventsWaiter());
     }
 
     private AclserviceConfig aclServiceConfig() {
index 6f4ee687e1cdaa67bc3b7cbc50971a70888f5674..f0cdc51ba8fa6aa6895071b4f60af4ce52469ea3 100644 (file)
@@ -20,6 +20,7 @@ import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+
 import org.junit.Assert;
 import org.opendaylight.genius.mdsalutil.ActionInfo;
 import org.opendaylight.genius.mdsalutil.InstructionInfo;
@@ -203,7 +204,5 @@ public class AclServiceTestUtils {
         if (entityOwnerCache != null) {
             entityOwnerCache.put(entityName, true);
         }
-
     }
-
 }