2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netvirt.elan.l2gw.listeners;
11 import static org.opendaylight.mdsal.binding.util.Datastore.CONFIGURATION;
13 import java.util.Collections;
14 import java.util.HashSet;
16 import java.util.Objects;
17 import java.util.Optional;
19 import java.util.function.BiPredicate;
20 import java.util.function.Predicate;
21 import javax.annotation.PreDestroy;
22 import javax.inject.Inject;
23 import javax.inject.Singleton;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.opendaylight.genius.datastoreutils.hwvtep.HwvtepAbstractDataTreeChangeListener;
26 import org.opendaylight.genius.mdsalutil.MDSALUtil;
27 import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
28 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
29 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
30 import org.opendaylight.infrautils.utils.concurrent.Executors;
31 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
32 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
33 import org.opendaylight.mdsal.binding.api.DataBroker;
34 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
35 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
36 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunnerImpl;
37 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
38 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
39 import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAOpClusteredListener;
40 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler;
41 import org.opendaylight.netvirt.elan.l2gw.utils.ElanL2GatewayUtils;
42 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils;
43 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayUtils;
44 import org.opendaylight.netvirt.elan.l2gw.utils.L2gwServiceProvider;
45 import org.opendaylight.netvirt.elan.l2gw.utils.StaleVlanBindingsCleaner;
46 import org.opendaylight.netvirt.elan.utils.ElanClusterUtils;
47 import org.opendaylight.netvirt.elanmanager.utils.ElanL2GwCacheUtils;
48 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayCache;
49 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
50 import org.opendaylight.serviceutils.srm.RecoverableListener;
51 import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
52 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.ItmRpcService;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentationBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.physical._switch.attributes.TunnelIps;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.physical._switch.attributes.TunnelIpsKey;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
64 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
69 * Listener to handle physical switch updates.
72 public class HwvtepPhysicalSwitchListener
73 extends HwvtepAbstractDataTreeChangeListener<PhysicalSwitchAugmentation, HwvtepPhysicalSwitchListener>
74 implements ClusteredDataTreeChangeListener<PhysicalSwitchAugmentation>, RecoverableListener {
76 /** The Constant LOG. */
77 private static final Logger LOG = LoggerFactory.getLogger(HwvtepPhysicalSwitchListener.class);
79 private static final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> DEVICE_NOT_CACHED_OR_PARENT_CONNECTED =
80 (l2GatewayDevice, globalIid) -> l2GatewayDevice == null || l2GatewayDevice.getHwvtepNodeId() == null
81 || !Objects.equals(l2GatewayDevice.getHwvtepNodeId(),
82 globalIid.firstKeyOf(Node.class).getNodeId().getValue());
84 private static final Predicate<PhysicalSwitchAugmentation> TUNNEL_IP_AVAILABLE =
85 phySwitch -> !HwvtepHAUtil.isEmpty(phySwitch.nonnullTunnelIps().values());
87 private static final Predicate<PhysicalSwitchAugmentation> TUNNEL_IP_NOT_AVAILABLE = TUNNEL_IP_AVAILABLE.negate();
89 private static final BiPredicate<PhysicalSwitchAugmentation, L2GatewayDevice> TUNNEL_IP_CHANGED =
90 (phySwitchAfter, existingDevice) -> TUNNEL_IP_AVAILABLE.test(phySwitchAfter)
92 existingDevice.getTunnelIp(), phySwitchAfter.getTunnelIps().get(0).getTunnelIpsKey());
94 /** The data broker. */
95 private final DataBroker dataBroker;
96 private final ManagedNewTransactionRunner txRunner;
98 /** The itm rpc service. */
99 private final ItmRpcService itmRpcService;
101 private final ElanClusterUtils elanClusterUtils;
103 private final HwvtepNodeHACache hwvtepNodeHACache;
105 private final L2gwServiceProvider l2gwServiceProvider;
107 private final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> childConnectedAfterParent;
109 private final Predicate<L2GatewayDevice> alreadyHasL2Gwids =
110 (l2GwDevice) -> l2GwDevice != null && HwvtepHAUtil.isEmpty(l2GwDevice.getL2GatewayIds());
112 private final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> parentConnectedAfterChild;
113 private final HAOpClusteredListener haOpClusteredListener;
115 private final L2GatewayCache l2GatewayCache;
117 private final StaleVlanBindingsCleaner staleVlanBindingsCleaner;
120 * Instantiates a new hwvtep physical switch listener.
123 public HwvtepPhysicalSwitchListener(final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler,
124 final ServiceRecoveryRegistry serviceRecoveryRegistry,
125 final DataBroker dataBroker, ItmRpcService itmRpcService,
126 ElanClusterUtils elanClusterUtils, L2gwServiceProvider l2gwServiceProvider,
127 HAOpClusteredListener haListener, L2GatewayCache l2GatewayCache,
128 StaleVlanBindingsCleaner staleVlanBindingsCleaner,
129 HwvtepNodeHACache hwvtepNodeHACache) {
130 super(dataBroker, DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL,
131 InstanceIdentifier.create(NetworkTopology.class)
132 .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID)).child(Node.class)
133 .augmentation(PhysicalSwitchAugmentation.class)),
134 Executors.newListeningSingleThreadExecutor("HwvtepPhysicalSwitchListener", LOG),
136 this.dataBroker = dataBroker;
137 this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
138 this.itmRpcService = itmRpcService;
139 this.elanClusterUtils = elanClusterUtils;
140 this.l2gwServiceProvider = l2gwServiceProvider;
141 this.staleVlanBindingsCleaner = staleVlanBindingsCleaner;
142 this.haOpClusteredListener = haListener;
143 this.l2GatewayCache = l2GatewayCache;
144 this.hwvtepNodeHACache = hwvtepNodeHACache;
146 childConnectedAfterParent = (l2GwDevice, globalIid) -> {
147 return !hwvtepNodeHACache.isHAParentNode(globalIid)
148 && l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null
149 && !Objects.equals(l2GwDevice.getHwvtepNodeId(), globalIid.firstKeyOf(Node.class)
150 .getNodeId().getValue());
153 parentConnectedAfterChild = (l2GwDevice, globalIid) -> {
154 InstanceIdentifier<Node> existingIid = globalIid;
155 if (l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null) {
156 existingIid = HwvtepHAUtil.convertToInstanceIdentifier(l2GwDevice.getHwvtepNodeId());
158 return hwvtepNodeHACache.isHAParentNode(globalIid)
159 && l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null
160 && !Objects.equals(l2GwDevice.getHwvtepNodeId(), globalIid.firstKeyOf(Node.class)
161 .getNodeId().getValue())
162 && Objects.equals(globalIid, hwvtepNodeHACache.getParent(existingIid));
165 serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(),
174 public void registerListener() {
176 LOG.info("Registering HwvtepPhysicalSwitchListener");
179 public void deregisterListener() {
181 LOG.info("Deregistering HwvtepPhysicalSwitchListener");
186 public void close() {
188 Executors.shutdownAndAwaitTermination(getExecutorService());
192 protected void removed(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
193 PhysicalSwitchAugmentation phySwitchDeleted) {
194 NodeId nodeId = getNodeId(identifier);
195 String psName = phySwitchDeleted.getHwvtepNodeName().getValue();
196 LOG.info("Received physical switch {} removed event for node {}", psName, nodeId.getValue());
198 L2GatewayDevice l2GwDevice = l2GatewayCache.get(psName);
199 if (l2GwDevice != null) {
200 if (!L2GatewayConnectionUtils.isGatewayAssociatedToL2Device(l2GwDevice)) {
201 l2GatewayCache.remove(psName);
202 LOG.debug("{} details removed from L2Gateway Cache", psName);
203 MDSALUtil.syncDelete(this.dataBroker, LogicalDatastoreType.CONFIGURATION,
204 HwvtepSouthboundUtils.createInstanceIdentifier(nodeId));
206 LOG.debug("{} details are not removed from L2Gateway Cache as it has L2Gateway reference", psName);
209 l2GwDevice.setConnected(false);
210 //ElanL2GwCacheUtils.removeL2GatewayDeviceFromAllElanCache(psName);
212 LOG.error("Unable to find L2 Gateway details for {}", psName);
217 * Upon update checks if the tunnels Ip was null earlier and it got newly added.
218 * In that case simply call add.
219 * If not then check if Tunnel Ip has been updated from an old value to new value.
220 * If yes. delete old ITM tunnels of odl Tunnel Ipand add new ITM tunnels with new Tunnel
221 * IP then call added ().
223 * @param identifier iid
224 * @param phySwitchBefore ps Node before update
225 * @param phySwitchAfter ps Node after update
228 protected void updated(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
229 PhysicalSwitchAugmentation phySwitchBefore, PhysicalSwitchAugmentation phySwitchAfter) {
230 NodeId nodeId = getNodeId(identifier);
231 LOG.trace("Received PhysicalSwitch Update Event for node {}: PhysicalSwitch Before: {}, "
232 + "PhysicalSwitch After: {}", nodeId.getValue(), phySwitchBefore, phySwitchAfter);
233 String psName = getPsName(identifier);
234 if (psName == null) {
235 LOG.error("Could not find the physical switch name for node {}", nodeId.getValue());
238 L2GatewayDevice existingDevice = l2GatewayCache.get(psName);
239 LOG.info("Received physical switch {} update event for node {}", psName, nodeId.getValue());
240 InstanceIdentifier<Node> globalNodeIid = getManagedByNodeIid(identifier);
242 if (DEVICE_NOT_CACHED_OR_PARENT_CONNECTED.test(existingDevice, globalNodeIid)) {
243 if (TUNNEL_IP_AVAILABLE.test(phySwitchAfter)) {
244 added(identifier, phySwitchAfter);
247 if (!Objects.equals(phySwitchAfter.getTunnelIps(), phySwitchBefore.getTunnelIps())
248 && TUNNEL_IP_CHANGED.test(phySwitchAfter, existingDevice)) {
250 final String hwvtepId = existingDevice.getHwvtepNodeId();
251 elanClusterUtils.runOnlyInOwnerNode(existingDevice.getDeviceName(),
252 "handling Physical Switch add create itm tunnels ",
254 LOG.info("Deleting itm tunnels for device {}", existingDevice.getDeviceName());
255 L2GatewayUtils.deleteItmTunnels(itmRpcService, hwvtepId,
256 existingDevice.getDeviceName(), existingDevice.getTunnelIp());
257 Thread.sleep(10000L);//TODO remove these sleeps
258 LOG.info("Creating itm tunnels for device {}", existingDevice.getDeviceName());
259 ElanL2GatewayUtils.createItmTunnels(dataBroker, itmRpcService, hwvtepId, psName,
260 phySwitchAfter.getTunnelIps().get(0).getTunnelIpsKey());
261 return Collections.emptyList();
265 Thread.sleep(20000L);//TODO remove the sleep by using better itm api to detect finish of prev op
266 } catch (InterruptedException e) {
267 LOG.error("Interrupted ");
269 existingDevice.setTunnelIps(new HashSet<>());
270 added(identifier, phySwitchAfter);
276 protected void added(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
277 final PhysicalSwitchAugmentation phySwitchAdded) {
278 String globalNodeId = getManagedByNodeId(identifier);
279 final InstanceIdentifier<Node> globalNodeIid = getManagedByNodeIid(identifier);
280 NodeId nodeId = getNodeId(identifier);
281 if (TUNNEL_IP_NOT_AVAILABLE.test(phySwitchAdded)) {
282 LOG.error("Could not find the /tunnel ips for node {}", nodeId.getValue());
285 final String psName = getPsName(identifier);
286 LOG.trace("Received physical switch {} added event received for node {}", psName, nodeId.getValue());
288 haOpClusteredListener.runAfterNodeIsConnected(globalNodeIid, (node) -> {
289 LOG.trace("Running job for node {} ", globalNodeIid);
290 if (!node.isPresent()) {
291 LOG.error("Global node is absent {}", globalNodeId);
294 HwvtepHAUtil.addToCacheIfHAChildNode(globalNodeIid, node.get(), hwvtepNodeHACache);
295 if (hwvtepNodeHACache.isHAEnabledDevice(globalNodeIid)) {
296 LOG.trace("Ha enabled device {}", globalNodeIid);
299 LOG.trace("Updating cache for node {}", globalNodeIid);
300 L2GatewayDevice l2GwDevice = l2GatewayCache.get(psName);
301 if (childConnectedAfterParent.test(l2GwDevice, globalNodeIid)) {
302 LOG.trace("Device {} {} is already Connected by {}",
303 psName, globalNodeId, l2GwDevice.getHwvtepNodeId());
306 InstanceIdentifier<Node> existingIid = globalNodeIid;
307 if (l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null) {
308 existingIid = HwvtepHAUtil.convertToInstanceIdentifier(l2GwDevice.getHwvtepNodeId());
310 if (parentConnectedAfterChild.test(l2GwDevice, globalNodeIid)
311 && alreadyHasL2Gwids.test(l2GwDevice)) {
312 LOG.error("Child node {} having l2gw configured became ha node "
313 + " removing the l2device {} from all elan cache and provision parent node {}",
314 existingIid, psName, globalNodeIid);
315 ElanL2GwCacheUtils.removeL2GatewayDeviceFromAllElanCache(l2GwDevice.getHwvtepNodeId());
318 l2GwDevice = l2GatewayCache.addOrGet(psName);
319 l2GwDevice.setConnected(true);
320 l2GwDevice.setHwvtepNodeId(globalNodeId);
322 Map<TunnelIpsKey, TunnelIps> tunnelIps = phySwitchAdded.nonnullTunnelIps();
323 if (tunnelIps != null) {
324 for (TunnelIps tunnelIp : tunnelIps.values()) {
325 IpAddress tunnelIpAddr = tunnelIp.getTunnelIpsKey();
326 l2GwDevice.addTunnelIp(tunnelIpAddr);
330 handleAdd(l2GwDevice);
331 elanClusterUtils.runOnlyInOwnerNode("Update config tunnels IP ",
332 () -> updateConfigTunnelIp(identifier, phySwitchAdded));
342 private void handleAdd(L2GatewayDevice l2GwDevice) {
343 final String psName = l2GwDevice.getDeviceName();
344 final String hwvtepNodeId = l2GwDevice.getHwvtepNodeId();
345 Set<IpAddress> tunnelIps = l2GwDevice.getTunnelIps();
346 for (final IpAddress tunnelIpAddr : tunnelIps) {
347 if (L2GatewayConnectionUtils.isGatewayAssociatedToL2Device(l2GwDevice)) {
348 LOG.debug("L2Gateway {} associated for {} physical switch; creating ITM tunnels for {}",
349 l2GwDevice.getL2GatewayIds(), psName, tunnelIpAddr);
350 l2gwServiceProvider.provisionItmAndL2gwConnection(l2GwDevice, psName, hwvtepNodeId, tunnelIpAddr);
352 LOG.info("l2gw.provision.skip {}:{}", hwvtepNodeId, psName);
355 elanClusterUtils.runOnlyInOwnerNode("Stale entry cleanup", () -> {
356 InstanceIdentifier<Node> globalNodeIid = HwvtepSouthboundUtils.createInstanceIdentifier(
357 new NodeId(hwvtepNodeId));
358 InstanceIdentifier<Node> psIid = HwvtepSouthboundUtils.createInstanceIdentifier(
359 HwvtepSouthboundUtils.createManagedNodeId(new NodeId(hwvtepNodeId), psName));
360 staleVlanBindingsCleaner.scheduleStaleCleanup(psName, globalNodeIid, psIid);
370 * @return the node id
372 private static NodeId getNodeId(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
373 return identifier.firstKeyOf(Node.class).getNodeId();
376 private static String getManagedByNodeId(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
377 String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
378 if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
379 return psNodeId.substring(0, psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
385 private static InstanceIdentifier<Node> getManagedByNodeIid(
386 InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
387 String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
388 if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
389 psNodeId = psNodeId.substring(0, psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
390 return identifier.firstIdentifierOf(Topology.class).child(Node.class, new NodeKey(new NodeId(psNodeId)));
396 private static String getPsName(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
397 String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
398 if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
399 return psNodeId.substring(psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH) + HwvtepHAUtil.PHYSICALSWITCH
405 private void updateConfigTunnelIp(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
406 PhysicalSwitchAugmentation phySwitchAdded) {
407 if (phySwitchAdded.getTunnelIps() != null) {
408 LoggingFutures.addErrorLogging(
409 txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
410 Optional<PhysicalSwitchAugmentation> existingSwitch = tx.read(identifier).get();
411 PhysicalSwitchAugmentationBuilder psBuilder = new PhysicalSwitchAugmentationBuilder();
412 if (existingSwitch.isPresent()) {
413 psBuilder = new PhysicalSwitchAugmentationBuilder(existingSwitch.get());
415 psBuilder.setTunnelIps(phySwitchAdded.getTunnelIps());
416 tx.mergeParentStructurePut(identifier, psBuilder.build());
417 LOG.trace("Updating config tunnel ips {}", identifier);
418 }), LOG, "Failed to update the config tunnel ips {}", identifier);