Merge "OPNFLWPLUG-952: All links disappear from the topology"
authorAnil Vishnoi <vishnoianil@gmail.com>
Mon, 26 Feb 2018 21:58:59 +0000 (21:58 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 26 Feb 2018 21:58:59 +0000 (21:58 +0000)
12 files changed:
applications/lldp-speaker/src/main/java/org/opendaylight/openflowplugin/applications/lldpspeaker/DeviceOwnershipStatusService.java [new file with mode: 0644]
applications/lldp-speaker/src/main/java/org/opendaylight/openflowplugin/applications/lldpspeaker/LLDPSpeaker.java
applications/lldp-speaker/src/main/java/org/opendaylight/openflowplugin/applications/lldpspeaker/LLDPUtil.java
applications/lldp-speaker/src/main/java/org/opendaylight/openflowplugin/applications/lldpspeaker/NodeConnectorInventoryEventTranslator.java
applications/lldp-speaker/src/main/resources/org/opendaylight/blueprint/lldp-speaker.xml
applications/lldp-speaker/src/test/java/org/opendaylight/openflowplugin/applications/lldpspeaker/LLDPSpeakerTest.java
applications/topology-lldp-discovery/pom.xml
applications/topology-lldp-discovery/src/main/java/org/opendaylight/openflowplugin/applications/topology/lldp/LLDPDiscoveryListener.java
applications/topology-lldp-discovery/src/main/java/org/opendaylight/openflowplugin/applications/topology/lldp/LLDPLinkAger.java
applications/topology-lldp-discovery/src/main/java/org/opendaylight/openflowplugin/applications/topology/lldp/utils/LLDPDiscoveryUtils.java
applications/topology-lldp-discovery/src/main/resources/org/opendaylight/blueprint/topology-lldp-discovery.xml
applications/topology-lldp-discovery/src/test/java/org/opendaylight/openflowplugin/applications/topology/lldp/LLDPLinkAgerTest.java

diff --git a/applications/lldp-speaker/src/main/java/org/opendaylight/openflowplugin/applications/lldpspeaker/DeviceOwnershipStatusService.java b/applications/lldp-speaker/src/main/java/org/opendaylight/openflowplugin/applications/lldpspeaker/DeviceOwnershipStatusService.java
new file mode 100644 (file)
index 0000000..1b2b71b
--- /dev/null
@@ -0,0 +1,100 @@
+/*
+ * Copyright (c) 2017 Lumina Networks, Inc.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.applications.lldpspeaker;
+
+import com.google.common.base.Optional;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Pattern;
+import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipChange;
+import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListener;
+import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.Entity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeviceOwnershipStatusService implements EntityOwnershipListener {
+    private static final Logger LOG = LoggerFactory.getLogger(DeviceOwnershipStatusService.class);
+    private static final String SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.ServiceEntityType";
+    private static final Pattern NODE_ID_PATTERN = Pattern.compile("^openflow:\\d+");
+
+    private final EntityOwnershipService eos;
+    private final ConcurrentMap<String, EntityOwnershipState> ownershipStateCache = new ConcurrentHashMap<>();
+
+    public DeviceOwnershipStatusService(final EntityOwnershipService entityOwnershipService) {
+        this.eos = entityOwnershipService;
+        registerEntityOwnershipListener();
+    }
+
+    public boolean isEntityOwned(final String nodeId) {
+        EntityOwnershipState state = ownershipStateCache.get(nodeId);
+        if (state == null) {
+            java.util.Optional<EntityOwnershipState> status = getCurrentOwnershipStatus(nodeId);
+            if (status.isPresent()) {
+                state = status.get();
+                ownershipStateCache.put(nodeId, state);
+            } else {
+                LOG.warn("Fetching ownership status failed for node {}", nodeId);
+            }
+        }
+        return state != null && state.equals(EntityOwnershipState.IS_OWNER);
+    }
+
+    public List<String> getOwnedNodes() {
+        List<String> nodes = new ArrayList<>();
+        ownershipStateCache.forEach((node, change) -> {
+            if (isEntityOwned(node)) {
+                nodes.add(node);
+            }
+        });
+        return nodes;
+    }
+
+    @Override
+    public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
+        final String entityName = ownershipChange.getEntity().getIdentifier().firstKeyOf(Entity.class).getName();
+        if (entityName != null && isOpenFlowEntity(entityName)) {
+            LOG.info("Entity ownership change received for node : {} : {}", entityName, ownershipChange);
+            if (!ownershipChange.getState().isOwner() && !ownershipChange.getState().hasOwner()
+                    && !ownershipChange.inJeopardy()) {
+                LOG.debug("Entity for node {} is unregistered.", entityName);
+                ownershipStateCache.remove(entityName);
+            } else if (!ownershipChange.getState().isOwner() && ownershipChange.getState().hasOwner()) {
+                ownershipStateCache.put(entityName, EntityOwnershipState.OWNED_BY_OTHER);
+            } else if (ownershipChange.getState().isOwner()) {
+                ownershipStateCache.put(entityName, EntityOwnershipState.IS_OWNER);
+            }
+        }
+    }
+
+    private java.util.Optional<EntityOwnershipState> getCurrentOwnershipStatus(final String nodeId) {
+        org.opendaylight.mdsal.eos.binding.api.Entity entity = createNodeEntity(nodeId);
+        Optional<EntityOwnershipState> ownershipStatus = eos.getOwnershipState(entity);
+
+        if (ownershipStatus.isPresent()) {
+            LOG.debug("Fetched ownership status for node {} is {}", nodeId, ownershipStatus.get());
+            return java.util.Optional.of(ownershipStatus.get());
+        }
+        return java.util.Optional.empty();
+    }
+
+    private org.opendaylight.mdsal.eos.binding.api.Entity createNodeEntity(final String nodeId) {
+        return new org.opendaylight.mdsal.eos.binding.api.Entity(SERVICE_ENTITY_TYPE, nodeId);
+    }
+
+    private void registerEntityOwnershipListener() {
+        this.eos.registerListener(SERVICE_ENTITY_TYPE, this);
+    }
+
+    private boolean isOpenFlowEntity(String entity) {
+        return NODE_ID_PATTERN.matcher(entity).matches();
+    }
+}
index e0684923749c5f41db173fba71adb261629a12fb..ddc7481714830d993ef3861edb92515017cd913c 100644 (file)
@@ -8,12 +8,18 @@
 
 package org.opendaylight.openflowplugin.applications.lldpspeaker;
 
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.MacAddress;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
@@ -22,6 +28,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketInputBuilder;
@@ -34,25 +41,43 @@ import org.slf4j.LoggerFactory;
  * Objects of this class send LLDP frames over all flow-capable ports that can
  * be discovered through inventory.
  */
-public class LLDPSpeaker implements AutoCloseable, NodeConnectorEventsObserver, Runnable {
+public class LLDPSpeaker implements NodeConnectorEventsObserver, Runnable, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(LLDPSpeaker.class);
-    private static final long LLDP_FLOOD_PERIOD = 5;
-    private long currentFloodPeriod = LLDP_FLOOD_PERIOD;
 
+    private static final long LLDP_FLOOD_PERIOD = 5;
+    private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder()
+            .setNameFormat("lldp-speaker-%d").setDaemon(true).build();
     private final PacketProcessingService packetProcessingService;
     private final ScheduledExecutorService scheduledExecutorService;
-    private final Map<InstanceIdentifier<NodeConnector>, TransmitPacketInput> nodeConnectorMap = new
-            ConcurrentHashMap<>();
-    private ScheduledFuture<?> scheduledSpeakerTask;
+    private final DeviceOwnershipStatusService deviceOwnershipStatusService;
+    private final Map<InstanceIdentifier<NodeConnector>, TransmitPacketInput> nodeConnectorMap =
+            new ConcurrentHashMap<>();
     private final MacAddress addressDestionation;
+    private long currentFloodPeriod = LLDP_FLOOD_PERIOD;
+    private ScheduledFuture<?> scheduledSpeakerTask;
     private volatile OperStatus operationalStatus = OperStatus.RUN;
 
-    public LLDPSpeaker(final PacketProcessingService packetProcessingService, final MacAddress addressDestionation) {
-        this(packetProcessingService, Executors.newSingleThreadScheduledExecutor(), addressDestionation);
+    public LLDPSpeaker(final PacketProcessingService packetProcessingService, final MacAddress addressDestionation,
+                       final EntityOwnershipService entityOwnershipService) {
+        this(packetProcessingService, Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY), addressDestionation,
+                entityOwnershipService);
+    }
+
+    public LLDPSpeaker(final PacketProcessingService packetProcessingService,
+                       final ScheduledExecutorService scheduledExecutorService,
+                       final MacAddress addressDestionation,
+                       final EntityOwnershipService entityOwnershipService) {
+        this.addressDestionation = addressDestionation;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.deviceOwnershipStatusService = new DeviceOwnershipStatusService(entityOwnershipService);
+        scheduledSpeakerTask = this.scheduledExecutorService
+                .scheduleAtFixedRate(this, LLDP_FLOOD_PERIOD,LLDP_FLOOD_PERIOD, TimeUnit.SECONDS);
+        this.packetProcessingService = packetProcessingService;
+        LOG.info("LLDPSpeaker started, it will send LLDP frames each {} seconds", LLDP_FLOOD_PERIOD);
     }
 
     public void setOperationalStatus(final OperStatus operationalStatus) {
-        LOG.info("Setting operational status to {}", operationalStatus);
+        LOG.info("LLDP speaker operational status set to {}", operationalStatus);
         this.operationalStatus = operationalStatus;
         if (operationalStatus.equals(OperStatus.STANDBY)) {
             nodeConnectorMap.clear();
@@ -66,7 +91,8 @@ public class LLDPSpeaker implements AutoCloseable, NodeConnectorEventsObserver,
     public void setLldpFloodInterval(long time) {
         this.currentFloodPeriod = time;
         scheduledSpeakerTask.cancel(false);
-        scheduledSpeakerTask = this.scheduledExecutorService.scheduleAtFixedRate(this, time, time, TimeUnit.SECONDS);
+        scheduledSpeakerTask = this.scheduledExecutorService
+                     .scheduleAtFixedRate(this, time, time, TimeUnit.SECONDS);
         LOG.info("LLDPSpeaker restarted, it will send LLDP frames each {} seconds", time);
     }
 
@@ -74,24 +100,18 @@ public class LLDPSpeaker implements AutoCloseable, NodeConnectorEventsObserver,
         return currentFloodPeriod;
     }
 
-    public LLDPSpeaker(final PacketProcessingService packetProcessingService,
-                       final ScheduledExecutorService scheduledExecutorService, final MacAddress addressDestionation) {
-        this.addressDestionation = addressDestionation;
-        this.scheduledExecutorService = scheduledExecutorService;
-        scheduledSpeakerTask = this.scheduledExecutorService.scheduleAtFixedRate(this, LLDP_FLOOD_PERIOD,
-                LLDP_FLOOD_PERIOD, TimeUnit.SECONDS);
-        this.packetProcessingService = packetProcessingService;
-        LOG.info("LLDPSpeaker started, it will send LLDP frames each {} seconds", LLDP_FLOOD_PERIOD);
-    }
-
     /**
      * Closes this resource, relinquishing any underlying resources.
      */
     @Override
     public void close() {
         nodeConnectorMap.clear();
-        scheduledExecutorService.shutdown();
-        scheduledSpeakerTask.cancel(true);
+        if (scheduledExecutorService != null) {
+            scheduledExecutorService.shutdown();
+        }
+        if (scheduledSpeakerTask != null) {
+            scheduledSpeakerTask.cancel(true);
+        }
         LOG.trace("LLDPSpeaker stopped sending LLDP frames.");
     }
 
@@ -101,15 +121,27 @@ public class LLDPSpeaker implements AutoCloseable, NodeConnectorEventsObserver,
     @Override
     public void run() {
         if (OperStatus.RUN.equals(operationalStatus)) {
-            LOG.debug("Sending LLDP frames to {} ports...", nodeConnectorMap.keySet().size());
-            for (InstanceIdentifier<NodeConnector> nodeConnectorInstanceId : nodeConnectorMap.keySet()) {
-                NodeConnectorId nodeConnectorId = InstanceIdentifier.keyOf(nodeConnectorInstanceId).getId();
-                LOG.trace("Sending LLDP through port {}", nodeConnectorId.getValue());
-                packetProcessingService.transmitPacket(nodeConnectorMap.get(nodeConnectorInstanceId));
-            }
+            LOG.debug("Sending LLDP frames to nodes {}", Arrays.toString(deviceOwnershipStatusService
+                    .getOwnedNodes().toArray()));
+            LOG.debug("Sending LLDP frames to total {} ports", getOwnedPorts());
+            nodeConnectorMap.keySet().forEach(ncIID -> {
+                NodeConnectorId nodeConnectorId = InstanceIdentifier.keyOf(ncIID).getId();
+                NodeId nodeId = ncIID.firstKeyOf(Node.class, NodeKey.class).getId();
+                if (deviceOwnershipStatusService.isEntityOwned(nodeId.getValue())) {
+                    LOG.debug("Node is owned by this controller, sending LLDP packet through port {}",
+                            nodeConnectorId.getValue());
+                    packetProcessingService.transmitPacket(nodeConnectorMap.get(ncIID));
+                } else {
+                    LOG.trace("Node {} is not owned by this controller, so skip sending LLDP packet on port {}",
+                            nodeId.getValue(), nodeConnectorId.getValue());
+                }
+            });
         }
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public void nodeConnectorAdded(final InstanceIdentifier<NodeConnector> nodeConnectorInstanceId,
                                    final FlowCapableNodeConnector flowConnector) {
@@ -119,7 +151,7 @@ public class LLDPSpeaker implements AutoCloseable, NodeConnectorEventsObserver,
         // frames to
         // port, so first we check if we actually need to perform any action
         if (nodeConnectorMap.containsKey(nodeConnectorInstanceId)) {
-            LOG.trace("Port {} already in LLDPSpeaker.nodeConnectorMap, no need for additional processing",
+            LOG.debug("Port {} already in LLDPSpeaker.nodeConnectorMap, no need for additional processing",
                     nodeConnectorId.getValue());
             return;
         }
@@ -132,7 +164,7 @@ public class LLDPSpeaker implements AutoCloseable, NodeConnectorEventsObserver,
 
         // No need to send LLDP frames on local ports
         if (outputPortNo == null) {
-            LOG.trace("Port {} is local, not sending LLDP frames through it", nodeConnectorId.getValue());
+            LOG.debug("Port {} is local, not sending LLDP frames through it", nodeConnectorId.getValue());
             return;
         }
 
@@ -142,10 +174,9 @@ public class LLDPSpeaker implements AutoCloseable, NodeConnectorEventsObserver,
                 .setNode(new NodeRef(nodeInstanceId)).setPayload(LLDPUtil.buildLldpFrame(nodeId,
                         nodeConnectorId, srcMacAddress, outputPortNo, addressDestionation)).build();
 
-        // Save packet to node connector id -> packet map to transmit it every 5
-        // seconds
+        // Save packet to node connector id -> packet map to transmit it periodically on the configured interval.
         nodeConnectorMap.put(nodeConnectorInstanceId, packet);
-        LOG.trace("Port {} added to LLDPSpeaker.nodeConnectorMap", nodeConnectorId.getValue());
+        LOG.debug("Port {} added to LLDPSpeaker.nodeConnectorMap", nodeConnectorId.getValue());
 
         // Transmit packet for first time immediately
         packetProcessingService.transmitPacket(packet);
@@ -153,9 +184,21 @@ public class LLDPSpeaker implements AutoCloseable, NodeConnectorEventsObserver,
 
     @Override
     public void nodeConnectorRemoved(final InstanceIdentifier<NodeConnector> nodeConnectorInstanceId) {
+        Preconditions.checkNotNull(nodeConnectorInstanceId);
+
         nodeConnectorMap.remove(nodeConnectorInstanceId);
         NodeConnectorId nodeConnectorId = InstanceIdentifier.keyOf(nodeConnectorInstanceId).getId();
-        LOG.trace("Port {} removed from LLDPSpeaker.nodeConnectorMap", nodeConnectorId.getValue());
+        LOG.trace("Port removed from node-connector map : {}", nodeConnectorId.getValue());
     }
 
+    private int getOwnedPorts() {
+        AtomicInteger ownedPorts = new AtomicInteger();
+        nodeConnectorMap.keySet().forEach(ncIID -> {
+            NodeId nodeId = ncIID.firstKeyOf(Node.class, NodeKey.class).getId();
+            if (deviceOwnershipStatusService.isEntityOwned(nodeId.getValue())) {
+                ownedPorts.incrementAndGet();
+            }
+        });
+        return ownedPorts.get();
+    }
 }
index c992017973fd8bc17b2eb2515125bc5ba534b696..0bb7cea7e9ca83f9a4325e10104fc98e864e21e0 100644 (file)
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
  */
 public final class LLDPUtil {
     private static final Logger LOG = LoggerFactory.getLogger(LLDPUtil.class);
+
     private static final String OF_URI_PREFIX = "openflow:";
 
     private LLDPUtil() {
@@ -88,9 +89,8 @@ public final class LLDPUtil {
             customSecTlv.setType(LLDPTLV.TLVType.Custom.getValue()).setLength((short) customSecValue.length)
                     .setValue(customSecValue);
             discoveryPkt.addCustomTLV(customSecTlv);
-        } catch (NoSuchAlgorithmException e1) {
-            LOG.info("LLDP extra authenticator creation failed: {}", e1.getMessage());
-            LOG.debug("Reason why LLDP extra authenticator creation failed: ", e1);
+        } catch (NoSuchAlgorithmException e) {
+            LOG.warn("LLDP extra authenticator creation failed.", e);
         }
 
 
@@ -107,8 +107,7 @@ public final class LLDPUtil {
         try {
             return ethPkt.serialize();
         } catch (PacketException e) {
-            LOG.warn("Error creating LLDP packet: {}", e.getMessage());
-            LOG.debug("Error creating LLDP packet.. ", e);
+            LOG.warn("Error creating LLDP packet.", e);
         }
         return null;
     }
index 09d09ec2cfbfe9e8dd92f1b733e0f2a587fec319..3e2e67c9a775959a37494594ee6491bb8ca786b1 100644 (file)
@@ -14,8 +14,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
@@ -40,8 +40,9 @@ import org.slf4j.LoggerFactory;
  * NodeConnectorInventoryEventTranslator is listening for changes in inventory operational DOM tree
  * and update LLDPSpeaker and topology.
  */
-public class NodeConnectorInventoryEventTranslator<T extends DataObject> implements DataTreeChangeListener<T>,
-        AutoCloseable {
+public class NodeConnectorInventoryEventTranslator<T extends DataObject>
+        implements ClusteredDataTreeChangeListener<T>, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(NodeConnectorInventoryEventTranslator.class);
 
     private static final InstanceIdentifier<State> II_TO_STATE = InstanceIdentifier.builder(Nodes.class)
             .child(Node.class).child(NodeConnector.class).augmentation(FlowCapableNodeConnector.class)
@@ -53,7 +54,6 @@ public class NodeConnectorInventoryEventTranslator<T extends DataObject> impleme
 
     private static final long STARTUP_LOOP_TICK = 500L;
     private static final int STARTUP_LOOP_MAX_RETRIES = 8;
-    private static final Logger LOG = LoggerFactory.getLogger(NodeConnectorInventoryEventTranslator.class);
 
     private final ListenerRegistration<DataTreeChangeListener> listenerOnPortRegistration;
     private final ListenerRegistration<DataTreeChangeListener> listenerOnPortStateRegistration;
@@ -70,24 +70,12 @@ public class NodeConnectorInventoryEventTranslator<T extends DataObject> impleme
                                                                                      II_TO_STATE);
         final SimpleTaskRetryLooper looper = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
         try {
-            listenerOnPortRegistration = looper
-                    .loopUntilNoException(new Callable<ListenerRegistration<DataTreeChangeListener>>() {
-                        @Override
-                        public ListenerRegistration<DataTreeChangeListener> call() throws Exception {
-                            return dataBroker.registerDataTreeChangeListener(dtiToNodeConnector,
-                                                                             NodeConnectorInventoryEventTranslator
-                                                                                     .this);
-                        }
-                    });
-            listenerOnPortStateRegistration = looper
-                    .loopUntilNoException(new Callable<ListenerRegistration<DataTreeChangeListener>>() {
-                        @Override
-                        public ListenerRegistration<DataTreeChangeListener> call() throws Exception {
-                            return dataBroker.registerDataTreeChangeListener(dtiToNodeConnectorState,
-                                                                             NodeConnectorInventoryEventTranslator
-                                                                                     .this);
-                        }
-                    });
+            listenerOnPortRegistration = looper.loopUntilNoException(() ->
+                    dataBroker.registerDataTreeChangeListener(dtiToNodeConnector,
+                            NodeConnectorInventoryEventTranslator.this));
+            listenerOnPortStateRegistration = looper.loopUntilNoException(() ->
+                    dataBroker.registerDataTreeChangeListener(dtiToNodeConnectorState,
+                            NodeConnectorInventoryEventTranslator.this));
         } catch (Exception e) {
             LOG.error("DataTreeChangeListeners registration failed: {}", e);
             throw new IllegalStateException("NodeConnectorInventoryEventTranslator startup failed!", e);
@@ -197,5 +185,4 @@ public class NodeConnectorInventoryEventTranslator<T extends DataObject> impleme
             observer.nodeConnectorRemoved(nodeConnectorInstanceId);
         }
     }
-
 }
index d07cc2d1eeee4de99a88a18100f8370384f6f8be..4290e3dcd00166ab7c6feae4ae3f14729c4f66f0 100644 (file)
@@ -4,6 +4,7 @@
            odl:use-default-for-reference-types="true">
 
   <reference id="dataBroker" interface="org.opendaylight.controller.md.sal.binding.api.DataBroker"/>
+  <reference id="entityOwnershipService" interface="org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService"/>
 
   <odl:clustered-app-config id="lldpSpeakerConfig"
       binding-class="org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.applications.lldp.speaker.config.rev160512.LldpSpeakerConfig">
@@ -23,6 +24,7 @@
     <argument>
       <bean factory-ref="lldpSpeakerConfig" factory-method="getAddressDestination"/>
     </argument>
+    <argument ref="entityOwnershipService"/>
   </bean>
 
   <bean id="nodeConnectorEventTranslator" class="org.opendaylight.openflowplugin.applications.lldpspeaker.NodeConnectorInventoryEventTranslator"
index 0fffeacb4ae3cd9646fdcb588d05409240485977..804f82985c069f3548c7b37d578e73a8506cc178 100644 (file)
@@ -16,6 +16,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import com.google.common.base.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -24,6 +25,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.MacAddress;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.PortNumberUni;
@@ -62,6 +65,8 @@ public class LLDPSpeakerTest {
     private ScheduledExecutorService scheduledExecutorService;
     @Mock
     private ScheduledFuture scheduledSpeakerTask;
+    @Mock
+    private EntityOwnershipService entityOwnershipService;
 
     private final MacAddress destinationMACAddress = null;
     private LLDPSpeaker lldpSpeaker;
@@ -73,12 +78,13 @@ public class LLDPSpeakerTest {
                         any(Runnable.class), anyLong(), anyLong(),
                         any(TimeUnit.class))).thenReturn(scheduledSpeakerTask);
         lldpSpeaker = new LLDPSpeaker(packetProcessingService,
-                scheduledExecutorService, destinationMACAddress);
+                scheduledExecutorService, destinationMACAddress, entityOwnershipService);
+        when(entityOwnershipService.getOwnershipState(any())).thenReturn(Optional.of(EntityOwnershipState.IS_OWNER));
         lldpSpeaker.setOperationalStatus(OperStatus.RUN);
     }
 
     /**
-     * Test that speaker does nothing when in standby mode.
+     * Test that speaker does nothing when in {@link OperStatus.STANDBY} mode.
      */
     @Test
     public void testStandBy() {
@@ -106,12 +112,15 @@ public class LLDPSpeakerTest {
         // packetProcessingService
         lldpSpeaker.nodeConnectorAdded(ID, FLOW_CAPABLE_NODE_CONNECTOR);
 
+
+        when(entityOwnershipService.getOwnershipState(any()))
+                .thenReturn(Optional.of(EntityOwnershipState.OWNED_BY_OTHER));
         // Execute one iteration of periodic task - LLDP packet should be
-        // transmitted second time
+        // not transmit second packet because it doesn't own the device.
         lldpSpeaker.run();
 
         // Check packet transmission
-        verify(packetProcessingService, times(2)).transmitPacket(PACKET_INPUT);
+        verify(packetProcessingService, times(1)).transmitPacket(PACKET_INPUT);
         verifyNoMoreInteractions(packetProcessingService);
     }
 
@@ -153,16 +162,6 @@ public class LLDPSpeakerTest {
         verifyNoMoreInteractions(packetProcessingService);
     }
 
-    /**
-     * Test that lldpSpeaker cancels periodic LLDP flood task and stops.
-     */
-    @Test
-    public void testCleanup() {
-        lldpSpeaker.close();
-        verify(scheduledSpeakerTask, times(1)).cancel(true);
-        verify(scheduledExecutorService, times(1)).shutdown();
-    }
-
     /**
      * Test that checks if LLDPSpeaker working fine with local ports.
      */
index 1b46e0d45a1b3d2810d1eaf8d154356b0bdcb324..7f82490d99a8a4a39f5e14978055ef5e644b0490 100644 (file)
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-binding-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.mdsal</groupId>
+      <artifactId>mdsal-eos-dom-api</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.openflowplugin.libraries</groupId>
       <artifactId>liblldp</artifactId>
index 0205c4761e97f0dd656f124dc41639115e329f3c..f8132f87d3de7e60cd8fd95c46ad9d7c51ba93ec 100644 (file)
@@ -8,33 +8,56 @@
 package org.opendaylight.openflowplugin.applications.topology.lldp;
 
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
 import org.opendaylight.openflowplugin.applications.topology.lldp.utils.LLDPDiscoveryUtils;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkDiscovered;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkDiscoveredBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class LLDPDiscoveryListener implements PacketProcessingListener {
+    private static final Logger LOG = LoggerFactory.getLogger(LLDPDiscoveryListener.class);
+
     private final LLDPLinkAger lldpLinkAger;
     private final NotificationProviderService notificationService;
+    private final EntityOwnershipService eos;
+
 
-    public LLDPDiscoveryListener(NotificationProviderService notificationService, LLDPLinkAger lldpLinkAger) {
+    public LLDPDiscoveryListener(final NotificationProviderService notificationService, final LLDPLinkAger lldpLinkAger,
+            final EntityOwnershipService entityOwnershipService) {
         this.notificationService = notificationService;
         this.lldpLinkAger = lldpLinkAger;
+        this.eos = entityOwnershipService;
     }
 
     @Override
     public void onPacketReceived(PacketReceived lldp) {
         NodeConnectorRef src = LLDPDiscoveryUtils.lldpToNodeConnectorRef(lldp.getPayload(), true);
         if (src != null) {
-            LinkDiscoveredBuilder ldb = new LinkDiscoveredBuilder();
-            ldb.setDestination(lldp.getIngress());
-            ldb.setSource(new NodeConnectorRef(src));
-            LinkDiscovered ld = ldb.build();
+            NodeKey nodeKey = src.getValue().firstKeyOf(Node.class);
+            LOG.debug("LLDP packet received for node {}", nodeKey);
+            if (nodeKey != null) {
+                LinkDiscoveredBuilder ldb = new LinkDiscoveredBuilder();
+                ldb.setDestination(lldp.getIngress());
+                ldb.setSource(new NodeConnectorRef(src));
+                LinkDiscovered ld = ldb.build();
 
-            notificationService.publish(ld);
-            lldpLinkAger.put(ld);
+                lldpLinkAger.put(ld);
+                if (LLDPDiscoveryUtils.isEntityOwned(this.eos, nodeKey.getId().getValue())) {
+                    LOG.debug("Publish add event for link {}", ld);
+                    notificationService.publish(ld);
+                } else {
+                    LOG.trace("Skip publishing the add event for link because controller is non-owner of the " +
+                            "node {}. Link : {}", nodeKey.getId().getValue(), ld);
+                }
+            } else {
+                LOG.debug("LLDP packet ignored. Unable to extract node-key from source node-connector reference.");
+            }
         }
     }
-}
+}
\ No newline at end of file
index 75f0f26825be959808d8d08670e163efb34f08a6..5c3cb23060a3791fdd09f875c60e8d2dacb9ecb2 100644 (file)
@@ -17,6 +17,8 @@ import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
+import org.opendaylight.openflowplugin.applications.topology.lldp.utils.LLDPDiscoveryUtils;
 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationListener;
 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkDiscovered;
@@ -24,6 +26,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.lldp.discovery.config.rev160511.TopologyLldpDiscoveryConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 
 public class LLDPLinkAger implements ConfigurationListener, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(LLDPLinkAger.class);
@@ -32,16 +36,18 @@ public class LLDPLinkAger implements ConfigurationListener, AutoCloseable {
     private final Timer timer;
     private final NotificationProviderService notificationService;
     private final AutoCloseable configurationServiceRegistration;
+    private final EntityOwnershipService eos;
 
     /**
      * default ctor - start timer.
      */
     public LLDPLinkAger(final TopologyLldpDiscoveryConfig topologyLldpDiscoveryConfig,
-                        final NotificationProviderService notificationService,
-                        final ConfigurationService configurationService) {
+            final NotificationProviderService notificationService,
+            final ConfigurationService configurationService, final EntityOwnershipService entityOwnershipService) {
         this.linkExpirationTime = topologyLldpDiscoveryConfig.getTopologyLldpExpirationInterval().getValue();
         this.notificationService = notificationService;
         this.configurationServiceRegistration = configurationService.registerListener(this);
+        this.eos = entityOwnershipService;
         linkToDate = new ConcurrentHashMap<>();
         timer = new Timer();
         timer.schedule(new LLDPAgingTask(), 0, topologyLldpDiscoveryConfig.getTopologyLldpInterval().getValue());
@@ -64,21 +70,29 @@ public class LLDPLinkAger implements ConfigurationListener, AutoCloseable {
 
         @Override
         public void run() {
-            for (Entry<LinkDiscovered,Date> entry : linkToDate.entrySet()) {
+            for (Entry<LinkDiscovered, Date> entry : linkToDate.entrySet()) {
                 LinkDiscovered link = entry.getKey();
                 Date expires = entry.getValue();
                 Date now = new Date();
                 if (now.after(expires)) {
                     if (notificationService != null) {
                         LinkRemovedBuilder lrb = new LinkRemovedBuilder(link);
-                        notificationService.publish(lrb.build());
+
+                        NodeKey nodeKey = link.getDestination().getValue().firstKeyOf(Node.class);
+                        LOG.info("No update received for link {} from last {} milliseconds. Removing link from cache.",
+                                link, linkExpirationTime);
                         linkToDate.remove(link);
+                        if (nodeKey != null && LLDPDiscoveryUtils.isEntityOwned(eos, nodeKey.getId().getValue())) {
+                            LOG.info("Publish Link Remove event for the link {}", link);
+                            notificationService.publish(lrb.build());
+                        } else {
+                            LOG.trace("Skip publishing Link Remove event for the link {} because link destination "
+                                    + "node is not owned by the controller", link);
+                        }
                     }
                 }
             }
-
         }
-
     }
 
     @VisibleForTesting
index b90698a25c2d8841d798dbbd8b17e72fd4a66e2b..5d39586995d1ed02a801f03193db1a8bcffc55e1 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.openflowplugin.applications.topology.lldp.utils;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.hash.HashCode;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
@@ -18,6 +20,9 @@ import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.Objects;
 import org.apache.commons.lang3.ArrayUtils;
+import org.opendaylight.mdsal.eos.binding.api.Entity;
+import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
 import org.opendaylight.openflowplugin.applications.topology.lldp.LLDPActivator;
 import org.opendaylight.openflowplugin.libraries.liblldp.BitBufferHelper;
 import org.opendaylight.openflowplugin.libraries.liblldp.CustomTLVKey;
@@ -46,6 +51,7 @@ public final class LLDPDiscoveryUtils {
     public static final short ETHERNET_TYPE_LLDP = (short) 0x88cc;
     private static final short ETHERNET_TYPE_OFFSET = 12;
     private static final short ETHERNET_VLAN_OFFSET = ETHERNET_TYPE_OFFSET + 4;
+    private static final String SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.ServiceEntityType";
 
     private LLDPDiscoveryUtils() {
     }
@@ -189,4 +195,33 @@ public final class LLDPDiscoveryUtils {
 
         return ethernetType == ETHERNET_TYPE_LLDP;
     }
+
+    public static boolean isEntityOwned(final EntityOwnershipService eos, final String nodeId) {
+        Preconditions.checkNotNull(eos, "Entity ownership service must not be null");
+
+        EntityOwnershipState state = null;
+        java.util.Optional<EntityOwnershipState> status = getCurrentOwnershipStatus(eos, nodeId);
+        if (status.isPresent()) {
+            state = status.get();
+        } else {
+            LOG.error("Fetching ownership status failed for node {}", nodeId);
+        }
+        return state != null && state.equals(EntityOwnershipState.IS_OWNER);
+    }
+
+    private static java.util.Optional<EntityOwnershipState> getCurrentOwnershipStatus(final EntityOwnershipService eos,
+            final String nodeId) {
+        Entity entity = createNodeEntity(nodeId);
+        Optional<EntityOwnershipState> ownershipStatus = eos.getOwnershipState(entity);
+
+        if (ownershipStatus.isPresent()) {
+            LOG.debug("Fetched ownership status for node {} is {}", nodeId, ownershipStatus.get());
+            return java.util.Optional.of(ownershipStatus.get());
+        }
+        return java.util.Optional.empty();
+    }
+
+    private static Entity createNodeEntity(final String nodeId) {
+        return new Entity(SERVICE_ENTITY_TYPE, nodeId);
+    }
 }
index 37749b9490165ce8c6fb136d6e586baa36cddd42..2d7ca120fbbaf87fe61ad6823fc113a147b59bb5 100644 (file)
@@ -6,6 +6,7 @@
 
   <reference id="notificationService" interface="org.opendaylight.controller.sal.binding.api.NotificationProviderService"/>
   <reference id="configurationService" interface="org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService"/>
+  <reference id="entityOwnershipService" interface="org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService"/>
 
   <odl:clustered-app-config id="topologyLLDPConfig"
       binding-class="org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.lldp.discovery.config.rev160511.TopologyLldpDiscoveryConfig">
     <argument ref="topologyLLDPConfig"/>
     <argument ref="notificationService"/>
     <argument ref="configurationService"/>
+    <argument ref="entityOwnershipService"/>
   </bean>
 
   <bean id="lldpDiscoveryListener" class="org.opendaylight.openflowplugin.applications.topology.lldp.LLDPDiscoveryListener">
     <argument ref="notificationService"/>
     <argument ref="lldpLinkAger"/>
+    <argument ref="entityOwnershipService"/>
   </bean>
 
   <bean id="LLDPActivator" class="org.opendaylight.openflowplugin.applications.topology.lldp.LLDPActivator"
index cb4c489e710c7c91881d043c78cf982580bfc81a..387db0a7aa46cc6408a2a0c0ddd92fccde016b5b 100644 (file)
@@ -12,6 +12,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.verify;
 
+import com.google.common.base.Optional;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -20,9 +21,18 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.mdsal.eos.binding.api.Entity;
+import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkDiscovered;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkRemoved;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.lldp.discovery.config.rev160511.NonZeroUint32Type;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.lldp.discovery.config.rev160511.TopologyLldpDiscoveryConfig;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.lldp.discovery.config.rev160511.TopologyLldpDiscoveryConfigBuilder;
@@ -51,10 +61,17 @@ public class LLDPLinkAgerTest {
     private LinkDiscovered link;
     @Mock
     private NotificationProviderService notificationService;
+    @Mock
+    private EntityOwnershipService eos;
+    @Mock
+    private LinkRemoved linkRemoved;
 
     @Before
     public void setUp() throws Exception {
-        lldpLinkAger = new LLDPLinkAger(getConfig(), notificationService, getConfigurationService());
+        lldpLinkAger = new LLDPLinkAger(getConfig(), notificationService, getConfigurationService(), eos);
+        Mockito.when(link.getDestination()).thenReturn(new NodeConnectorRef(
+                InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(new NodeId("openflow:1")))));
+        Mockito.when(eos.getOwnershipState(Mockito.any(Entity.class))).thenReturn(Optional.of(EntityOwnershipState.IS_OWNER));
     }
 
     @Test