import com.google.common.primitives.Ints;
import java.math.BigInteger;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
-
-import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Singleton;
-import org.opendaylight.controller.liblldp.NetUtils;
import org.opendaylight.genius.mdsalutil.MetaDataUtil;
import org.opendaylight.genius.mdsalutil.NWUtil;
import org.opendaylight.genius.mdsalutil.NwConstants;
import org.opendaylight.genius.mdsalutil.packet.IPv4;
import org.opendaylight.genius.mdsalutil.packet.TCP;
import org.opendaylight.genius.mdsalutil.packet.UDP;
+import org.opendaylight.openflowplugin.libraries.liblldp.NetUtils;
import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NaptPacketInHandler implements PacketProcessingListener {
private static final Logger LOG = LoggerFactory.getLogger(NaptPacketInHandler.class);
- public static final HashMap<String,NatPacketProcessingState> INCOMING_PACKET_MAP = new HashMap<>();
+ private final ConcurrentMap<String,NatPacketProcessingState> incomingPacketMap = new ConcurrentHashMap<>();
private final NaptEventHandler naptEventHandler;
- private ExecutorService firstPacketExecutorService;
- private ExecutorService retryPacketExecutorService;
+ private final ExecutorService firstPacketExecutorService = SpecialExecutors.newBlockingBoundedFastThreadPool(
+ NatConstants.SNAT_PACKET_THEADPOOL_SIZE, Integer.MAX_VALUE, "Napt-firstPacket", NaptPacketInHandler.class);
+ private final ExecutorService retryPacketExecutorService = SpecialExecutors.newBlockingBoundedFastThreadPool(
+ NatConstants.SNAT_PACKET_RETRY_THEADPOOL_SIZE, Integer.MAX_VALUE, "Napt-retryPacket",
+ NaptPacketInHandler.class);
@Inject
public NaptPacketInHandler(NaptEventHandler naptEventHandler) {
this.naptEventHandler = naptEventHandler;
}
- @PostConstruct
- public void init() {
- firstPacketExecutorService = Executors.newFixedThreadPool(NatConstants.SNAT_PACKET_THEADPOOL_SIZE);
- retryPacketExecutorService = Executors.newFixedThreadPool(NatConstants.SNAT_PACKET_RETRY_THEADPOOL_SIZE);
- }
-
@PreDestroy
public void close() {
firstPacketExecutorService.shutdown();
}
String sourceIPPortKey = routerId + NatConstants.COLON_SEPARATOR
+ internalIPAddress + NatConstants.COLON_SEPARATOR + portNumber;
- if (!INCOMING_PACKET_MAP.containsKey(sourceIPPortKey)) {
- INCOMING_PACKET_MAP.put(sourceIPPortKey,
- new NatPacketProcessingState(System.currentTimeMillis(), -1));
+
+ NatPacketProcessingState state = incomingPacketMap.get(sourceIPPortKey);
+ if (state == null) {
+ state = new NatPacketProcessingState(System.currentTimeMillis(), -1);
+ incomingPacketMap.put(sourceIPPortKey, state);
LOG.trace("onPacketReceived : Processing new SNAT({}) Packet", sourceIPPortKey);
//send to Event Queue
NAPTEntryEvent naptEntryEvent = new NAPTEntryEvent(internalIPAddress, portNumber, routerId,
- operation, protocol, packetReceived, false);
+ operation, protocol, packetReceived, false, state);
LOG.info("onPacketReceived : First Packet IN Queue Size : {}",
((ThreadPoolExecutor)firstPacketExecutorService).getQueue().size());
firstPacketExecutorService.execute(() -> naptEventHandler.handleEvent(naptEntryEvent));
} else {
LOG.trace("onPacketReceived : SNAT({}) Packet already processed.", sourceIPPortKey);
NAPTEntryEvent naptEntryEvent = new NAPTEntryEvent(internalIPAddress, portNumber, routerId,
- operation, protocol, packetReceived, true);
- LOG.info("onPacketReceived : Retry Packet IN Queue Size : {}",
+ operation, protocol, packetReceived, true, state);
+ LOG.debug("onPacketReceived : Retry Packet IN Queue Size : {}",
((ThreadPoolExecutor)retryPacketExecutorService).getQueue().size());
+
+ long firstPacketInTime = state.getFirstPacketInTime();
retryPacketExecutorService.execute(() -> {
- NatPacketProcessingState state = INCOMING_PACKET_MAP.get(sourceIPPortKey);
- long firstPacketInTime = state.getFirstPacketInTime();
- long flowInstalledTime = state.getFlowInstalledTime();
- if ((System.currentTimeMillis() - firstPacketInTime) > 4000) {
+ if (System.currentTimeMillis() - firstPacketInTime > 4000) {
LOG.error("onPacketReceived : Flow not installed even after 4sec."
+ "Dropping SNAT ({}) Packet", sourceIPPortKey);
removeIncomingPacketMap(sourceIPPortKey);
}
public void removeIncomingPacketMap(String sourceIPPortKey) {
- INCOMING_PACKET_MAP.remove(sourceIPPortKey);
+ incomingPacketMap.remove(sourceIPPortKey);
LOG.debug("removeIncomingPacketMap : sourceIPPortKey {} mapping is removed from map", sourceIPPortKey);
}
- protected class NatPacketProcessingState {
- long firstPacketInTime;
- long flowInstalledTime;
+ static class NatPacketProcessingState {
+ private final long firstPacketInTime;
+ private volatile long flowInstalledTime;
- public NatPacketProcessingState(long firstPacketInTime, long flowInstalledTime) {
+ NatPacketProcessingState(long firstPacketInTime, long flowInstalledTime) {
this.firstPacketInTime = firstPacketInTime;
this.flowInstalledTime = flowInstalledTime;
}
- public long getFirstPacketInTime() {
+ long getFirstPacketInTime() {
return firstPacketInTime;
}
- public void setFirstPacketInTime(long firstPacketInTime) {
- this.firstPacketInTime = firstPacketInTime;
- }
-
- public long getFlowInstalledTime() {
+ long getFlowInstalledTime() {
return flowInstalledTime;
}
- public void setFlowInstalledTime(long flowInstalledTime) {
+ void setFlowInstalledTime(long flowInstalledTime) {
this.flowInstalledTime = flowInstalledTime;
}
}