2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netvirt.elan.l2gw.listeners;
11 import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
13 import java.util.Collections;
14 import java.util.HashSet;
15 import java.util.List;
16 import java.util.Objects;
17 import java.util.Optional;
19 import java.util.function.BiPredicate;
20 import java.util.function.Predicate;
21 import javax.annotation.PreDestroy;
22 import javax.inject.Inject;
23 import javax.inject.Singleton;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.opendaylight.genius.datastoreutils.hwvtep.HwvtepAbstractDataTreeChangeListener;
26 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
27 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
28 import org.opendaylight.genius.mdsalutil.MDSALUtil;
29 import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
30 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
31 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
32 import org.opendaylight.infrautils.utils.concurrent.Executors;
33 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
34 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
35 import org.opendaylight.mdsal.binding.api.DataBroker;
36 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
37 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
38 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
39 import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAOpClusteredListener;
40 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler;
41 import org.opendaylight.netvirt.elan.l2gw.utils.ElanL2GatewayUtils;
42 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils;
43 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayUtils;
44 import org.opendaylight.netvirt.elan.l2gw.utils.L2gwServiceProvider;
45 import org.opendaylight.netvirt.elan.l2gw.utils.StaleVlanBindingsCleaner;
46 import org.opendaylight.netvirt.elan.utils.ElanClusterUtils;
47 import org.opendaylight.netvirt.elanmanager.utils.ElanL2GwCacheUtils;
48 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayCache;
49 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
50 import org.opendaylight.serviceutils.srm.RecoverableListener;
51 import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
52 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.ItmRpcService;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentationBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.physical._switch.attributes.TunnelIps;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
63 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
68 * Listener to handle physical switch updates.
71 public class HwvtepPhysicalSwitchListener
72 extends HwvtepAbstractDataTreeChangeListener<PhysicalSwitchAugmentation, HwvtepPhysicalSwitchListener>
73 implements ClusteredDataTreeChangeListener<PhysicalSwitchAugmentation>, RecoverableListener {
75 /** The Constant LOG. */
76 private static final Logger LOG = LoggerFactory.getLogger(HwvtepPhysicalSwitchListener.class);
78 private static final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> DEVICE_NOT_CACHED_OR_PARENT_CONNECTED =
79 (l2GatewayDevice, globalIid) -> l2GatewayDevice == null || l2GatewayDevice.getHwvtepNodeId() == null
80 || !Objects.equals(l2GatewayDevice.getHwvtepNodeId(),
81 globalIid.firstKeyOf(Node.class).getNodeId().getValue());
83 private static final Predicate<PhysicalSwitchAugmentation> TUNNEL_IP_AVAILABLE =
84 phySwitch -> !HwvtepHAUtil.isEmpty(phySwitch.getTunnelIps());
86 private static final Predicate<PhysicalSwitchAugmentation> TUNNEL_IP_NOT_AVAILABLE = TUNNEL_IP_AVAILABLE.negate();
88 private static final BiPredicate<PhysicalSwitchAugmentation, L2GatewayDevice> TUNNEL_IP_CHANGED =
89 (phySwitchAfter, existingDevice) -> TUNNEL_IP_AVAILABLE.test(phySwitchAfter)
91 existingDevice.getTunnelIp(), phySwitchAfter.getTunnelIps().get(0).getTunnelIpsKey());
93 /** The data broker. */
94 private final DataBroker dataBroker;
95 private final ManagedNewTransactionRunner txRunner;
97 /** The itm rpc service. */
98 private final ItmRpcService itmRpcService;
100 private final ElanClusterUtils elanClusterUtils;
102 private final HwvtepNodeHACache hwvtepNodeHACache;
104 private final L2gwServiceProvider l2gwServiceProvider;
106 private final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> childConnectedAfterParent;
108 private final Predicate<L2GatewayDevice> alreadyHasL2Gwids =
109 (l2GwDevice) -> l2GwDevice != null && HwvtepHAUtil.isEmpty(l2GwDevice.getL2GatewayIds());
111 private final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> parentConnectedAfterChild;
112 private final HAOpClusteredListener haOpClusteredListener;
114 private final L2GatewayCache l2GatewayCache;
116 private final StaleVlanBindingsCleaner staleVlanBindingsCleaner;
119 * Instantiates a new hwvtep physical switch listener.
122 public HwvtepPhysicalSwitchListener(final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler,
123 final ServiceRecoveryRegistry serviceRecoveryRegistry,
124 final DataBroker dataBroker, ItmRpcService itmRpcService,
125 ElanClusterUtils elanClusterUtils, L2gwServiceProvider l2gwServiceProvider,
126 HAOpClusteredListener haListener, L2GatewayCache l2GatewayCache,
127 StaleVlanBindingsCleaner staleVlanBindingsCleaner,
128 HwvtepNodeHACache hwvtepNodeHACache) {
129 super(dataBroker, DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL,
130 InstanceIdentifier.create(NetworkTopology.class)
131 .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID)).child(Node.class)
132 .augmentation(PhysicalSwitchAugmentation.class)),
133 Executors.newListeningSingleThreadExecutor("HwvtepPhysicalSwitchListener", LOG),
135 this.dataBroker = dataBroker;
136 this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
137 this.itmRpcService = itmRpcService;
138 this.elanClusterUtils = elanClusterUtils;
139 this.l2gwServiceProvider = l2gwServiceProvider;
140 this.staleVlanBindingsCleaner = staleVlanBindingsCleaner;
141 this.haOpClusteredListener = haListener;
142 this.l2GatewayCache = l2GatewayCache;
143 this.hwvtepNodeHACache = hwvtepNodeHACache;
145 childConnectedAfterParent = (l2GwDevice, globalIid) -> {
146 return !hwvtepNodeHACache.isHAParentNode(globalIid)
147 && l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null
148 && !Objects.equals(l2GwDevice.getHwvtepNodeId(), globalIid.firstKeyOf(Node.class)
149 .getNodeId().getValue());
152 parentConnectedAfterChild = (l2GwDevice, globalIid) -> {
153 InstanceIdentifier<Node> existingIid = globalIid;
154 if (l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null) {
155 existingIid = HwvtepHAUtil.convertToInstanceIdentifier(l2GwDevice.getHwvtepNodeId());
157 return hwvtepNodeHACache.isHAParentNode(globalIid)
158 && l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null
159 && !Objects.equals(l2GwDevice.getHwvtepNodeId(), globalIid.firstKeyOf(Node.class)
160 .getNodeId().getValue())
161 && Objects.equals(globalIid, hwvtepNodeHACache.getParent(existingIid));
164 serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(),
173 public void registerListener() {
175 LOG.info("Registering HwvtepPhysicalSwitchListener");
178 public void deregisterListener() {
180 LOG.info("Deregistering HwvtepPhysicalSwitchListener");
185 public void close() {
187 Executors.shutdownAndAwaitTermination(getExecutorService());
191 protected void removed(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
192 PhysicalSwitchAugmentation phySwitchDeleted) {
193 NodeId nodeId = getNodeId(identifier);
194 String psName = phySwitchDeleted.getHwvtepNodeName().getValue();
195 LOG.info("Received physical switch {} removed event for node {}", psName, nodeId.getValue());
197 L2GatewayDevice l2GwDevice = l2GatewayCache.get(psName);
198 if (l2GwDevice != null) {
199 if (!L2GatewayConnectionUtils.isGatewayAssociatedToL2Device(l2GwDevice)) {
200 l2GatewayCache.remove(psName);
201 LOG.debug("{} details removed from L2Gateway Cache", psName);
202 MDSALUtil.syncDelete(this.dataBroker, LogicalDatastoreType.CONFIGURATION,
203 HwvtepSouthboundUtils.createInstanceIdentifier(nodeId));
205 LOG.debug("{} details are not removed from L2Gateway Cache as it has L2Gateway reference", psName);
208 l2GwDevice.setConnected(false);
209 //ElanL2GwCacheUtils.removeL2GatewayDeviceFromAllElanCache(psName);
211 LOG.error("Unable to find L2 Gateway details for {}", psName);
216 * Upon update checks if the tunnels Ip was null earlier and it got newly added.
217 * In that case simply call add.
218 * If not then check if Tunnel Ip has been updated from an old value to new value.
219 * If yes. delete old ITM tunnels of odl Tunnel Ipand add new ITM tunnels with new Tunnel
220 * IP then call added ().
222 * @param identifier iid
223 * @param phySwitchBefore ps Node before update
224 * @param phySwitchAfter ps Node after update
227 protected void updated(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
228 PhysicalSwitchAugmentation phySwitchBefore, PhysicalSwitchAugmentation phySwitchAfter) {
229 NodeId nodeId = getNodeId(identifier);
230 LOG.trace("Received PhysicalSwitch Update Event for node {}: PhysicalSwitch Before: {}, "
231 + "PhysicalSwitch After: {}", nodeId.getValue(), phySwitchBefore, phySwitchAfter);
232 String psName = getPsName(identifier);
233 if (psName == null) {
234 LOG.error("Could not find the physical switch name for node {}", nodeId.getValue());
237 L2GatewayDevice existingDevice = l2GatewayCache.get(psName);
238 LOG.info("Received physical switch {} update event for node {}", psName, nodeId.getValue());
239 InstanceIdentifier<Node> globalNodeIid = getManagedByNodeIid(identifier);
241 if (DEVICE_NOT_CACHED_OR_PARENT_CONNECTED.test(existingDevice, globalNodeIid)) {
242 if (TUNNEL_IP_AVAILABLE.test(phySwitchAfter)) {
243 added(identifier, phySwitchAfter);
246 if (!Objects.equals(phySwitchAfter.getTunnelIps(), phySwitchBefore.getTunnelIps())
247 && TUNNEL_IP_CHANGED.test(phySwitchAfter, existingDevice)) {
249 final String hwvtepId = existingDevice.getHwvtepNodeId();
250 elanClusterUtils.runOnlyInOwnerNode(existingDevice.getDeviceName(),
251 "handling Physical Switch add create itm tunnels ",
253 LOG.info("Deleting itm tunnels for device {}", existingDevice.getDeviceName());
254 L2GatewayUtils.deleteItmTunnels(itmRpcService, hwvtepId,
255 existingDevice.getDeviceName(), existingDevice.getTunnelIp());
256 Thread.sleep(10000L);//TODO remove these sleeps
257 LOG.info("Creating itm tunnels for device {}", existingDevice.getDeviceName());
258 ElanL2GatewayUtils.createItmTunnels(dataBroker, itmRpcService, hwvtepId, psName,
259 phySwitchAfter.getTunnelIps().get(0).getTunnelIpsKey());
260 return Collections.emptyList();
264 Thread.sleep(20000L);//TODO remove the sleep by using better itm api to detect finish of prev op
265 } catch (InterruptedException e) {
266 LOG.error("Interrupted ");
268 existingDevice.setTunnelIps(new HashSet<>());
269 added(identifier, phySwitchAfter);
275 protected void added(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
276 final PhysicalSwitchAugmentation phySwitchAdded) {
277 String globalNodeId = getManagedByNodeId(identifier);
278 final InstanceIdentifier<Node> globalNodeIid = getManagedByNodeIid(identifier);
279 NodeId nodeId = getNodeId(identifier);
280 if (TUNNEL_IP_NOT_AVAILABLE.test(phySwitchAdded)) {
281 LOG.error("Could not find the /tunnel ips for node {}", nodeId.getValue());
284 final String psName = getPsName(identifier);
285 LOG.trace("Received physical switch {} added event received for node {}", psName, nodeId.getValue());
287 haOpClusteredListener.runAfterNodeIsConnected(globalNodeIid, (node) -> {
288 LOG.trace("Running job for node {} ", globalNodeIid);
289 if (!node.isPresent()) {
290 LOG.error("Global node is absent {}", globalNodeId);
293 HwvtepHAUtil.addToCacheIfHAChildNode(globalNodeIid, node.get(), hwvtepNodeHACache);
294 if (hwvtepNodeHACache.isHAEnabledDevice(globalNodeIid)) {
295 LOG.trace("Ha enabled device {}", globalNodeIid);
298 LOG.trace("Updating cache for node {}", globalNodeIid);
299 L2GatewayDevice l2GwDevice = l2GatewayCache.get(psName);
300 if (childConnectedAfterParent.test(l2GwDevice, globalNodeIid)) {
301 LOG.trace("Device {} {} is already Connected by {}",
302 psName, globalNodeId, l2GwDevice.getHwvtepNodeId());
305 InstanceIdentifier<Node> existingIid = globalNodeIid;
306 if (l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null) {
307 existingIid = HwvtepHAUtil.convertToInstanceIdentifier(l2GwDevice.getHwvtepNodeId());
309 if (parentConnectedAfterChild.test(l2GwDevice, globalNodeIid)
310 && alreadyHasL2Gwids.test(l2GwDevice)) {
311 LOG.error("Child node {} having l2gw configured became ha node "
312 + " removing the l2device {} from all elan cache and provision parent node {}",
313 existingIid, psName, globalNodeIid);
314 ElanL2GwCacheUtils.removeL2GatewayDeviceFromAllElanCache(l2GwDevice.getHwvtepNodeId());
317 l2GwDevice = l2GatewayCache.addOrGet(psName);
318 l2GwDevice.setConnected(true);
319 l2GwDevice.setHwvtepNodeId(globalNodeId);
321 List<TunnelIps> tunnelIps = phySwitchAdded.getTunnelIps();
322 if (tunnelIps != null) {
323 for (TunnelIps tunnelIp : tunnelIps) {
324 IpAddress tunnelIpAddr = tunnelIp.getTunnelIpsKey();
325 l2GwDevice.addTunnelIp(tunnelIpAddr);
329 handleAdd(l2GwDevice);
330 elanClusterUtils.runOnlyInOwnerNode("Update config tunnels IP ",
331 () -> updateConfigTunnelIp(identifier, phySwitchAdded));
341 private void handleAdd(L2GatewayDevice l2GwDevice) {
342 final String psName = l2GwDevice.getDeviceName();
343 final String hwvtepNodeId = l2GwDevice.getHwvtepNodeId();
344 Set<IpAddress> tunnelIps = l2GwDevice.getTunnelIps();
345 for (final IpAddress tunnelIpAddr : tunnelIps) {
346 if (L2GatewayConnectionUtils.isGatewayAssociatedToL2Device(l2GwDevice)) {
347 LOG.debug("L2Gateway {} associated for {} physical switch; creating ITM tunnels for {}",
348 l2GwDevice.getL2GatewayIds(), psName, tunnelIpAddr);
349 l2gwServiceProvider.provisionItmAndL2gwConnection(l2GwDevice, psName, hwvtepNodeId, tunnelIpAddr);
351 LOG.info("l2gw.provision.skip {}:{}", hwvtepNodeId, psName);
354 elanClusterUtils.runOnlyInOwnerNode("Stale entry cleanup", () -> {
355 InstanceIdentifier<Node> globalNodeIid = HwvtepSouthboundUtils.createInstanceIdentifier(
356 new NodeId(hwvtepNodeId));
357 InstanceIdentifier<Node> psIid = HwvtepSouthboundUtils.createInstanceIdentifier(
358 HwvtepSouthboundUtils.createManagedNodeId(new NodeId(hwvtepNodeId), psName));
359 staleVlanBindingsCleaner.scheduleStaleCleanup(psName, globalNodeIid, psIid);
369 * @return the node id
371 private static NodeId getNodeId(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
372 return identifier.firstKeyOf(Node.class).getNodeId();
375 private static String getManagedByNodeId(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
376 String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
377 if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
378 return psNodeId.substring(0, psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
384 private static InstanceIdentifier<Node> getManagedByNodeIid(
385 InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
386 String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
387 if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
388 psNodeId = psNodeId.substring(0, psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
389 return identifier.firstIdentifierOf(Topology.class).child(Node.class, new NodeKey(new NodeId(psNodeId)));
395 private static String getPsName(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
396 String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
397 if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
398 return psNodeId.substring(psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH) + HwvtepHAUtil.PHYSICALSWITCH
404 private void updateConfigTunnelIp(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
405 PhysicalSwitchAugmentation phySwitchAdded) {
406 if (phySwitchAdded.getTunnelIps() != null) {
407 LoggingFutures.addErrorLogging(
408 txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
409 Optional<PhysicalSwitchAugmentation> existingSwitch = tx.read(identifier).get();
410 PhysicalSwitchAugmentationBuilder psBuilder = new PhysicalSwitchAugmentationBuilder();
411 if (existingSwitch.isPresent()) {
412 psBuilder = new PhysicalSwitchAugmentationBuilder(existingSwitch.get());
414 psBuilder.setTunnelIps(phySwitchAdded.getTunnelIps());
415 tx.put(identifier, psBuilder.build(), true);
416 LOG.trace("Updating config tunnel ips {}", identifier);
417 }), LOG, "Failed to update the config tunnel ips {}", identifier);