2 * Copyright (c) 2016, 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netvirt.natservice.internal;
10 import com.google.common.primitives.Ints;
11 import java.math.BigInteger;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.ThreadPoolExecutor;
16 import javax.annotation.PreDestroy;
17 import javax.inject.Inject;
18 import javax.inject.Singleton;
19 import org.opendaylight.genius.mdsalutil.MetaDataUtil;
20 import org.opendaylight.genius.mdsalutil.NWUtil;
21 import org.opendaylight.genius.mdsalutil.NwConstants;
22 import org.opendaylight.genius.mdsalutil.packet.Ethernet;
23 import org.opendaylight.genius.mdsalutil.packet.IPv4;
24 import org.opendaylight.genius.mdsalutil.packet.TCP;
25 import org.opendaylight.genius.mdsalutil.packet.UDP;
26 import org.opendaylight.openflowplugin.libraries.liblldp.NetUtils;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
29 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
34 public class NaptPacketInHandler implements PacketProcessingListener {
36 private static final Logger LOG = LoggerFactory.getLogger(NaptPacketInHandler.class);
37 private final ConcurrentMap<String,NatPacketProcessingState> incomingPacketMap = new ConcurrentHashMap<>();
38 private final NaptEventHandler naptEventHandler;
39 private final ExecutorService firstPacketExecutorService = SpecialExecutors.newBlockingBoundedFastThreadPool(
40 NatConstants.SNAT_PACKET_THEADPOOL_SIZE, Integer.MAX_VALUE, "Napt-firstPacket", NaptPacketInHandler.class);
41 private final ExecutorService retryPacketExecutorService = SpecialExecutors.newBlockingBoundedFastThreadPool(
42 NatConstants.SNAT_PACKET_RETRY_THEADPOOL_SIZE, Integer.MAX_VALUE, "Napt-retryPacket",
43 NaptPacketInHandler.class);
46 public NaptPacketInHandler(NaptEventHandler naptEventHandler) {
47 this.naptEventHandler = naptEventHandler;
52 firstPacketExecutorService.shutdown();
53 retryPacketExecutorService.shutdown();
57 // TODO Clean up the exception handling
58 @SuppressWarnings("checkstyle:IllegalCatch")
59 public void onPacketReceived(PacketReceived packetReceived) {
60 String internalIPAddress = "";
63 NAPTEntryEvent.Operation operation = NAPTEntryEvent.Operation.ADD;
64 NAPTEntryEvent.Protocol protocol;
66 Short tableId = packetReceived.getTableId().getValue();
68 LOG.trace("onPacketReceived : packet: {}, tableId {}", packetReceived, tableId);
70 if (tableId == NwConstants.OUTBOUND_NAPT_TABLE) {
71 LOG.debug("onPacketReceived : NAPTPacketInHandler Packet for Outbound NAPT Table");
72 byte[] inPayload = packetReceived.getPayload();
73 Ethernet ethPkt = new Ethernet();
74 if (inPayload != null) {
76 ethPkt.deserialize(inPayload, 0, inPayload.length * NetUtils.NUM_BITS_IN_A_BYTE);
77 } catch (Exception e) {
78 LOG.warn("onPacketReceived: Failed to decode Packet", e);
81 if (ethPkt.getPayload() instanceof IPv4) {
82 IPv4 ipPkt = (IPv4) ethPkt.getPayload();
83 byte[] ipSrc = Ints.toByteArray(ipPkt.getSourceAddress());
85 internalIPAddress = NWUtil.toStringIpAddress(ipSrc);
86 LOG.trace("onPacketReceived : Retrieved internalIPAddress {}", internalIPAddress);
87 if (ipPkt.getPayload() instanceof TCP) {
88 TCP tcpPkt = (TCP) ipPkt.getPayload();
89 portNumber = tcpPkt.getSourcePort();
91 portNumber = 32767 + portNumber + 32767 + 2;
92 LOG.trace("onPacketReceived : Retrieved and extracted TCP portNumber {}", portNumber);
94 protocol = NAPTEntryEvent.Protocol.TCP;
95 LOG.trace("onPacketReceived : Retrieved TCP portNumber {}", portNumber);
96 } else if (ipPkt.getPayload() instanceof UDP) {
97 UDP udpPkt = (UDP) ipPkt.getPayload();
98 portNumber = udpPkt.getSourcePort();
100 portNumber = 32767 + portNumber + 32767 + 2;
101 LOG.trace("onPacketReceived : Retrieved and extracted UDP portNumber {}", portNumber);
103 protocol = NAPTEntryEvent.Protocol.UDP;
104 LOG.trace("onPacketReceived : Retrieved UDP portNumber {}", portNumber);
106 LOG.error("onPacketReceived : Incoming Packet is neither TCP or UDP packet");
110 LOG.error("onPacketReceived : Incoming Packet is not IPv4 packet");
114 if (internalIPAddress != null) {
115 BigInteger metadata = packetReceived.getMatch().getMetadata().getMetadata();
116 routerId = MetaDataUtil.getNatRouterIdFromMetadata(metadata);
118 LOG.error("onPacketReceived : Router ID is invalid");
121 String sourceIPPortKey = routerId + NatConstants.COLON_SEPARATOR
122 + internalIPAddress + NatConstants.COLON_SEPARATOR + portNumber;
124 NatPacketProcessingState state = incomingPacketMap.get(sourceIPPortKey);
126 state = new NatPacketProcessingState(System.currentTimeMillis(), -1);
127 incomingPacketMap.put(sourceIPPortKey, state);
128 LOG.trace("onPacketReceived : Processing new SNAT({}) Packet", sourceIPPortKey);
130 //send to Event Queue
131 NAPTEntryEvent naptEntryEvent = new NAPTEntryEvent(internalIPAddress, portNumber, routerId,
132 operation, protocol, packetReceived, false, state);
133 LOG.info("onPacketReceived : First Packet IN Queue Size : {}",
134 ((ThreadPoolExecutor)firstPacketExecutorService).getQueue().size());
135 firstPacketExecutorService.execute(() -> naptEventHandler.handleEvent(naptEntryEvent));
137 LOG.trace("onPacketReceived : SNAT({}) Packet already processed.", sourceIPPortKey);
138 NAPTEntryEvent naptEntryEvent = new NAPTEntryEvent(internalIPAddress, portNumber, routerId,
139 operation, protocol, packetReceived, true, state);
140 LOG.debug("onPacketReceived : Retry Packet IN Queue Size : {}",
141 ((ThreadPoolExecutor)retryPacketExecutorService).getQueue().size());
143 long firstPacketInTime = state.getFirstPacketInTime();
144 retryPacketExecutorService.execute(() -> {
145 if (System.currentTimeMillis() - firstPacketInTime > 4000) {
146 LOG.error("onPacketReceived : Flow not installed even after 4sec."
147 + "Dropping SNAT ({}) Packet", sourceIPPortKey);
148 removeIncomingPacketMap(sourceIPPortKey);
151 naptEventHandler.handleEvent(naptEntryEvent);
155 LOG.error("onPacketReceived : Retrived internalIPAddress is NULL");
159 LOG.trace("onPacketReceived : Packet is not from the Outbound NAPT table");
163 public void removeIncomingPacketMap(String sourceIPPortKey) {
164 incomingPacketMap.remove(sourceIPPortKey);
165 LOG.debug("removeIncomingPacketMap : sourceIPPortKey {} mapping is removed from map", sourceIPPortKey);
168 static class NatPacketProcessingState {
169 private final long firstPacketInTime;
170 private volatile long flowInstalledTime;
172 NatPacketProcessingState(long firstPacketInTime, long flowInstalledTime) {
173 this.firstPacketInTime = firstPacketInTime;
174 this.flowInstalledTime = flowInstalledTime;
177 long getFirstPacketInTime() {
178 return firstPacketInTime;
181 long getFlowInstalledTime() {
182 return flowInstalledTime;
185 void setFlowInstalledTime(long flowInstalledTime) {
186 this.flowInstalledTime = flowInstalledTime;