OPNFLWPLUG-972 : Point to openflowplugin liblldp
[netvirt.git] / vpnservice / natservice / natservice-impl / src / main / java / org / opendaylight / netvirt / natservice / internal / NaptPacketInHandler.java
index 403c53070474ef7e7b0350854e7dca0b6f517dd3..fbe31f14eba3048402d91caa4dc530754552b0f6 100644 (file)
@@ -9,16 +9,13 @@ package org.opendaylight.netvirt.natservice.internal;
 
 import com.google.common.primitives.Ints;
 import java.math.BigInteger;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
-
-import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 import javax.inject.Singleton;
-import org.opendaylight.controller.liblldp.NetUtils;
 import org.opendaylight.genius.mdsalutil.MetaDataUtil;
 import org.opendaylight.genius.mdsalutil.NWUtil;
 import org.opendaylight.genius.mdsalutil.NwConstants;
@@ -26,8 +23,10 @@ import org.opendaylight.genius.mdsalutil.packet.Ethernet;
 import org.opendaylight.genius.mdsalutil.packet.IPv4;
 import org.opendaylight.genius.mdsalutil.packet.TCP;
 import org.opendaylight.genius.mdsalutil.packet.UDP;
+import org.opendaylight.openflowplugin.libraries.liblldp.NetUtils;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,22 +34,19 @@ import org.slf4j.LoggerFactory;
 public class NaptPacketInHandler implements PacketProcessingListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(NaptPacketInHandler.class);
-    public static final HashMap<String,NatPacketProcessingState> INCOMING_PACKET_MAP = new HashMap<>();
+    private final ConcurrentMap<String,NatPacketProcessingState> incomingPacketMap = new ConcurrentHashMap<>();
     private final NaptEventHandler naptEventHandler;
-    private ExecutorService firstPacketExecutorService;
-    private ExecutorService retryPacketExecutorService;
+    private final ExecutorService firstPacketExecutorService = SpecialExecutors.newBlockingBoundedFastThreadPool(
+            NatConstants.SNAT_PACKET_THEADPOOL_SIZE, Integer.MAX_VALUE, "Napt-firstPacket", NaptPacketInHandler.class);
+    private final ExecutorService retryPacketExecutorService = SpecialExecutors.newBlockingBoundedFastThreadPool(
+            NatConstants.SNAT_PACKET_RETRY_THEADPOOL_SIZE, Integer.MAX_VALUE, "Napt-retryPacket",
+            NaptPacketInHandler.class);
 
     @Inject
     public NaptPacketInHandler(NaptEventHandler naptEventHandler) {
         this.naptEventHandler = naptEventHandler;
     }
 
-    @PostConstruct
-    public void init() {
-        firstPacketExecutorService = Executors.newFixedThreadPool(NatConstants.SNAT_PACKET_THEADPOOL_SIZE);
-        retryPacketExecutorService = Executors.newFixedThreadPool(NatConstants.SNAT_PACKET_RETRY_THEADPOOL_SIZE);
-    }
-
     @PreDestroy
     public void close() {
         firstPacketExecutorService.shutdown();
@@ -124,28 +120,29 @@ public class NaptPacketInHandler implements PacketProcessingListener {
                     }
                     String sourceIPPortKey = routerId + NatConstants.COLON_SEPARATOR
                             + internalIPAddress + NatConstants.COLON_SEPARATOR + portNumber;
-                    if (!INCOMING_PACKET_MAP.containsKey(sourceIPPortKey)) {
-                        INCOMING_PACKET_MAP.put(sourceIPPortKey,
-                                new NatPacketProcessingState(System.currentTimeMillis(), -1));
+
+                    NatPacketProcessingState state = incomingPacketMap.get(sourceIPPortKey);
+                    if (state == null) {
+                        state = new NatPacketProcessingState(System.currentTimeMillis(), -1);
+                        incomingPacketMap.put(sourceIPPortKey, state);
                         LOG.trace("onPacketReceived : Processing new SNAT({}) Packet", sourceIPPortKey);
 
                         //send to Event Queue
                         NAPTEntryEvent naptEntryEvent = new NAPTEntryEvent(internalIPAddress, portNumber, routerId,
-                            operation, protocol, packetReceived, false);
+                            operation, protocol, packetReceived, false, state);
                         LOG.info("onPacketReceived : First Packet IN Queue Size : {}",
                                 ((ThreadPoolExecutor)firstPacketExecutorService).getQueue().size());
                         firstPacketExecutorService.execute(() -> naptEventHandler.handleEvent(naptEntryEvent));
                     } else {
                         LOG.trace("onPacketReceived : SNAT({}) Packet already processed.", sourceIPPortKey);
                         NAPTEntryEvent naptEntryEvent = new NAPTEntryEvent(internalIPAddress, portNumber, routerId,
-                            operation, protocol, packetReceived, true);
-                        LOG.info("onPacketReceived : Retry Packet IN Queue Size : {}",
+                            operation, protocol, packetReceived, true, state);
+                        LOG.debug("onPacketReceived : Retry Packet IN Queue Size : {}",
                                 ((ThreadPoolExecutor)retryPacketExecutorService).getQueue().size());
+
+                        long firstPacketInTime = state.getFirstPacketInTime();
                         retryPacketExecutorService.execute(() -> {
-                            NatPacketProcessingState state = INCOMING_PACKET_MAP.get(sourceIPPortKey);
-                            long firstPacketInTime = state.getFirstPacketInTime();
-                            long flowInstalledTime = state.getFlowInstalledTime();
-                            if ((System.currentTimeMillis() - firstPacketInTime) > 4000) {
+                            if (System.currentTimeMillis() - firstPacketInTime > 4000) {
                                 LOG.error("onPacketReceived : Flow not installed even after 4sec."
                                         + "Dropping SNAT ({}) Packet", sourceIPPortKey);
                                 removeIncomingPacketMap(sourceIPPortKey);
@@ -164,32 +161,28 @@ public class NaptPacketInHandler implements PacketProcessingListener {
     }
 
     public void removeIncomingPacketMap(String sourceIPPortKey) {
-        INCOMING_PACKET_MAP.remove(sourceIPPortKey);
+        incomingPacketMap.remove(sourceIPPortKey);
         LOG.debug("removeIncomingPacketMap : sourceIPPortKey {} mapping is removed from map", sourceIPPortKey);
     }
 
-    protected class NatPacketProcessingState {
-        long firstPacketInTime;
-        long flowInstalledTime;
+    static class NatPacketProcessingState {
+        private final long firstPacketInTime;
+        private volatile long flowInstalledTime;
 
-        public NatPacketProcessingState(long firstPacketInTime, long flowInstalledTime) {
+        NatPacketProcessingState(long firstPacketInTime, long flowInstalledTime) {
             this.firstPacketInTime = firstPacketInTime;
             this.flowInstalledTime = flowInstalledTime;
         }
 
-        public long getFirstPacketInTime() {
+        long getFirstPacketInTime() {
             return firstPacketInTime;
         }
 
-        public void setFirstPacketInTime(long firstPacketInTime) {
-            this.firstPacketInTime = firstPacketInTime;
-        }
-
-        public long getFlowInstalledTime() {
+        long getFlowInstalledTime() {
             return flowInstalledTime;
         }
 
-        public void setFlowInstalledTime(long flowInstalledTime) {
+        void setFlowInstalledTime(long flowInstalledTime) {
             this.flowInstalledTime = flowInstalledTime;
         }
     }