X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Flldp-speaker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Flldpspeaker%2FLLDPSpeaker.java;h=aa653bf649e591908eb0100adf6eb572003c5ad8;hb=refs%2Fchanges%2F36%2F76236%2F27;hp=e0684923749c5f41db173fba71adb261629a12fb;hpb=9688b3b10ee4113753705f0080b29cd30e61a85d;p=openflowplugin.git diff --git a/applications/lldp-speaker/src/main/java/org/opendaylight/openflowplugin/applications/lldpspeaker/LLDPSpeaker.java b/applications/lldp-speaker/src/main/java/org/opendaylight/openflowplugin/applications/lldpspeaker/LLDPSpeaker.java index e068492374..aa653bf649 100644 --- a/applications/lldp-speaker/src/main/java/org/opendaylight/openflowplugin/applications/lldpspeaker/LLDPSpeaker.java +++ b/applications/lldp-speaker/src/main/java/org/opendaylight/openflowplugin/applications/lldpspeaker/LLDPSpeaker.java @@ -8,12 +8,21 @@ package org.opendaylight.openflowplugin.applications.lldpspeaker; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.security.NoSuchAlgorithmException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.opendaylight.infrautils.utils.concurrent.JdkFutures; +import org.opendaylight.openflowplugin.applications.deviceownershipservice.DeviceOwnershipService; +import org.opendaylight.openflowplugin.libraries.liblldp.PacketException; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.MacAddress; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId; @@ -25,8 +34,10 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.N import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService; import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.applications.lldp.speaker.rev141023.OperStatus; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,25 +45,43 @@ import org.slf4j.LoggerFactory; * Objects of this class send LLDP frames over all flow-capable ports that can * be discovered through inventory. */ -public class LLDPSpeaker implements AutoCloseable, NodeConnectorEventsObserver, Runnable { +public class LLDPSpeaker implements NodeConnectorEventsObserver, Runnable, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(LLDPSpeaker.class); - private static final long LLDP_FLOOD_PERIOD = 5; - private long currentFloodPeriod = LLDP_FLOOD_PERIOD; + private static final long LLDP_FLOOD_PERIOD = 5; + private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder() + .setNameFormat("lldp-speaker-%d").setDaemon(true).build(); private final PacketProcessingService packetProcessingService; private final ScheduledExecutorService scheduledExecutorService; - private final Map, TransmitPacketInput> nodeConnectorMap = new - ConcurrentHashMap<>(); - private ScheduledFuture scheduledSpeakerTask; + private final DeviceOwnershipService deviceOwnershipService; + private final Map, TransmitPacketInput> nodeConnectorMap = + new ConcurrentHashMap<>(); private final MacAddress addressDestionation; + private long currentFloodPeriod = LLDP_FLOOD_PERIOD; + private ScheduledFuture scheduledSpeakerTask; private volatile OperStatus operationalStatus = OperStatus.RUN; - public LLDPSpeaker(final PacketProcessingService packetProcessingService, final MacAddress addressDestionation) { - this(packetProcessingService, Executors.newSingleThreadScheduledExecutor(), addressDestionation); + public LLDPSpeaker(final PacketProcessingService packetProcessingService, final MacAddress addressDestionation, + final DeviceOwnershipService deviceOwnershipService) { + this(packetProcessingService, Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY), addressDestionation, + deviceOwnershipService); + } + + public LLDPSpeaker(final PacketProcessingService packetProcessingService, + final ScheduledExecutorService scheduledExecutorService, + final MacAddress addressDestionation, + final DeviceOwnershipService deviceOwnershipStatusService) { + this.addressDestionation = addressDestionation; + this.scheduledExecutorService = scheduledExecutorService; + this.deviceOwnershipService = deviceOwnershipStatusService; + scheduledSpeakerTask = this.scheduledExecutorService + .scheduleAtFixedRate(this, LLDP_FLOOD_PERIOD,LLDP_FLOOD_PERIOD, TimeUnit.SECONDS); + this.packetProcessingService = packetProcessingService; + LOG.info("LLDPSpeaker started, it will send LLDP frames each {} seconds", LLDP_FLOOD_PERIOD); } public void setOperationalStatus(final OperStatus operationalStatus) { - LOG.info("Setting operational status to {}", operationalStatus); + LOG.info("LLDP speaker operational status set to {}", operationalStatus); this.operationalStatus = operationalStatus; if (operationalStatus.equals(OperStatus.STANDBY)) { nodeConnectorMap.clear(); @@ -66,7 +95,8 @@ public class LLDPSpeaker implements AutoCloseable, NodeConnectorEventsObserver, public void setLldpFloodInterval(long time) { this.currentFloodPeriod = time; scheduledSpeakerTask.cancel(false); - scheduledSpeakerTask = this.scheduledExecutorService.scheduleAtFixedRate(this, time, time, TimeUnit.SECONDS); + scheduledSpeakerTask = this.scheduledExecutorService + .scheduleAtFixedRate(this, time, time, TimeUnit.SECONDS); LOG.info("LLDPSpeaker restarted, it will send LLDP frames each {} seconds", time); } @@ -74,24 +104,18 @@ public class LLDPSpeaker implements AutoCloseable, NodeConnectorEventsObserver, return currentFloodPeriod; } - public LLDPSpeaker(final PacketProcessingService packetProcessingService, - final ScheduledExecutorService scheduledExecutorService, final MacAddress addressDestionation) { - this.addressDestionation = addressDestionation; - this.scheduledExecutorService = scheduledExecutorService; - scheduledSpeakerTask = this.scheduledExecutorService.scheduleAtFixedRate(this, LLDP_FLOOD_PERIOD, - LLDP_FLOOD_PERIOD, TimeUnit.SECONDS); - this.packetProcessingService = packetProcessingService; - LOG.info("LLDPSpeaker started, it will send LLDP frames each {} seconds", LLDP_FLOOD_PERIOD); - } - /** * Closes this resource, relinquishing any underlying resources. */ @Override public void close() { nodeConnectorMap.clear(); - scheduledExecutorService.shutdown(); - scheduledSpeakerTask.cancel(true); + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdown(); + } + if (scheduledSpeakerTask != null) { + scheduledSpeakerTask.cancel(true); + } LOG.trace("LLDPSpeaker stopped sending LLDP frames."); } @@ -101,12 +125,19 @@ public class LLDPSpeaker implements AutoCloseable, NodeConnectorEventsObserver, @Override public void run() { if (OperStatus.RUN.equals(operationalStatus)) { - LOG.debug("Sending LLDP frames to {} ports...", nodeConnectorMap.keySet().size()); - for (InstanceIdentifier nodeConnectorInstanceId : nodeConnectorMap.keySet()) { - NodeConnectorId nodeConnectorId = InstanceIdentifier.keyOf(nodeConnectorInstanceId).getId(); - LOG.trace("Sending LLDP through port {}", nodeConnectorId.getValue()); - packetProcessingService.transmitPacket(nodeConnectorMap.get(nodeConnectorInstanceId)); - } + LOG.debug("Sending LLDP frames to total {} ports", getOwnedPorts()); + nodeConnectorMap.keySet().forEach(ncIID -> { + NodeConnectorId nodeConnectorId = InstanceIdentifier.keyOf(ncIID).getId(); + NodeId nodeId = ncIID.firstKeyOf(Node.class).getId(); + if (deviceOwnershipService.isEntityOwned(nodeId.getValue())) { + LOG.debug("Node is owned by this controller, sending LLDP packet through port {}", + nodeConnectorId.getValue()); + packetProcessingService.transmitPacket(nodeConnectorMap.get(ncIID)); + } else { + LOG.debug("Node {} is not owned by this controller, so skip sending LLDP packet on port {}", + nodeId.getValue(), nodeConnectorId.getValue()); + } + }); } } @@ -119,43 +150,65 @@ public class LLDPSpeaker implements AutoCloseable, NodeConnectorEventsObserver, // frames to // port, so first we check if we actually need to perform any action if (nodeConnectorMap.containsKey(nodeConnectorInstanceId)) { - LOG.trace("Port {} already in LLDPSpeaker.nodeConnectorMap, no need for additional processing", + LOG.debug("Port {} already in LLDPSpeaker.nodeConnectorMap, no need for additional processing", nodeConnectorId.getValue()); return; } - // Prepare to build LLDP payload InstanceIdentifier nodeInstanceId = nodeConnectorInstanceId.firstIdentifierOf(Node.class); NodeId nodeId = InstanceIdentifier.keyOf(nodeInstanceId).getId(); + if (!deviceOwnershipService.isEntityOwned(nodeId.getValue())) { + LOG.debug("Node {} is not owned by this controller, so skip sending LLDP packet on port {}", + nodeId.getValue(), nodeConnectorId.getValue()); + return; + } MacAddress srcMacAddress = flowConnector.getHardwareAddress(); Long outputPortNo = flowConnector.getPortNumber().getUint32(); // No need to send LLDP frames on local ports if (outputPortNo == null) { - LOG.trace("Port {} is local, not sending LLDP frames through it", nodeConnectorId.getValue()); + LOG.debug("Port {} is local, not sending LLDP frames through it", nodeConnectorId.getValue()); return; } // Generate packet with destination switch and port - TransmitPacketInput packet = new TransmitPacketInputBuilder() - .setEgress(new NodeConnectorRef(nodeConnectorInstanceId)) - .setNode(new NodeRef(nodeInstanceId)).setPayload(LLDPUtil.buildLldpFrame(nodeId, - nodeConnectorId, srcMacAddress, outputPortNo, addressDestionation)).build(); + TransmitPacketInput packet; + try { + packet = new TransmitPacketInputBuilder() + .setEgress(new NodeConnectorRef(nodeConnectorInstanceId)) + .setNode(new NodeRef(nodeInstanceId)).setPayload(LLDPUtil.buildLldpFrame(nodeId, + nodeConnectorId, srcMacAddress, outputPortNo, addressDestionation)).build(); + } catch (NoSuchAlgorithmException | PacketException e) { + LOG.error("Error building LLDP frame", e); + return; + } - // Save packet to node connector id -> packet map to transmit it every 5 - // seconds + // Save packet to node connector id -> packet map to transmit it periodically on the configured interval. nodeConnectorMap.put(nodeConnectorInstanceId, packet); - LOG.trace("Port {} added to LLDPSpeaker.nodeConnectorMap", nodeConnectorId.getValue()); + LOG.debug("Port {} added to LLDPSpeaker.nodeConnectorMap", nodeConnectorId.getValue()); // Transmit packet for first time immediately - packetProcessingService.transmitPacket(packet); + final Future> resultFuture = packetProcessingService.transmitPacket(packet); + JdkFutures.addErrorLogging(resultFuture, LOG, "transmitPacket"); } @Override public void nodeConnectorRemoved(final InstanceIdentifier nodeConnectorInstanceId) { + Preconditions.checkNotNull(nodeConnectorInstanceId); + nodeConnectorMap.remove(nodeConnectorInstanceId); NodeConnectorId nodeConnectorId = InstanceIdentifier.keyOf(nodeConnectorInstanceId).getId(); - LOG.trace("Port {} removed from LLDPSpeaker.nodeConnectorMap", nodeConnectorId.getValue()); + LOG.trace("Port removed from node-connector map : {}", nodeConnectorId.getValue()); } + private int getOwnedPorts() { + AtomicInteger ownedPorts = new AtomicInteger(); + nodeConnectorMap.keySet().forEach(ncIID -> { + NodeId nodeId = ncIID.firstKeyOf(Node.class).getId(); + if (deviceOwnershipService.isEntityOwned(nodeId.getValue())) { + ownedPorts.incrementAndGet(); + } + }); + return ownedPorts.get(); + } }