Use ByteBuf.readRetainedSlice()
[openflowplugin.git] / applications / lldp-speaker / src / main / java / org / opendaylight / openflowplugin / applications / lldpspeaker / LLDPSpeaker.java
1 /*
2  * Copyright (c) 2014 Pacnet and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.lldpspeaker;
9
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.infrautils.utils.concurrent.LoggingFutures.addErrorLogging;
12
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ThreadFactoryBuilder;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.ThreadFactory;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import org.opendaylight.mdsal.binding.api.RpcProviderService;
25 import org.opendaylight.mdsal.binding.api.RpcService;
26 import org.opendaylight.openflowplugin.applications.deviceownershipservice.DeviceOwnershipService;
27 import org.opendaylight.openflowplugin.libraries.liblldp.PacketException;
28 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.MacAddress;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacket;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketInputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.applications.lldp.speaker.config.rev160512.LldpSpeakerConfig;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.applications.lldp.speaker.rev141023.ChangeOperationalStatus;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.applications.lldp.speaker.rev141023.ChangeOperationalStatusInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.applications.lldp.speaker.rev141023.ChangeOperationalStatusOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.applications.lldp.speaker.rev141023.GetLldpFloodInterval;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.applications.lldp.speaker.rev141023.GetLldpFloodIntervalInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.applications.lldp.speaker.rev141023.GetLldpFloodIntervalOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.applications.lldp.speaker.rev141023.GetLldpFloodIntervalOutputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.applications.lldp.speaker.rev141023.GetOperationalStatus;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.applications.lldp.speaker.rev141023.GetOperationalStatusInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.applications.lldp.speaker.rev141023.GetOperationalStatusOutput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.applications.lldp.speaker.rev141023.GetOperationalStatusOutputBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.applications.lldp.speaker.rev141023.OperStatus;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.applications.lldp.speaker.rev141023.SetLldpFloodInterval;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.applications.lldp.speaker.rev141023.SetLldpFloodIntervalInput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.applications.lldp.speaker.rev141023.SetLldpFloodIntervalOutput;
55 import org.opendaylight.yangtools.concepts.Registration;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.opendaylight.yangtools.yang.common.RpcResult;
58 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
59 import org.opendaylight.yangtools.yang.common.Uint32;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 /**
64  * Objects of this class send LLDP frames over all flow-capable ports that can be discovered through inventory.
65  */
66 public final class LLDPSpeaker implements NodeConnectorEventsObserver, Runnable, AutoCloseable {
67     private static final Logger LOG = LoggerFactory.getLogger(LLDPSpeaker.class);
68     private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder()
69         .setNameFormat("lldp-speaker-%d")
70         .setDaemon(true)
71         .build();
72
73     private static final long LLDP_FLOOD_PERIOD = 5;
74
75     private final ConcurrentMap<InstanceIdentifier<NodeConnector>, TransmitPacketInput> nodeConnectorMap =
76         new ConcurrentHashMap<>();
77     private final ScheduledExecutorService scheduledExecutorService;
78     private final DeviceOwnershipService deviceOwnershipService;
79     private final MacAddress addressDestination;
80     private final TransmitPacket transmitPacket;
81     private final Registration registration;
82
83     private long currentFloodPeriod = LLDP_FLOOD_PERIOD;
84     private ScheduledFuture<?> scheduledSpeakerTask;
85
86     private volatile OperStatus operationalStatus = OperStatus.RUN;
87
88     public LLDPSpeaker(final DeviceOwnershipService deviceOwnershipService, final RpcService rpcService,
89             final RpcProviderService rpcProviderService, final LldpSpeakerConfig config) {
90         this(Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY), deviceOwnershipService, rpcService,
91             rpcProviderService, config.getAddressDestination());
92     }
93
94     @VisibleForTesting
95     LLDPSpeaker(final ScheduledExecutorService scheduledExecutorService,
96             final DeviceOwnershipService deviceOwnershipService, final RpcService rpcService,
97             final RpcProviderService rpcProviderService, final MacAddress addressDestination) {
98         this.scheduledExecutorService = requireNonNull(scheduledExecutorService);
99         this.deviceOwnershipService = requireNonNull(deviceOwnershipService);
100         this.addressDestination = addressDestination;
101         transmitPacket = rpcService.getRpc(TransmitPacket.class);
102
103         scheduledSpeakerTask = scheduledExecutorService.scheduleAtFixedRate(this, LLDP_FLOOD_PERIOD, LLDP_FLOOD_PERIOD,
104             TimeUnit.SECONDS);
105         registration = rpcProviderService.registerRpcImplementations(
106             (GetLldpFloodInterval) this::getLldpFloodInterval,
107             (GetOperationalStatus) this::getOperationalStatus,
108             (SetLldpFloodInterval) this::setLldpFloodInterval,
109             (ChangeOperationalStatus) this::changeOperationalStatus);
110         LOG.info("LLDPSpeaker started, it will send LLDP frames each {} seconds", LLDP_FLOOD_PERIOD);
111     }
112
113     /**
114      * Closes this resource, relinquishing any underlying resources.
115      */
116     @Override
117     public synchronized void close() {
118         registration.close();
119         scheduledSpeakerTask.cancel(true);
120         scheduledExecutorService.shutdown();
121         nodeConnectorMap.clear();
122         LOG.info("LLDPSpeaker stopped sending LLDP frames.");
123     }
124
125     private synchronized ListenableFuture<RpcResult<GetLldpFloodIntervalOutput>> getLldpFloodInterval(
126             final GetLldpFloodIntervalInput intput) {
127         return RpcResultBuilder.<GetLldpFloodIntervalOutput>success()
128             .withResult(new GetLldpFloodIntervalOutputBuilder().setInterval(currentFloodPeriod).build())
129             .buildFuture();
130     }
131
132     private ListenableFuture<RpcResult<ChangeOperationalStatusOutput>> changeOperationalStatus(
133             final ChangeOperationalStatusInput input) {
134         changeOperationalStatus(input.requireOperationalStatus());
135         return RpcResultBuilder.<ChangeOperationalStatusOutput>success().buildFuture();
136     }
137
138     synchronized void changeOperationalStatus(final OperStatus newStatus) {
139         LOG.info("LLDP speaker operational status set to {}", newStatus);
140         operationalStatus = newStatus;
141         if (newStatus.equals(OperStatus.STANDBY)) {
142             nodeConnectorMap.clear();
143         }
144     }
145
146     private ListenableFuture<RpcResult<GetOperationalStatusOutput>> getOperationalStatus(
147             final GetOperationalStatusInput input) {
148         return RpcResultBuilder.<GetOperationalStatusOutput>success()
149             .withResult(new GetOperationalStatusOutputBuilder()
150             .setOperationalStatus(operationalStatus)
151             .build())
152             .buildFuture();
153     }
154
155     private synchronized ListenableFuture<RpcResult<SetLldpFloodIntervalOutput>> setLldpFloodInterval(
156             final SetLldpFloodIntervalInput input) {
157         final long time = input.requireInterval();
158         currentFloodPeriod = time;
159         scheduledSpeakerTask.cancel(false);
160         scheduledSpeakerTask = scheduledExecutorService.scheduleAtFixedRate(this, time, time, TimeUnit.SECONDS);
161         LOG.info("LLDPSpeaker restarted, it will send LLDP frames each {} seconds", time);
162         return RpcResultBuilder.<SetLldpFloodIntervalOutput>success().buildFuture();
163     }
164
165     /**
166      * Send LLDPDU frames to all known openflow switch ports.
167      */
168     @Override
169     public void run() {
170         if (OperStatus.RUN.equals(operationalStatus)) {
171             LOG.debug("Sending LLDP frames to total {} ports", getOwnedPorts());
172             nodeConnectorMap.keySet().forEach(ncIID -> {
173                 final var nodeConnectorId = InstanceIdentifier.keyOf(ncIID).getId();
174                 final var nodeId = ncIID.firstKeyOf(Node.class).getId();
175                 if (deviceOwnershipService.isEntityOwned(nodeId.getValue())) {
176                     LOG.debug("Node is owned by this controller, sending LLDP packet through port {}",
177                             nodeConnectorId.getValue());
178                     addErrorLogging(transmitPacket.invoke(nodeConnectorMap.get(ncIID)), LOG,
179                             "transmitPacket() failed");
180                 } else {
181                     LOG.debug("Node {} is not owned by this controller, so skip sending LLDP packet on port {}",
182                             nodeId.getValue(), nodeConnectorId.getValue());
183                 }
184             });
185         }
186     }
187
188     @Override
189     public void nodeConnectorAdded(final InstanceIdentifier<NodeConnector> nodeConnectorInstanceId,
190             final FlowCapableNodeConnector flowConnector) {
191         final var nodeConnectorId = InstanceIdentifier.keyOf(nodeConnectorInstanceId).getId();
192
193         // nodeConnectorAdded can be called even if we already sending LLDP
194         // frames to
195         // port, so first we check if we actually need to perform any action
196         if (nodeConnectorMap.containsKey(nodeConnectorInstanceId)) {
197             LOG.debug("Port {} already in LLDPSpeaker.nodeConnectorMap, no need for additional processing",
198                     nodeConnectorId.getValue());
199             return;
200         }
201         // Prepare to build LLDP payload
202         InstanceIdentifier<Node> nodeInstanceId = nodeConnectorInstanceId.firstIdentifierOf(Node.class);
203         NodeId nodeId = InstanceIdentifier.keyOf(nodeInstanceId).getId();
204         if (!deviceOwnershipService.isEntityOwned(nodeId.getValue())) {
205             LOG.debug("Node {} is not owned by this controller, so skip sending LLDP packet on port {}",
206                     nodeId.getValue(), nodeConnectorId.getValue());
207             return;
208         }
209         MacAddress srcMacAddress = flowConnector.getHardwareAddress();
210         Uint32 outputPortNo = flowConnector.getPortNumber().getUint32();
211
212         // No need to send LLDP frames on local ports
213         if (outputPortNo == null) {
214             LOG.debug("Port {} is local, not sending LLDP frames through it", nodeConnectorId.getValue());
215             return;
216         }
217
218         // Generate packet with destination switch and port
219         TransmitPacketInput packet;
220         try {
221             packet = new TransmitPacketInputBuilder()
222                 .setEgress(new NodeConnectorRef(nodeConnectorInstanceId))
223                 .setNode(new NodeRef(nodeInstanceId))
224                 .setPayload(
225                     LLDPUtil.buildLldpFrame(nodeId, nodeConnectorId, srcMacAddress, outputPortNo, addressDestination))
226                 .build();
227         } catch (PacketException e) {
228             LOG.error("Error building LLDP frame", e);
229             return;
230         }
231
232         // Save packet to node connector id -> packet map to transmit it periodically on the configured interval.
233         nodeConnectorMap.put(nodeConnectorInstanceId, packet);
234         LOG.debug("Port {} added to LLDPSpeaker.nodeConnectorMap", nodeConnectorId.getValue());
235
236         // Transmit packet for first time immediately
237         addErrorLogging(transmitPacket.invoke(packet), LOG, "transmitPacket");
238     }
239
240     @Override
241     public void nodeConnectorRemoved(final InstanceIdentifier<NodeConnector> nodeConnectorInstanceId) {
242         nodeConnectorMap.remove(requireNonNull(nodeConnectorInstanceId));
243         NodeConnectorId nodeConnectorId = InstanceIdentifier.keyOf(nodeConnectorInstanceId).getId();
244         LOG.trace("Port removed from node-connector map : {}", nodeConnectorId.getValue());
245     }
246
247     private int getOwnedPorts() {
248         AtomicInteger ownedPorts = new AtomicInteger();
249         nodeConnectorMap.keySet().forEach(ncIID -> {
250             NodeId nodeId = ncIID.firstKeyOf(Node.class).getId();
251             if (deviceOwnershipService.isEntityOwned(nodeId.getValue())) {
252                 ownedPorts.incrementAndGet();
253             }
254         });
255         return ownedPorts.get();
256     }
257 }