2 * Copyright (c) 2015 NEC Corporation
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this
7 * distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.vtn.manager.internal.packet;
12 import java.util.Iterator;
13 import java.util.TimerTask;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.ConcurrentMap;
16 import java.util.concurrent.CopyOnWriteArrayList;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.atomic.AtomicReference;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
23 import org.opendaylight.vtn.manager.internal.VTNManagerProvider;
24 import org.opendaylight.vtn.manager.internal.inventory.VTNInventoryListener;
25 import org.opendaylight.vtn.manager.internal.inventory.VtnNodeEvent;
26 import org.opendaylight.vtn.manager.internal.inventory.VtnPortEvent;
27 import org.opendaylight.vtn.manager.internal.util.SalNode;
28 import org.opendaylight.vtn.manager.internal.util.SalNotificationListener;
29 import org.opendaylight.vtn.manager.internal.util.SalPort;
30 import org.opendaylight.vtn.manager.internal.util.concurrent.FutureErrorCallback;
32 import org.opendaylight.controller.sal.binding.api.NotificationService;
34 import org.opendaylight.yangtools.yang.common.RpcResult;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketInputBuilder;
42 import org.opendaylight.controller.sal.packet.Packet;
43 import org.opendaylight.controller.sal.utils.EtherTypes;
46 * Provider of internal packet services.
48 public final class VTNPacketService extends SalNotificationListener
49 implements PacketProcessingListener, VTNInventoryListener {
53 private static final Logger LOG =
54 LoggerFactory.getLogger(VTNPacketService.class);
57 * VTN Manager provider service.
59 private final VTNManagerProvider vtnProvider;
62 * MD-SAL packet processing service.
64 private final AtomicReference<PacketProcessingService> packetService =
65 new AtomicReference<PacketProcessingService>();
68 * A list of VTN packet listeners.
70 private final CopyOnWriteArrayList<VTNPacketListener> listeners =
71 new CopyOnWriteArrayList<VTNPacketListener>();
74 * Keep nodes which are not in service yet.
77 * If a node is contained in this map, any packet from the node is
78 * ignored, and no packet is sent to the node.
81 private ConcurrentMap<SalNode, TimerTask> disabledNodes =
82 new ConcurrentHashMap<SalNode, TimerTask>();
85 * Construct a new instance.
87 * @param provider A VTN Manager provider service.
88 * @param nsv A {@link NotificationService} service instance.
90 public VTNPacketService(VTNManagerProvider provider,
91 NotificationService nsv) {
92 vtnProvider = provider;
95 // Get MD-SAL packet processing service.
96 PacketProcessingService pps = provider.
97 getRpcService(PacketProcessingService.class);
98 packetService.set(pps);
100 // Register MD-SAL notification listener.
101 registerListener(nsv);
102 } catch (Exception e) {
103 String msg = "Failed to initialize VTN packet service.";
106 throw new IllegalStateException(msg, e);
111 * Add the given VTN packet listener.
113 * @param l A VTN packet listener.
115 public void addListener(VTNPacketListener l) {
116 listeners.addIfAbsent(l);
120 * Transmit the given packet.
122 * @param egress A {@link SalPort} instance which specifies the egress
124 * @param packet A {@link Packet} instance to be transmitted.
126 public void transmit(SalPort egress, Packet packet) {
127 PacketProcessingService pps = packetService.get();
134 payload = packet.serialize();
135 } catch (Exception e) {
136 LOG.error("Failed to transmit packet via " + egress, e);
140 TransmitPacketInputBuilder builder = new TransmitPacketInputBuilder();
141 builder.setNode(egress.getNodeRef()).
142 setEgress(egress.getNodeConnectorRef()).
145 Future<RpcResult<Void>> f = pps.transmitPacket(builder.build());
146 FutureErrorCallback<RpcResult<Void>> cb = new FutureErrorCallback<>(
147 LOG, "Failed to transmit packet.");
148 vtnProvider.setCallback(f, cb);
152 * Add the given node to {@link #disabledNodes}.
155 * Neighbor node discovery may not be completed on a newly detected node.
156 * If a PACKET_OUT is sent to a port in a newly detected node, it may
157 * cause broadcast packet storm because the controller can not
158 * determine internal port in a node yet.
161 * That is why we should disable any packet service on a newly detected
165 * @param snode A newly detected node.
167 private void addDisabledNode(final SalNode snode) {
168 int edgeWait = vtnProvider.getVTNConfig().getNodeEdgeWait();
173 if (packetService.get() == null) {
174 // Packet service is already closed.
178 // Create a timer task to remove the node from disabledNodes.
179 TimerTask task = new TimerTask() {
182 if (disabledNodes.remove(snode) != null) {
183 LOG.info("{}: Start packet service", snode);
188 if (disabledNodes.putIfAbsent(snode, task) == null) {
189 vtnProvider.getTimer().schedule(task, edgeWait);
196 * Close the internal packet service.
199 public void close() {
202 packetService.set(null);
204 for (Iterator<TimerTask> it = disabledNodes.values().iterator();
206 TimerTask task = it.next();
212 // PacketProcessingListener
215 * Invoked when a PACKET_IN message has been notified.
217 * @param notification A PACKET_IN notification.
220 public void onPacketReceived(PacketReceived notification) {
221 if (notification == null) {
222 LOG.debug("Ignore null notification.");
226 NodeConnectorRef nref = notification.getIngress();
227 SalPort ingress = SalPort.create(nref);
228 if (ingress == null) {
229 LOG.trace("Ignore packet received on unsupported port: {}", nref);
233 SalNode snode = ingress.getSalNode();
234 if (disabledNodes.containsKey(snode)) {
235 LOG.trace("Ignore packet from disabled node: {}", snode);
239 PacketInEvent ev = null;
240 for (VTNPacketListener l: listeners) {
243 ev = new PacketInEvent(l, notification, ingress);
244 } catch (IllegalArgumentException e) {
245 LOG.debug("Ignore received packet: {}", e.getMessage());
247 } catch (Exception e) {
248 LOG.error("Failed to create PacketInEvent: " +
253 if (ev.getEthernet().getEtherType() ==
254 EtherTypes.LLDP.shortValue()) {
255 // Ignore LLDP packet.
259 ev = new PacketInEvent(l, ev);
262 vtnProvider.post(ev);
266 // SalNotificationListener
272 protected Logger getLogger() {
276 // VTNInventoryListener
282 public void notifyVtnNode(VtnNodeEvent ev) {
283 switch (ev.getUpdateType()) {
285 addDisabledNode(ev.getSalNode());
289 TimerTask task = disabledNodes.get(ev.getSalNode());
304 public void notifyVtnPort(VtnPortEvent ev) {