Bug 8998 - stale l2gw connection cleanup
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / listeners / HwvtepPhysicalSwitchListener.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netvirt.elan.l2gw.listeners;
10
11 import java.util.Collections;
12 import java.util.HashSet;
13 import java.util.List;
14 import java.util.Objects;
15 import java.util.Set;
16 import java.util.concurrent.ExecutionException;
17 import java.util.function.BiPredicate;
18 import java.util.function.Predicate;
19 import javax.annotation.PostConstruct;
20 import javax.inject.Inject;
21 import javax.inject.Singleton;
22
23 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
24 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
25 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
26 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
27 import org.opendaylight.genius.datastoreutils.hwvtep.HwvtepAbstractDataTreeChangeListener;
28 import org.opendaylight.genius.mdsalutil.MDSALUtil;
29 import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
30 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
31 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
32 import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
33 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
34 import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAOpClusteredListener;
35 import org.opendaylight.netvirt.elan.l2gw.utils.ElanL2GatewayUtils;
36 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils;
37 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayUtils;
38 import org.opendaylight.netvirt.elan.l2gw.utils.L2gwServiceProvider;
39 import org.opendaylight.netvirt.elan.l2gw.utils.StaleVlanBindingsCleaner;
40 import org.opendaylight.netvirt.elan.utils.ElanClusterUtils;
41 import org.opendaylight.netvirt.elanmanager.utils.ElanL2GwCacheUtils;
42 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayCache;
43 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
44 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.ItmRpcService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentationBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.physical._switch.attributes.TunnelIps;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
54 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
55 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 /**
60  * Listener to handle physical switch updates.
61  */
62 @Singleton
63 public class HwvtepPhysicalSwitchListener
64         extends HwvtepAbstractDataTreeChangeListener<PhysicalSwitchAugmentation, HwvtepPhysicalSwitchListener>
65         implements ClusteredDataTreeChangeListener<PhysicalSwitchAugmentation> {
66
67     /** The Constant LOG. */
68     private static final Logger LOG = LoggerFactory.getLogger(HwvtepPhysicalSwitchListener.class);
69
70     private static final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> DEVICE_NOT_CACHED_OR_PARENT_CONNECTED =
71         (l2GatewayDevice, globalIid) -> {
72             return l2GatewayDevice == null || l2GatewayDevice.getHwvtepNodeId() == null
73                     || !Objects.equals(l2GatewayDevice.getHwvtepNodeId(),
74                             globalIid.firstKeyOf(Node.class).getNodeId().getValue());
75         };
76
77     private static final Predicate<PhysicalSwitchAugmentation> TUNNEL_IP_AVAILABLE =
78         phySwitch -> !HwvtepHAUtil.isEmpty(phySwitch.getTunnelIps());
79
80     private static final Predicate<PhysicalSwitchAugmentation> TUNNEL_IP_NOT_AVAILABLE = TUNNEL_IP_AVAILABLE.negate();
81
82     private static final BiPredicate<PhysicalSwitchAugmentation, L2GatewayDevice> TUNNEL_IP_CHANGED =
83         (phySwitchAfter, existingDevice) -> {
84             return TUNNEL_IP_AVAILABLE.test(phySwitchAfter)
85                     && !Objects.equals(
86                             existingDevice.getTunnelIp(),  phySwitchAfter.getTunnelIps().get(0).getTunnelIpsKey());
87         };
88
89     /** The data broker. */
90     private final DataBroker dataBroker;
91
92     /** The itm rpc service. */
93     private final ItmRpcService itmRpcService;
94
95     private final ElanClusterUtils elanClusterUtils;
96
97     private final HwvtepHACache hwvtepHACache = HwvtepHACache.getInstance();
98
99     private final L2gwServiceProvider l2gwServiceProvider;
100
101     private final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> childConnectedAfterParent =
102         (l2GwDevice, globalIid) -> {
103             return !hwvtepHACache.isHAParentNode(globalIid)
104                     && l2GwDevice != null;
105                     // FIXME: The following call to equals compares different types (String and InstanceIdentifier) and
106                     // thus will always return false. I don't know what the intention is here so commented out for now.
107                     //&& !Objects.equals(l2GwDevice.getHwvtepNodeId(), globalIid);
108         };
109
110     private final Predicate<L2GatewayDevice> alreadyHasL2Gwids =
111         (l2GwDevice) -> {
112             return l2GwDevice != null && HwvtepHAUtil.isEmpty(l2GwDevice.getL2GatewayIds());
113         };
114
115     private final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> parentConnectedAfterChild =
116         (l2GwDevice, globalIid) -> {
117             InstanceIdentifier<Node> existingIid = globalIid;
118             if (l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null) {
119                 existingIid = HwvtepHAUtil.convertToInstanceIdentifier(l2GwDevice.getHwvtepNodeId());
120             }
121             return hwvtepHACache.isHAParentNode(globalIid)
122                     && l2GwDevice != null
123                     // FIXME: The following call to equals compares different types (String and InstanceIdentifier) and
124                     // thus will always return false. I don't know what the intention is here so commented out for now.
125                     //&& !Objects.equals(l2GwDevice.getHwvtepNodeId(), globalIid)
126                     && Objects.equals(globalIid, hwvtepHACache.getParent(existingIid));
127         };
128
129
130     private final HAOpClusteredListener haOpClusteredListener;
131
132     private final L2GatewayCache l2GatewayCache;
133
134     private final StaleVlanBindingsCleaner staleVlanBindingsCleaner;
135
136     /**
137      * Instantiates a new hwvtep physical switch listener.
138      * @param dataBroker DataBroker
139      * @param itmRpcService ItmRpcService
140      * @param elanClusterUtils ElanClusterUtils
141      * @param l2gwServiceProvider L2gwServiceProvider
142      * @param haListener HAOpClusteredListener
143      * @param l2GatewayCache L2GatewayCache
144      * @param staleVlanBindingsCleaner StaleVlanBindingsCleaner
145      */
146     @Inject
147     public HwvtepPhysicalSwitchListener(final DataBroker dataBroker, ItmRpcService itmRpcService,
148             ElanClusterUtils elanClusterUtils, L2gwServiceProvider l2gwServiceProvider,
149             HAOpClusteredListener haListener, L2GatewayCache l2GatewayCache,
150             StaleVlanBindingsCleaner staleVlanBindingsCleaner) {
151         super(PhysicalSwitchAugmentation.class, HwvtepPhysicalSwitchListener.class);
152         this.dataBroker = dataBroker;
153         this.itmRpcService = itmRpcService;
154         this.elanClusterUtils = elanClusterUtils;
155         this.l2gwServiceProvider = l2gwServiceProvider;
156         this.staleVlanBindingsCleaner = staleVlanBindingsCleaner;
157         this.haOpClusteredListener = haListener;
158         this.l2GatewayCache = l2GatewayCache;
159     }
160
161     @Override
162     @PostConstruct
163     public void init() {
164         registerListener(LogicalDatastoreType.OPERATIONAL, dataBroker);
165     }
166
167     @Override
168     protected InstanceIdentifier<PhysicalSwitchAugmentation> getWildCardPath() {
169         return InstanceIdentifier.create(NetworkTopology.class)
170                 .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID)).child(Node.class)
171                 .augmentation(PhysicalSwitchAugmentation.class);
172     }
173
174     @Override
175     protected HwvtepPhysicalSwitchListener getDataTreeChangeListener() {
176         return HwvtepPhysicalSwitchListener.this;
177     }
178
179     @Override
180     protected void removed(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
181             PhysicalSwitchAugmentation phySwitchDeleted) {
182         NodeId nodeId = getNodeId(identifier);
183         String psName = phySwitchDeleted.getHwvtepNodeName().getValue();
184         LOG.info("Received physical switch {} removed event for node {}", psName, nodeId.getValue());
185
186         L2GatewayDevice l2GwDevice = l2GatewayCache.get(psName);
187         if (l2GwDevice != null) {
188             if (!L2GatewayConnectionUtils.isGatewayAssociatedToL2Device(l2GwDevice)) {
189                 l2GatewayCache.remove(psName);
190                 LOG.debug("{} details removed from L2Gateway Cache", psName);
191                 MDSALUtil.syncDelete(this.dataBroker, LogicalDatastoreType.CONFIGURATION,
192                         HwvtepSouthboundUtils.createInstanceIdentifier(nodeId));
193             } else {
194                 LOG.debug("{} details are not removed from L2Gateway Cache as it has L2Gateway reference", psName);
195             }
196
197             l2GwDevice.setConnected(false);
198             //ElanL2GwCacheUtils.removeL2GatewayDeviceFromAllElanCache(psName);
199         } else {
200             LOG.error("Unable to find L2 Gateway details for {}", psName);
201         }
202     }
203
204     /**
205      * Upon update checks if the tunnels Ip was null earlier and it got newly added.
206      * In that case simply call add.
207      * If not then check if Tunnel Ip has been updated from an old value to new value.
208      * If yes. delete old ITM tunnels of odl Tunnel Ipand add new ITM tunnels with new Tunnel
209      * IP then call added ().
210      *
211      * @param identifier iid
212      * @param phySwitchBefore ps Node before update
213      * @param phySwitchAfter ps Node after update
214      */
215     @Override
216     protected void updated(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
217             PhysicalSwitchAugmentation phySwitchBefore, PhysicalSwitchAugmentation phySwitchAfter) {
218         NodeId nodeId = getNodeId(identifier);
219         LOG.trace("Received PhysicalSwitch Update Event for node {}: PhysicalSwitch Before: {}, "
220                 + "PhysicalSwitch After: {}", nodeId.getValue(), phySwitchBefore, phySwitchAfter);
221         String psName = getPsName(identifier);
222         if (psName == null) {
223             LOG.error("Could not find the physical switch name for node {}", nodeId.getValue());
224             return;
225         }
226         L2GatewayDevice existingDevice = l2GatewayCache.get(psName);
227         LOG.info("Received physical switch {} update event for node {}", psName, nodeId.getValue());
228         InstanceIdentifier<Node> globalNodeIid = getManagedByNodeIid(identifier);
229
230         if (DEVICE_NOT_CACHED_OR_PARENT_CONNECTED.test(existingDevice, globalNodeIid)) {
231             if (TUNNEL_IP_AVAILABLE.test(phySwitchAfter)) {
232                 added(identifier, phySwitchAfter);
233             }
234         } else {
235             if (!Objects.equals(phySwitchAfter.getTunnelIps(), phySwitchBefore.getTunnelIps())
236                     && TUNNEL_IP_CHANGED.test(phySwitchAfter, existingDevice)) {
237
238                 final String hwvtepId = existingDevice.getHwvtepNodeId();
239                 elanClusterUtils.runOnlyInOwnerNode(existingDevice.getDeviceName(),
240                     "handling Physical Switch add create itm tunnels ",
241                     () -> {
242                         LOG.info("Deleting itm tunnels for device {}", existingDevice.getDeviceName());
243                         L2GatewayUtils.deleteItmTunnels(itmRpcService, hwvtepId,
244                                 existingDevice.getDeviceName(), existingDevice.getTunnelIp());
245                         Thread.sleep(10000L);//TODO remove these sleeps
246                         LOG.info("Creating itm tunnels for device {}", existingDevice.getDeviceName());
247                         ElanL2GatewayUtils.createItmTunnels(itmRpcService, hwvtepId, psName,
248                                 phySwitchAfter.getTunnelIps().get(0).getTunnelIpsKey());
249                         return Collections.emptyList();
250                     }
251                 );
252                 try {
253                     Thread.sleep(20000L);//TODO remove the sleep by using better itm api to detect finish of prev op
254                 } catch (InterruptedException e) {
255                     LOG.error("Interrupted ");
256                 }
257                 existingDevice.setTunnelIps(new HashSet<>());
258                 added(identifier, phySwitchAfter);
259             }
260         }
261     }
262
263     @Override
264     protected void added(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
265                          final PhysicalSwitchAugmentation phySwitchAdded) {
266         String globalNodeId = getManagedByNodeId(identifier);
267         final InstanceIdentifier<Node> globalNodeIid = getManagedByNodeIid(identifier);
268         NodeId nodeId = getNodeId(identifier);
269         if (TUNNEL_IP_NOT_AVAILABLE.test(phySwitchAdded)) {
270             LOG.error("Could not find the /tunnel ips for node {}", nodeId.getValue());
271             return;
272         }
273         final String psName = getPsName(identifier);
274         LOG.trace("Received physical switch {} added event received for node {}", psName, nodeId.getValue());
275
276         haOpClusteredListener.runAfterNodeIsConnected(globalNodeIid, (node) -> {
277             LOG.trace("Running job for node {} ", globalNodeIid);
278             if (!node.isPresent()) {
279                 LOG.error("Global node is absent {}", globalNodeId);
280                 return;
281             }
282             HAOpClusteredListener.addToCacheIfHAChildNode(globalNodeIid, node.get());
283             if (hwvtepHACache.isHAEnabledDevice(globalNodeIid)) {
284                 LOG.trace("Ha enabled device {}", globalNodeIid);
285                 return;
286             }
287             LOG.trace("Updating cache for node {}", globalNodeIid);
288             L2GatewayDevice l2GwDevice = l2GatewayCache.get(psName);
289             if (childConnectedAfterParent.test(l2GwDevice, globalNodeIid)) {
290                 LOG.trace("Device {} {} is already Connected by ",
291                         psName, globalNodeId, l2GwDevice.getHwvtepNodeId());
292                 return;
293             }
294             InstanceIdentifier<Node> existingIid = globalNodeIid;
295             if (l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null) {
296                 existingIid = HwvtepHAUtil.convertToInstanceIdentifier(l2GwDevice.getHwvtepNodeId());
297             }
298             if (parentConnectedAfterChild.test(l2GwDevice, globalNodeIid)
299                     && alreadyHasL2Gwids.test(l2GwDevice)) {
300                 LOG.error("Child node {} having l2gw configured became ha node "
301                                 + " removing the l2device {} from all elan cache and provision parent node {}",
302                           existingIid, psName, globalNodeIid);
303                 ElanL2GwCacheUtils.removeL2GatewayDeviceFromAllElanCache(l2GwDevice.getHwvtepNodeId());
304             }
305
306             l2GwDevice = l2GatewayCache.addOrGet(psName);
307             l2GwDevice.setConnected(true);
308             l2GwDevice.setHwvtepNodeId(globalNodeId);
309
310             List<TunnelIps> tunnelIps = phySwitchAdded.getTunnelIps();
311             if (tunnelIps != null) {
312                 for (TunnelIps tunnelIp : tunnelIps) {
313                     IpAddress tunnelIpAddr = tunnelIp.getTunnelIpsKey();
314                     l2GwDevice.addTunnelIp(tunnelIpAddr);
315                 }
316             }
317
318             handleAdd(l2GwDevice);
319             elanClusterUtils.runOnlyInOwnerNode("Update config tunnels IP ", () -> {
320                 updateConfigTunnelIp(identifier, phySwitchAdded);
321             });
322             return;
323         });
324     }
325
326     boolean updateHACacheIfHANode(DataBroker broker, InstanceIdentifier<Node> globalNodeId)
327             throws ExecutionException, InterruptedException {
328         ReadWriteTransaction transaction = broker.newReadWriteTransaction();
329         Node node = transaction.read(LogicalDatastoreType.OPERATIONAL, globalNodeId).get().get();
330         HAOpClusteredListener.addToCacheIfHAChildNode(globalNodeId, node);
331         return hwvtepHACache.isHAEnabledDevice(globalNodeId);
332     }
333
334     /**
335      * Handle add.
336      *
337      * @param l2GwDevice
338      *            the l2 gw device
339      */
340     private void handleAdd(L2GatewayDevice l2GwDevice) {
341         final String psName = l2GwDevice.getDeviceName();
342         final String hwvtepNodeId = l2GwDevice.getHwvtepNodeId();
343         Set<IpAddress> tunnelIps = l2GwDevice.getTunnelIps();
344         for (final IpAddress tunnelIpAddr : tunnelIps) {
345             if (L2GatewayConnectionUtils.isGatewayAssociatedToL2Device(l2GwDevice)) {
346                 LOG.debug("L2Gateway {} associated for {} physical switch; creating ITM tunnels for {}",
347                         l2GwDevice.getL2GatewayIds(), psName, tunnelIpAddr);
348                 l2gwServiceProvider.provisionItmAndL2gwConnection(l2GwDevice, psName, hwvtepNodeId, tunnelIpAddr);
349             } else {
350                 LOG.info("l2gw.provision.skip {}", hwvtepNodeId, psName);
351             }
352         }
353         elanClusterUtils.runOnlyInOwnerNode("Stale entry cleanup", () -> {
354             InstanceIdentifier<Node> globalNodeIid = HwvtepSouthboundUtils.createInstanceIdentifier(
355                     new NodeId(hwvtepNodeId));
356             InstanceIdentifier<Node> psIid = HwvtepSouthboundUtils.createInstanceIdentifier(
357                     HwvtepSouthboundUtils.createManagedNodeId(new NodeId(hwvtepNodeId), psName));
358             staleVlanBindingsCleaner.scheduleStaleCleanup(psName, globalNodeIid, psIid);
359         });
360     }
361
362
363     /**
364      * Gets the node id.
365      *
366      * @param identifier
367      *            the identifier
368      * @return the node id
369      */
370     private NodeId getNodeId(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
371         return identifier.firstKeyOf(Node.class).getNodeId();
372     }
373
374     private String getManagedByNodeId(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
375         String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
376         if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
377             return psNodeId.substring(0, psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
378         }
379         return psNodeId;
380     }
381
382     private InstanceIdentifier<Node> getManagedByNodeIid(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
383         String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
384         if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
385             psNodeId = psNodeId.substring(0, psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
386             return identifier.firstIdentifierOf(Topology.class).child(Node.class, new NodeKey(new NodeId(psNodeId)));
387         }
388         return null;
389     }
390
391     private String getPsName(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
392         String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
393         if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
394             return psNodeId.substring(psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH) + HwvtepHAUtil.PHYSICALSWITCH
395                     .length());
396         }
397         return null;
398     }
399
400     private void updateConfigTunnelIp(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
401                                       PhysicalSwitchAugmentation phySwitchAdded) {
402         if (phySwitchAdded.getTunnelIps() != null) {
403             ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
404             PhysicalSwitchAugmentationBuilder psBuilder = new PhysicalSwitchAugmentationBuilder();
405             psBuilder.setTunnelIps(phySwitchAdded.getTunnelIps());
406             tx.merge(LogicalDatastoreType.CONFIGURATION, identifier, psBuilder.build());
407             LOG.trace("Updating config tunnel ips {}", identifier);
408             ListenableFutures.addErrorLogging(tx.submit(), LOG,
409                     "Failed to update config tunnel ip for iid {}", identifier);
410         }
411     }
412 }