NETVIRT-1630 migrate to md-sal APIs
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / listeners / HwvtepPhysicalSwitchListener.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netvirt.elan.l2gw.listeners;
10
11 import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
12
13 import java.util.Collections;
14 import java.util.HashSet;
15 import java.util.List;
16 import java.util.Objects;
17 import java.util.Optional;
18 import java.util.Set;
19 import java.util.function.BiPredicate;
20 import java.util.function.Predicate;
21 import javax.annotation.PreDestroy;
22 import javax.inject.Inject;
23 import javax.inject.Singleton;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.opendaylight.genius.datastoreutils.hwvtep.HwvtepAbstractDataTreeChangeListener;
26 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
27 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
28 import org.opendaylight.genius.mdsalutil.MDSALUtil;
29 import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
30 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
31 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
32 import org.opendaylight.infrautils.utils.concurrent.Executors;
33 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
34 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
35 import org.opendaylight.mdsal.binding.api.DataBroker;
36 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
37 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
38 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
39 import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAOpClusteredListener;
40 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler;
41 import org.opendaylight.netvirt.elan.l2gw.utils.ElanL2GatewayUtils;
42 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils;
43 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayUtils;
44 import org.opendaylight.netvirt.elan.l2gw.utils.L2gwServiceProvider;
45 import org.opendaylight.netvirt.elan.l2gw.utils.StaleVlanBindingsCleaner;
46 import org.opendaylight.netvirt.elan.utils.ElanClusterUtils;
47 import org.opendaylight.netvirt.elanmanager.utils.ElanL2GwCacheUtils;
48 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayCache;
49 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
50 import org.opendaylight.serviceutils.srm.RecoverableListener;
51 import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
52 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.ItmRpcService;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentationBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.physical._switch.attributes.TunnelIps;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
63 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66
67 /**
68  * Listener to handle physical switch updates.
69  */
70 @Singleton
71 public class HwvtepPhysicalSwitchListener
72         extends HwvtepAbstractDataTreeChangeListener<PhysicalSwitchAugmentation, HwvtepPhysicalSwitchListener>
73         implements ClusteredDataTreeChangeListener<PhysicalSwitchAugmentation>, RecoverableListener {
74
75     /** The Constant LOG. */
76     private static final Logger LOG = LoggerFactory.getLogger(HwvtepPhysicalSwitchListener.class);
77
78     private static final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> DEVICE_NOT_CACHED_OR_PARENT_CONNECTED =
79         (l2GatewayDevice, globalIid) -> l2GatewayDevice == null || l2GatewayDevice.getHwvtepNodeId() == null
80                 || !Objects.equals(l2GatewayDevice.getHwvtepNodeId(),
81                         globalIid.firstKeyOf(Node.class).getNodeId().getValue());
82
83     private static final Predicate<PhysicalSwitchAugmentation> TUNNEL_IP_AVAILABLE =
84         phySwitch -> !HwvtepHAUtil.isEmpty(phySwitch.getTunnelIps());
85
86     private static final Predicate<PhysicalSwitchAugmentation> TUNNEL_IP_NOT_AVAILABLE = TUNNEL_IP_AVAILABLE.negate();
87
88     private static final BiPredicate<PhysicalSwitchAugmentation, L2GatewayDevice> TUNNEL_IP_CHANGED =
89         (phySwitchAfter, existingDevice) -> TUNNEL_IP_AVAILABLE.test(phySwitchAfter)
90                 && !Objects.equals(
91                         existingDevice.getTunnelIp(),  phySwitchAfter.getTunnelIps().get(0).getTunnelIpsKey());
92
93     /** The data broker. */
94     private final DataBroker dataBroker;
95     private final ManagedNewTransactionRunner txRunner;
96
97     /** The itm rpc service. */
98     private final ItmRpcService itmRpcService;
99
100     private final ElanClusterUtils elanClusterUtils;
101
102     private final HwvtepNodeHACache hwvtepNodeHACache;
103
104     private final L2gwServiceProvider l2gwServiceProvider;
105
106     private final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> childConnectedAfterParent;
107
108     private final Predicate<L2GatewayDevice> alreadyHasL2Gwids =
109         (l2GwDevice) -> l2GwDevice != null && HwvtepHAUtil.isEmpty(l2GwDevice.getL2GatewayIds());
110
111     private final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> parentConnectedAfterChild;
112     private final HAOpClusteredListener haOpClusteredListener;
113
114     private final L2GatewayCache l2GatewayCache;
115
116     private final StaleVlanBindingsCleaner staleVlanBindingsCleaner;
117
118     /**
119      * Instantiates a new hwvtep physical switch listener.
120      */
121     @Inject
122     public HwvtepPhysicalSwitchListener(final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler,
123                                         final ServiceRecoveryRegistry serviceRecoveryRegistry,
124                                         final DataBroker dataBroker, ItmRpcService itmRpcService,
125                                         ElanClusterUtils elanClusterUtils, L2gwServiceProvider l2gwServiceProvider,
126                                         HAOpClusteredListener haListener, L2GatewayCache l2GatewayCache,
127                                         StaleVlanBindingsCleaner staleVlanBindingsCleaner,
128                                         HwvtepNodeHACache hwvtepNodeHACache) {
129         super(dataBroker,  DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL,
130                 InstanceIdentifier.create(NetworkTopology.class)
131                 .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID)).child(Node.class)
132                 .augmentation(PhysicalSwitchAugmentation.class)),
133                 Executors.newListeningSingleThreadExecutor("HwvtepPhysicalSwitchListener", LOG),
134                 hwvtepNodeHACache);
135         this.dataBroker = dataBroker;
136         this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
137         this.itmRpcService = itmRpcService;
138         this.elanClusterUtils = elanClusterUtils;
139         this.l2gwServiceProvider = l2gwServiceProvider;
140         this.staleVlanBindingsCleaner = staleVlanBindingsCleaner;
141         this.haOpClusteredListener = haListener;
142         this.l2GatewayCache = l2GatewayCache;
143         this.hwvtepNodeHACache = hwvtepNodeHACache;
144
145         childConnectedAfterParent = (l2GwDevice, globalIid) -> {
146             return !hwvtepNodeHACache.isHAParentNode(globalIid)
147                     && l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null
148                     && !Objects.equals(l2GwDevice.getHwvtepNodeId(), globalIid.firstKeyOf(Node.class)
149                     .getNodeId().getValue());
150         };
151
152         parentConnectedAfterChild = (l2GwDevice, globalIid) -> {
153             InstanceIdentifier<Node> existingIid = globalIid;
154             if (l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null) {
155                 existingIid = HwvtepHAUtil.convertToInstanceIdentifier(l2GwDevice.getHwvtepNodeId());
156             }
157             return hwvtepNodeHACache.isHAParentNode(globalIid)
158                     && l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null
159                     && !Objects.equals(l2GwDevice.getHwvtepNodeId(), globalIid.firstKeyOf(Node.class)
160                     .getNodeId().getValue())
161                     && Objects.equals(globalIid, hwvtepNodeHACache.getParent(existingIid));
162         };
163
164         serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(),
165                 this);
166     }
167
168     public void init() {
169         registerListener();
170     }
171
172     @Override
173     public void registerListener() {
174         super.register();
175         LOG.info("Registering HwvtepPhysicalSwitchListener");
176     }
177
178     public void deregisterListener() {
179         super.close();
180         LOG.info("Deregistering HwvtepPhysicalSwitchListener");
181     }
182
183     @Override
184     @PreDestroy
185     public void close() {
186         super.close();
187         Executors.shutdownAndAwaitTermination(getExecutorService());
188     }
189
190     @Override
191     protected void removed(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
192             PhysicalSwitchAugmentation phySwitchDeleted) {
193         NodeId nodeId = getNodeId(identifier);
194         String psName = phySwitchDeleted.getHwvtepNodeName().getValue();
195         LOG.info("Received physical switch {} removed event for node {}", psName, nodeId.getValue());
196
197         L2GatewayDevice l2GwDevice = l2GatewayCache.get(psName);
198         if (l2GwDevice != null) {
199             if (!L2GatewayConnectionUtils.isGatewayAssociatedToL2Device(l2GwDevice)) {
200                 l2GatewayCache.remove(psName);
201                 LOG.debug("{} details removed from L2Gateway Cache", psName);
202                 MDSALUtil.syncDelete(this.dataBroker, LogicalDatastoreType.CONFIGURATION,
203                         HwvtepSouthboundUtils.createInstanceIdentifier(nodeId));
204             } else {
205                 LOG.debug("{} details are not removed from L2Gateway Cache as it has L2Gateway reference", psName);
206             }
207
208             l2GwDevice.setConnected(false);
209             //ElanL2GwCacheUtils.removeL2GatewayDeviceFromAllElanCache(psName);
210         } else {
211             LOG.error("Unable to find L2 Gateway details for {}", psName);
212         }
213     }
214
215     /**
216      * Upon update checks if the tunnels Ip was null earlier and it got newly added.
217      * In that case simply call add.
218      * If not then check if Tunnel Ip has been updated from an old value to new value.
219      * If yes. delete old ITM tunnels of odl Tunnel Ipand add new ITM tunnels with new Tunnel
220      * IP then call added ().
221      *
222      * @param identifier iid
223      * @param phySwitchBefore ps Node before update
224      * @param phySwitchAfter ps Node after update
225      */
226     @Override
227     protected void updated(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
228             PhysicalSwitchAugmentation phySwitchBefore, PhysicalSwitchAugmentation phySwitchAfter) {
229         NodeId nodeId = getNodeId(identifier);
230         LOG.trace("Received PhysicalSwitch Update Event for node {}: PhysicalSwitch Before: {}, "
231                 + "PhysicalSwitch After: {}", nodeId.getValue(), phySwitchBefore, phySwitchAfter);
232         String psName = getPsName(identifier);
233         if (psName == null) {
234             LOG.error("Could not find the physical switch name for node {}", nodeId.getValue());
235             return;
236         }
237         L2GatewayDevice existingDevice = l2GatewayCache.get(psName);
238         LOG.info("Received physical switch {} update event for node {}", psName, nodeId.getValue());
239         InstanceIdentifier<Node> globalNodeIid = getManagedByNodeIid(identifier);
240
241         if (DEVICE_NOT_CACHED_OR_PARENT_CONNECTED.test(existingDevice, globalNodeIid)) {
242             if (TUNNEL_IP_AVAILABLE.test(phySwitchAfter)) {
243                 added(identifier, phySwitchAfter);
244             }
245         } else {
246             if (!Objects.equals(phySwitchAfter.getTunnelIps(), phySwitchBefore.getTunnelIps())
247                     && TUNNEL_IP_CHANGED.test(phySwitchAfter, existingDevice)) {
248
249                 final String hwvtepId = existingDevice.getHwvtepNodeId();
250                 elanClusterUtils.runOnlyInOwnerNode(existingDevice.getDeviceName(),
251                     "handling Physical Switch add create itm tunnels ",
252                     () -> {
253                         LOG.info("Deleting itm tunnels for device {}", existingDevice.getDeviceName());
254                         L2GatewayUtils.deleteItmTunnels(itmRpcService, hwvtepId,
255                                 existingDevice.getDeviceName(), existingDevice.getTunnelIp());
256                         Thread.sleep(10000L);//TODO remove these sleeps
257                         LOG.info("Creating itm tunnels for device {}", existingDevice.getDeviceName());
258                         ElanL2GatewayUtils.createItmTunnels(dataBroker, itmRpcService, hwvtepId, psName,
259                                 phySwitchAfter.getTunnelIps().get(0).getTunnelIpsKey());
260                         return Collections.emptyList();
261                     }
262                 );
263                 try {
264                     Thread.sleep(20000L);//TODO remove the sleep by using better itm api to detect finish of prev op
265                 } catch (InterruptedException e) {
266                     LOG.error("Interrupted ");
267                 }
268                 existingDevice.setTunnelIps(new HashSet<>());
269                 added(identifier, phySwitchAfter);
270             }
271         }
272     }
273
274     @Override
275     protected void added(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
276                          final PhysicalSwitchAugmentation phySwitchAdded) {
277         String globalNodeId = getManagedByNodeId(identifier);
278         final InstanceIdentifier<Node> globalNodeIid = getManagedByNodeIid(identifier);
279         NodeId nodeId = getNodeId(identifier);
280         if (TUNNEL_IP_NOT_AVAILABLE.test(phySwitchAdded)) {
281             LOG.error("Could not find the /tunnel ips for node {}", nodeId.getValue());
282             return;
283         }
284         final String psName = getPsName(identifier);
285         LOG.trace("Received physical switch {} added event received for node {}", psName, nodeId.getValue());
286
287         haOpClusteredListener.runAfterNodeIsConnected(globalNodeIid, (node) -> {
288             LOG.trace("Running job for node {} ", globalNodeIid);
289             if (!node.isPresent()) {
290                 LOG.error("Global node is absent {}", globalNodeId);
291                 return;
292             }
293             HwvtepHAUtil.addToCacheIfHAChildNode(globalNodeIid, node.get(), hwvtepNodeHACache);
294             if (hwvtepNodeHACache.isHAEnabledDevice(globalNodeIid)) {
295                 LOG.trace("Ha enabled device {}", globalNodeIid);
296                 return;
297             }
298             LOG.trace("Updating cache for node {}", globalNodeIid);
299             L2GatewayDevice l2GwDevice = l2GatewayCache.get(psName);
300             if (childConnectedAfterParent.test(l2GwDevice, globalNodeIid)) {
301                 LOG.trace("Device {} {} is already Connected by {}",
302                         psName, globalNodeId, l2GwDevice.getHwvtepNodeId());
303                 return;
304             }
305             InstanceIdentifier<Node> existingIid = globalNodeIid;
306             if (l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null) {
307                 existingIid = HwvtepHAUtil.convertToInstanceIdentifier(l2GwDevice.getHwvtepNodeId());
308             }
309             if (parentConnectedAfterChild.test(l2GwDevice, globalNodeIid)
310                     && alreadyHasL2Gwids.test(l2GwDevice)) {
311                 LOG.error("Child node {} having l2gw configured became ha node "
312                                 + " removing the l2device {} from all elan cache and provision parent node {}",
313                           existingIid, psName, globalNodeIid);
314                 ElanL2GwCacheUtils.removeL2GatewayDeviceFromAllElanCache(l2GwDevice.getHwvtepNodeId());
315             }
316
317             l2GwDevice = l2GatewayCache.addOrGet(psName);
318             l2GwDevice.setConnected(true);
319             l2GwDevice.setHwvtepNodeId(globalNodeId);
320
321             List<TunnelIps> tunnelIps = phySwitchAdded.getTunnelIps();
322             if (tunnelIps != null) {
323                 for (TunnelIps tunnelIp : tunnelIps) {
324                     IpAddress tunnelIpAddr = tunnelIp.getTunnelIpsKey();
325                     l2GwDevice.addTunnelIp(tunnelIpAddr);
326                 }
327             }
328
329             handleAdd(l2GwDevice);
330             elanClusterUtils.runOnlyInOwnerNode("Update config tunnels IP ",
331                 () -> updateConfigTunnelIp(identifier, phySwitchAdded));
332         });
333     }
334
335     /**
336      * Handle add.
337      *
338      * @param l2GwDevice
339      *            the l2 gw device
340      */
341     private void handleAdd(L2GatewayDevice l2GwDevice) {
342         final String psName = l2GwDevice.getDeviceName();
343         final String hwvtepNodeId = l2GwDevice.getHwvtepNodeId();
344         Set<IpAddress> tunnelIps = l2GwDevice.getTunnelIps();
345         for (final IpAddress tunnelIpAddr : tunnelIps) {
346             if (L2GatewayConnectionUtils.isGatewayAssociatedToL2Device(l2GwDevice)) {
347                 LOG.debug("L2Gateway {} associated for {} physical switch; creating ITM tunnels for {}",
348                         l2GwDevice.getL2GatewayIds(), psName, tunnelIpAddr);
349                 l2gwServiceProvider.provisionItmAndL2gwConnection(l2GwDevice, psName, hwvtepNodeId, tunnelIpAddr);
350             } else {
351                 LOG.info("l2gw.provision.skip {}:{}", hwvtepNodeId, psName);
352             }
353         }
354         elanClusterUtils.runOnlyInOwnerNode("Stale entry cleanup", () -> {
355             InstanceIdentifier<Node> globalNodeIid = HwvtepSouthboundUtils.createInstanceIdentifier(
356                     new NodeId(hwvtepNodeId));
357             InstanceIdentifier<Node> psIid = HwvtepSouthboundUtils.createInstanceIdentifier(
358                     HwvtepSouthboundUtils.createManagedNodeId(new NodeId(hwvtepNodeId), psName));
359             staleVlanBindingsCleaner.scheduleStaleCleanup(psName, globalNodeIid, psIid);
360         });
361     }
362
363
364     /**
365      * Gets the node id.
366      *
367      * @param identifier
368      *            the identifier
369      * @return the node id
370      */
371     private static NodeId getNodeId(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
372         return identifier.firstKeyOf(Node.class).getNodeId();
373     }
374
375     private static String getManagedByNodeId(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
376         String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
377         if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
378             return psNodeId.substring(0, psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
379         }
380         return psNodeId;
381     }
382
383     @Nullable
384     private static InstanceIdentifier<Node> getManagedByNodeIid(
385             InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
386         String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
387         if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
388             psNodeId = psNodeId.substring(0, psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
389             return identifier.firstIdentifierOf(Topology.class).child(Node.class, new NodeKey(new NodeId(psNodeId)));
390         }
391         return null;
392     }
393
394     @Nullable
395     private static String getPsName(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
396         String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
397         if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
398             return psNodeId.substring(psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH) + HwvtepHAUtil.PHYSICALSWITCH
399                     .length());
400         }
401         return null;
402     }
403
404     private void updateConfigTunnelIp(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
405                                       PhysicalSwitchAugmentation phySwitchAdded) {
406         if (phySwitchAdded.getTunnelIps() != null) {
407             LoggingFutures.addErrorLogging(
408                 txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
409                     Optional<PhysicalSwitchAugmentation> existingSwitch = tx.read(identifier).get();
410                     PhysicalSwitchAugmentationBuilder psBuilder = new PhysicalSwitchAugmentationBuilder();
411                     if (existingSwitch.isPresent()) {
412                         psBuilder = new PhysicalSwitchAugmentationBuilder(existingSwitch.get());
413                     }
414                     psBuilder.setTunnelIps(phySwitchAdded.getTunnelIps());
415                     tx.put(identifier, psBuilder.build(), true);
416                     LOG.trace("Updating config tunnel ips {}", identifier);
417                 }), LOG, "Failed to update the config tunnel ips {}", identifier);
418         }
419     }
420 }