X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Flldp-speaker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Flldpspeaker%2FLLDPSpeaker.java;h=d27b137d3d40e5a2cb8cc644d0a08b2e2eccbf90;hb=f913888c89b58874b1576d6b9d426effbc3b12ee;hp=5c0f71fe923e4f6a72faa032998a6952da9a4b19;hpb=75e6e27ac8626955508a2cf3f0f0c7638c53fe46;p=openflowplugin.git diff --git a/applications/lldp-speaker/src/main/java/org/opendaylight/openflowplugin/applications/lldpspeaker/LLDPSpeaker.java b/applications/lldp-speaker/src/main/java/org/opendaylight/openflowplugin/applications/lldpspeaker/LLDPSpeaker.java index 5c0f71fe92..d27b137d3d 100644 --- a/applications/lldp-speaker/src/main/java/org/opendaylight/openflowplugin/applications/lldpspeaker/LLDPSpeaker.java +++ b/applications/lldp-speaker/src/main/java/org/opendaylight/openflowplugin/applications/lldpspeaker/LLDPSpeaker.java @@ -8,12 +8,22 @@ package org.opendaylight.openflowplugin.applications.lldpspeaker; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.opendaylight.infrautils.utils.concurrent.JdkFutures; +import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService; +import org.opendaylight.openflowplugin.libraries.liblldp.PacketException; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.MacAddress; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId; @@ -22,11 +32,14 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService; import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.applications.lldp.speaker.rev141023.OperStatus; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,24 +47,43 @@ import org.slf4j.LoggerFactory; * Objects of this class send LLDP frames over all flow-capable ports that can * be discovered through inventory. */ -public class LLDPSpeaker implements AutoCloseable, NodeConnectorEventsObserver, Runnable { +public class LLDPSpeaker implements NodeConnectorEventsObserver, Runnable, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(LLDPSpeaker.class); - private static final long LLDP_FLOOD_PERIOD = 5; + private static final long LLDP_FLOOD_PERIOD = 5; + private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder() + .setNameFormat("lldp-speaker-%d").setDaemon(true).build(); private final PacketProcessingService packetProcessingService; private final ScheduledExecutorService scheduledExecutorService; + private final DeviceOwnershipStatusService deviceOwnershipStatusService; private final Map, TransmitPacketInput> nodeConnectorMap = new ConcurrentHashMap<>(); - private final ScheduledFuture scheduledSpeakerTask; private final MacAddress addressDestionation; + private long currentFloodPeriod = LLDP_FLOOD_PERIOD; + private ScheduledFuture scheduledSpeakerTask; private volatile OperStatus operationalStatus = OperStatus.RUN; - public LLDPSpeaker(final PacketProcessingService packetProcessingService, final MacAddress addressDestionation) { - this(packetProcessingService, Executors.newSingleThreadScheduledExecutor(), addressDestionation); + public LLDPSpeaker(final PacketProcessingService packetProcessingService, final MacAddress addressDestionation, + final EntityOwnershipService entityOwnershipService) { + this(packetProcessingService, Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY), addressDestionation, + entityOwnershipService); + } + + public LLDPSpeaker(final PacketProcessingService packetProcessingService, + final ScheduledExecutorService scheduledExecutorService, + final MacAddress addressDestionation, + final EntityOwnershipService entityOwnershipService) { + this.addressDestionation = addressDestionation; + this.scheduledExecutorService = scheduledExecutorService; + this.deviceOwnershipStatusService = new DeviceOwnershipStatusService(entityOwnershipService); + scheduledSpeakerTask = this.scheduledExecutorService + .scheduleAtFixedRate(this, LLDP_FLOOD_PERIOD,LLDP_FLOOD_PERIOD, TimeUnit.SECONDS); + this.packetProcessingService = packetProcessingService; + LOG.info("LLDPSpeaker started, it will send LLDP frames each {} seconds", LLDP_FLOOD_PERIOD); } public void setOperationalStatus(final OperStatus operationalStatus) { - LOG.info("Setting operational status to {}", operationalStatus); + LOG.info("LLDP speaker operational status set to {}", operationalStatus); this.operationalStatus = operationalStatus; if (operationalStatus.equals(OperStatus.STANDBY)) { nodeConnectorMap.clear(); @@ -62,15 +94,16 @@ public class LLDPSpeaker implements AutoCloseable, NodeConnectorEventsObserver, return operationalStatus; } - public LLDPSpeaker(final PacketProcessingService packetProcessingService, - final ScheduledExecutorService scheduledExecutorService, - final MacAddress addressDestionation) { - this.addressDestionation = addressDestionation; - this.scheduledExecutorService = scheduledExecutorService; + public void setLldpFloodInterval(long time) { + this.currentFloodPeriod = time; + scheduledSpeakerTask.cancel(false); scheduledSpeakerTask = this.scheduledExecutorService - .scheduleAtFixedRate(this, LLDP_FLOOD_PERIOD,LLDP_FLOOD_PERIOD, TimeUnit.SECONDS); - this.packetProcessingService = packetProcessingService; - LOG.info("LLDPSpeaker started, it will send LLDP frames each {} seconds", LLDP_FLOOD_PERIOD); + .scheduleAtFixedRate(this, time, time, TimeUnit.SECONDS); + LOG.info("LLDPSpeaker restarted, it will send LLDP frames each {} seconds", time); + } + + public long getLldpFloodInterval() { + return currentFloodPeriod; } /** @@ -79,8 +112,12 @@ public class LLDPSpeaker implements AutoCloseable, NodeConnectorEventsObserver, @Override public void close() { nodeConnectorMap.clear(); - scheduledExecutorService.shutdown(); - scheduledSpeakerTask.cancel(true); + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdown(); + } + if (scheduledSpeakerTask != null) { + scheduledSpeakerTask.cancel(true); + } LOG.trace("LLDPSpeaker stopped sending LLDP frames."); } @@ -90,18 +127,24 @@ public class LLDPSpeaker implements AutoCloseable, NodeConnectorEventsObserver, @Override public void run() { if (OperStatus.RUN.equals(operationalStatus)) { - LOG.debug("Sending LLDP frames to {} ports...", nodeConnectorMap.keySet().size()); - for (InstanceIdentifier nodeConnectorInstanceId : nodeConnectorMap.keySet()) { - NodeConnectorId nodeConnectorId = InstanceIdentifier.keyOf(nodeConnectorInstanceId).getId(); - LOG.trace("Sending LLDP through port {}", nodeConnectorId.getValue()); - packetProcessingService.transmitPacket(nodeConnectorMap.get(nodeConnectorInstanceId)); - } + LOG.debug("Sending LLDP frames to nodes {}", Arrays.toString(deviceOwnershipStatusService + .getOwnedNodes().toArray())); + LOG.debug("Sending LLDP frames to total {} ports", getOwnedPorts()); + nodeConnectorMap.keySet().forEach(ncIID -> { + NodeConnectorId nodeConnectorId = InstanceIdentifier.keyOf(ncIID).getId(); + NodeId nodeId = ncIID.firstKeyOf(Node.class, NodeKey.class).getId(); + if (deviceOwnershipStatusService.isEntityOwned(nodeId.getValue())) { + LOG.debug("Node is owned by this controller, sending LLDP packet through port {}", + nodeConnectorId.getValue()); + packetProcessingService.transmitPacket(nodeConnectorMap.get(ncIID)); + } else { + LOG.trace("Node {} is not owned by this controller, so skip sending LLDP packet on port {}", + nodeId.getValue(), nodeConnectorId.getValue()); + } + }); } } - /** - * {@inheritDoc} - */ @Override public void nodeConnectorAdded(final InstanceIdentifier nodeConnectorInstanceId, final FlowCapableNodeConnector flowConnector) { @@ -111,8 +154,7 @@ public class LLDPSpeaker implements AutoCloseable, NodeConnectorEventsObserver, // frames to // port, so first we check if we actually need to perform any action if (nodeConnectorMap.containsKey(nodeConnectorInstanceId)) { - LOG.trace( - "Port {} already in LLDPSpeaker.nodeConnectorMap, no need for additional processing", + LOG.debug("Port {} already in LLDPSpeaker.nodeConnectorMap, no need for additional processing", nodeConnectorId.getValue()); return; } @@ -125,35 +167,48 @@ public class LLDPSpeaker implements AutoCloseable, NodeConnectorEventsObserver, // No need to send LLDP frames on local ports if (outputPortNo == null) { - LOG.trace("Port {} is local, not sending LLDP frames through it", nodeConnectorId.getValue()); + LOG.debug("Port {} is local, not sending LLDP frames through it", nodeConnectorId.getValue()); return; } // Generate packet with destination switch and port - TransmitPacketInput packet = new TransmitPacketInputBuilder() - .setEgress(new NodeConnectorRef(nodeConnectorInstanceId)) - .setNode(new NodeRef(nodeInstanceId)) - .setPayload(LLDPUtil - .buildLldpFrame(nodeId, nodeConnectorId, srcMacAddress, outputPortNo, addressDestionation)) - .build(); - - // Save packet to node connector id -> packet map to transmit it every 5 - // seconds + TransmitPacketInput packet; + try { + packet = new TransmitPacketInputBuilder() + .setEgress(new NodeConnectorRef(nodeConnectorInstanceId)) + .setNode(new NodeRef(nodeInstanceId)).setPayload(LLDPUtil.buildLldpFrame(nodeId, + nodeConnectorId, srcMacAddress, outputPortNo, addressDestionation)).build(); + } catch (NoSuchAlgorithmException | PacketException e) { + LOG.error("Error building LLDP frame", e); + return; + } + + // Save packet to node connector id -> packet map to transmit it periodically on the configured interval. nodeConnectorMap.put(nodeConnectorInstanceId, packet); - LOG.trace("Port {} added to LLDPSpeaker.nodeConnectorMap", nodeConnectorId.getValue()); + LOG.debug("Port {} added to LLDPSpeaker.nodeConnectorMap", nodeConnectorId.getValue()); // Transmit packet for first time immediately - packetProcessingService.transmitPacket(packet); + final Future> resultFuture = packetProcessingService.transmitPacket(packet); + JdkFutures.addErrorLogging(resultFuture, LOG, "transmitPacket"); } - /** - * {@inheritDoc} - */ @Override public void nodeConnectorRemoved(final InstanceIdentifier nodeConnectorInstanceId) { + Preconditions.checkNotNull(nodeConnectorInstanceId); + nodeConnectorMap.remove(nodeConnectorInstanceId); NodeConnectorId nodeConnectorId = InstanceIdentifier.keyOf(nodeConnectorInstanceId).getId(); - LOG.trace("Port {} removed from LLDPSpeaker.nodeConnectorMap", nodeConnectorId.getValue()); + LOG.trace("Port removed from node-connector map : {}", nodeConnectorId.getValue()); } + private int getOwnedPorts() { + AtomicInteger ownedPorts = new AtomicInteger(); + nodeConnectorMap.keySet().forEach(ncIID -> { + NodeId nodeId = ncIID.firstKeyOf(Node.class, NodeKey.class).getId(); + if (deviceOwnershipStatusService.isEntityOwned(nodeId.getValue())) { + ownedPorts.incrementAndGet(); + } + }); + return ownedPorts.get(); + } }