Make nullToEmpty return immutable empty lists
[genius.git] / arputil / arputil-impl / src / main / java / org / opendaylight / genius / arputil / internal / ArpUtilImpl.java
index a5c1b2c8668aa8e254853e35823258e3d2bfbe3b..2237d735b72f247d57648d695cd6a42a74475023 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
 
 package org.opendaylight.genius.arputil.internal;
 
-import com.google.common.base.Optional;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Collections.emptyList;
+
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.liblldp.NetUtils;
-import org.opendaylight.controller.liblldp.Packet;
+import java.io.UnsupportedEncodingException;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.genius.arputil.api.ArpConstants;
 import org.opendaylight.genius.mdsalutil.MDSALUtil;
 import org.opendaylight.genius.mdsalutil.MetaDataUtil;
 import org.opendaylight.genius.mdsalutil.NWUtil;
-import org.opendaylight.genius.mdsalutil.interfaces.IMdsalApiManager;
 import org.opendaylight.genius.mdsalutil.packet.ARP;
 import org.opendaylight.genius.mdsalutil.packet.Ethernet;
+import org.opendaylight.infrautils.inject.AbstractLifecycle;
+import org.opendaylight.infrautils.metrics.Meter;
+import org.opendaylight.infrautils.metrics.MetricProvider;
+import org.opendaylight.infrautils.utils.concurrent.Executors;
+import org.opendaylight.openflowplugin.libraries.liblldp.HexEncode;
+import org.opendaylight.openflowplugin.libraries.liblldp.NetUtils;
+import org.opendaylight.openflowplugin.libraries.liblldp.Packet;
+import org.opendaylight.openflowplugin.libraries.liblldp.PacketException;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IetfInetUtil;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.Interfaces;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.InterfacesState;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.interfaces.Interface;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.interfaces.InterfaceKey;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.PhysAddress;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.*;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.*;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.ArpRequestReceivedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.ArpResponseReceivedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.GetMacInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.GetMacOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.GetMacOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.MacChangedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.OdlArputilService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.SendArpRequestInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.SendArpRequestInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.SendArpRequestOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.SendArpResponseInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.SendArpResponseOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.interfaces.InterfaceAddress;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.GetEgressActionsForInterfaceInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.GetEgressActionsForInterfaceOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.GetInterfaceFromIfIndexInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.GetInterfaceFromIfIndexInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.GetInterfaceFromIfIndexOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.GetPortFromInterfaceInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.GetPortFromInterfaceOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.OdlInterfaceRpcService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.Metadata;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.*;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.*;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.interfaces.InterfaceAddress;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInReason;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.SendToController;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketOutput;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.math.BigInteger;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.List;
-import java.util.concurrent.*;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-public class ArpUtilImpl implements OdlArputilService,
-        PacketProcessingListener, AutoCloseable {
-
-    private static final String FAILED_TO_GET_SRC_IP_FOR_INTERFACE = "Failed to get src ip for %s";
-
-    private static final String FAILED_TO_GET_SRC_MAC_FOR_INTERFACE = "Failed to get src mac for interface %s iid %s ";
-
-    private static final String FAILED_TO_SEND_ARP_REQ_FOR_INTERFACE = "failed to send arp req for interface ";
-
-    private static final String UNKNOWN_IP_ADDRESS_SUPPLIED = "unknown ip address supplied";
-
-    private static final String NODE_CONNECTOR_NOT_FOUND_ERROR = "Node connector id not found for interface %s";
-
-    private static final String DPN_NOT_FOUND_ERROR = "dpn not found for interface %s ";
-
-    private static final short ARP_REQUEST_OP = (short) 1;
-
-    private static final short ARP_RESPONSE_OP = (short) 2;
-
-    private static final short ETH_TYPE_ARP = 0x0806;
-
-    private static final Logger LOGGER = LoggerFactory
-            .getLogger(ArpUtilImpl.class);
-
-    static OdlInterfaceRpcService intfRpc;
-    
-    ExecutorService threadPool = Executors.newFixedThreadPool(1);
-
-    DataBroker dataBroker;
-    PacketProcessingService packetProcessingService;
-    NotificationPublishService notificationPublishService;
-    NotificationService notificationService;
-    IMdsalApiManager mdsalMgr;
-
-    RpcProviderRegistry rpc;
-    ListenerRegistration<ArpUtilImpl> listenerRegistration;
-
-    ConcurrentMap<String, String> macsDB = new ConcurrentHashMap<>();
-    ConcurrentMap<String, SettableFuture<RpcResult<GetMacOutput>>> getMacFutures = new ConcurrentHashMap<>();
-
-    public ArpUtilImpl(DataBroker db,
-            PacketProcessingService packetProcessingService,
-            NotificationPublishService notificationPublishService,
-            NotificationService notificationService,
-            IMdsalApiManager mdsalApiManager,
-            RpcProviderRegistry rpc) {
-
-        this.dataBroker = db;
+@Singleton
+public class ArpUtilImpl extends AbstractLifecycle implements OdlArputilService, PacketProcessingListener {
+    private static final Logger LOG = LoggerFactory.getLogger(ArpUtilImpl.class);
+    private static final String MODULENAME = "odl.genius.arputil.";
+    private static final String OPENFLOW_PFX = "openflow:";
+
+    private final DataBroker dataBroker;
+    private final PacketProcessingService packetProcessingService;
+    private final NotificationPublishService notificationPublishService;
+    private final NotificationService notificationService;
+    private final OdlInterfaceRpcService odlInterfaceRpcService;
+    private ListenerRegistration<ArpUtilImpl> listenerRegistration;
+    private final ExecutorService threadPool = Executors.newFixedThreadPool(1, "ArpUtil", LOG);
+    private final ConcurrentMap<String, String> macsDB = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, SettableFuture<RpcResult<GetMacOutput>>> macAddrs = new ConcurrentHashMap<>();
+
+    private final Meter arpRespRecvd;
+    private final Meter arpRespRecvdNotification;
+    private final Meter arpRespRecvdNotificationRejected;
+    private final Meter arpReqRecvd;
+    private final Meter arpReqRecvdNotification;
+    private final Meter arpReqRecvdNotificationRejected;
+
+
+    @Inject
+    public ArpUtilImpl(final DataBroker dataBroker, final PacketProcessingService packetProcessingService,
+                       final NotificationPublishService notificationPublishService,
+                       final NotificationService notificationService,
+                       final OdlInterfaceRpcService odlInterfaceRpcService,
+                       final MetricProvider metricProvider) {
+        this.dataBroker = dataBroker;
         this.packetProcessingService = packetProcessingService;
         this.notificationPublishService = notificationPublishService;
-        this.mdsalMgr = mdsalApiManager;
         this.notificationService = notificationService;
-        this.rpc = rpc;
-        listenerRegistration = notificationService
-                .registerNotificationListener(this);
-        LOGGER.info("ArpUtil Manager Initialized ");
+        this.odlInterfaceRpcService = odlInterfaceRpcService;
+
+        arpRespRecvd = metricProvider.newMeter(this,MODULENAME + "arpResponseReceived");
+        arpRespRecvdNotification = metricProvider.newMeter(this,MODULENAME + "arpResponseReceivedNotification");
+        arpRespRecvdNotificationRejected = metricProvider.newMeter(this,
+                MODULENAME + "arpResponseReceivedNotificationRejected");
+        arpReqRecvd = metricProvider.newMeter(this,MODULENAME + "arpRequestReceived");
+        arpReqRecvdNotification = metricProvider.newMeter(this,MODULENAME + "arpRequestReceivedNotification");
+        arpReqRecvdNotificationRejected = metricProvider.newMeter(this,
+                MODULENAME + "arpRequestReceivedNotificationRejected");
     }
 
-    OdlInterfaceRpcService getInterfaceRpcService() {
-        if (intfRpc == null ) {
-            intfRpc = rpc.getRpcService(OdlInterfaceRpcService.class);
-        }
-        return intfRpc;
+    @Override
+    public void start() {
+        LOG.info("{} start", getClass().getSimpleName());
+        listenerRegistration = notificationService.registerNotificationListener(this);
     }
 
     @Override
-    public void close() throws Exception {
-        listenerRegistration.close();
-        LOGGER.trace("ArpUtil manager Closed");
-    }
+    public void stop() {
+        LOG.info("{} stop", getClass().getSimpleName());
 
-    String getIpAddressInString(IpAddress ipAddress)
-            throws UnknownHostException {
-        return InetAddress.getByName(ipAddress.getIpv4Address().getValue())
-                .getHostAddress();
+        if (listenerRegistration != null) {
+            listenerRegistration.close();
+            listenerRegistration = null;
+        }
     }
 
-    public Future<RpcResult<GetMacOutput>> getMac(GetMacInput input) {
+    private String getIpAddressInString(IpAddress ipAddress) throws UnknownHostException {
+        return InetAddress.getByName(ipAddress.getIpv4Address().getValue()).getHostAddress();
+    }
 
+    @Override
+    public ListenableFuture<RpcResult<GetMacOutput>> getMac(GetMacInput input) {
         try {
-            final String dstIpAddress = getIpAddressInString(input
-                    .getIpaddress());
-            if (LOGGER.isTraceEnabled()) {
-                LOGGER.trace("getMac rpc invoked for ip " + dstIpAddress);
-            }
-            if (getMacFutures.get(dstIpAddress) != null) {
-                if (LOGGER.isInfoEnabled()) {
-                    LOGGER.info("get mac already in progress for the ip "
-                            + dstIpAddress);
+            final String dstIpAddress = getIpAddressInString(input.getIpaddress());
+            LOG.trace("getMac rpc invoked for ip {}", dstIpAddress);
+            if (macAddrs.get(dstIpAddress) != null) {
+                if (LOG.isInfoEnabled()) {
+                    LOG.info("get mac already in progress for the ip {}", dstIpAddress);
                 }
-                return getMacFutures.get(dstIpAddress);
+                return macAddrs.get(dstIpAddress);
             }
             SendArpRequestInputBuilder builder = new SendArpRequestInputBuilder()
-                    .setInterfaceAddress(input.getInterfaceAddress())
-                    .setIpaddress(input.getIpaddress());
-            Future<RpcResult<Void>> arpReqFt = sendArpRequest(builder.build());
-            final SettableFuture<RpcResult<GetMacOutput>> ft = SettableFuture
-                    .create();
-
-            Futures.addCallback(
-                    JdkFutureAdapters.listenInPoolThread(arpReqFt, threadPool),
-                    new FutureCallback<RpcResult<Void>>() {
-                        @Override
-                        public void onFailure(Throwable e) {
-                            RpcResultBuilder<GetMacOutput> resultBuilder = RpcResultBuilder
-                                    .<GetMacOutput> failed().withError(
-                                            ErrorType.APPLICATION,
-                                            e.getMessage(), e);
-                            ft.set(resultBuilder.build());
-                        }
-
-                        @Override
-                        public void onSuccess(RpcResult<Void> result) {
-                            LOGGER.trace("Successfully sent the arp pkt out for ip "
-                                    + dstIpAddress);
-                        }
-                    });
-
-            getMacFutures.put(dstIpAddress, ft);
+                    .setInterfaceAddress(input.getInterfaceAddress()).setIpaddress(input.getIpaddress());
+            ListenableFuture<RpcResult<SendArpRequestOutput>> arpReqFt = sendArpRequest(builder.build());
+            final SettableFuture<RpcResult<GetMacOutput>> ft = SettableFuture.create();
+
+            Futures.addCallback(arpReqFt, new FutureCallback<RpcResult<SendArpRequestOutput>>() {
+                @Override
+                public void onFailure(Throwable ex) {
+                    RpcResultBuilder<GetMacOutput> resultBuilder = RpcResultBuilder.<GetMacOutput>failed()
+                            .withError(ErrorType.APPLICATION, ex.getMessage(), ex);
+                    ft.set(resultBuilder.build());
+                }
+
+                @Override
+                public void onSuccess(RpcResult<SendArpRequestOutput> result) {
+                    LOG.trace("Successfully sent the arp pkt out for ip {}", dstIpAddress);
+                }
+            }, MoreExecutors.directExecutor());
+
+            macAddrs.put(dstIpAddress, ft);
             return ft;
-        } catch (Exception e) {
-            LOGGER.trace("failed to handle getMac request for {} {}",
-                    input.getIpaddress(), e);
-            RpcResultBuilder<GetMacOutput> resultBuilder = RpcResultBuilder
-                    .<GetMacOutput> failed().withError(ErrorType.APPLICATION,
-                            e.getMessage(), e);
+        } catch (UnknownHostException e) {
+            LOG.error("Failed to handle getMac request for {}", input.getIpaddress(), e);
+            RpcResultBuilder<GetMacOutput> resultBuilder = RpcResultBuilder.<GetMacOutput>failed()
+                    .withError(ErrorType.APPLICATION, e.getMessage(), e);
             return Futures.immediateFuture(resultBuilder.build());
         }
     }
 
-    byte[] getIpAddressBytes(IpAddress ip) throws UnknownHostException {
-        return InetAddress.getByName(ip.getIpv4Address().getValue())
-                .getAddress();
+    private byte[] getIpAddressBytes(IpAddress ip) throws UnknownHostException {
+        return InetAddress.getByName(ip.getIpv4Address().getValue()).getAddress();
     }
 
     @Override
-    public Future<RpcResult<Void>> sendArpRequest(
-            SendArpRequestInput arpReqInput) {
-        if (LOGGER.isTraceEnabled()) {
-            LOGGER.trace("rpc sendArpRequest invoked for ip "
-                    + arpReqInput.getIpaddress());
-        }
+    public ListenableFuture<RpcResult<SendArpRequestOutput>> sendArpRequest(SendArpRequestInput arpReqInput) {
+        LOG.trace("rpc sendArpRequest invoked for ip {}", arpReqInput.getIpaddress());
         BigInteger dpnId;
-        long groupId;
-        byte payload[];
+        byte[] payload;
         String interfaceName = null;
-        byte srcIpBytes[];
-        byte[] dstIpBytes = null;
+        byte[] srcIpBytes;
+        byte[] dstIpBytes;
+        byte[] srcMac;
 
-        RpcResultBuilder<Void> failureBuilder = RpcResultBuilder
-                .<Void> failed();
-        RpcResultBuilder<Void> successBuilder = RpcResultBuilder
-                .<Void> success();
+        RpcResultBuilder<SendArpRequestOutput> failureBuilder = RpcResultBuilder.failed();
+        RpcResultBuilder<SendArpRequestOutput> successBuilder = RpcResultBuilder.success();
 
         try {
             dstIpBytes = getIpAddressBytes(arpReqInput.getIpaddress());
-        } catch (Exception e) {
-            failureBuilder.withError(ErrorType.APPLICATION,
-                    UNKNOWN_IP_ADDRESS_SUPPLIED);
+        } catch (UnknownHostException e) {
+            LOG.error("Cannot get IP address", e);
+            failureBuilder.withError(ErrorType.APPLICATION, ArpConstants.UNKNOWN_IP_ADDRESS_SUPPLIED);
             return Futures.immediateFuture(failureBuilder.build());
         }
 
         int localErrorCount = 0;
-        for (InterfaceAddress interfaceAddress : arpReqInput
-                .getInterfaceAddress()) {
+        for (InterfaceAddress interfaceAddress : nullToEmpty(arpReqInput.getInterfaceAddress())) {
             try {
                 interfaceName = interfaceAddress.getInterface();
                 srcIpBytes = getIpAddressBytes(interfaceAddress.getIpAddress());
 
-                NodeConnectorId id = getNodeConnectorFromInterfaceName(interfaceName);
-
                 GetPortFromInterfaceOutput portResult = getPortFromInterface(interfaceName);
+                checkNotNull(portResult);
                 dpnId = portResult.getDpid();
                 Long portid = portResult.getPortno();
-                checkArgument(null != dpnId && BigInteger.ZERO != dpnId,
-                        DPN_NOT_FOUND_ERROR, interfaceName);
+                checkArgument(null != dpnId && !BigInteger.ZERO.equals(dpnId),
+                    ArpConstants.DPN_NOT_FOUND_ERROR, interfaceName);
 
-                NodeConnectorRef ref = MDSALUtil.getNodeConnRef(dpnId,
-                        portid.toString());
-                checkNotNull(ref, NODE_CONNECTOR_NOT_FOUND_ERROR, interfaceName);
+                NodeConnectorRef ref = MDSALUtil.getNodeConnRef(dpnId, portid.toString());
+                checkNotNull(ref, ArpConstants.NODE_CONNECTOR_NOT_FOUND_ERROR, interfaceName);
 
-                if (LOGGER.isTraceEnabled()) {
-                    LOGGER.trace(
-                            "sendArpRequest received dpnId {} out interface {}",
-                            dpnId, interfaceName);
+                LOG.trace("sendArpRequest received dpnId {} out interface {}", dpnId, interfaceName);
+                if (interfaceAddress.getMacaddress() == null) {
+                    srcMac = MDSALUtil.getMacAddressForNodeConnector(dataBroker,
+                            (InstanceIdentifier<NodeConnector>) ref.getValue());
+                } else {
+                    String macAddr = interfaceAddress.getMacaddress().getValue();
+                    srcMac = HexEncode.bytesFromHexString(macAddr);
                 }
-                byte srcMac[] = MDSALUtil.getMacAddressForNodeConnector(
-                        dataBroker,
-                        (InstanceIdentifier<NodeConnector>) ref.getValue());
-                checkNotNull(srcMac, FAILED_TO_GET_SRC_MAC_FOR_INTERFACE,
-                        interfaceName, ref.getValue());
-                checkNotNull(srcIpBytes, FAILED_TO_GET_SRC_IP_FOR_INTERFACE,
-                        interfaceName);
+                checkNotNull(srcMac, ArpConstants.FAILED_TO_GET_SRC_MAC_FOR_INTERFACE, interfaceName, ref.getValue());
+                checkNotNull(srcIpBytes, ArpConstants.FAILED_TO_GET_SRC_IP_FOR_INTERFACE, interfaceName);
 
-                payload = ArpPacketUtil.getPayload(ARP_REQUEST_OP, srcMac,
-                        srcIpBytes, ArpPacketUtil.EthernetDestination_Broadcast,
-                        dstIpBytes);
+                payload = ArpPacketUtil.getPayload(ArpConstants.ARP_REQUEST_OP, srcMac, srcIpBytes,
+                        ArpPacketUtil.ETHERNET_BROADCAST_DESTINATION, dstIpBytes);
 
-                sendPacketOut(dpnId, payload, ref);
+                List<Action> actions = getEgressAction(interfaceName);
+                sendPacketOutWithActions(dpnId, payload, ref, actions);
 
-                if (LOGGER.isTraceEnabled()) {
-                    LOGGER.trace("sent arp request for "
-                            + arpReqInput.getIpaddress());
-                }
-            } catch (Throwable e) {
-                LOGGER.trace("failed to send arp req for {} on interface {}",
-                        arpReqInput.getIpaddress(), interfaceName);
-
-                failureBuilder
-                        .withError(ErrorType.APPLICATION,
-                                FAILED_TO_SEND_ARP_REQ_FOR_INTERFACE
-                                        + interfaceName, e);
-                successBuilder
-                        .withError(ErrorType.APPLICATION,
-                                FAILED_TO_SEND_ARP_REQ_FOR_INTERFACE
-                                        + interfaceName, e);
+                LOG.trace("sent arp request for {}", arpReqInput.getIpaddress());
+            } catch (UnknownHostException | PacketException | InterruptedException | ExecutionException
+                    | ReadFailedException e) {
+                LOG.trace("failed to send arp req for {} on interface {}", arpReqInput.getIpaddress(), interfaceName);
+
+                failureBuilder.withError(ErrorType.APPLICATION,
+                    ArpConstants.FAILED_TO_SEND_ARP_REQ_FOR_INTERFACE + interfaceName, e);
+                successBuilder.withError(ErrorType.APPLICATION,
+                    ArpConstants.FAILED_TO_SEND_ARP_REQ_FOR_INTERFACE + interfaceName, e);
                 localErrorCount++;
             }
         }
@@ -285,164 +282,180 @@ public class ArpUtilImpl implements OdlArputilService,
         return Futures.immediateFuture(successBuilder.build());
     }
 
-    public Future<RpcResult<Void>> sendPacketOut(BigInteger dpnId,
-                                                 byte[] payload, NodeConnectorRef ref) {
-
-        NodeConnectorRef nodeConnectorRef = MDSALUtil.getNodeConnRef(dpnId,
-                "0xfffffffd");
-        return packetProcessingService
-                .transmitPacket(new TransmitPacketInputBuilder()
-                        .setPayload(payload)
-                        .setNode(
-                                new NodeRef(InstanceIdentifier
-                                        .builder(Nodes.class)
-                                        .child(Node.class,
-                                                new NodeKey(new NodeId(
-                                                        "openflow:" + dpnId)))
-                                        .toInstance()))
-                        .setIngress(nodeConnectorRef).setEgress(ref).build());
+    public ListenableFuture<RpcResult<TransmitPacketOutput>> sendPacketOut(
+            BigInteger dpnId, byte[] payload, NodeConnectorRef ref) {
+        NodeConnectorRef nodeConnectorRef = MDSALUtil.getNodeConnRef(dpnId, "0xfffffffd");
+        return packetProcessingService.transmitPacket(new TransmitPacketInputBuilder().setPayload(payload)
+                .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
+                        .child(Node.class, new NodeKey(new NodeId(OPENFLOW_PFX + dpnId))).build()))
+                .setIngress(nodeConnectorRef).setEgress(ref).build());
     }
 
-    @Override
-    public Future<RpcResult<Void>> sendArpResponse(SendArpResponseInput input) {
-        if (LOGGER.isTraceEnabled()) {
-            LOGGER.trace("sendArpResponse rpc invoked");
+    private Future<RpcResult<TransmitPacketOutput>> sendPacketOutWithActions(
+            BigInteger dpnId, byte[] payload, NodeConnectorRef ref, List<Action> actions) {
+        NodeConnectorRef nodeConnectorRef = MDSALUtil.getNodeConnRef(dpnId, "0xfffffffd");
+        TransmitPacketInput transmitPacketInput = new TransmitPacketInputBuilder().setPayload(payload)
+                .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
+                        .child(Node.class, new NodeKey(new NodeId(OPENFLOW_PFX + dpnId))).build()))
+                .setIngress(nodeConnectorRef).setEgress(ref).setAction(actions).build();
+        LOG.trace("PacketOut message framed for transmitting {}", transmitPacketInput);
+        return packetProcessingService.transmitPacket(transmitPacketInput);
+    }
+
+    private List<Action> getEgressAction(String interfaceName) {
+        List<Action> actions = new ArrayList<>();
+        try {
+            GetEgressActionsForInterfaceInputBuilder egressAction = new GetEgressActionsForInterfaceInputBuilder()
+                    .setIntfName(interfaceName);
+            OdlInterfaceRpcService intfRpc = odlInterfaceRpcService;
+            if (intfRpc == null) {
+                LOG.error("Unable to obtain interfaceMgrRpc service, ignoring egress actions for interfaceName {}",
+                        interfaceName);
+                return actions;
+            }
+            Future<RpcResult<GetEgressActionsForInterfaceOutput>> result = intfRpc
+                    .getEgressActionsForInterface(egressAction.build());
+            RpcResult<GetEgressActionsForInterfaceOutput> rpcResult = result.get();
+            if (!rpcResult.isSuccessful()) {
+                LOG.warn("RPC Call to Get egress actions for interface {} returned with Errors {}", interfaceName,
+                        rpcResult.getErrors());
+            } else {
+                actions = rpcResult.getResult().getAction();
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Exception when egress actions for interface {}", interfaceName, e);
         }
+        return actions;
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<SendArpResponseOutput>> sendArpResponse(SendArpResponseInput input) {
+        LOG.trace("sendArpResponse rpc invoked");
         BigInteger dpnId;
-        long groupId;
-        byte payload[];
+        byte[] payload;
+        byte[] srcMac;
 
         try {
             String interfaceName = input.getInterface();
             GetPortFromInterfaceOutput portResult = getPortFromInterface(interfaceName);
+            checkNotNull(portResult);
             dpnId = portResult.getDpid();
             Long portid = portResult.getPortno();
-            NodeConnectorRef ref = MDSALUtil.getNodeConnRef(dpnId,
-                    portid.toString());
-            checkArgument(null != dpnId && BigInteger.ZERO != dpnId,
-                    DPN_NOT_FOUND_ERROR, interfaceName);
-            checkNotNull(ref, NODE_CONNECTOR_NOT_FOUND_ERROR, interfaceName);
-
-            if (LOGGER.isTraceEnabled()) {
-                LOGGER.trace(
-                        "sendArpRequest received dpnId {} out interface {}",
-                        dpnId, interfaceName);
-            }
-
-            byte[] srcIpBytes = getIpAddressBytes(input.getSrcIpAddress());
-            byte[] dstIpBytes = getIpAddressBytes(input.getIpaddress());
-            byte srcMac[] = MDSALUtil.getMacAddressForNodeConnector(dataBroker,
-                    (InstanceIdentifier<NodeConnector>) ref.getValue());
-            byte[] dstMac = NWUtil.parseMacAddress(input.getMacaddress()
-                    .getValue());
-            checkNotNull(srcIpBytes, FAILED_TO_GET_SRC_IP_FOR_INTERFACE,
-                    interfaceName);
-            payload = ArpPacketUtil.getPayload(ARP_RESPONSE_OP, srcMac, srcIpBytes,
-                    dstMac, dstIpBytes);
-
-            sendPacketOut(dpnId, payload, ref);
-            if (LOGGER.isTraceEnabled()) {
-                LOGGER.trace("sent the arp response for "
-                        + input.getSrcIpAddress());
+            NodeConnectorRef ref = MDSALUtil.getNodeConnRef(dpnId, portid.toString());
+            checkArgument(null != dpnId && !BigInteger.ZERO.equals(dpnId),
+                ArpConstants.DPN_NOT_FOUND_ERROR, interfaceName);
+            checkNotNull(ref, ArpConstants.NODE_CONNECTOR_NOT_FOUND_ERROR, interfaceName);
+
+            LOG.trace("sendArpRequest received dpnId {} out interface {}", dpnId, interfaceName);
+
+            byte[] srcIpBytes = getIpAddressBytes(input.getSrcIpaddress());
+            byte[] dstIpBytes = getIpAddressBytes(input.getDstIpaddress());
+            if (input.getSrcMacaddress() == null) {
+                srcMac = portResult.getPhyAddress().getBytes("UTF-8");
+            } else {
+                String macAddr = input.getSrcMacaddress().getValue();
+                srcMac = HexEncode.bytesFromHexString(macAddr);
             }
-        } catch (Throwable e) {
-            LOGGER.trace("failed to send arp response for {} {}",
-                    input.getSrcIpAddress(), e);
-            return RpcResultBuilder.<Void> failed()
-                    .withError(ErrorType.APPLICATION, e.getMessage(), e)
-                    .buildFuture();
+            byte[] dstMac = NWUtil.parseMacAddress(input.getDstMacaddress().getValue());
+            checkNotNull(srcIpBytes, ArpConstants.FAILED_TO_GET_SRC_IP_FOR_INTERFACE, interfaceName);
+            payload = ArpPacketUtil.getPayload(ArpConstants.ARP_RESPONSE_OP, srcMac, srcIpBytes, dstMac, dstIpBytes);
+
+            List<Action> actions = getEgressAction(interfaceName);
+            sendPacketOutWithActions(dpnId, payload, ref, actions);
+            LOG.debug("Sent ARP response for IP {}, from source MAC {} to target MAC {} and target IP {} via dpnId {}",
+                    input.getSrcIpaddress().getIpv4Address().getValue(), HexEncode.bytesToHexStringFormat(srcMac),
+                    HexEncode.bytesToHexStringFormat(dstMac), input.getDstIpaddress().getIpv4Address().getValue(),
+                    dpnId);
+        } catch (UnknownHostException | PacketException | InterruptedException | UnsupportedEncodingException
+                | ExecutionException e) {
+            LOG.error("failed to send arp response for {}: ", input.getSrcIpaddress(), e);
+            return RpcResultBuilder.<SendArpResponseOutput>failed()
+                    .withError(ErrorType.APPLICATION, e.getMessage(), e).buildFuture();
         }
-        RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
+        RpcResultBuilder<SendArpResponseOutput> rpcResultBuilder = RpcResultBuilder.success();
         return Futures.immediateFuture(rpcResultBuilder.build());
     }
 
     @Override
     public void onPacketReceived(PacketReceived packetReceived) {
-        Class<? extends PacketInReason> pktInReason = packetReceived
-                .getPacketInReason();
-        if (LOGGER.isTraceEnabled()) {
-            LOGGER.trace("Packet Received {}", packetReceived);
-        }
+        Class<? extends PacketInReason> pktInReason = packetReceived.getPacketInReason();
+        LOG.trace("Packet Received {}", packetReceived);
 
         if (pktInReason == SendToController.class) {
-
             try {
+                BigInteger dpnId = extractDpnId(packetReceived);
                 int tableId = packetReceived.getTableId().getValue();
 
                 byte[] data = packetReceived.getPayload();
                 Ethernet ethernet = new Ethernet();
 
-                ethernet.deserialize(data, 0, data.length
-                        * NetUtils.NumBitsInAByte);
-                if (ethernet.getEtherType() != ETH_TYPE_ARP) {
+                ethernet.deserialize(data, 0, data.length * NetUtils.NUM_BITS_IN_A_BYTE);
+                if (ethernet.getEtherType() != ArpConstants.ETH_TYPE_ARP) {
                     return;
                 }
 
                 Packet pkt = ethernet.getPayload();
                 ARP arp = (ARP) pkt;
-                InetAddress srcInetAddr = InetAddress.getByAddress(arp
-                        .getSenderProtocolAddress());
-                InetAddress dstInetAddr = InetAddress.getByAddress(arp
-                        .getTargetProtocolAddress());
+                InetAddress srcInetAddr = InetAddress.getByAddress(arp.getSenderProtocolAddress());
+                InetAddress dstInetAddr = InetAddress.getByAddress(arp.getTargetProtocolAddress());
                 byte[] srcMac = ethernet.getSourceMACAddress();
+                byte[] dstMac = ethernet.getDestinationMACAddress();
 
-                NodeConnectorRef ref = packetReceived.getIngress();
-
-                Metadata metadata  = packetReceived.getMatch().getMetadata();
-                String interfaceName = getInterfaceName(ref,metadata, dataBroker);
-                
-                checkAndFireMacChangedNotification(interfaceName, srcInetAddr,
-                        srcMac);
+                Metadata metadata = packetReceived.getMatch().getMetadata();
 
-                macsDB.put(interfaceName + "-" + srcInetAddr.getHostAddress(),
-                        NWUtil.toStringMacAddress(srcMac));
+                String interfaceName = getInterfaceName(metadata);
 
-                if (arp.getOpCode() == ARP_REQUEST_OP) {
-                    fireArpReqRecvdNotification(interfaceName, srcInetAddr,
-                            srcMac, dstInetAddr, tableId);
+                checkAndFireMacChangedNotification(interfaceName, srcInetAddr, srcMac);
+                macsDB.put(interfaceName + "-" + srcInetAddr.getHostAddress(), NWUtil.toStringMacAddress(srcMac));
+                if (arp.getOpCode() == ArpConstants.ARP_REQUEST_OP) {
+                    fireArpReqRecvdNotification(interfaceName, srcInetAddr, srcMac, dstInetAddr, dpnId, tableId,
+                            metadata.getMetadata());
                 } else {
-                    fireArpRespRecvdNotification(interfaceName, srcInetAddr,
-                            srcMac, tableId);
+                    fireArpRespRecvdNotification(interfaceName, srcInetAddr, srcMac, dpnId, tableId,
+                                                 metadata.getMetadata(), dstInetAddr, dstMac);
                 }
-                if (getMacFutures.get(srcInetAddr.getHostAddress()) != null) {
-                    threadPool.submit(new MacResponderTask(arp));
+                if (macAddrs.get(srcInetAddr.getHostAddress()) != null) {
+                    threadPool.execute(new MacResponderTask(arp));
                 }
-
-            } catch (Throwable e) {
-                LOGGER.trace("Failed to decode packet: {}", e);
+            } catch (PacketException | UnknownHostException | InterruptedException | ExecutionException e) {
+                LOG.trace("Failed to decode packet", e);
             }
         }
     }
 
-    GetPortFromInterfaceOutput getPortFromInterface(String interfaceName) throws Throwable {
+    private GetPortFromInterfaceOutput getPortFromInterface(String interfaceName)
+            throws InterruptedException, ExecutionException {
         GetPortFromInterfaceInputBuilder getPortFromInterfaceInputBuilder = new GetPortFromInterfaceInputBuilder();
-        getPortFromInterfaceInputBuilder.setIntfName(interfaceName);;
-        Future<RpcResult<GetPortFromInterfaceOutput>> portFromInterface = intfRpc.getPortFromInterface(getPortFromInterfaceInputBuilder.build());
+        getPortFromInterfaceInputBuilder.setIntfName(interfaceName);
+
+        Future<RpcResult<GetPortFromInterfaceOutput>> portFromInterface = odlInterfaceRpcService
+                .getPortFromInterface(getPortFromInterfaceInputBuilder.build());
         GetPortFromInterfaceOutput result = portFromInterface.get().getResult();
-        LOGGER.trace("getPortFromInterface rpc result is {} ", result);
+        LOG.trace("getPortFromInterface rpc result is {} ", result);
         if (result != null) {
-            LOGGER.trace("getPortFromInterface rpc result is {} {} ", result.getDpid(), result.getPortno());
+            LOG.trace("getPortFromInterface rpc result is {} {} ", result.getDpid(), result.getPortno());
         }
         return result;
     }
-    
-    private String getInterfaceName(NodeConnectorRef ref, Metadata metadata, DataBroker dataBroker2) throws Throwable {
-        LOGGER.debug("metadata received is {} ", metadata);
-       
+
+    private String getInterfaceName(Metadata metadata)
+            throws InterruptedException, ExecutionException {
+        LOG.debug("metadata received is {} ", metadata);
+
         GetInterfaceFromIfIndexInputBuilder ifIndexInputBuilder = new GetInterfaceFromIfIndexInputBuilder();
         BigInteger lportTag = MetaDataUtil.getLportFromMetadata(metadata.getMetadata());
-       
+
         ifIndexInputBuilder.setIfIndex(lportTag.intValue());
         GetInterfaceFromIfIndexInput input = ifIndexInputBuilder.build();
-        OdlInterfaceRpcService intfRpc = getInterfaceRpcService();
 
-        Future<RpcResult<GetInterfaceFromIfIndexOutput>> interfaceFromIfIndex = intfRpc.getInterfaceFromIfIndex(input);
+        Future<RpcResult<GetInterfaceFromIfIndexOutput>> interfaceFromIfIndex = odlInterfaceRpcService
+                .getInterfaceFromIfIndex(input);
         GetInterfaceFromIfIndexOutput interfaceFromIfIndexOutput = interfaceFromIfIndex.get().getResult();
         return interfaceFromIfIndexOutput.getInterfaceName();
     }
 
-       class MacResponderTask implements Runnable {
-        ARP arp;
+    private class MacResponderTask implements Runnable {
+        final ARP arp;
 
         MacResponderTask(ARP arp) {
             this.arp = arp;
@@ -453,76 +466,84 @@ public class ArpUtilImpl implements OdlArputilService,
             InetAddress srcAddr;
             GetMacOutputBuilder outputBuilder;
             String srcMac;
-            SettableFuture<RpcResult<GetMacOutput>> future = null;
-            RpcResultBuilder<GetMacOutput> resultBuilder;
             try {
-                srcAddr = InetAddress.getByAddress(arp
-                        .getSenderProtocolAddress());
-                srcMac = NWUtil.toStringMacAddress(arp
-                        .getSenderHardwareAddress());
-                future = getMacFutures.remove(srcAddr.getHostAddress());
+                srcAddr = InetAddress.getByAddress(arp.getSenderProtocolAddress());
+                srcMac = NWUtil.toStringMacAddress(arp.getSenderHardwareAddress());
+                SettableFuture<RpcResult<GetMacOutput>> future = macAddrs.remove(srcAddr.getHostAddress());
                 if (future == null) {
-                    LOGGER.trace("There are no pending mac requests.");
+                    LOG.trace("There are no pending mac requests.");
                     return;
                 }
-                outputBuilder = new GetMacOutputBuilder()
-                        .setMacaddress(new PhysAddress(srcMac));
-                resultBuilder = RpcResultBuilder.success(outputBuilder.build());
-                if (LOGGER.isTraceEnabled()) {
-                    LOGGER.trace("sent the mac response for ip {}",
-                            srcAddr.getHostAddress());
+                outputBuilder = new GetMacOutputBuilder().setMacaddress(new PhysAddress(srcMac));
+                future.set(RpcResultBuilder.success(outputBuilder.build()).build());
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("sent the mac response for ip {}", srcAddr.getHostAddress());
                 }
-            } catch (Exception e) {
-                LOGGER.trace("failed to send mac response {} ", e);
-                resultBuilder = RpcResultBuilder.<GetMacOutput> failed()
-                        .withError(ErrorType.APPLICATION, e.getMessage(), e);
+            } catch (UnknownHostException e) {
+                LOG.error("failed to send mac response", e);
             }
-            future.set(resultBuilder.build());
         }
     }
 
-    private void fireArpRespRecvdNotification(String interfaceName,
-                                              InetAddress inetAddr, byte[] macAddressBytes, int tableId)
-            throws InterruptedException {
-
-        IpAddress ip = new IpAddress(inetAddr.getHostAddress().toCharArray());
-        String macAddress = NWUtil.toStringMacAddress(macAddressBytes);
-        PhysAddress mac = new PhysAddress(macAddress);
+    private void fireArpRespRecvdNotification(String interfaceName, InetAddress srcInetAddr, byte[] srcMacAddressBytes,
+            BigInteger dpnId, int tableId, BigInteger metadata, InetAddress dstInetAddr, byte[] dstMacAddressBytes)
+                    throws InterruptedException {
+        arpRespRecvd.mark();
+
+        IpAddress srcIp = IetfInetUtil.INSTANCE.ipAddressFor(srcInetAddr);
+        IpAddress dstIp = IetfInetUtil.INSTANCE.ipAddressFor(dstInetAddr);
+        String srcMacAddress = NWUtil.toStringMacAddress(srcMacAddressBytes);
+        PhysAddress srcMac = new PhysAddress(srcMacAddress);
+        String dstMacAddress = NWUtil.toStringMacAddress(dstMacAddressBytes);
+        PhysAddress dstMac = new PhysAddress(dstMacAddress);
         ArpResponseReceivedBuilder builder = new ArpResponseReceivedBuilder();
         builder.setInterface(interfaceName);
-        builder.setIpaddress(ip);
+        builder.setSrcIpaddress(srcIp);
+        builder.setDpnId(dpnId);
         builder.setOfTableId((long) tableId);
-        builder.setMacaddress(mac);
-        notificationPublishService.putNotification(builder.build());
+        builder.setSrcMac(srcMac);
+        builder.setMetadata(metadata);
+        builder.setDstIpaddress(dstIp);
+        builder.setDstMac(dstMac);
+        ListenableFuture<?> offerNotification = notificationPublishService.offerNotification(builder.build());
+        if (offerNotification != null && offerNotification.equals(NotificationPublishService.REJECTED)) {
+            arpRespRecvdNotificationRejected.mark();
+
+        } else {
+            arpRespRecvdNotification.mark();
+        }
     }
 
-    private void fireArpReqRecvdNotification(String interfaceName,
-                                             InetAddress srcInetAddr, byte[] srcMac, InetAddress dstInetAddr,
-                                             int tableId) throws InterruptedException {
+    private void fireArpReqRecvdNotification(String interfaceName, InetAddress srcInetAddr, byte[] srcMac,
+            InetAddress dstInetAddr, BigInteger dpnId, int tableId, BigInteger metadata) throws InterruptedException {
+        arpReqRecvd.mark();
         String macAddress = NWUtil.toStringMacAddress(srcMac);
         ArpRequestReceivedBuilder builder = new ArpRequestReceivedBuilder();
         builder.setInterface(interfaceName);
+        builder.setDpnId(dpnId);
         builder.setOfTableId((long) tableId);
-        builder.setSrcIpaddress(new IpAddress(srcInetAddr.getHostAddress()
-                .toCharArray()));
-        builder.setDstIpaddress(new IpAddress(dstInetAddr.getHostAddress()
-                .toCharArray()));
+        builder.setSrcIpaddress(IetfInetUtil.INSTANCE.ipAddressFor(srcInetAddr));
+        builder.setDstIpaddress(IetfInetUtil.INSTANCE.ipAddressFor(dstInetAddr));
         builder.setSrcMac(new PhysAddress(macAddress));
-        notificationPublishService.putNotification(builder.build());
+        builder.setMetadata(metadata);
+        ListenableFuture<?> offerNotification = notificationPublishService.offerNotification(builder.build());
+        if (offerNotification != null && offerNotification.equals(NotificationPublishService.REJECTED)) {
+            arpReqRecvdNotificationRejected.mark();
+        } else {
+            arpReqRecvdNotification.mark();
+        }
     }
 
-    private void checkAndFireMacChangedNotification(String interfaceName,
-                                                    InetAddress inetAddr, byte[] macAddressBytes)
+    private void checkAndFireMacChangedNotification(String interfaceName, InetAddress inetAddr, byte[] macAddressBytes)
             throws InterruptedException {
 
-        IpAddress ip = new IpAddress(inetAddr.getHostAddress().toCharArray());
+        IpAddress ip = IetfInetUtil.INSTANCE.ipAddressFor(inetAddr);
         String macAddress = NWUtil.toStringMacAddress(macAddressBytes);
         PhysAddress mac = new PhysAddress(macAddress);
 
-        if (!macAddress.equals(macsDB.get(interfaceName + "-"
-                + inetAddr.getHostAddress()))) {
-            if (LOGGER.isTraceEnabled()) {
-                LOGGER.trace("mac address changed for " + inetAddr);
+        if (!macAddress.equals(macsDB.get(interfaceName + "-" + inetAddr.getHostAddress()))) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("mac address changed for {}", inetAddr);
             }
             MacChangedBuilder builder = new MacChangedBuilder();
             builder.setInterface(interfaceName);
@@ -532,33 +553,21 @@ public class ArpUtilImpl implements OdlArputilService,
         }
     }
 
-    private InstanceIdentifier<Interface> buildInterfaceId(String interfaceName) {
-        InstanceIdentifierBuilder<Interface> idBuilder = InstanceIdentifier
-                .builder(Interfaces.class).child(Interface.class,
-                        new InterfaceKey(interfaceName));
-        InstanceIdentifier<Interface> id = idBuilder.build();
-        return id;
-    }
-
+    private BigInteger extractDpnId(PacketReceived packetReceived) {
+        NodeKey nodeKey = packetReceived.getIngress().getValue().firstKeyOf(Node.class);
+        String nodeKeyString = nodeKey.getId().getValue();
 
-    private NodeConnectorId getNodeConnectorFromInterfaceName(String interfaceName) {
-        InstanceIdentifierBuilder<org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.interfaces.state.Interface> idBuilder =
-                InstanceIdentifier.builder(InterfacesState.class)
-                        .child(org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.interfaces.state.Interface.class,
-                                new org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.interfaces.state.InterfaceKey(interfaceName));
-        InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.interfaces.state.Interface> ifStateId = idBuilder.build();
+        if (!nodeKeyString.startsWith(OPENFLOW_PFX)) {
+            LOG.warn("Could not extract DPN for packet-in, doesn't start with 'openflow:' {}", packetReceived);
+            return null;
+        }
 
-        Optional<org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.interfaces.state.Interface> ifStateOptional = MDSALUtil.read(dataBroker,
-                LogicalDatastoreType.OPERATIONAL,
-                ifStateId);
+        return new BigInteger(nodeKeyString.substring(OPENFLOW_PFX.length()));
+    }
 
-        if (ifStateOptional.isPresent()) {
-            org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.interfaces.state.Interface ifState = ifStateOptional.get();
-            List<String> lowerLayerIf = ifState.getLowerLayerIf();
-            if (!lowerLayerIf.isEmpty()) {
-                return new NodeConnectorId(lowerLayerIf.get(0));
-            }
-        }
-        return null;
+    // TODO Replace this with mdsal's DataObjectUtils.nullToEmpty when upgrading to mdsal 3.0.2
+    @Nonnull
+    public static <T> List<T> nullToEmpty(final @Nullable List<T> input) {
+        return input != null ? input : emptyList();
     }
 }