--- /dev/null
+/*
+ * Copyright (c) 2017 Lumina Networks, Inc. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.applications.lldpspeaker;
+
+import com.google.common.base.Optional;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Pattern;
+import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipChange;
+import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListener;
+import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.Entity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeviceOwnershipStatusService implements EntityOwnershipListener {
+ private static final Logger LOG = LoggerFactory.getLogger(DeviceOwnershipStatusService.class);
+ private static final String SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.ServiceEntityType";
+ private static final Pattern NODE_ID_PATTERN = Pattern.compile("^openflow:\\d+");
+
+ private final EntityOwnershipService eos;
+ private final ConcurrentMap<String, EntityOwnershipState> ownershipStateCache = new ConcurrentHashMap<>();
+
+ public DeviceOwnershipStatusService(final EntityOwnershipService entityOwnershipService) {
+ this.eos = entityOwnershipService;
+ registerEntityOwnershipListener();
+ }
+
+ public boolean isEntityOwned(final String nodeId) {
+ EntityOwnershipState state = ownershipStateCache.get(nodeId);
+ if (state == null) {
+ java.util.Optional<EntityOwnershipState> status = getCurrentOwnershipStatus(nodeId);
+ if (status.isPresent()) {
+ state = status.get();
+ ownershipStateCache.put(nodeId, state);
+ } else {
+ LOG.warn("Fetching ownership status failed for node {}", nodeId);
+ }
+ }
+ return state != null && state.equals(EntityOwnershipState.IS_OWNER);
+ }
+
+ public List<String> getOwnedNodes() {
+ List<String> nodes = new ArrayList<>();
+ ownershipStateCache.forEach((node, change) -> {
+ if (isEntityOwned(node)) {
+ nodes.add(node);
+ }
+ });
+ return nodes;
+ }
+
+ @Override
+ public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
+ final String entityName = ownershipChange.getEntity().getIdentifier().firstKeyOf(Entity.class).getName();
+ if (entityName != null && isOpenFlowEntity(entityName)) {
+ LOG.info("Entity ownership change received for node : {} : {}", entityName, ownershipChange);
+ if (!ownershipChange.getState().isOwner() && !ownershipChange.getState().hasOwner()
+ && !ownershipChange.inJeopardy()) {
+ LOG.debug("Entity for node {} is unregistered.", entityName);
+ ownershipStateCache.remove(entityName);
+ } else if (!ownershipChange.getState().isOwner() && ownershipChange.getState().hasOwner()) {
+ ownershipStateCache.put(entityName, EntityOwnershipState.OWNED_BY_OTHER);
+ } else if (ownershipChange.getState().isOwner()) {
+ ownershipStateCache.put(entityName, EntityOwnershipState.IS_OWNER);
+ }
+ }
+ }
+
+ private java.util.Optional<EntityOwnershipState> getCurrentOwnershipStatus(final String nodeId) {
+ org.opendaylight.mdsal.eos.binding.api.Entity entity = createNodeEntity(nodeId);
+ Optional<EntityOwnershipState> ownershipStatus = eos.getOwnershipState(entity);
+
+ if (ownershipStatus.isPresent()) {
+ LOG.debug("Fetched ownership status for node {} is {}", nodeId, ownershipStatus.get());
+ return java.util.Optional.of(ownershipStatus.get());
+ }
+ return java.util.Optional.empty();
+ }
+
+ private org.opendaylight.mdsal.eos.binding.api.Entity createNodeEntity(final String nodeId) {
+ return new org.opendaylight.mdsal.eos.binding.api.Entity(SERVICE_ENTITY_TYPE, nodeId);
+ }
+
+ private void registerEntityOwnershipListener() {
+ this.eos.registerListener(SERVICE_ENTITY_TYPE, this);
+ }
+
+ private boolean isOpenFlowEntity(String entity) {
+ return NODE_ID_PATTERN.matcher(entity).matches();
+ }
+}
package org.opendaylight.openflowplugin.applications.lldpspeaker;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.MacAddress;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketInputBuilder;
* Objects of this class send LLDP frames over all flow-capable ports that can
* be discovered through inventory.
*/
-public class LLDPSpeaker implements AutoCloseable, NodeConnectorEventsObserver, Runnable {
+public class LLDPSpeaker implements NodeConnectorEventsObserver, Runnable, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(LLDPSpeaker.class);
- private static final long LLDP_FLOOD_PERIOD = 5;
- private long currentFloodPeriod = LLDP_FLOOD_PERIOD;
+ private static final long LLDP_FLOOD_PERIOD = 5;
+ private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder()
+ .setNameFormat("lldp-speaker-%d").setDaemon(true).build();
private final PacketProcessingService packetProcessingService;
private final ScheduledExecutorService scheduledExecutorService;
- private final Map<InstanceIdentifier<NodeConnector>, TransmitPacketInput> nodeConnectorMap = new
- ConcurrentHashMap<>();
- private ScheduledFuture<?> scheduledSpeakerTask;
+ private final DeviceOwnershipStatusService deviceOwnershipStatusService;
+ private final Map<InstanceIdentifier<NodeConnector>, TransmitPacketInput> nodeConnectorMap =
+ new ConcurrentHashMap<>();
private final MacAddress addressDestionation;
+ private long currentFloodPeriod = LLDP_FLOOD_PERIOD;
+ private ScheduledFuture<?> scheduledSpeakerTask;
private volatile OperStatus operationalStatus = OperStatus.RUN;
- public LLDPSpeaker(final PacketProcessingService packetProcessingService, final MacAddress addressDestionation) {
- this(packetProcessingService, Executors.newSingleThreadScheduledExecutor(), addressDestionation);
+ public LLDPSpeaker(final PacketProcessingService packetProcessingService, final MacAddress addressDestionation,
+ final EntityOwnershipService entityOwnershipService) {
+ this(packetProcessingService, Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY), addressDestionation,
+ entityOwnershipService);
+ }
+
+ public LLDPSpeaker(final PacketProcessingService packetProcessingService,
+ final ScheduledExecutorService scheduledExecutorService,
+ final MacAddress addressDestionation,
+ final EntityOwnershipService entityOwnershipService) {
+ this.addressDestionation = addressDestionation;
+ this.scheduledExecutorService = scheduledExecutorService;
+ this.deviceOwnershipStatusService = new DeviceOwnershipStatusService(entityOwnershipService);
+ scheduledSpeakerTask = this.scheduledExecutorService
+ .scheduleAtFixedRate(this, LLDP_FLOOD_PERIOD,LLDP_FLOOD_PERIOD, TimeUnit.SECONDS);
+ this.packetProcessingService = packetProcessingService;
+ LOG.info("LLDPSpeaker started, it will send LLDP frames each {} seconds", LLDP_FLOOD_PERIOD);
}
public void setOperationalStatus(final OperStatus operationalStatus) {
- LOG.info("Setting operational status to {}", operationalStatus);
+ LOG.info("LLDP speaker operational status set to {}", operationalStatus);
this.operationalStatus = operationalStatus;
if (operationalStatus.equals(OperStatus.STANDBY)) {
nodeConnectorMap.clear();
public void setLldpFloodInterval(long time) {
this.currentFloodPeriod = time;
scheduledSpeakerTask.cancel(false);
- scheduledSpeakerTask = this.scheduledExecutorService.scheduleAtFixedRate(this, time, time, TimeUnit.SECONDS);
+ scheduledSpeakerTask = this.scheduledExecutorService
+ .scheduleAtFixedRate(this, time, time, TimeUnit.SECONDS);
LOG.info("LLDPSpeaker restarted, it will send LLDP frames each {} seconds", time);
}
return currentFloodPeriod;
}
- public LLDPSpeaker(final PacketProcessingService packetProcessingService,
- final ScheduledExecutorService scheduledExecutorService, final MacAddress addressDestionation) {
- this.addressDestionation = addressDestionation;
- this.scheduledExecutorService = scheduledExecutorService;
- scheduledSpeakerTask = this.scheduledExecutorService.scheduleAtFixedRate(this, LLDP_FLOOD_PERIOD,
- LLDP_FLOOD_PERIOD, TimeUnit.SECONDS);
- this.packetProcessingService = packetProcessingService;
- LOG.info("LLDPSpeaker started, it will send LLDP frames each {} seconds", LLDP_FLOOD_PERIOD);
- }
-
/**
* Closes this resource, relinquishing any underlying resources.
*/
@Override
public void close() {
nodeConnectorMap.clear();
- scheduledExecutorService.shutdown();
- scheduledSpeakerTask.cancel(true);
+ if (scheduledExecutorService != null) {
+ scheduledExecutorService.shutdown();
+ }
+ if (scheduledSpeakerTask != null) {
+ scheduledSpeakerTask.cancel(true);
+ }
LOG.trace("LLDPSpeaker stopped sending LLDP frames.");
}
@Override
public void run() {
if (OperStatus.RUN.equals(operationalStatus)) {
- LOG.debug("Sending LLDP frames to {} ports...", nodeConnectorMap.keySet().size());
- for (InstanceIdentifier<NodeConnector> nodeConnectorInstanceId : nodeConnectorMap.keySet()) {
- NodeConnectorId nodeConnectorId = InstanceIdentifier.keyOf(nodeConnectorInstanceId).getId();
- LOG.trace("Sending LLDP through port {}", nodeConnectorId.getValue());
- packetProcessingService.transmitPacket(nodeConnectorMap.get(nodeConnectorInstanceId));
- }
+ LOG.debug("Sending LLDP frames to nodes {}", Arrays.toString(deviceOwnershipStatusService
+ .getOwnedNodes().toArray()));
+ LOG.debug("Sending LLDP frames to total {} ports", getOwnedPorts());
+ nodeConnectorMap.keySet().forEach(ncIID -> {
+ NodeConnectorId nodeConnectorId = InstanceIdentifier.keyOf(ncIID).getId();
+ NodeId nodeId = ncIID.firstKeyOf(Node.class, NodeKey.class).getId();
+ if (deviceOwnershipStatusService.isEntityOwned(nodeId.getValue())) {
+ LOG.debug("Node is owned by this controller, sending LLDP packet through port {}",
+ nodeConnectorId.getValue());
+ packetProcessingService.transmitPacket(nodeConnectorMap.get(ncIID));
+ } else {
+ LOG.trace("Node {} is not owned by this controller, so skip sending LLDP packet on port {}",
+ nodeId.getValue(), nodeConnectorId.getValue());
+ }
+ });
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void nodeConnectorAdded(final InstanceIdentifier<NodeConnector> nodeConnectorInstanceId,
final FlowCapableNodeConnector flowConnector) {
// frames to
// port, so first we check if we actually need to perform any action
if (nodeConnectorMap.containsKey(nodeConnectorInstanceId)) {
- LOG.trace("Port {} already in LLDPSpeaker.nodeConnectorMap, no need for additional processing",
+ LOG.debug("Port {} already in LLDPSpeaker.nodeConnectorMap, no need for additional processing",
nodeConnectorId.getValue());
return;
}
// No need to send LLDP frames on local ports
if (outputPortNo == null) {
- LOG.trace("Port {} is local, not sending LLDP frames through it", nodeConnectorId.getValue());
+ LOG.debug("Port {} is local, not sending LLDP frames through it", nodeConnectorId.getValue());
return;
}
.setNode(new NodeRef(nodeInstanceId)).setPayload(LLDPUtil.buildLldpFrame(nodeId,
nodeConnectorId, srcMacAddress, outputPortNo, addressDestionation)).build();
- // Save packet to node connector id -> packet map to transmit it every 5
- // seconds
+ // Save packet to node connector id -> packet map to transmit it periodically on the configured interval.
nodeConnectorMap.put(nodeConnectorInstanceId, packet);
- LOG.trace("Port {} added to LLDPSpeaker.nodeConnectorMap", nodeConnectorId.getValue());
+ LOG.debug("Port {} added to LLDPSpeaker.nodeConnectorMap", nodeConnectorId.getValue());
// Transmit packet for first time immediately
packetProcessingService.transmitPacket(packet);
@Override
public void nodeConnectorRemoved(final InstanceIdentifier<NodeConnector> nodeConnectorInstanceId) {
+ Preconditions.checkNotNull(nodeConnectorInstanceId);
+
nodeConnectorMap.remove(nodeConnectorInstanceId);
NodeConnectorId nodeConnectorId = InstanceIdentifier.keyOf(nodeConnectorInstanceId).getId();
- LOG.trace("Port {} removed from LLDPSpeaker.nodeConnectorMap", nodeConnectorId.getValue());
+ LOG.trace("Port removed from node-connector map : {}", nodeConnectorId.getValue());
}
+ private int getOwnedPorts() {
+ AtomicInteger ownedPorts = new AtomicInteger();
+ nodeConnectorMap.keySet().forEach(ncIID -> {
+ NodeId nodeId = ncIID.firstKeyOf(Node.class, NodeKey.class).getId();
+ if (deviceOwnershipStatusService.isEntityOwned(nodeId.getValue())) {
+ ownedPorts.incrementAndGet();
+ }
+ });
+ return ownedPorts.get();
+ }
}
*/
public final class LLDPUtil {
private static final Logger LOG = LoggerFactory.getLogger(LLDPUtil.class);
+
private static final String OF_URI_PREFIX = "openflow:";
private LLDPUtil() {
customSecTlv.setType(LLDPTLV.TLVType.Custom.getValue()).setLength((short) customSecValue.length)
.setValue(customSecValue);
discoveryPkt.addCustomTLV(customSecTlv);
- } catch (NoSuchAlgorithmException e1) {
- LOG.info("LLDP extra authenticator creation failed: {}", e1.getMessage());
- LOG.debug("Reason why LLDP extra authenticator creation failed: ", e1);
+ } catch (NoSuchAlgorithmException e) {
+ LOG.warn("LLDP extra authenticator creation failed.", e);
}
try {
return ethPkt.serialize();
} catch (PacketException e) {
- LOG.warn("Error creating LLDP packet: {}", e.getMessage());
- LOG.debug("Error creating LLDP packet.. ", e);
+ LOG.warn("Error creating LLDP packet.", e);
}
return null;
}
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
* NodeConnectorInventoryEventTranslator is listening for changes in inventory operational DOM tree
* and update LLDPSpeaker and topology.
*/
-public class NodeConnectorInventoryEventTranslator<T extends DataObject> implements DataTreeChangeListener<T>,
- AutoCloseable {
+public class NodeConnectorInventoryEventTranslator<T extends DataObject>
+ implements ClusteredDataTreeChangeListener<T>, AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(NodeConnectorInventoryEventTranslator.class);
private static final InstanceIdentifier<State> II_TO_STATE = InstanceIdentifier.builder(Nodes.class)
.child(Node.class).child(NodeConnector.class).augmentation(FlowCapableNodeConnector.class)
private static final long STARTUP_LOOP_TICK = 500L;
private static final int STARTUP_LOOP_MAX_RETRIES = 8;
- private static final Logger LOG = LoggerFactory.getLogger(NodeConnectorInventoryEventTranslator.class);
private final ListenerRegistration<DataTreeChangeListener> listenerOnPortRegistration;
private final ListenerRegistration<DataTreeChangeListener> listenerOnPortStateRegistration;
II_TO_STATE);
final SimpleTaskRetryLooper looper = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
try {
- listenerOnPortRegistration = looper
- .loopUntilNoException(new Callable<ListenerRegistration<DataTreeChangeListener>>() {
- @Override
- public ListenerRegistration<DataTreeChangeListener> call() throws Exception {
- return dataBroker.registerDataTreeChangeListener(dtiToNodeConnector,
- NodeConnectorInventoryEventTranslator
- .this);
- }
- });
- listenerOnPortStateRegistration = looper
- .loopUntilNoException(new Callable<ListenerRegistration<DataTreeChangeListener>>() {
- @Override
- public ListenerRegistration<DataTreeChangeListener> call() throws Exception {
- return dataBroker.registerDataTreeChangeListener(dtiToNodeConnectorState,
- NodeConnectorInventoryEventTranslator
- .this);
- }
- });
+ listenerOnPortRegistration = looper.loopUntilNoException(() ->
+ dataBroker.registerDataTreeChangeListener(dtiToNodeConnector,
+ NodeConnectorInventoryEventTranslator.this));
+ listenerOnPortStateRegistration = looper.loopUntilNoException(() ->
+ dataBroker.registerDataTreeChangeListener(dtiToNodeConnectorState,
+ NodeConnectorInventoryEventTranslator.this));
} catch (Exception e) {
LOG.error("DataTreeChangeListeners registration failed: {}", e);
throw new IllegalStateException("NodeConnectorInventoryEventTranslator startup failed!", e);
observer.nodeConnectorRemoved(nodeConnectorInstanceId);
}
}
-
}
odl:use-default-for-reference-types="true">
<reference id="dataBroker" interface="org.opendaylight.controller.md.sal.binding.api.DataBroker"/>
+ <reference id="entityOwnershipService" interface="org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService"/>
<odl:clustered-app-config id="lldpSpeakerConfig"
binding-class="org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.applications.lldp.speaker.config.rev160512.LldpSpeakerConfig">
<argument>
<bean factory-ref="lldpSpeakerConfig" factory-method="getAddressDestination"/>
</argument>
+ <argument ref="entityOwnershipService"/>
</bean>
<bean id="nodeConnectorEventTranslator" class="org.opendaylight.openflowplugin.applications.lldpspeaker.NodeConnectorInventoryEventTranslator"
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import com.google.common.base.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.MacAddress;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.PortNumberUni;
private ScheduledExecutorService scheduledExecutorService;
@Mock
private ScheduledFuture scheduledSpeakerTask;
+ @Mock
+ private EntityOwnershipService entityOwnershipService;
private final MacAddress destinationMACAddress = null;
private LLDPSpeaker lldpSpeaker;
any(Runnable.class), anyLong(), anyLong(),
any(TimeUnit.class))).thenReturn(scheduledSpeakerTask);
lldpSpeaker = new LLDPSpeaker(packetProcessingService,
- scheduledExecutorService, destinationMACAddress);
+ scheduledExecutorService, destinationMACAddress, entityOwnershipService);
+ when(entityOwnershipService.getOwnershipState(any())).thenReturn(Optional.of(EntityOwnershipState.IS_OWNER));
lldpSpeaker.setOperationalStatus(OperStatus.RUN);
}
/**
- * Test that speaker does nothing when in standby mode.
+ * Test that speaker does nothing when in {@link OperStatus.STANDBY} mode.
*/
@Test
public void testStandBy() {
// packetProcessingService
lldpSpeaker.nodeConnectorAdded(ID, FLOW_CAPABLE_NODE_CONNECTOR);
+
+ when(entityOwnershipService.getOwnershipState(any()))
+ .thenReturn(Optional.of(EntityOwnershipState.OWNED_BY_OTHER));
// Execute one iteration of periodic task - LLDP packet should be
- // transmitted second time
+ // not transmit second packet because it doesn't own the device.
lldpSpeaker.run();
// Check packet transmission
- verify(packetProcessingService, times(2)).transmitPacket(PACKET_INPUT);
+ verify(packetProcessingService, times(1)).transmitPacket(PACKET_INPUT);
verifyNoMoreInteractions(packetProcessingService);
}
verifyNoMoreInteractions(packetProcessingService);
}
- /**
- * Test that lldpSpeaker cancels periodic LLDP flood task and stops.
- */
- @Test
- public void testCleanup() {
- lldpSpeaker.close();
- verify(scheduledSpeakerTask, times(1)).cancel(true);
- verify(scheduledExecutorService, times(1)).shutdown();
- }
-
/**
* Test that checks if LLDPSpeaker working fine with local ports.
*/
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-binding-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-eos-dom-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.openflowplugin.libraries</groupId>
<artifactId>liblldp</artifactId>
package org.opendaylight.openflowplugin.applications.topology.lldp;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
import org.opendaylight.openflowplugin.applications.topology.lldp.utils.LLDPDiscoveryUtils;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkDiscovered;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkDiscoveredBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class LLDPDiscoveryListener implements PacketProcessingListener {
+ private static final Logger LOG = LoggerFactory.getLogger(LLDPDiscoveryListener.class);
+
private final LLDPLinkAger lldpLinkAger;
private final NotificationProviderService notificationService;
+ private final EntityOwnershipService eos;
+
- public LLDPDiscoveryListener(NotificationProviderService notificationService, LLDPLinkAger lldpLinkAger) {
+ public LLDPDiscoveryListener(final NotificationProviderService notificationService, final LLDPLinkAger lldpLinkAger,
+ final EntityOwnershipService entityOwnershipService) {
this.notificationService = notificationService;
this.lldpLinkAger = lldpLinkAger;
+ this.eos = entityOwnershipService;
}
@Override
public void onPacketReceived(PacketReceived lldp) {
NodeConnectorRef src = LLDPDiscoveryUtils.lldpToNodeConnectorRef(lldp.getPayload(), true);
if (src != null) {
- LinkDiscoveredBuilder ldb = new LinkDiscoveredBuilder();
- ldb.setDestination(lldp.getIngress());
- ldb.setSource(new NodeConnectorRef(src));
- LinkDiscovered ld = ldb.build();
+ NodeKey nodeKey = src.getValue().firstKeyOf(Node.class);
+ LOG.debug("LLDP packet received for node {}", nodeKey);
+ if (nodeKey != null) {
+ LinkDiscoveredBuilder ldb = new LinkDiscoveredBuilder();
+ ldb.setDestination(lldp.getIngress());
+ ldb.setSource(new NodeConnectorRef(src));
+ LinkDiscovered ld = ldb.build();
- notificationService.publish(ld);
- lldpLinkAger.put(ld);
+ lldpLinkAger.put(ld);
+ if (LLDPDiscoveryUtils.isEntityOwned(this.eos, nodeKey.getId().getValue())) {
+ LOG.debug("Publish add event for link {}", ld);
+ notificationService.publish(ld);
+ } else {
+ LOG.trace("Skip publishing the add event for link because controller is non-owner of the " +
+ "node {}. Link : {}", nodeKey.getId().getValue(), ld);
+ }
+ } else {
+ LOG.debug("LLDP packet ignored. Unable to extract node-key from source node-connector reference.");
+ }
}
}
-}
+}
\ No newline at end of file
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
+import org.opendaylight.openflowplugin.applications.topology.lldp.utils.LLDPDiscoveryUtils;
import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationListener;
import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkDiscovered;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.lldp.discovery.config.rev160511.TopologyLldpDiscoveryConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
public class LLDPLinkAger implements ConfigurationListener, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(LLDPLinkAger.class);
private final Timer timer;
private final NotificationProviderService notificationService;
private final AutoCloseable configurationServiceRegistration;
+ private final EntityOwnershipService eos;
/**
* default ctor - start timer.
*/
public LLDPLinkAger(final TopologyLldpDiscoveryConfig topologyLldpDiscoveryConfig,
- final NotificationProviderService notificationService,
- final ConfigurationService configurationService) {
+ final NotificationProviderService notificationService,
+ final ConfigurationService configurationService, final EntityOwnershipService entityOwnershipService) {
this.linkExpirationTime = topologyLldpDiscoveryConfig.getTopologyLldpExpirationInterval().getValue();
this.notificationService = notificationService;
this.configurationServiceRegistration = configurationService.registerListener(this);
+ this.eos = entityOwnershipService;
linkToDate = new ConcurrentHashMap<>();
timer = new Timer();
timer.schedule(new LLDPAgingTask(), 0, topologyLldpDiscoveryConfig.getTopologyLldpInterval().getValue());
@Override
public void run() {
- for (Entry<LinkDiscovered,Date> entry : linkToDate.entrySet()) {
+ for (Entry<LinkDiscovered, Date> entry : linkToDate.entrySet()) {
LinkDiscovered link = entry.getKey();
Date expires = entry.getValue();
Date now = new Date();
if (now.after(expires)) {
if (notificationService != null) {
LinkRemovedBuilder lrb = new LinkRemovedBuilder(link);
- notificationService.publish(lrb.build());
+
+ NodeKey nodeKey = link.getDestination().getValue().firstKeyOf(Node.class);
+ LOG.info("No update received for link {} from last {} milliseconds. Removing link from cache.",
+ link, linkExpirationTime);
linkToDate.remove(link);
+ if (nodeKey != null && LLDPDiscoveryUtils.isEntityOwned(eos, nodeKey.getId().getValue())) {
+ LOG.info("Publish Link Remove event for the link {}", link);
+ notificationService.publish(lrb.build());
+ } else {
+ LOG.trace("Skip publishing Link Remove event for the link {} because link destination "
+ + "node is not owned by the controller", link);
+ }
}
}
}
-
}
-
}
@VisibleForTesting
*/
package org.opendaylight.openflowplugin.applications.topology.lldp.utils;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import java.util.Arrays;
import java.util.Objects;
import org.apache.commons.lang3.ArrayUtils;
+import org.opendaylight.mdsal.eos.binding.api.Entity;
+import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
import org.opendaylight.openflowplugin.applications.topology.lldp.LLDPActivator;
import org.opendaylight.openflowplugin.libraries.liblldp.BitBufferHelper;
import org.opendaylight.openflowplugin.libraries.liblldp.CustomTLVKey;
public static final short ETHERNET_TYPE_LLDP = (short) 0x88cc;
private static final short ETHERNET_TYPE_OFFSET = 12;
private static final short ETHERNET_VLAN_OFFSET = ETHERNET_TYPE_OFFSET + 4;
+ private static final String SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.ServiceEntityType";
private LLDPDiscoveryUtils() {
}
return ethernetType == ETHERNET_TYPE_LLDP;
}
+
+ public static boolean isEntityOwned(final EntityOwnershipService eos, final String nodeId) {
+ Preconditions.checkNotNull(eos, "Entity ownership service must not be null");
+
+ EntityOwnershipState state = null;
+ java.util.Optional<EntityOwnershipState> status = getCurrentOwnershipStatus(eos, nodeId);
+ if (status.isPresent()) {
+ state = status.get();
+ } else {
+ LOG.error("Fetching ownership status failed for node {}", nodeId);
+ }
+ return state != null && state.equals(EntityOwnershipState.IS_OWNER);
+ }
+
+ private static java.util.Optional<EntityOwnershipState> getCurrentOwnershipStatus(final EntityOwnershipService eos,
+ final String nodeId) {
+ Entity entity = createNodeEntity(nodeId);
+ Optional<EntityOwnershipState> ownershipStatus = eos.getOwnershipState(entity);
+
+ if (ownershipStatus.isPresent()) {
+ LOG.debug("Fetched ownership status for node {} is {}", nodeId, ownershipStatus.get());
+ return java.util.Optional.of(ownershipStatus.get());
+ }
+ return java.util.Optional.empty();
+ }
+
+ private static Entity createNodeEntity(final String nodeId) {
+ return new Entity(SERVICE_ENTITY_TYPE, nodeId);
+ }
}
<reference id="notificationService" interface="org.opendaylight.controller.sal.binding.api.NotificationProviderService"/>
<reference id="configurationService" interface="org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService"/>
+ <reference id="entityOwnershipService" interface="org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService"/>
<odl:clustered-app-config id="topologyLLDPConfig"
binding-class="org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.lldp.discovery.config.rev160511.TopologyLldpDiscoveryConfig">
<argument ref="topologyLLDPConfig"/>
<argument ref="notificationService"/>
<argument ref="configurationService"/>
+ <argument ref="entityOwnershipService"/>
</bean>
<bean id="lldpDiscoveryListener" class="org.opendaylight.openflowplugin.applications.topology.lldp.LLDPDiscoveryListener">
<argument ref="notificationService"/>
<argument ref="lldpLinkAger"/>
+ <argument ref="entityOwnershipService"/>
</bean>
<bean id="LLDPActivator" class="org.opendaylight.openflowplugin.applications.topology.lldp.LLDPActivator"
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.verify;
+import com.google.common.base.Optional;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.mdsal.eos.binding.api.Entity;
+import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkDiscovered;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkRemoved;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.lldp.discovery.config.rev160511.NonZeroUint32Type;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.lldp.discovery.config.rev160511.TopologyLldpDiscoveryConfig;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.lldp.discovery.config.rev160511.TopologyLldpDiscoveryConfigBuilder;
private LinkDiscovered link;
@Mock
private NotificationProviderService notificationService;
+ @Mock
+ private EntityOwnershipService eos;
+ @Mock
+ private LinkRemoved linkRemoved;
@Before
public void setUp() throws Exception {
- lldpLinkAger = new LLDPLinkAger(getConfig(), notificationService, getConfigurationService());
+ lldpLinkAger = new LLDPLinkAger(getConfig(), notificationService, getConfigurationService(), eos);
+ Mockito.when(link.getDestination()).thenReturn(new NodeConnectorRef(
+ InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(new NodeId("openflow:1")))));
+ Mockito.when(eos.getOwnershipState(Mockito.any(Entity.class))).thenReturn(Optional.of(EntityOwnershipState.IS_OWNER));
}
@Test