X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=vpnservice%2Fnatservice%2Fnatservice-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetvirt%2Fnatservice%2Finternal%2FNaptPacketInHandler.java;h=fbe31f14eba3048402d91caa4dc530754552b0f6;hb=f617fffab912f9ef79a3a7144075db46188069c7;hp=403c53070474ef7e7b0350854e7dca0b6f517dd3;hpb=6cfbcc794bdaec6a699d5c446f843409491241d6;p=netvirt.git diff --git a/vpnservice/natservice/natservice-impl/src/main/java/org/opendaylight/netvirt/natservice/internal/NaptPacketInHandler.java b/vpnservice/natservice/natservice-impl/src/main/java/org/opendaylight/netvirt/natservice/internal/NaptPacketInHandler.java index 403c530704..fbe31f14eb 100644 --- a/vpnservice/natservice/natservice-impl/src/main/java/org/opendaylight/netvirt/natservice/internal/NaptPacketInHandler.java +++ b/vpnservice/natservice/natservice-impl/src/main/java/org/opendaylight/netvirt/natservice/internal/NaptPacketInHandler.java @@ -9,16 +9,13 @@ package org.opendaylight.netvirt.natservice.internal; import com.google.common.primitives.Ints; import java.math.BigInteger; -import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; - -import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.inject.Inject; import javax.inject.Singleton; -import org.opendaylight.controller.liblldp.NetUtils; import org.opendaylight.genius.mdsalutil.MetaDataUtil; import org.opendaylight.genius.mdsalutil.NWUtil; import org.opendaylight.genius.mdsalutil.NwConstants; @@ -26,8 +23,10 @@ import org.opendaylight.genius.mdsalutil.packet.Ethernet; import org.opendaylight.genius.mdsalutil.packet.IPv4; import org.opendaylight.genius.mdsalutil.packet.TCP; import org.opendaylight.genius.mdsalutil.packet.UDP; +import org.opendaylight.openflowplugin.libraries.liblldp.NetUtils; import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived; +import org.opendaylight.yangtools.util.concurrent.SpecialExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,22 +34,19 @@ import org.slf4j.LoggerFactory; public class NaptPacketInHandler implements PacketProcessingListener { private static final Logger LOG = LoggerFactory.getLogger(NaptPacketInHandler.class); - public static final HashMap INCOMING_PACKET_MAP = new HashMap<>(); + private final ConcurrentMap incomingPacketMap = new ConcurrentHashMap<>(); private final NaptEventHandler naptEventHandler; - private ExecutorService firstPacketExecutorService; - private ExecutorService retryPacketExecutorService; + private final ExecutorService firstPacketExecutorService = SpecialExecutors.newBlockingBoundedFastThreadPool( + NatConstants.SNAT_PACKET_THEADPOOL_SIZE, Integer.MAX_VALUE, "Napt-firstPacket", NaptPacketInHandler.class); + private final ExecutorService retryPacketExecutorService = SpecialExecutors.newBlockingBoundedFastThreadPool( + NatConstants.SNAT_PACKET_RETRY_THEADPOOL_SIZE, Integer.MAX_VALUE, "Napt-retryPacket", + NaptPacketInHandler.class); @Inject public NaptPacketInHandler(NaptEventHandler naptEventHandler) { this.naptEventHandler = naptEventHandler; } - @PostConstruct - public void init() { - firstPacketExecutorService = Executors.newFixedThreadPool(NatConstants.SNAT_PACKET_THEADPOOL_SIZE); - retryPacketExecutorService = Executors.newFixedThreadPool(NatConstants.SNAT_PACKET_RETRY_THEADPOOL_SIZE); - } - @PreDestroy public void close() { firstPacketExecutorService.shutdown(); @@ -124,28 +120,29 @@ public class NaptPacketInHandler implements PacketProcessingListener { } String sourceIPPortKey = routerId + NatConstants.COLON_SEPARATOR + internalIPAddress + NatConstants.COLON_SEPARATOR + portNumber; - if (!INCOMING_PACKET_MAP.containsKey(sourceIPPortKey)) { - INCOMING_PACKET_MAP.put(sourceIPPortKey, - new NatPacketProcessingState(System.currentTimeMillis(), -1)); + + NatPacketProcessingState state = incomingPacketMap.get(sourceIPPortKey); + if (state == null) { + state = new NatPacketProcessingState(System.currentTimeMillis(), -1); + incomingPacketMap.put(sourceIPPortKey, state); LOG.trace("onPacketReceived : Processing new SNAT({}) Packet", sourceIPPortKey); //send to Event Queue NAPTEntryEvent naptEntryEvent = new NAPTEntryEvent(internalIPAddress, portNumber, routerId, - operation, protocol, packetReceived, false); + operation, protocol, packetReceived, false, state); LOG.info("onPacketReceived : First Packet IN Queue Size : {}", ((ThreadPoolExecutor)firstPacketExecutorService).getQueue().size()); firstPacketExecutorService.execute(() -> naptEventHandler.handleEvent(naptEntryEvent)); } else { LOG.trace("onPacketReceived : SNAT({}) Packet already processed.", sourceIPPortKey); NAPTEntryEvent naptEntryEvent = new NAPTEntryEvent(internalIPAddress, portNumber, routerId, - operation, protocol, packetReceived, true); - LOG.info("onPacketReceived : Retry Packet IN Queue Size : {}", + operation, protocol, packetReceived, true, state); + LOG.debug("onPacketReceived : Retry Packet IN Queue Size : {}", ((ThreadPoolExecutor)retryPacketExecutorService).getQueue().size()); + + long firstPacketInTime = state.getFirstPacketInTime(); retryPacketExecutorService.execute(() -> { - NatPacketProcessingState state = INCOMING_PACKET_MAP.get(sourceIPPortKey); - long firstPacketInTime = state.getFirstPacketInTime(); - long flowInstalledTime = state.getFlowInstalledTime(); - if ((System.currentTimeMillis() - firstPacketInTime) > 4000) { + if (System.currentTimeMillis() - firstPacketInTime > 4000) { LOG.error("onPacketReceived : Flow not installed even after 4sec." + "Dropping SNAT ({}) Packet", sourceIPPortKey); removeIncomingPacketMap(sourceIPPortKey); @@ -164,32 +161,28 @@ public class NaptPacketInHandler implements PacketProcessingListener { } public void removeIncomingPacketMap(String sourceIPPortKey) { - INCOMING_PACKET_MAP.remove(sourceIPPortKey); + incomingPacketMap.remove(sourceIPPortKey); LOG.debug("removeIncomingPacketMap : sourceIPPortKey {} mapping is removed from map", sourceIPPortKey); } - protected class NatPacketProcessingState { - long firstPacketInTime; - long flowInstalledTime; + static class NatPacketProcessingState { + private final long firstPacketInTime; + private volatile long flowInstalledTime; - public NatPacketProcessingState(long firstPacketInTime, long flowInstalledTime) { + NatPacketProcessingState(long firstPacketInTime, long flowInstalledTime) { this.firstPacketInTime = firstPacketInTime; this.flowInstalledTime = flowInstalledTime; } - public long getFirstPacketInTime() { + long getFirstPacketInTime() { return firstPacketInTime; } - public void setFirstPacketInTime(long firstPacketInTime) { - this.firstPacketInTime = firstPacketInTime; - } - - public long getFlowInstalledTime() { + long getFlowInstalledTime() { return flowInstalledTime; } - public void setFlowInstalledTime(long flowInstalledTime) { + void setFlowInstalledTime(long flowInstalledTime) { this.flowInstalledTime = flowInstalledTime; } }