Bump odlparent->6.0.0,mdsal->5.0.3
[netvirt.git] / natservice / impl / src / main / java / org / opendaylight / netvirt / natservice / internal / NaptEventHandler.java
index 20a5b491eb36fab32dd906a691faf78a3236c043..9630fb6cf1d88375f1cbfa32ee8bfe7d70b27d36 100644 (file)
@@ -8,23 +8,25 @@
 
 package org.opendaylight.netvirt.natservice.internal;
 
+import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.MoreExecutors;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.math.BigInteger;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 import javax.inject.Inject;
 import javax.inject.Singleton;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.sal.common.util.Arguments;
 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
 import org.opendaylight.genius.mdsalutil.ActionInfo;
@@ -65,8 +67,6 @@ import org.opendaylight.genius.mdsalutil.packet.TCP;
 import org.opendaylight.genius.mdsalutil.packet.UDP;
 import org.opendaylight.infrautils.utils.concurrent.JdkFutures;
 import org.opendaylight.netvirt.elanmanager.api.IElanService;
-import org.opendaylight.netvirt.natservice.internal.NaptPacketInHandler.NatPacketProcessingState;
-import org.opendaylight.openflowplugin.libraries.liblldp.NetUtils;
 import org.opendaylight.openflowplugin.libraries.liblldp.PacketException;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.MacAddress;
@@ -88,10 +88,12 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpc
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.GetInterfaceFromIfIndexInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.GetInterfaceFromIfIndexOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.OdlInterfaceRpcService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
@@ -99,7 +101,10 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.natservice.rev16011
 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketInput;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.Uint32;
+import org.opendaylight.yangtools.yang.common.Uint64;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -115,6 +120,7 @@ public class NaptEventHandler {
     private final IdManagerService idManager;
     private final IInterfaceManager interfaceManager;
     private final SalFlowService salFlowServiceRpc;
+    private final NatOverVxlanUtil natOverVxlanUtil;
 
     @Inject
     public NaptEventHandler(final DataBroker dataBroker, final IMdsalApiManager mdsalManager,
@@ -124,7 +130,8 @@ public class NaptEventHandler {
                             final IInterfaceManager interfaceManager,
                             final IElanService elanManager,
                             final IdManagerService idManager,
-                            final SalFlowService salFlowServiceRpc) {
+                            final SalFlowService salFlowServiceRpc,
+                            final NatOverVxlanUtil natOverVxlanUtil) {
         this.dataBroker = dataBroker;
         this.mdsalManager = mdsalManager;
         this.naptManager = naptManager;
@@ -134,6 +141,7 @@ public class NaptEventHandler {
         this.elanManager = elanManager;
         this.idManager = idManager;
         this.salFlowServiceRpc = salFlowServiceRpc;
+        this.natOverVxlanUtil = natOverVxlanUtil;
     }
 
     // TODO Clean up the exception handling
@@ -159,9 +167,10 @@ public class NaptEventHandler {
             5) Write the flow to the INBOUND NAPT Table and forward to FIB table for routing the traffic.
     */
         try {
-            Long routerId = naptEntryEvent.getRouterId();
+            Uint32 routerId = naptEntryEvent.getRouterId();
             String internalIpAddress = naptEntryEvent.getIpAddress();
             int internalPort = naptEntryEvent.getPortNumber();
+            NAPTEntryEvent.Protocol protocol = naptEntryEvent.getProtocol();
             String sourceIPPortKey = routerId + NatConstants.COLON_SEPARATOR
                 + internalIpAddress + NatConstants.COLON_SEPARATOR + internalPort;
             LOG.trace("handleEvent : Time Elapsed before procesing snat ({}:{}) packet is {} ms,routerId: {},"
@@ -170,15 +179,15 @@ public class NaptEventHandler {
                               (System.currentTimeMillis() - naptEntryEvent.getObjectCreationTime()), routerId,
                               naptEntryEvent.isPktProcessed());
             //Get the DPN ID
-            BigInteger dpnId = NatUtil.getPrimaryNaptfromRouterId(dataBroker, routerId);
-            long bgpVpnId = NatConstants.INVALID_ID;
+            Uint64 dpnId = NatUtil.getPrimaryNaptfromRouterId(dataBroker, routerId);
+            Uint32 bgpVpnId = NatConstants.INVALID_ID;
             if (dpnId == null) {
                 LOG.warn("handleEvent : dpnId is null. Assuming the router ID {} as the BGP VPN ID and "
                     + "proceeding....", routerId);
                 bgpVpnId = routerId;
                 LOG.debug("handleEvent : BGP VPN ID {}", bgpVpnId);
                 String vpnName = NatUtil.getRouterName(dataBroker, bgpVpnId);
-                String routerName = NatUtil.getRouterIdfromVpnInstance(dataBroker, vpnName);
+                String routerName = NatUtil.getRouterIdfromVpnInstance(dataBroker, vpnName, internalIpAddress);
                 if (routerName == null) {
                     NaptPacketInHandler.removeIncomingPacketMap(sourceIPPortKey);
                     LOG.error("handleEvent: Unable to find router for VpnName {}. Droping packet for SNAT ({})"
@@ -230,10 +239,9 @@ public class NaptEventHandler {
                                  sourceIPPortKey);
                         return;
                     }
-                    Long vpnId = NatUtil.getVpnId(dataBroker, vpnUuid.getValue());
+                    Uint32 vpnId = NatUtil.getVpnId(dataBroker, vpnUuid.getValue());
 
                     SessionAddress internalAddress = new SessionAddress(internalIpAddress, internalPort);
-                    NAPTEntryEvent.Protocol protocol = naptEntryEvent.getProtocol();
 
                     //Get the external IP address for the corresponding internal IP address
                     SessionAddress externalAddress =
@@ -246,7 +254,7 @@ public class NaptEventHandler {
                         return;
                     }
 
-                    Long vpnIdFromExternalSubnet = getVpnIdFromExternalSubnet(routerId,
+                    Uint32 vpnIdFromExternalSubnet = getVpnIdFromExternalSubnet(routerId,
                             externalAddress.getIpAddress());
                     if (vpnIdFromExternalSubnet != NatConstants.INVALID_ID) {
                         vpnId = vpnIdFromExternalSubnet;
@@ -256,10 +264,10 @@ public class NaptEventHandler {
                     Future<RpcResult<AddFlowOutput>> addFlowResult =
                             buildAndInstallNatFlowsOptionalRpc(dpnId, NwConstants.INBOUND_NAPT_TABLE, vpnId, routerId,
                                     bgpVpnId, externalAddress, internalAddress, protocol, extGwMacAddress, true);
-                    final BigInteger finalDpnId = dpnId;
-                    final Long finalVpnId = vpnId;
-                    final Long finalRouterId = routerId;
-                    final long finalBgpVpnId = bgpVpnId;
+                    final Uint64 finalDpnId = dpnId;
+                    final Uint32 finalVpnId = vpnId;
+                    final Uint32 finalRouterId = routerId;
+                    final Uint32 finalBgpVpnId = bgpVpnId;
                     Futures.addCallback(JdkFutureAdapters.listenInPoolThread(addFlowResult),
                                         new FutureCallback<RpcResult<AddFlowOutput>>() {
 
@@ -279,11 +287,11 @@ public class NaptEventHandler {
                                             public void onSuccess(@Nullable RpcResult<AddFlowOutput> result) {
                                                 LOG.debug("handleEvent : Configured outbound rule, sending packet out"
                                                         + "from {} to {}", internalAddress, externalAddress);
-                                                prepareAndSendPacketOut(naptEntryEvent, finalRouterId);
+                                                prepareAndSendPacketOut(naptEntryEvent, finalRouterId, sourceIPPortKey);
                                             }
 
                                             @Override
-                                            public void onFailure(@Nonnull Throwable throwable) {
+                                            public void onFailure(@NonNull Throwable throwable) {
                                                 LOG.error("handleEvent : Error configuring outbound "
                                                         + "SNAT flows using RPC for SNAT connection from {} to {}",
                                                                   internalAddress, externalAddress);
@@ -292,19 +300,14 @@ public class NaptEventHandler {
                                 }
 
                                 @Override
-                                public void onFailure(@Nonnull Throwable throwable) {
+                                public void onFailure(@NonNull Throwable throwable) {
                                     LOG.error("handleEvent : Error configuring inbound SNAT flows "
                                             + "using RPC for SNAT connection from {} to {}",
                                             internalAddress, externalAddress);
                                 }
                             }, MoreExecutors.directExecutor());
-
-                    NatPacketProcessingState state = naptEntryEvent.getState();
-                    if (state != null) {
-                        state.setFlowInstalledTime(System.currentTimeMillis());
-                    }
                 } else {
-                    prepareAndSendPacketOut(naptEntryEvent, routerId);
+                    prepareAndSendPacketOut(naptEntryEvent, routerId, sourceIPPortKey);
                 }
                 LOG.trace("handleEvent : Time elapsed after Processsing snat ({}:{}) packet: {}ms,isPktProcessed:{} ",
                         naptEntryEvent.getIpAddress(), naptEntryEvent.getPortNumber(),
@@ -312,8 +315,7 @@ public class NaptEventHandler {
                         naptEntryEvent.isPktProcessed());
             } else {
                 LOG.debug("handleEvent : Inside delete Operation of NaptEventHandler");
-                removeNatFlows(dpnId, NwConstants.INBOUND_NAPT_TABLE, routerId, naptEntryEvent.getIpAddress(),
-                    naptEntryEvent.getPortNumber());
+                handleFlowRemoved(naptEntryEvent, routerId, sourceIPPortKey, dpnId);
                 LOG.info("handleEvent : exited for removeEvent for IP {}, port {}, routerID : {}",
                         naptEntryEvent.getIpAddress(), naptEntryEvent.getPortNumber(), routerId);
             }
@@ -322,15 +324,16 @@ public class NaptEventHandler {
         }
     }
 
-    private void prepareAndSendPacketOut(NAPTEntryEvent naptEntryEvent, Long routerId) {
+    private void prepareAndSendPacketOut(NAPTEntryEvent naptEntryEvent, Uint32 routerId, String sourceIPPortKey) {
         //Send Packetout - tcp or udp packets which got punted to controller.
-        BigInteger metadata = naptEntryEvent.getPacketReceived().getMatch().getMetadata().getMetadata();
+        Uint64 metadata = naptEntryEvent.getPacketReceived().getMatch().getMetadata().getMetadata();
         byte[] inPayload = naptEntryEvent.getPacketReceived().getPayload();
         Ethernet ethPkt = new Ethernet();
         if (inPayload != null) {
             try {
-                ethPkt.deserialize(inPayload, 0, inPayload.length * NetUtils.NUM_BITS_IN_A_BYTE);
+                ethPkt.deserialize(inPayload, 0, inPayload.length * Byte.SIZE);
             } catch (PacketException e) {
+                NaptPacketInHandler.removeIncomingPacketMap(sourceIPPortKey);
                 LOG.error("prepareAndSendPacketOut : Failed to decode Packet", e);
                 return;
             }
@@ -338,62 +341,91 @@ public class NaptEventHandler {
 
         long portTag = MetaDataUtil.getLportFromMetadata(metadata).intValue();
         LOG.debug("prepareAndSendPacketOut : portTag from incoming packet is {}", portTag);
-        String interfaceName = getInterfaceNameFromTag(portTag);
-        LOG.debug("prepareAndSendPacketOut : interfaceName fetched from portTag is {}", interfaceName);
-        org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508
-                .interfaces.Interface iface = null;
-        int vlanId = 0;
-        iface = interfaceManager.getInterfaceInfoFromConfigDataStore(interfaceName);
-        if (iface == null) {
-            LOG.error("prepareAndSendPacketOut : Unable to read interface {} from config DataStore", interfaceName);
-            return;
-        }
         List<ActionInfo> actionInfos = new ArrayList<>();
-        IfL2vlan ifL2vlan = iface.getAugmentation(IfL2vlan.class);
-        if (ifL2vlan != null && ifL2vlan.getVlanId() != null) {
-            vlanId = ifL2vlan.getVlanId().getValue() == null ? 0 : ifL2vlan.getVlanId().getValue();
-        }
-        InterfaceInfo infInfo = interfaceManager.getInterfaceInfoFromOperationalDataStore(interfaceName);
-        if (infInfo == null) {
-            LOG.error("prepareAndSendPacketOut : error in getting interfaceInfo from Operation DS");
-            return;
-        }
+        String interfaceName = getInterfaceNameFromTag(portTag);
+        Uint64 dpnID = null;
+        int portNum = -1;
+        if (interfaceName != null) {
+            LOG.debug("prepareAndSendPacketOut : interfaceName fetched from portTag is {}", interfaceName);
+            org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508
+                    .interfaces.Interface iface = null;
+            int vlanId = 0;
+            iface = interfaceManager.getInterfaceInfoFromConfigDataStore(interfaceName);
+            if (iface == null) {
+                NaptPacketInHandler.removeIncomingPacketMap(sourceIPPortKey);
+                LOG.error("prepareAndSendPacketOut : Unable to read interface {} from config DataStore", interfaceName);
+                return;
+            }
 
-        byte[] pktOut = buildNaptPacketOut(ethPkt);
-        if (ethPkt.getEtherType() != (short) NwConstants.ETHTYPE_802_1Q) {
-            // VLAN Access port
-            LOG.debug("prepareAndSendPacketOut : vlanId is {}", vlanId);
-            if (vlanId != 0) {
-                // Push vlan
-                actionInfos.add(new ActionPushVlan(0));
-                actionInfos.add(new ActionSetFieldVlanVid(1, vlanId));
+            IfL2vlan ifL2vlan = iface.augmentation(IfL2vlan.class);
+            if (ifL2vlan != null && ifL2vlan.getVlanId() != null) {
+                vlanId = ifL2vlan.getVlanId().getValue() == null ? 0 : ifL2vlan.getVlanId().getValue().toJava();
+            }
+            InterfaceInfo infInfo = interfaceManager.getInterfaceInfoFromOperationalDataStore(interfaceName);
+            if (infInfo == null) {
+                LOG.error("prepareAndSendPacketOut : error in getting interfaceInfo from Operation DS");
+                return;
+            }
+            dpnID = infInfo.getDpId();
+            portNum = infInfo.getPortNo();
+            if (ethPkt.getEtherType() != (short) NwConstants.ETHTYPE_802_1Q) {
+                // VLAN Access port
+                LOG.debug("prepareAndSendPacketOut : vlanId is {}", vlanId);
+                if (vlanId != 0) {
+                    // Push vlan
+                    actionInfos.add(new ActionPushVlan(0));
+                    actionInfos.add(new ActionSetFieldVlanVid(1, vlanId));
+                } else {
+                    LOG.debug("prepareAndSendPacketOut : No vlanId {}, may be untagged", vlanId);
+                }
             } else {
-                LOG.debug("prepareAndSendPacketOut : No vlanId {}, may be untagged", vlanId);
+                // VLAN Trunk Port
+                LOG.debug("prepareAndSendPacketOut : This is VLAN Trunk port case - need not do VLAN tagging again");
             }
         } else {
-            // VLAN Trunk Port
-            LOG.debug("prepareAndSendPacketOut : This is VLAN Trunk port case - need not do VLAN tagging again");
+            // This case will be hit for packets send from non-napt switch.
+            LOG.info("prepareAndSendPacketOut : interfaceName is not available.Retrieve from packet received");
+            NodeConnectorId nodeId = naptEntryEvent.getPacketReceived().getMatch().getInPort();
+            portNum = Integer.parseInt(nodeId.getValue());
+            LOG.debug("prepareAndSendPacketOut : in_port portNum : {}", portNum);
+            //List<PathArgument> dpnNodes = naptEntryEvent.getPacketReceived().getIngress().getValue().getPath();
+            Iterable<PathArgument> outArgs = naptEntryEvent.getPacketReceived().getIngress().getValue()
+                    .getPathArguments();
+            PathArgument pathArgument = Iterables.get(outArgs, 2);
+            LOG.debug("prepareAndSendPacketOut : pathArgument : {}", pathArgument);
+            InstanceIdentifier.IdentifiableItem<?, ?> item = Arguments.checkInstanceOf(pathArgument,
+                    InstanceIdentifier.IdentifiableItem.class);
+            NodeConnectorKey key = Arguments.checkInstanceOf(item.getKey(), NodeConnectorKey.class);
+            LOG.info("prepareAndSendPacketOut : NodeConnectorKey key : {}", key.getId().getValue());
+            String dpnKey = key.getId().getValue();
+            if (dpnKey.contains(NatConstants.COLON_SEPARATOR)) {
+                dpnID = Uint64.valueOf(dpnKey.split(NatConstants.COLON_SEPARATOR)[1]).intern();
+            }
         }
+        byte[] pktOut = buildNaptPacketOut(ethPkt);
+
         if (pktOut != null) {
             String routerName = NatUtil.getRouterName(dataBroker, routerId);
-            long tunId = NatUtil.getTunnelIdForNonNaptToNaptFlow(dataBroker, elanManager, idManager, routerId,
-                    routerName);
-            sendNaptPacketOut(pktOut, infInfo, actionInfos, tunId);
+            Uint64 tunId = NatUtil.getTunnelIdForNonNaptToNaptFlow(dataBroker, natOverVxlanUtil, elanManager,
+                    idManager, routerId, routerName);
+            LOG.info("sendNaptPacketOut for ({}:{}) on dpnId {} portNum {} tunId {}",
+                naptEntryEvent.getIpAddress(), naptEntryEvent.getPortNumber(), dpnID, portNum, tunId);
+            sendNaptPacketOut(pktOut, dpnID, portNum, actionInfos, tunId);
         } else {
             LOG.warn("prepareAndSendPacketOut : Unable to send Packet Out");
         }
     }
 
-    public void buildAndInstallNatFlows(BigInteger dpnId, short tableId, long vpnId, long routerId,
-                                               long bgpVpnId, SessionAddress actualSourceAddress,
-                                               SessionAddress translatedSourceAddress,
-                                               NAPTEntryEvent.Protocol protocol, String extGwMacAddress) {
+    public void buildAndInstallNatFlows(Uint64 dpnId, short tableId, Uint32 vpnId, Uint32 routerId,
+                                        Uint32 bgpVpnId, SessionAddress actualSourceAddress,
+                                        SessionAddress translatedSourceAddress,
+                                        NAPTEntryEvent.Protocol protocol, String extGwMacAddress) {
         buildAndInstallNatFlowsOptionalRpc(dpnId, tableId, vpnId, routerId, bgpVpnId, actualSourceAddress,
                 translatedSourceAddress, protocol, extGwMacAddress, false);
     }
 
     private Future<RpcResult<AddFlowOutput>> buildAndInstallNatFlowsOptionalRpc(
-            BigInteger dpnId, short tableId, long vpnId, long routerId, long bgpVpnId,
+            Uint64 dpnId, short tableId, Uint32 vpnId, Uint32 routerId, Uint32 bgpVpnId,
             SessionAddress actualSourceAddress, SessionAddress translatedSourceAddress,
             NAPTEntryEvent.Protocol protocol, String extGwMacAddress,
             boolean sendRpc) {
@@ -404,7 +436,7 @@ public class NaptEventHandler {
         if (tableId == NwConstants.OUTBOUND_NAPT_TABLE) {
             idleTimeout = NatConstants.DEFAULT_NAPT_IDLE_TIMEOUT;
         }
-        long intranetVpnId;
+        Uint32 intranetVpnId;
         if (bgpVpnId != NatConstants.INVALID_ID) {
             intranetVpnId = bgpVpnId;
         } else {
@@ -415,9 +447,14 @@ public class NaptEventHandler {
         int translatedPort = translatedSourceAddress.getPortNumber();
         String actualIp = actualSourceAddress.getIpAddress();
         int actualPort = actualSourceAddress.getPortNumber();
-        String switchFlowRef =
-            NatUtil.getNaptFlowRef(dpnId, tableId, String.valueOf(routerId), actualIp, actualPort);
-
+        String switchFlowRef = null;
+        if (tableId == NwConstants.OUTBOUND_NAPT_TABLE) {
+            switchFlowRef = NatUtil.getNaptFlowRef(dpnId, tableId, String.valueOf(routerId), actualIp, actualPort,
+                protocol.name());
+        } else {
+            switchFlowRef = NatUtil.getNaptFlowRef(dpnId, tableId, String.valueOf(routerId), translatedIp,
+                translatedPort, protocol.name());
+        }
         FlowEntity snatFlowEntity = new FlowEntityBuilder()
             .setDpnId(dpnId)
             .setTableId(tableId)
@@ -464,32 +501,33 @@ public class NaptEventHandler {
         return addFlowResult;
     }
 
-    private static Node buildInventoryDpnNode(BigInteger dpnId) {
+    private static Node buildInventoryDpnNode(Uint64 dpnId) {
         NodeId nodeId = new NodeId("openflow:" + dpnId);
-        Node nodeDpn = new NodeBuilder().setId(nodeId).setKey(new NodeKey(nodeId)).build();
+        Node nodeDpn = new NodeBuilder().setId(nodeId).withKey(new NodeKey(nodeId)).build();
         return nodeDpn;
     }
 
-    private static NodeRef getNodeRef(BigInteger dpnId) {
+    private static NodeRef getNodeRef(Uint64 dpnId) {
         NodeId nodeId = new NodeId("openflow:" + dpnId);
         return new NodeRef(InstanceIdentifier.builder(Nodes.class)
                 .child(Node.class, new NodeKey(nodeId)).build());
     }
 
-    public static FlowRef getFlowRef(BigInteger dpId, Flow flow) {
+    public static FlowRef getFlowRef(Uint64 dpId, Flow flow) {
         FlowKey flowKey = new FlowKey(new FlowId(flow.getId()));
         Node nodeDpn = buildInventoryDpnNode(dpId);
         InstanceIdentifier<Flow> flowInstanceId =
                 InstanceIdentifier.builder(Nodes.class)
-                .child(Node.class, nodeDpn.getKey()).augmentation(FlowCapableNode.class)
+                .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
                 .child(Table.class, new TableKey(flow.getTableId()))
                 .child(Flow.class, flowKey)
                 .build();
         return new FlowRef(flowInstanceId);
     }
 
+    @Nullable
     private static List<MatchInfo> buildAndGetMatchInfo(String ip, int port, short tableId,
-                                                        NAPTEntryEvent.Protocol protocol, long segmentId) {
+                                                        NAPTEntryEvent.Protocol protocol, Uint32 segmentId) {
         MatchInfo ipMatchInfo = null;
         MatchInfo portMatchInfo = null;
         MatchInfo protocolMatchInfo = null;
@@ -516,7 +554,8 @@ public class NaptEventHandler {
                 portMatchInfo = new MatchUdpSourcePort(port);
             }
             metaDataMatchInfo =
-                    new MatchMetadata(MetaDataUtil.getVpnIdMetadata(segmentId), MetaDataUtil.METADATA_MASK_VRFID);
+                    new MatchMetadata(MetaDataUtil.getVpnIdMetadata(segmentId.longValue()),
+                            MetaDataUtil.METADATA_MASK_VRFID);
         } else {
             ipMatchInfo = new MatchIpv4Destination(ipAddressAsString, "32");
             if (protocol == NAPTEntryEvent.Protocol.TCP) {
@@ -539,8 +578,9 @@ public class NaptEventHandler {
         return matchInfo;
     }
 
+    @NonNull
     private static List<InstructionInfo> buildAndGetSetActionInstructionInfo(String ipAddress, int port,
-                                                                             long segmentId, long vpnId,
+                                                                             Uint32 segmentId, Uint32 vpnId,
                                                                              short tableId,
                                                                              NAPTEntryEvent.Protocol protocol,
                                                                              String extGwMacAddress) {
@@ -560,8 +600,9 @@ public class NaptEventHandler {
                     portActionInfo = new ActionSetUdpSourcePort(port);
                 }
                 // reset the split-horizon bit to allow traffic from tunnel to be sent back to the provider port
-                instructionInfo.add(new InstructionWriteMetadata(MetaDataUtil.getVpnIdMetadata(vpnId),
-                    MetaDataUtil.METADATA_MASK_VRFID.or(MetaDataUtil.METADATA_MASK_SH_FLAG)));
+                instructionInfo.add(new InstructionWriteMetadata(MetaDataUtil.getVpnIdMetadata(vpnId.longValue()),
+                    Uint64.fromLongBits(MetaDataUtil.METADATA_MASK_VRFID.longValue()
+                            | (MetaDataUtil.METADATA_MASK_SH_FLAG.longValue()))));
                 break;
 
             case NwConstants.INBOUND_NAPT_TABLE:
@@ -572,13 +613,13 @@ public class NaptEventHandler {
                     portActionInfo = new ActionSetUdpDestinationPort(port);
                 }
                 instructionInfo.add(new InstructionWriteMetadata(
-                        MetaDataUtil.getVpnIdMetadata(segmentId), MetaDataUtil.METADATA_MASK_VRFID));
+                        MetaDataUtil.getVpnIdMetadata(segmentId.longValue()), MetaDataUtil.METADATA_MASK_VRFID));
                 break;
 
             default:
                 LOG.error("buildAndGetSetActionInstructionInfo : Neither OUTBOUND_NAPT_TABLE nor "
                         + "INBOUND_NAPT_TABLE matches with input table id {}", tableId);
-                return null;
+                return Collections.emptyList();
         }
 
         listActionInfo.add(ipActionInfo);
@@ -593,8 +634,8 @@ public class NaptEventHandler {
         return instructionInfo;
     }
 
-    void removeNatFlows(BigInteger dpnId, short tableId ,long segmentId, String ip, int port) {
-        if (dpnId == null || dpnId.equals(BigInteger.ZERO)) {
+    void removeNatFlows(Uint64 dpnId, short tableId ,Uint32 segmentId, String ip, int port, String protocol) {
+        if (dpnId == null || dpnId.equals(Uint64.ZERO)) {
             LOG.error("removeNatFlows : DPN ID {} is invalid" , dpnId);
             return;
         }
@@ -602,19 +643,20 @@ public class NaptEventHandler {
             dpnId, segmentId, ip, port);
 
         //Build the flow with the port IP and port as the match info.
-        String switchFlowRef = NatUtil.getNaptFlowRef(dpnId, tableId, String.valueOf(segmentId), ip, port);
+        String switchFlowRef = NatUtil.getNaptFlowRef(dpnId, tableId, String.valueOf(segmentId), ip, port, protocol);
         FlowEntity snatFlowEntity = NatUtil.buildFlowEntity(dpnId, tableId, switchFlowRef);
         LOG.debug("removeNatFlows : Remove the flow in the table {} for the switch with the DPN ID {}",
-            NwConstants.INBOUND_NAPT_TABLE, dpnId);
+                tableId, dpnId);
         long startTime = System.currentTimeMillis();
         mdsalManager.removeFlow(snatFlowEntity);
         LOG.trace("removeNatFlows : Time Elapsed for removing table-{} flow from switch with DPN ID:{} "
                 + "for SNAT ({}:{}) session:{}ms", tableId, dpnId, ip, port, System.currentTimeMillis() - startTime);
     }
 
+    @Nullable
     @SuppressFBWarnings("PZLA_PREFER_ZERO_LENGTH_ARRAYS")
     protected byte[] buildNaptPacketOut(Ethernet etherPkt) {
-        LOG.debug("removeNatFlows : About to build Napt Packet Out");
+        LOG.debug("buildNaptPacketOut : About to build Napt Packet Out");
         if (etherPkt.getPayload() instanceof IPv4) {
             byte[] rawPkt;
             IPv4 ipPkt = (IPv4) etherPkt.getPayload();
@@ -627,22 +669,23 @@ public class NaptEventHandler {
                     return null;
                 }
             } else {
-                LOG.error("removeNatFlows : Unable to build NaptPacketOut since its neither TCP nor UDP");
+                LOG.error("buildNaptPacketOut : Unable to build NaptPacketOut since its neither TCP nor UDP");
                 return null;
             }
         }
-        LOG.error("removeNatFlows : Unable to build NaptPacketOut since its not IPv4 packet");
+        LOG.error("buildNaptPacketOut : Unable to build NaptPacketOut since its not IPv4 packet");
         return null;
     }
 
-    private void sendNaptPacketOut(byte[] pktOut, InterfaceInfo infInfo, List<ActionInfo> actionInfos, Long tunId) {
-        LOG.trace("sendNaptPacketOut: Sending packet out DpId {}, interfaceInfo {}", infInfo.getDpId(), infInfo);
+    private void sendNaptPacketOut(byte[] pktOut, Uint64 dpnID, int portNum,
+            List<ActionInfo> actionInfos, Uint64 tunId) {
+        LOG.trace("sendNaptPacketOut: Sending packet out DpId {}, interface {}", dpnID, portNum);
         // set inPort, and action as OFPP_TABLE so that it starts from table 0 (lowest table as per spec)
-        actionInfos.add(new ActionSetFieldTunnelId(2, BigInteger.valueOf(tunId)));
+        actionInfos.add(new ActionSetFieldTunnelId(2, tunId));
         actionInfos.add(new ActionOutput(3, new Uri("0xfffffff9")));
-        NodeConnectorRef inPort = MDSALUtil.getNodeConnRef(infInfo.getDpId(), String.valueOf(infInfo.getPortNo()));
-        LOG.debug("sendNaptPacketOut : inPort for packetout is being set to {}", String.valueOf(infInfo.getPortNo()));
-        TransmitPacketInput output = MDSALUtil.getPacketOut(actionInfos, pktOut, infInfo.getDpId().longValue(), inPort);
+        NodeConnectorRef inPort = MDSALUtil.getNodeConnRef(dpnID, String.valueOf(portNum));
+        LOG.debug("sendNaptPacketOut : inPort for packetout is being set to {}", portNum);
+        TransmitPacketInput output = MDSALUtil.getPacketOut(actionInfos, pktOut, dpnID.longValue(), inPort);
         LOG.debug("sendNaptPacketOut : Transmitting packet: {}, inPort {}", output, inPort);
 
         JdkFutures.addErrorLogging(pktService.transmitPacket(output), LOG, "Transmit packet");
@@ -656,7 +699,9 @@ public class NaptEventHandler {
             interfaceManagerRpc.getInterfaceFromIfIndex(input);
         try {
             GetInterfaceFromIfIndexOutput output = futureOutput.get().getResult();
-            interfaceName = output.getInterfaceName();
+            if (output != null) {
+                interfaceName = output.getInterfaceName();
+            }
         } catch (InterruptedException | ExecutionException e) {
             LOG.error("getInterfaceNameFromTag : Error while retrieving the interfaceName from tag using "
                 + "getInterfaceFromIfIndex RPC");
@@ -666,7 +711,7 @@ public class NaptEventHandler {
         return interfaceName;
     }
 
-    private long getVpnIdFromExternalSubnet(Long routerId, String externalIpAddress) {
+    private Uint32 getVpnIdFromExternalSubnet(Uint32 routerId, String externalIpAddress) {
         String routerName = NatUtil.getRouterName(dataBroker, routerId);
         if (routerName != null) {
             Routers extRouter = NatUtil.getRoutersFromConfigDS(dataBroker, routerName);
@@ -677,4 +722,30 @@ public class NaptEventHandler {
 
         return NatConstants.INVALID_ID;
     }
+
+    public void handleFlowRemoved(NAPTEntryEvent naptEntryEvent, Uint32 routerId, String sourceIPPortKey,
+                                  Uint64 dpnId) {
+        String internalIpv4HostAddress = naptEntryEvent.getIpAddress();
+        Integer internalPortNumber = naptEntryEvent.getPortNumber();
+        NAPTEntryEvent.Protocol protocol = naptEntryEvent.getProtocol();
+        //Get the external IP address and the port from the model
+        LOG.trace("handleFlowRemoved: Failed to remove snat flow internalIP {} with "
+                + "Port {} protocol {} for routerId {} in OUTBOUNDTABLE of naptSwitch {}",
+            internalIpv4HostAddress, internalPortNumber, protocol, routerId, dpnId);
+        removeNatFlows(dpnId, NwConstants.OUTBOUND_NAPT_TABLE, routerId, internalIpv4HostAddress,
+            internalPortNumber, protocol.name());
+
+        LOG.trace("handleFlowRemoved: Failed to remove snat flow internalIP {} with "
+                + "Port {} protocol {} for routerId {} in INBOUNDTABLE of naptSwitch {}",
+            internalIpv4HostAddress, internalPortNumber, protocol, routerId, dpnId);
+        removeNatFlows(dpnId, NwConstants.INBOUND_NAPT_TABLE, routerId, internalIpv4HostAddress,
+            internalPortNumber, protocol.name());
+
+        //Remove the SourceIP:Port key from the Napt packet handler map.
+        NaptPacketInHandler.removeIncomingPacketMap(sourceIPPortKey);
+
+        //Remove the mapping of internal fixed ip/port to external ip/port from the datastore.
+        SessionAddress internalSessionAddress = new SessionAddress(internalIpv4HostAddress, internalPortNumber);
+        naptManager.releaseIpExtPortMapping(routerId, internalSessionAddress, protocol);
+    }
 }