91683deca5f5f72e841b82f39721accaed61cd8a
[genius.git] / arputil / arputil-impl / src / main / java / org / opendaylight / genius / arputil / internal / ArpUtilImpl.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.genius.arputil.internal;
10
11 import static com.google.common.base.Preconditions.checkArgument;
12 import static com.google.common.base.Preconditions.checkNotNull;
13
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.io.UnsupportedEncodingException;
20 import java.math.BigInteger;
21 import java.net.InetAddress;
22 import java.net.UnknownHostException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Future;
30 import javax.inject.Inject;
31 import javax.inject.Singleton;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
34 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
35 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
36 import org.opendaylight.genius.arputil.api.ArpConstants;
37 import org.opendaylight.genius.mdsalutil.MDSALUtil;
38 import org.opendaylight.genius.mdsalutil.MetaDataUtil;
39 import org.opendaylight.genius.mdsalutil.NWUtil;
40 import org.opendaylight.genius.mdsalutil.packet.ARP;
41 import org.opendaylight.genius.mdsalutil.packet.Ethernet;
42 import org.opendaylight.infrautils.inject.AbstractLifecycle;
43 import org.opendaylight.infrautils.metrics.Meter;
44 import org.opendaylight.infrautils.metrics.MetricProvider;
45 import org.opendaylight.infrautils.utils.concurrent.Executors;
46 import org.opendaylight.openflowplugin.libraries.liblldp.HexEncode;
47 import org.opendaylight.openflowplugin.libraries.liblldp.NetUtils;
48 import org.opendaylight.openflowplugin.libraries.liblldp.Packet;
49 import org.opendaylight.openflowplugin.libraries.liblldp.PacketException;
50 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IetfInetUtil;
51 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
52 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.PhysAddress;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.ArpRequestReceivedBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.ArpResponseReceivedBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.GetMacInput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.GetMacOutput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.GetMacOutputBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.MacChangedBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.OdlArputilService;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.SendArpRequestInput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.SendArpRequestInputBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.SendArpRequestOutput;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.SendArpResponseInput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.SendArpResponseOutput;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.arputil.rev160406.interfaces.InterfaceAddress;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.GetEgressActionsForInterfaceInputBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.GetEgressActionsForInterfaceOutput;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.GetInterfaceFromIfIndexInput;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.GetInterfaceFromIfIndexInputBuilder;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.GetInterfaceFromIfIndexOutput;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.GetPortFromInterfaceInputBuilder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.GetPortFromInterfaceOutput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.rpcs.rev160406.OdlInterfaceRpcService;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.Metadata;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInReason;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.SendToController;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketInput;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketInputBuilder;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketOutput;
91 import org.opendaylight.yangtools.concepts.ListenerRegistration;
92 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
93 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
94 import org.opendaylight.yangtools.yang.common.RpcResult;
95 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
96 import org.slf4j.Logger;
97 import org.slf4j.LoggerFactory;
98
99 @Singleton
100 public class ArpUtilImpl extends AbstractLifecycle implements OdlArputilService, PacketProcessingListener {
101     private static final Logger LOG = LoggerFactory.getLogger(ArpUtilImpl.class);
102     private static final String MODULENAME = "odl.genius.arputil.";
103     private static final String OPENFLOW_PFX = "openflow:";
104
105     private final DataBroker dataBroker;
106     private final PacketProcessingService packetProcessingService;
107     private final NotificationPublishService notificationPublishService;
108     private final NotificationService notificationService;
109     private final OdlInterfaceRpcService odlInterfaceRpcService;
110     private ListenerRegistration<ArpUtilImpl> listenerRegistration;
111     private final ExecutorService threadPool = Executors.newFixedThreadPool(1, "ArpUtil", LOG);
112     private final ConcurrentMap<String, String> macsDB = new ConcurrentHashMap<>();
113     private final ConcurrentMap<String, SettableFuture<RpcResult<GetMacOutput>>> macAddrs = new ConcurrentHashMap<>();
114
115     private final Meter arpRespRecvd;
116     private final Meter arpRespRecvdNotification;
117     private final Meter arpRespRecvdNotificationRejected;
118     private final Meter arpReqRecvd;
119     private final Meter arpReqRecvdNotification;
120     private final Meter arpReqRecvdNotificationRejected;
121
122
123     @Inject
124     public ArpUtilImpl(final DataBroker dataBroker, final PacketProcessingService packetProcessingService,
125                        final NotificationPublishService notificationPublishService,
126                        final NotificationService notificationService,
127                        final OdlInterfaceRpcService odlInterfaceRpcService,
128                        final MetricProvider metricProvider) {
129         this.dataBroker = dataBroker;
130         this.packetProcessingService = packetProcessingService;
131         this.notificationPublishService = notificationPublishService;
132         this.notificationService = notificationService;
133         this.odlInterfaceRpcService = odlInterfaceRpcService;
134
135         arpRespRecvd = metricProvider.newMeter(this,MODULENAME + "arpResponseReceived");
136         arpRespRecvdNotification = metricProvider.newMeter(this,MODULENAME + "arpResponseReceivedNotification");
137         arpRespRecvdNotificationRejected = metricProvider.newMeter(this,
138                 MODULENAME + "arpResponseReceivedNotificationRejected");
139         arpReqRecvd = metricProvider.newMeter(this,MODULENAME + "arpRequestReceived");
140         arpReqRecvdNotification = metricProvider.newMeter(this,MODULENAME + "arpRequestReceivedNotification");
141         arpReqRecvdNotificationRejected = metricProvider.newMeter(this,
142                 MODULENAME + "arpRequestReceivedNotificationRejected");
143     }
144
145     @Override
146     public void start() {
147         LOG.info("{} start", getClass().getSimpleName());
148         listenerRegistration = notificationService.registerNotificationListener(this);
149     }
150
151     @Override
152     public void stop() {
153         LOG.info("{} stop", getClass().getSimpleName());
154
155         if (listenerRegistration != null) {
156             listenerRegistration.close();
157             listenerRegistration = null;
158         }
159     }
160
161     private String getIpAddressInString(IpAddress ipAddress) throws UnknownHostException {
162         return InetAddress.getByName(ipAddress.getIpv4Address().getValue()).getHostAddress();
163     }
164
165     @Override
166     public ListenableFuture<RpcResult<GetMacOutput>> getMac(GetMacInput input) {
167         try {
168             final String dstIpAddress = getIpAddressInString(input.getIpaddress());
169             LOG.trace("getMac rpc invoked for ip {}", dstIpAddress);
170             if (macAddrs.get(dstIpAddress) != null) {
171                 if (LOG.isInfoEnabled()) {
172                     LOG.info("get mac already in progress for the ip {}", dstIpAddress);
173                 }
174                 return macAddrs.get(dstIpAddress);
175             }
176             SendArpRequestInputBuilder builder = new SendArpRequestInputBuilder()
177                     .setInterfaceAddress(input.getInterfaceAddress()).setIpaddress(input.getIpaddress());
178             ListenableFuture<RpcResult<SendArpRequestOutput>> arpReqFt = sendArpRequest(builder.build());
179             final SettableFuture<RpcResult<GetMacOutput>> ft = SettableFuture.create();
180
181             Futures.addCallback(arpReqFt, new FutureCallback<RpcResult<SendArpRequestOutput>>() {
182                 @Override
183                 public void onFailure(Throwable ex) {
184                     RpcResultBuilder<GetMacOutput> resultBuilder = RpcResultBuilder.<GetMacOutput>failed()
185                             .withError(ErrorType.APPLICATION, ex.getMessage(), ex);
186                     ft.set(resultBuilder.build());
187                 }
188
189                 @Override
190                 public void onSuccess(RpcResult<SendArpRequestOutput> result) {
191                     LOG.trace("Successfully sent the arp pkt out for ip {}", dstIpAddress);
192                 }
193             }, MoreExecutors.directExecutor());
194
195             macAddrs.put(dstIpAddress, ft);
196             return ft;
197         } catch (UnknownHostException e) {
198             LOG.error("Failed to handle getMac request for {}", input.getIpaddress(), e);
199             RpcResultBuilder<GetMacOutput> resultBuilder = RpcResultBuilder.<GetMacOutput>failed()
200                     .withError(ErrorType.APPLICATION, e.getMessage(), e);
201             return Futures.immediateFuture(resultBuilder.build());
202         }
203     }
204
205     private byte[] getIpAddressBytes(IpAddress ip) throws UnknownHostException {
206         return InetAddress.getByName(ip.getIpv4Address().getValue()).getAddress();
207     }
208
209     @Override
210     public ListenableFuture<RpcResult<SendArpRequestOutput>> sendArpRequest(SendArpRequestInput arpReqInput) {
211         LOG.trace("rpc sendArpRequest invoked for ip {}", arpReqInput.getIpaddress());
212         BigInteger dpnId;
213         byte[] payload;
214         String interfaceName = null;
215         byte[] srcIpBytes;
216         byte[] dstIpBytes;
217         byte[] srcMac;
218
219         RpcResultBuilder<SendArpRequestOutput> failureBuilder = RpcResultBuilder.failed();
220         RpcResultBuilder<SendArpRequestOutput> successBuilder = RpcResultBuilder.success();
221
222         try {
223             dstIpBytes = getIpAddressBytes(arpReqInput.getIpaddress());
224         } catch (UnknownHostException e) {
225             LOG.error("Cannot get IP address", e);
226             failureBuilder.withError(ErrorType.APPLICATION, ArpConstants.UNKNOWN_IP_ADDRESS_SUPPLIED);
227             return Futures.immediateFuture(failureBuilder.build());
228         }
229
230         int localErrorCount = 0;
231         for (InterfaceAddress interfaceAddress : arpReqInput.getInterfaceAddress()) {
232             try {
233                 interfaceName = interfaceAddress.getInterface();
234                 srcIpBytes = getIpAddressBytes(interfaceAddress.getIpAddress());
235
236                 GetPortFromInterfaceOutput portResult = getPortFromInterface(interfaceName);
237                 checkNotNull(portResult);
238                 dpnId = portResult.getDpid();
239                 Long portid = portResult.getPortno();
240                 checkArgument(null != dpnId && !BigInteger.ZERO.equals(dpnId),
241                     ArpConstants.DPN_NOT_FOUND_ERROR, interfaceName);
242
243                 NodeConnectorRef ref = MDSALUtil.getNodeConnRef(dpnId, portid.toString());
244                 checkNotNull(ref, ArpConstants.NODE_CONNECTOR_NOT_FOUND_ERROR, interfaceName);
245
246                 LOG.trace("sendArpRequest received dpnId {} out interface {}", dpnId, interfaceName);
247                 if (interfaceAddress.getMacaddress() == null) {
248                     srcMac = MDSALUtil.getMacAddressForNodeConnector(dataBroker,
249                             (InstanceIdentifier<NodeConnector>) ref.getValue());
250                 } else {
251                     String macAddr = interfaceAddress.getMacaddress().getValue();
252                     srcMac = HexEncode.bytesFromHexString(macAddr);
253                 }
254                 checkNotNull(srcMac, ArpConstants.FAILED_TO_GET_SRC_MAC_FOR_INTERFACE, interfaceName, ref.getValue());
255                 checkNotNull(srcIpBytes, ArpConstants.FAILED_TO_GET_SRC_IP_FOR_INTERFACE, interfaceName);
256
257                 payload = ArpPacketUtil.getPayload(ArpConstants.ARP_REQUEST_OP, srcMac, srcIpBytes,
258                         ArpPacketUtil.ETHERNET_BROADCAST_DESTINATION, dstIpBytes);
259
260                 List<Action> actions = getEgressAction(interfaceName);
261                 sendPacketOutWithActions(dpnId, payload, ref, actions);
262
263                 LOG.trace("sent arp request for {}", arpReqInput.getIpaddress());
264             } catch (UnknownHostException | PacketException | InterruptedException | ExecutionException
265                     | ReadFailedException e) {
266                 LOG.trace("failed to send arp req for {} on interface {}", arpReqInput.getIpaddress(), interfaceName);
267
268                 failureBuilder.withError(ErrorType.APPLICATION,
269                     ArpConstants.FAILED_TO_SEND_ARP_REQ_FOR_INTERFACE + interfaceName, e);
270                 successBuilder.withError(ErrorType.APPLICATION,
271                     ArpConstants.FAILED_TO_SEND_ARP_REQ_FOR_INTERFACE + interfaceName, e);
272                 localErrorCount++;
273             }
274         }
275         if (localErrorCount == arpReqInput.getInterfaceAddress().size()) {
276             // All the requests failed
277             return Futures.immediateFuture(failureBuilder.build());
278         }
279         return Futures.immediateFuture(successBuilder.build());
280     }
281
282     public ListenableFuture<RpcResult<TransmitPacketOutput>> sendPacketOut(
283             BigInteger dpnId, byte[] payload, NodeConnectorRef ref) {
284         NodeConnectorRef nodeConnectorRef = MDSALUtil.getNodeConnRef(dpnId, "0xfffffffd");
285         return packetProcessingService.transmitPacket(new TransmitPacketInputBuilder().setPayload(payload)
286                 .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
287                         .child(Node.class, new NodeKey(new NodeId(OPENFLOW_PFX + dpnId))).build()))
288                 .setIngress(nodeConnectorRef).setEgress(ref).build());
289     }
290
291     private Future<RpcResult<TransmitPacketOutput>> sendPacketOutWithActions(
292             BigInteger dpnId, byte[] payload, NodeConnectorRef ref, List<Action> actions) {
293         NodeConnectorRef nodeConnectorRef = MDSALUtil.getNodeConnRef(dpnId, "0xfffffffd");
294         TransmitPacketInput transmitPacketInput = new TransmitPacketInputBuilder().setPayload(payload)
295                 .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
296                         .child(Node.class, new NodeKey(new NodeId(OPENFLOW_PFX + dpnId))).build()))
297                 .setIngress(nodeConnectorRef).setEgress(ref).setAction(actions).build();
298         LOG.trace("PacketOut message framed for transmitting {}", transmitPacketInput);
299         return packetProcessingService.transmitPacket(transmitPacketInput);
300     }
301
302     private List<Action> getEgressAction(String interfaceName) {
303         List<Action> actions = new ArrayList<>();
304         try {
305             GetEgressActionsForInterfaceInputBuilder egressAction = new GetEgressActionsForInterfaceInputBuilder()
306                     .setIntfName(interfaceName);
307             OdlInterfaceRpcService intfRpc = odlInterfaceRpcService;
308             if (intfRpc == null) {
309                 LOG.error("Unable to obtain interfaceMgrRpc service, ignoring egress actions for interfaceName {}",
310                         interfaceName);
311                 return actions;
312             }
313             Future<RpcResult<GetEgressActionsForInterfaceOutput>> result = intfRpc
314                     .getEgressActionsForInterface(egressAction.build());
315             RpcResult<GetEgressActionsForInterfaceOutput> rpcResult = result.get();
316             if (!rpcResult.isSuccessful()) {
317                 LOG.warn("RPC Call to Get egress actions for interface {} returned with Errors {}", interfaceName,
318                         rpcResult.getErrors());
319             } else {
320                 actions = rpcResult.getResult().getAction();
321             }
322         } catch (InterruptedException | ExecutionException e) {
323             LOG.error("Exception when egress actions for interface {}", interfaceName, e);
324         }
325         return actions;
326     }
327
328     @Override
329     public ListenableFuture<RpcResult<SendArpResponseOutput>> sendArpResponse(SendArpResponseInput input) {
330         LOG.trace("sendArpResponse rpc invoked");
331         BigInteger dpnId;
332         byte[] payload;
333         byte[] srcMac;
334
335         try {
336             String interfaceName = input.getInterface();
337             GetPortFromInterfaceOutput portResult = getPortFromInterface(interfaceName);
338             checkNotNull(portResult);
339             dpnId = portResult.getDpid();
340             Long portid = portResult.getPortno();
341             NodeConnectorRef ref = MDSALUtil.getNodeConnRef(dpnId, portid.toString());
342             checkArgument(null != dpnId && !BigInteger.ZERO.equals(dpnId),
343                 ArpConstants.DPN_NOT_FOUND_ERROR, interfaceName);
344             checkNotNull(ref, ArpConstants.NODE_CONNECTOR_NOT_FOUND_ERROR, interfaceName);
345
346             LOG.trace("sendArpRequest received dpnId {} out interface {}", dpnId, interfaceName);
347
348             byte[] srcIpBytes = getIpAddressBytes(input.getSrcIpaddress());
349             byte[] dstIpBytes = getIpAddressBytes(input.getDstIpaddress());
350             if (input.getSrcMacaddress() == null) {
351                 srcMac = portResult.getPhyAddress().getBytes("UTF-8");
352             } else {
353                 String macAddr = input.getSrcMacaddress().getValue();
354                 srcMac = HexEncode.bytesFromHexString(macAddr);
355             }
356             byte[] dstMac = NWUtil.parseMacAddress(input.getDstMacaddress().getValue());
357             checkNotNull(srcIpBytes, ArpConstants.FAILED_TO_GET_SRC_IP_FOR_INTERFACE, interfaceName);
358             payload = ArpPacketUtil.getPayload(ArpConstants.ARP_RESPONSE_OP, srcMac, srcIpBytes, dstMac, dstIpBytes);
359
360             List<Action> actions = getEgressAction(interfaceName);
361             sendPacketOutWithActions(dpnId, payload, ref, actions);
362             LOG.debug("Sent ARP response for IP {}, from source MAC {} to target MAC {} and target IP {} via dpnId {}",
363                     input.getSrcIpaddress().getIpv4Address().getValue(), HexEncode.bytesToHexStringFormat(srcMac),
364                     HexEncode.bytesToHexStringFormat(dstMac), input.getDstIpaddress().getIpv4Address().getValue(),
365                     dpnId);
366         } catch (UnknownHostException | PacketException | InterruptedException | UnsupportedEncodingException
367                 | ExecutionException e) {
368             LOG.error("failed to send arp response for {}: ", input.getSrcIpaddress(), e);
369             return RpcResultBuilder.<SendArpResponseOutput>failed()
370                     .withError(ErrorType.APPLICATION, e.getMessage(), e).buildFuture();
371         }
372         RpcResultBuilder<SendArpResponseOutput> rpcResultBuilder = RpcResultBuilder.success();
373         return Futures.immediateFuture(rpcResultBuilder.build());
374     }
375
376     @Override
377     public void onPacketReceived(PacketReceived packetReceived) {
378         Class<? extends PacketInReason> pktInReason = packetReceived.getPacketInReason();
379         LOG.trace("Packet Received {}", packetReceived);
380
381         if (pktInReason == SendToController.class) {
382             try {
383                 BigInteger dpnId = extractDpnId(packetReceived);
384                 int tableId = packetReceived.getTableId().getValue();
385
386                 byte[] data = packetReceived.getPayload();
387                 Ethernet ethernet = new Ethernet();
388
389                 ethernet.deserialize(data, 0, data.length * NetUtils.NUM_BITS_IN_A_BYTE);
390                 if (ethernet.getEtherType() != ArpConstants.ETH_TYPE_ARP) {
391                     return;
392                 }
393
394                 Packet pkt = ethernet.getPayload();
395                 ARP arp = (ARP) pkt;
396                 InetAddress srcInetAddr = InetAddress.getByAddress(arp.getSenderProtocolAddress());
397                 InetAddress dstInetAddr = InetAddress.getByAddress(arp.getTargetProtocolAddress());
398                 byte[] srcMac = ethernet.getSourceMACAddress();
399                 byte[] dstMac = ethernet.getDestinationMACAddress();
400
401                 Metadata metadata = packetReceived.getMatch().getMetadata();
402
403                 String interfaceName = getInterfaceName(metadata);
404
405                 checkAndFireMacChangedNotification(interfaceName, srcInetAddr, srcMac);
406                 macsDB.put(interfaceName + "-" + srcInetAddr.getHostAddress(), NWUtil.toStringMacAddress(srcMac));
407                 if (arp.getOpCode() == ArpConstants.ARP_REQUEST_OP) {
408                     fireArpReqRecvdNotification(interfaceName, srcInetAddr, srcMac, dstInetAddr, dpnId, tableId,
409                             metadata.getMetadata());
410                 } else {
411                     fireArpRespRecvdNotification(interfaceName, srcInetAddr, srcMac, dpnId, tableId,
412                                                  metadata.getMetadata(), dstInetAddr, dstMac);
413                 }
414                 if (macAddrs.get(srcInetAddr.getHostAddress()) != null) {
415                     threadPool.execute(new MacResponderTask(arp));
416                 }
417             } catch (PacketException | UnknownHostException | InterruptedException | ExecutionException e) {
418                 LOG.trace("Failed to decode packet", e);
419             }
420         }
421     }
422
423     private GetPortFromInterfaceOutput getPortFromInterface(String interfaceName)
424             throws InterruptedException, ExecutionException {
425         GetPortFromInterfaceInputBuilder getPortFromInterfaceInputBuilder = new GetPortFromInterfaceInputBuilder();
426         getPortFromInterfaceInputBuilder.setIntfName(interfaceName);
427
428         Future<RpcResult<GetPortFromInterfaceOutput>> portFromInterface = odlInterfaceRpcService
429                 .getPortFromInterface(getPortFromInterfaceInputBuilder.build());
430         GetPortFromInterfaceOutput result = portFromInterface.get().getResult();
431         LOG.trace("getPortFromInterface rpc result is {} ", result);
432         if (result != null) {
433             LOG.trace("getPortFromInterface rpc result is {} {} ", result.getDpid(), result.getPortno());
434         }
435         return result;
436     }
437
438     private String getInterfaceName(Metadata metadata)
439             throws InterruptedException, ExecutionException {
440         LOG.debug("metadata received is {} ", metadata);
441
442         GetInterfaceFromIfIndexInputBuilder ifIndexInputBuilder = new GetInterfaceFromIfIndexInputBuilder();
443         BigInteger lportTag = MetaDataUtil.getLportFromMetadata(metadata.getMetadata());
444
445         ifIndexInputBuilder.setIfIndex(lportTag.intValue());
446         GetInterfaceFromIfIndexInput input = ifIndexInputBuilder.build();
447
448         Future<RpcResult<GetInterfaceFromIfIndexOutput>> interfaceFromIfIndex = odlInterfaceRpcService
449                 .getInterfaceFromIfIndex(input);
450         GetInterfaceFromIfIndexOutput interfaceFromIfIndexOutput = interfaceFromIfIndex.get().getResult();
451         return interfaceFromIfIndexOutput.getInterfaceName();
452     }
453
454     private class MacResponderTask implements Runnable {
455         final ARP arp;
456
457         MacResponderTask(ARP arp) {
458             this.arp = arp;
459         }
460
461         @Override
462         public void run() {
463             InetAddress srcAddr;
464             GetMacOutputBuilder outputBuilder;
465             String srcMac;
466             try {
467                 srcAddr = InetAddress.getByAddress(arp.getSenderProtocolAddress());
468                 srcMac = NWUtil.toStringMacAddress(arp.getSenderHardwareAddress());
469                 SettableFuture<RpcResult<GetMacOutput>> future = macAddrs.remove(srcAddr.getHostAddress());
470                 if (future == null) {
471                     LOG.trace("There are no pending mac requests.");
472                     return;
473                 }
474                 outputBuilder = new GetMacOutputBuilder().setMacaddress(new PhysAddress(srcMac));
475                 future.set(RpcResultBuilder.success(outputBuilder.build()).build());
476                 if (LOG.isTraceEnabled()) {
477                     LOG.trace("sent the mac response for ip {}", srcAddr.getHostAddress());
478                 }
479             } catch (UnknownHostException e) {
480                 LOG.error("failed to send mac response", e);
481             }
482         }
483     }
484
485     private void fireArpRespRecvdNotification(String interfaceName, InetAddress srcInetAddr, byte[] srcMacAddressBytes,
486             BigInteger dpnId, int tableId, BigInteger metadata, InetAddress dstInetAddr, byte[] dstMacAddressBytes)
487                     throws InterruptedException {
488         arpRespRecvd.mark();
489
490         IpAddress srcIp = IetfInetUtil.INSTANCE.ipAddressFor(srcInetAddr);
491         IpAddress dstIp = IetfInetUtil.INSTANCE.ipAddressFor(dstInetAddr);
492         String srcMacAddress = NWUtil.toStringMacAddress(srcMacAddressBytes);
493         PhysAddress srcMac = new PhysAddress(srcMacAddress);
494         String dstMacAddress = NWUtil.toStringMacAddress(dstMacAddressBytes);
495         PhysAddress dstMac = new PhysAddress(dstMacAddress);
496         ArpResponseReceivedBuilder builder = new ArpResponseReceivedBuilder();
497         builder.setInterface(interfaceName);
498         builder.setSrcIpaddress(srcIp);
499         builder.setDpnId(dpnId);
500         builder.setOfTableId((long) tableId);
501         builder.setSrcMac(srcMac);
502         builder.setMetadata(metadata);
503         builder.setDstIpaddress(dstIp);
504         builder.setDstMac(dstMac);
505         ListenableFuture<?> offerNotification = notificationPublishService.offerNotification(builder.build());
506         if (offerNotification != null && offerNotification.equals(NotificationPublishService.REJECTED)) {
507             arpRespRecvdNotificationRejected.mark();
508
509         } else {
510             arpRespRecvdNotification.mark();
511         }
512     }
513
514     private void fireArpReqRecvdNotification(String interfaceName, InetAddress srcInetAddr, byte[] srcMac,
515             InetAddress dstInetAddr, BigInteger dpnId, int tableId, BigInteger metadata) throws InterruptedException {
516         arpReqRecvd.mark();
517         String macAddress = NWUtil.toStringMacAddress(srcMac);
518         ArpRequestReceivedBuilder builder = new ArpRequestReceivedBuilder();
519         builder.setInterface(interfaceName);
520         builder.setDpnId(dpnId);
521         builder.setOfTableId((long) tableId);
522         builder.setSrcIpaddress(IetfInetUtil.INSTANCE.ipAddressFor(srcInetAddr));
523         builder.setDstIpaddress(IetfInetUtil.INSTANCE.ipAddressFor(dstInetAddr));
524         builder.setSrcMac(new PhysAddress(macAddress));
525         builder.setMetadata(metadata);
526         ListenableFuture<?> offerNotification = notificationPublishService.offerNotification(builder.build());
527         if (offerNotification != null && offerNotification.equals(NotificationPublishService.REJECTED)) {
528             arpReqRecvdNotificationRejected.mark();
529         } else {
530             arpReqRecvdNotification.mark();
531         }
532     }
533
534     private void checkAndFireMacChangedNotification(String interfaceName, InetAddress inetAddr, byte[] macAddressBytes)
535             throws InterruptedException {
536
537         IpAddress ip = IetfInetUtil.INSTANCE.ipAddressFor(inetAddr);
538         String macAddress = NWUtil.toStringMacAddress(macAddressBytes);
539         PhysAddress mac = new PhysAddress(macAddress);
540
541         if (!macAddress.equals(macsDB.get(interfaceName + "-" + inetAddr.getHostAddress()))) {
542             if (LOG.isTraceEnabled()) {
543                 LOG.trace("mac address changed for {}", inetAddr);
544             }
545             MacChangedBuilder builder = new MacChangedBuilder();
546             builder.setInterface(interfaceName);
547             builder.setIpaddress(ip);
548             builder.setMacaddress(mac);
549             notificationPublishService.putNotification(builder.build());
550         }
551     }
552
553     private BigInteger extractDpnId(PacketReceived packetReceived) {
554         NodeKey nodeKey = packetReceived.getIngress().getValue().firstKeyOf(Node.class);
555         String nodeKeyString = nodeKey.getId().getValue();
556
557         if (!nodeKeyString.startsWith(OPENFLOW_PFX)) {
558             LOG.warn("Could not extract DPN for packet-in, doesn't start with 'openflow:' {}", packetReceived);
559             return null;
560         }
561
562         return new BigInteger(nodeKeyString.substring(OPENFLOW_PFX.length()));
563     }
564 }