2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netvirt.elan.l2gw.listeners;
11 import java.util.Collections;
12 import java.util.HashSet;
13 import java.util.Objects;
15 import java.util.concurrent.ExecutionException;
16 import java.util.function.BiPredicate;
17 import java.util.function.Predicate;
18 import javax.annotation.PostConstruct;
19 import javax.inject.Inject;
20 import javax.inject.Singleton;
21 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
22 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
23 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
24 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
25 import org.opendaylight.genius.datastoreutils.hwvtep.HwvtepAbstractDataTreeChangeListener;
26 import org.opendaylight.genius.mdsalutil.MDSALUtil;
27 import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
28 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
29 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
30 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
31 import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAOpClusteredListener;
32 import org.opendaylight.netvirt.elan.l2gw.utils.ElanL2GatewayUtils;
33 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils;
34 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayUtils;
35 import org.opendaylight.netvirt.elan.l2gw.utils.L2gwServiceProvider;
36 import org.opendaylight.netvirt.elan.utils.ElanClusterUtils;
37 import org.opendaylight.netvirt.elanmanager.utils.ElanL2GwCacheUtils;
38 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
39 import org.opendaylight.netvirt.neutronvpn.api.l2gw.utils.L2GatewayCacheUtils;
40 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.ItmRpcService;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
49 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
54 * Listener to handle physical switch updates.
57 public class HwvtepPhysicalSwitchListener
58 extends HwvtepAbstractDataTreeChangeListener<PhysicalSwitchAugmentation, HwvtepPhysicalSwitchListener>
59 implements ClusteredDataTreeChangeListener<PhysicalSwitchAugmentation> {
61 /** The Constant LOG. */
62 private static final Logger LOG = LoggerFactory.getLogger(HwvtepPhysicalSwitchListener.class);
64 private static final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> DEVICE_NOT_CACHED_OR_PARENT_CONNECTED =
65 (l2GatewayDevice, globalIid) -> {
66 return l2GatewayDevice == null || l2GatewayDevice.getHwvtepNodeId() == null
67 || !Objects.equals(l2GatewayDevice.getHwvtepNodeId(),
68 globalIid.firstKeyOf(Node.class).getNodeId().getValue());
71 private static final Predicate<PhysicalSwitchAugmentation> TUNNEL_IP_AVAILABLE =
72 phySwitch -> !HwvtepHAUtil.isEmpty(phySwitch.getTunnelIps());
74 private static final Predicate<PhysicalSwitchAugmentation> TUNNEL_IP_NOT_AVAILABLE = TUNNEL_IP_AVAILABLE.negate();
76 private static final BiPredicate<PhysicalSwitchAugmentation, L2GatewayDevice> TUNNEL_IP_CHANGED =
77 (phySwitchAfter, existingDevice) -> {
78 return TUNNEL_IP_AVAILABLE.test(phySwitchAfter)
80 existingDevice.getTunnelIp(), phySwitchAfter.getTunnelIps().get(0).getTunnelIpsKey());
83 /** The data broker. */
84 private final DataBroker dataBroker;
86 /** The itm rpc service. */
87 private final ItmRpcService itmRpcService;
89 private final ElanClusterUtils elanClusterUtils;
91 private final HwvtepHACache hwvtepHACache = HwvtepHACache.getInstance();
93 private final L2gwServiceProvider l2gwServiceProvider;
95 private final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> childConnectedAfterParent =
96 (l2GwDevice, globalIid) -> {
97 return !hwvtepHACache.isHAParentNode(globalIid)
98 && l2GwDevice != null;
99 // FIXME: The following call to equals compares different types (String and InstanceIdentifier) and
100 // thus will always return false. I don't know what the intention is here so commented out for now.
101 //&& !Objects.equals(l2GwDevice.getHwvtepNodeId(), globalIid);
104 private final Predicate<L2GatewayDevice> alreadyHasL2Gwids =
106 return l2GwDevice != null && HwvtepHAUtil.isEmpty(l2GwDevice.getL2GatewayIds());
109 private final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> parentConnectedAfterChild =
110 (l2GwDevice, globalIid) -> {
111 InstanceIdentifier<Node> existingIid = globalIid;
112 if (l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null) {
113 existingIid = HwvtepHAUtil.convertToInstanceIdentifier(l2GwDevice.getHwvtepNodeId());
115 return hwvtepHACache.isHAParentNode(globalIid)
116 && l2GwDevice != null
117 // FIXME: The following call to equals compares different types (String and InstanceIdentifier) and
118 // thus will always return false. I don't know what the intention is here so commented out for now.
119 //&& !Objects.equals(l2GwDevice.getHwvtepNodeId(), globalIid)
120 && Objects.equals(globalIid, hwvtepHACache.getParent(existingIid));
124 private final HAOpClusteredListener haOpClusteredListener;
127 * Instantiates a new hwvtep physical switch listener.
130 public HwvtepPhysicalSwitchListener(final DataBroker dataBroker, ItmRpcService itmRpcService,
131 ElanClusterUtils elanClusterUtils, L2gwServiceProvider l2gwServiceProvider,
132 HAOpClusteredListener haListener) {
133 super(PhysicalSwitchAugmentation.class, HwvtepPhysicalSwitchListener.class);
134 this.dataBroker = dataBroker;
135 this.itmRpcService = itmRpcService;
136 this.elanClusterUtils = elanClusterUtils;
137 this.l2gwServiceProvider = l2gwServiceProvider;
138 this.haOpClusteredListener = haListener;
144 registerListener(LogicalDatastoreType.OPERATIONAL, dataBroker);
148 protected InstanceIdentifier<PhysicalSwitchAugmentation> getWildCardPath() {
149 return InstanceIdentifier.create(NetworkTopology.class)
150 .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID)).child(Node.class)
151 .augmentation(PhysicalSwitchAugmentation.class);
155 protected HwvtepPhysicalSwitchListener getDataTreeChangeListener() {
156 return HwvtepPhysicalSwitchListener.this;
160 protected void removed(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
161 PhysicalSwitchAugmentation phySwitchDeleted) {
162 NodeId nodeId = getNodeId(identifier);
163 String psName = phySwitchDeleted.getHwvtepNodeName().getValue();
164 LOG.info("Received physical switch {} removed event for node {}", psName, nodeId.getValue());
166 L2GatewayDevice l2GwDevice = L2GatewayCacheUtils.getL2DeviceFromCache(psName);
167 if (l2GwDevice != null) {
168 if (!L2GatewayConnectionUtils.isGatewayAssociatedToL2Device(l2GwDevice)) {
169 L2GatewayCacheUtils.removeL2DeviceFromCache(psName);
170 LOG.debug("{} details removed from L2Gateway Cache", psName);
171 MDSALUtil.syncDelete(this.dataBroker, LogicalDatastoreType.CONFIGURATION,
172 HwvtepSouthboundUtils.createInstanceIdentifier(nodeId));
174 LOG.debug("{} details are not removed from L2Gateway Cache as it has L2Gateway reference", psName);
177 l2GwDevice.setConnected(false);
178 //ElanL2GwCacheUtils.removeL2GatewayDeviceFromAllElanCache(psName);
180 LOG.error("Unable to find L2 Gateway details for {}", psName);
185 * Upon update checks if the tunnels Ip was null earlier and it got newly added.
186 * In that case simply call add.
187 * If not then check if Tunnel Ip has been updated from an old value to new value.
188 * If yes. delete old ITM tunnels of odl Tunnel Ipand add new ITM tunnels with new Tunnel
189 * IP then call added ().
191 * @param identifier iid
192 * @param phySwitchBefore ps Node before update
193 * @param phySwitchAfter ps Node after update
196 protected void updated(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
197 PhysicalSwitchAugmentation phySwitchBefore, PhysicalSwitchAugmentation phySwitchAfter) {
198 NodeId nodeId = getNodeId(identifier);
199 LOG.trace("Received PhysicalSwitch Update Event for node {}: PhysicalSwitch Before: {}, "
200 + "PhysicalSwitch After: {}", nodeId.getValue(), phySwitchBefore, phySwitchAfter);
201 String psName = getPsName(identifier);
202 if (psName == null) {
203 LOG.error("Could not find the physical switch name for node {}", nodeId.getValue());
206 L2GatewayDevice existingDevice = L2GatewayCacheUtils.getL2DeviceFromCache(psName);
207 LOG.info("Received physical switch {} update event for node {}", psName, nodeId.getValue());
208 InstanceIdentifier<Node> globalNodeIid = getManagedByNodeIid(identifier);
210 if (DEVICE_NOT_CACHED_OR_PARENT_CONNECTED.test(existingDevice, globalNodeIid)) {
211 if (TUNNEL_IP_AVAILABLE.test(phySwitchAfter)) {
212 added(identifier, phySwitchAfter);
215 if (!Objects.equals(phySwitchAfter.getTunnelIps(), phySwitchBefore.getTunnelIps())
216 && TUNNEL_IP_CHANGED.test(phySwitchAfter, existingDevice)) {
218 final String hwvtepId = existingDevice.getHwvtepNodeId();
219 elanClusterUtils.runOnlyInOwnerNode(existingDevice.getDeviceName(),
220 "handling Physical Switch add create itm tunnels ",
222 LOG.info("Deleting itm tunnels for device {}", existingDevice.getDeviceName());
223 L2GatewayUtils.deleteItmTunnels(itmRpcService, hwvtepId,
224 existingDevice.getDeviceName(), existingDevice.getTunnelIp());
225 Thread.sleep(10000L);//TODO remove these sleeps
226 LOG.info("Creating itm tunnels for device {}", existingDevice.getDeviceName());
227 ElanL2GatewayUtils.createItmTunnels(itmRpcService, hwvtepId, psName,
228 phySwitchAfter.getTunnelIps().get(0).getTunnelIpsKey());
229 return Collections.emptyList();
233 Thread.sleep(20000L);//TODO remove the sleep by using better itm api to detect finish of prev op
234 } catch (InterruptedException e) {
235 LOG.error("Interrupted ");
237 existingDevice.setTunnelIps(new HashSet<>());
238 added(identifier, phySwitchAfter);
244 protected void added(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
245 final PhysicalSwitchAugmentation phySwitchAdded) {
246 String globalNodeId = getManagedByNodeId(identifier);
247 final InstanceIdentifier<Node> globalNodeIid = getManagedByNodeIid(identifier);
248 NodeId nodeId = getNodeId(identifier);
249 if (TUNNEL_IP_NOT_AVAILABLE.test(phySwitchAdded)) {
250 LOG.error("Could not find the /tunnel ips for node {}", nodeId.getValue());
253 final String psName = getPsName(identifier);
254 LOG.trace("Received physical switch {} added event received for node {}", psName, nodeId.getValue());
256 haOpClusteredListener.runAfterNodeIsConnected(globalNodeIid, (node) -> {
257 LOG.trace("Running job for node {} ", globalNodeIid);
258 if (!node.isPresent()) {
259 LOG.error("Global node is absent {}", globalNodeId);
262 HAOpClusteredListener.addToCacheIfHAChildNode(globalNodeIid, node.get());
263 if (hwvtepHACache.isHAEnabledDevice(globalNodeIid)) {
264 LOG.trace("Ha enabled device {}", globalNodeIid);
267 LOG.trace("Updating cache for node {}", globalNodeIid);
268 L2GatewayDevice l2GwDevice = L2GatewayCacheUtils.getL2DeviceFromCache(psName);
269 if (childConnectedAfterParent.test(l2GwDevice, globalNodeIid)) {
270 LOG.trace("Device {} {} is already Connected by ",
271 psName, globalNodeId, l2GwDevice.getHwvtepNodeId());
274 InstanceIdentifier<Node> existingIid = globalNodeIid;
275 if (l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null) {
276 existingIid = HwvtepHAUtil.convertToInstanceIdentifier(l2GwDevice.getHwvtepNodeId());
278 if (parentConnectedAfterChild.test(l2GwDevice, globalNodeIid)
279 && alreadyHasL2Gwids.test(l2GwDevice)) {
280 LOG.error("Child node {} having l2gw configured became ha node "
281 + " removing the l2device {} from all elan cache and provision parent node {}",
282 existingIid, psName, globalNodeIid);
283 ElanL2GwCacheUtils.removeL2GatewayDeviceFromAllElanCache(l2GwDevice.getHwvtepNodeId());
285 l2GwDevice = L2GatewayCacheUtils.updateL2GatewayCache(
286 psName, globalNodeId, phySwitchAdded.getTunnelIps());
287 handleAdd(l2GwDevice);
292 boolean updateHACacheIfHANode(DataBroker broker, InstanceIdentifier<Node> globalNodeId)
293 throws ExecutionException, InterruptedException {
294 ReadWriteTransaction transaction = broker.newReadWriteTransaction();
295 Node node = transaction.read(LogicalDatastoreType.OPERATIONAL, globalNodeId).get().get();
296 HAOpClusteredListener.addToCacheIfHAChildNode(globalNodeId, node);
297 return hwvtepHACache.isHAEnabledDevice(globalNodeId);
306 private void handleAdd(L2GatewayDevice l2GwDevice) {
307 final String psName = l2GwDevice.getDeviceName();
308 final String hwvtepNodeId = l2GwDevice.getHwvtepNodeId();
309 Set<IpAddress> tunnelIps = l2GwDevice.getTunnelIps();
310 for (final IpAddress tunnelIpAddr : tunnelIps) {
311 if (L2GatewayConnectionUtils.isGatewayAssociatedToL2Device(l2GwDevice)) {
312 LOG.debug("L2Gateway {} associated for {} physical switch; creating ITM tunnels for {}",
313 l2GwDevice.getL2GatewayIds(), psName, tunnelIpAddr);
314 l2gwServiceProvider.provisionItmAndL2gwConnection(l2GwDevice, psName, hwvtepNodeId, tunnelIpAddr);
316 LOG.info("l2gw.provision.skip {}", hwvtepNodeId, psName);
327 * @return the node id
329 private NodeId getNodeId(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
330 return identifier.firstKeyOf(Node.class).getNodeId();
333 private String getManagedByNodeId(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
334 String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
335 if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
336 return psNodeId.substring(0, psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
341 private InstanceIdentifier<Node> getManagedByNodeIid(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
342 String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
343 if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
344 psNodeId = psNodeId.substring(0, psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
345 return identifier.firstIdentifierOf(Topology.class).child(Node.class, new NodeKey(new NodeId(psNodeId)));
350 private String getPsName(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
351 String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
352 if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
353 return psNodeId.substring(psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH) + HwvtepHAUtil.PHYSICALSWITCH