elanmanager: use transaction manager
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / listeners / HwvtepPhysicalSwitchListener.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netvirt.elan.l2gw.listeners;
10
11 import java.util.Collections;
12 import java.util.HashSet;
13 import java.util.List;
14 import java.util.Objects;
15 import java.util.Set;
16 import java.util.concurrent.ExecutionException;
17 import java.util.function.BiPredicate;
18 import java.util.function.Predicate;
19 import javax.annotation.PostConstruct;
20 import javax.inject.Inject;
21 import javax.inject.Singleton;
22
23 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
24 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
25 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
26 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
27 import org.opendaylight.genius.datastoreutils.hwvtep.HwvtepAbstractDataTreeChangeListener;
28 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
29 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
30 import org.opendaylight.genius.mdsalutil.MDSALUtil;
31 import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
32 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
33 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
34 import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
35 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
36 import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAOpClusteredListener;
37 import org.opendaylight.netvirt.elan.l2gw.utils.ElanL2GatewayUtils;
38 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils;
39 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayUtils;
40 import org.opendaylight.netvirt.elan.l2gw.utils.L2gwServiceProvider;
41 import org.opendaylight.netvirt.elan.l2gw.utils.StaleVlanBindingsCleaner;
42 import org.opendaylight.netvirt.elan.utils.ElanClusterUtils;
43 import org.opendaylight.netvirt.elanmanager.utils.ElanL2GwCacheUtils;
44 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayCache;
45 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
46 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.ItmRpcService;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentationBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.physical._switch.attributes.TunnelIps;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
54 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
55 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
56 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
57 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 /**
62  * Listener to handle physical switch updates.
63  */
64 @Singleton
65 public class HwvtepPhysicalSwitchListener
66         extends HwvtepAbstractDataTreeChangeListener<PhysicalSwitchAugmentation, HwvtepPhysicalSwitchListener>
67         implements ClusteredDataTreeChangeListener<PhysicalSwitchAugmentation> {
68
69     /** The Constant LOG. */
70     private static final Logger LOG = LoggerFactory.getLogger(HwvtepPhysicalSwitchListener.class);
71
72     private static final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> DEVICE_NOT_CACHED_OR_PARENT_CONNECTED =
73         (l2GatewayDevice, globalIid) -> {
74             return l2GatewayDevice == null || l2GatewayDevice.getHwvtepNodeId() == null
75                     || !Objects.equals(l2GatewayDevice.getHwvtepNodeId(),
76                             globalIid.firstKeyOf(Node.class).getNodeId().getValue());
77         };
78
79     private static final Predicate<PhysicalSwitchAugmentation> TUNNEL_IP_AVAILABLE =
80         phySwitch -> !HwvtepHAUtil.isEmpty(phySwitch.getTunnelIps());
81
82     private static final Predicate<PhysicalSwitchAugmentation> TUNNEL_IP_NOT_AVAILABLE = TUNNEL_IP_AVAILABLE.negate();
83
84     private static final BiPredicate<PhysicalSwitchAugmentation, L2GatewayDevice> TUNNEL_IP_CHANGED =
85         (phySwitchAfter, existingDevice) -> {
86             return TUNNEL_IP_AVAILABLE.test(phySwitchAfter)
87                     && !Objects.equals(
88                             existingDevice.getTunnelIp(),  phySwitchAfter.getTunnelIps().get(0).getTunnelIpsKey());
89         };
90
91     /** The data broker. */
92     private final DataBroker dataBroker;
93     private final ManagedNewTransactionRunner txRunner;
94
95     /** The itm rpc service. */
96     private final ItmRpcService itmRpcService;
97
98     private final ElanClusterUtils elanClusterUtils;
99
100     private final HwvtepHACache hwvtepHACache = HwvtepHACache.getInstance();
101
102     private final L2gwServiceProvider l2gwServiceProvider;
103
104     private final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> childConnectedAfterParent =
105         (l2GwDevice, globalIid) -> {
106             return !hwvtepHACache.isHAParentNode(globalIid)
107                     && l2GwDevice != null;
108                     // FIXME: The following call to equals compares different types (String and InstanceIdentifier) and
109                     // thus will always return false. I don't know what the intention is here so commented out for now.
110                     //&& !Objects.equals(l2GwDevice.getHwvtepNodeId(), globalIid);
111         };
112
113     private final Predicate<L2GatewayDevice> alreadyHasL2Gwids =
114         (l2GwDevice) -> {
115             return l2GwDevice != null && HwvtepHAUtil.isEmpty(l2GwDevice.getL2GatewayIds());
116         };
117
118     private final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> parentConnectedAfterChild =
119         (l2GwDevice, globalIid) -> {
120             InstanceIdentifier<Node> existingIid = globalIid;
121             if (l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null) {
122                 existingIid = HwvtepHAUtil.convertToInstanceIdentifier(l2GwDevice.getHwvtepNodeId());
123             }
124             return hwvtepHACache.isHAParentNode(globalIid)
125                     && l2GwDevice != null
126                     // FIXME: The following call to equals compares different types (String and InstanceIdentifier) and
127                     // thus will always return false. I don't know what the intention is here so commented out for now.
128                     //&& !Objects.equals(l2GwDevice.getHwvtepNodeId(), globalIid)
129                     && Objects.equals(globalIid, hwvtepHACache.getParent(existingIid));
130         };
131
132
133     private final HAOpClusteredListener haOpClusteredListener;
134
135     private final L2GatewayCache l2GatewayCache;
136
137     private final StaleVlanBindingsCleaner staleVlanBindingsCleaner;
138
139     /**
140      * Instantiates a new hwvtep physical switch listener.
141      * @param dataBroker DataBroker
142      * @param itmRpcService ItmRpcService
143      * @param elanClusterUtils ElanClusterUtils
144      * @param l2gwServiceProvider L2gwServiceProvider
145      * @param haListener HAOpClusteredListener
146      * @param l2GatewayCache L2GatewayCache
147      * @param staleVlanBindingsCleaner StaleVlanBindingsCleaner
148      */
149     @Inject
150     public HwvtepPhysicalSwitchListener(final DataBroker dataBroker, ItmRpcService itmRpcService,
151             ElanClusterUtils elanClusterUtils, L2gwServiceProvider l2gwServiceProvider,
152             HAOpClusteredListener haListener, L2GatewayCache l2GatewayCache,
153             StaleVlanBindingsCleaner staleVlanBindingsCleaner) {
154         super(PhysicalSwitchAugmentation.class, HwvtepPhysicalSwitchListener.class);
155         this.dataBroker = dataBroker;
156         this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
157         this.itmRpcService = itmRpcService;
158         this.elanClusterUtils = elanClusterUtils;
159         this.l2gwServiceProvider = l2gwServiceProvider;
160         this.staleVlanBindingsCleaner = staleVlanBindingsCleaner;
161         this.haOpClusteredListener = haListener;
162         this.l2GatewayCache = l2GatewayCache;
163     }
164
165     @Override
166     @PostConstruct
167     public void init() {
168         registerListener(LogicalDatastoreType.OPERATIONAL, dataBroker);
169     }
170
171     @Override
172     protected InstanceIdentifier<PhysicalSwitchAugmentation> getWildCardPath() {
173         return InstanceIdentifier.create(NetworkTopology.class)
174                 .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID)).child(Node.class)
175                 .augmentation(PhysicalSwitchAugmentation.class);
176     }
177
178     @Override
179     protected HwvtepPhysicalSwitchListener getDataTreeChangeListener() {
180         return HwvtepPhysicalSwitchListener.this;
181     }
182
183     @Override
184     protected void removed(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
185             PhysicalSwitchAugmentation phySwitchDeleted) {
186         NodeId nodeId = getNodeId(identifier);
187         String psName = phySwitchDeleted.getHwvtepNodeName().getValue();
188         LOG.info("Received physical switch {} removed event for node {}", psName, nodeId.getValue());
189
190         L2GatewayDevice l2GwDevice = l2GatewayCache.get(psName);
191         if (l2GwDevice != null) {
192             if (!L2GatewayConnectionUtils.isGatewayAssociatedToL2Device(l2GwDevice)) {
193                 l2GatewayCache.remove(psName);
194                 LOG.debug("{} details removed from L2Gateway Cache", psName);
195                 MDSALUtil.syncDelete(this.dataBroker, LogicalDatastoreType.CONFIGURATION,
196                         HwvtepSouthboundUtils.createInstanceIdentifier(nodeId));
197             } else {
198                 LOG.debug("{} details are not removed from L2Gateway Cache as it has L2Gateway reference", psName);
199             }
200
201             l2GwDevice.setConnected(false);
202             //ElanL2GwCacheUtils.removeL2GatewayDeviceFromAllElanCache(psName);
203         } else {
204             LOG.error("Unable to find L2 Gateway details for {}", psName);
205         }
206     }
207
208     /**
209      * Upon update checks if the tunnels Ip was null earlier and it got newly added.
210      * In that case simply call add.
211      * If not then check if Tunnel Ip has been updated from an old value to new value.
212      * If yes. delete old ITM tunnels of odl Tunnel Ipand add new ITM tunnels with new Tunnel
213      * IP then call added ().
214      *
215      * @param identifier iid
216      * @param phySwitchBefore ps Node before update
217      * @param phySwitchAfter ps Node after update
218      */
219     @Override
220     protected void updated(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
221             PhysicalSwitchAugmentation phySwitchBefore, PhysicalSwitchAugmentation phySwitchAfter) {
222         NodeId nodeId = getNodeId(identifier);
223         LOG.trace("Received PhysicalSwitch Update Event for node {}: PhysicalSwitch Before: {}, "
224                 + "PhysicalSwitch After: {}", nodeId.getValue(), phySwitchBefore, phySwitchAfter);
225         String psName = getPsName(identifier);
226         if (psName == null) {
227             LOG.error("Could not find the physical switch name for node {}", nodeId.getValue());
228             return;
229         }
230         L2GatewayDevice existingDevice = l2GatewayCache.get(psName);
231         LOG.info("Received physical switch {} update event for node {}", psName, nodeId.getValue());
232         InstanceIdentifier<Node> globalNodeIid = getManagedByNodeIid(identifier);
233
234         if (DEVICE_NOT_CACHED_OR_PARENT_CONNECTED.test(existingDevice, globalNodeIid)) {
235             if (TUNNEL_IP_AVAILABLE.test(phySwitchAfter)) {
236                 added(identifier, phySwitchAfter);
237             }
238         } else {
239             if (!Objects.equals(phySwitchAfter.getTunnelIps(), phySwitchBefore.getTunnelIps())
240                     && TUNNEL_IP_CHANGED.test(phySwitchAfter, existingDevice)) {
241
242                 final String hwvtepId = existingDevice.getHwvtepNodeId();
243                 elanClusterUtils.runOnlyInOwnerNode(existingDevice.getDeviceName(),
244                     "handling Physical Switch add create itm tunnels ",
245                     () -> {
246                         LOG.info("Deleting itm tunnels for device {}", existingDevice.getDeviceName());
247                         L2GatewayUtils.deleteItmTunnels(itmRpcService, hwvtepId,
248                                 existingDevice.getDeviceName(), existingDevice.getTunnelIp());
249                         Thread.sleep(10000L);//TODO remove these sleeps
250                         LOG.info("Creating itm tunnels for device {}", existingDevice.getDeviceName());
251                         ElanL2GatewayUtils.createItmTunnels(itmRpcService, hwvtepId, psName,
252                                 phySwitchAfter.getTunnelIps().get(0).getTunnelIpsKey());
253                         return Collections.emptyList();
254                     }
255                 );
256                 try {
257                     Thread.sleep(20000L);//TODO remove the sleep by using better itm api to detect finish of prev op
258                 } catch (InterruptedException e) {
259                     LOG.error("Interrupted ");
260                 }
261                 existingDevice.setTunnelIps(new HashSet<>());
262                 added(identifier, phySwitchAfter);
263             }
264         }
265     }
266
267     @Override
268     protected void added(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
269                          final PhysicalSwitchAugmentation phySwitchAdded) {
270         String globalNodeId = getManagedByNodeId(identifier);
271         final InstanceIdentifier<Node> globalNodeIid = getManagedByNodeIid(identifier);
272         NodeId nodeId = getNodeId(identifier);
273         if (TUNNEL_IP_NOT_AVAILABLE.test(phySwitchAdded)) {
274             LOG.error("Could not find the /tunnel ips for node {}", nodeId.getValue());
275             return;
276         }
277         final String psName = getPsName(identifier);
278         LOG.trace("Received physical switch {} added event received for node {}", psName, nodeId.getValue());
279
280         haOpClusteredListener.runAfterNodeIsConnected(globalNodeIid, (node) -> {
281             LOG.trace("Running job for node {} ", globalNodeIid);
282             if (!node.isPresent()) {
283                 LOG.error("Global node is absent {}", globalNodeId);
284                 return;
285             }
286             HAOpClusteredListener.addToCacheIfHAChildNode(globalNodeIid, node.get());
287             if (hwvtepHACache.isHAEnabledDevice(globalNodeIid)) {
288                 LOG.trace("Ha enabled device {}", globalNodeIid);
289                 return;
290             }
291             LOG.trace("Updating cache for node {}", globalNodeIid);
292             L2GatewayDevice l2GwDevice = l2GatewayCache.get(psName);
293             if (childConnectedAfterParent.test(l2GwDevice, globalNodeIid)) {
294                 LOG.trace("Device {} {} is already Connected by ",
295                         psName, globalNodeId, l2GwDevice.getHwvtepNodeId());
296                 return;
297             }
298             InstanceIdentifier<Node> existingIid = globalNodeIid;
299             if (l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null) {
300                 existingIid = HwvtepHAUtil.convertToInstanceIdentifier(l2GwDevice.getHwvtepNodeId());
301             }
302             if (parentConnectedAfterChild.test(l2GwDevice, globalNodeIid)
303                     && alreadyHasL2Gwids.test(l2GwDevice)) {
304                 LOG.error("Child node {} having l2gw configured became ha node "
305                                 + " removing the l2device {} from all elan cache and provision parent node {}",
306                           existingIid, psName, globalNodeIid);
307                 ElanL2GwCacheUtils.removeL2GatewayDeviceFromAllElanCache(l2GwDevice.getHwvtepNodeId());
308             }
309
310             l2GwDevice = l2GatewayCache.addOrGet(psName);
311             l2GwDevice.setConnected(true);
312             l2GwDevice.setHwvtepNodeId(globalNodeId);
313
314             List<TunnelIps> tunnelIps = phySwitchAdded.getTunnelIps();
315             if (tunnelIps != null) {
316                 for (TunnelIps tunnelIp : tunnelIps) {
317                     IpAddress tunnelIpAddr = tunnelIp.getTunnelIpsKey();
318                     l2GwDevice.addTunnelIp(tunnelIpAddr);
319                 }
320             }
321
322             handleAdd(l2GwDevice);
323             elanClusterUtils.runOnlyInOwnerNode("Update config tunnels IP ", () -> {
324                 updateConfigTunnelIp(identifier, phySwitchAdded);
325             });
326             return;
327         });
328     }
329
330     boolean updateHACacheIfHANode(InstanceIdentifier<Node> globalNodeId)
331             throws ExecutionException, InterruptedException {
332         try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction()) {
333             tx.read(LogicalDatastoreType.OPERATIONAL, globalNodeId).get().toJavaUtil().ifPresent(
334                 node -> HAOpClusteredListener.addToCacheIfHAChildNode(globalNodeId, node));
335         }
336         return hwvtepHACache.isHAEnabledDevice(globalNodeId);
337     }
338
339     /**
340      * Handle add.
341      *
342      * @param l2GwDevice
343      *            the l2 gw device
344      */
345     private void handleAdd(L2GatewayDevice l2GwDevice) {
346         final String psName = l2GwDevice.getDeviceName();
347         final String hwvtepNodeId = l2GwDevice.getHwvtepNodeId();
348         Set<IpAddress> tunnelIps = l2GwDevice.getTunnelIps();
349         for (final IpAddress tunnelIpAddr : tunnelIps) {
350             if (L2GatewayConnectionUtils.isGatewayAssociatedToL2Device(l2GwDevice)) {
351                 LOG.debug("L2Gateway {} associated for {} physical switch; creating ITM tunnels for {}",
352                         l2GwDevice.getL2GatewayIds(), psName, tunnelIpAddr);
353                 l2gwServiceProvider.provisionItmAndL2gwConnection(l2GwDevice, psName, hwvtepNodeId, tunnelIpAddr);
354             } else {
355                 LOG.info("l2gw.provision.skip {}", hwvtepNodeId, psName);
356             }
357         }
358         elanClusterUtils.runOnlyInOwnerNode("Stale entry cleanup", () -> {
359             InstanceIdentifier<Node> globalNodeIid = HwvtepSouthboundUtils.createInstanceIdentifier(
360                     new NodeId(hwvtepNodeId));
361             InstanceIdentifier<Node> psIid = HwvtepSouthboundUtils.createInstanceIdentifier(
362                     HwvtepSouthboundUtils.createManagedNodeId(new NodeId(hwvtepNodeId), psName));
363             staleVlanBindingsCleaner.scheduleStaleCleanup(psName, globalNodeIid, psIid);
364         });
365     }
366
367
368     /**
369      * Gets the node id.
370      *
371      * @param identifier
372      *            the identifier
373      * @return the node id
374      */
375     private NodeId getNodeId(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
376         return identifier.firstKeyOf(Node.class).getNodeId();
377     }
378
379     private String getManagedByNodeId(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
380         String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
381         if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
382             return psNodeId.substring(0, psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
383         }
384         return psNodeId;
385     }
386
387     private InstanceIdentifier<Node> getManagedByNodeIid(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
388         String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
389         if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
390             psNodeId = psNodeId.substring(0, psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
391             return identifier.firstIdentifierOf(Topology.class).child(Node.class, new NodeKey(new NodeId(psNodeId)));
392         }
393         return null;
394     }
395
396     private String getPsName(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
397         String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
398         if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
399             return psNodeId.substring(psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH) + HwvtepHAUtil.PHYSICALSWITCH
400                     .length());
401         }
402         return null;
403     }
404
405     private void updateConfigTunnelIp(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
406                                       PhysicalSwitchAugmentation phySwitchAdded) {
407         if (phySwitchAdded.getTunnelIps() != null) {
408             ListenableFutures.addErrorLogging(
409                 txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
410                     PhysicalSwitchAugmentationBuilder psBuilder = new PhysicalSwitchAugmentationBuilder();
411                     psBuilder.setTunnelIps(phySwitchAdded.getTunnelIps());
412                     tx.merge(LogicalDatastoreType.CONFIGURATION, identifier, psBuilder.build());
413                     LOG.trace("Updating config tunnel ips {}", identifier);
414                 }),
415                 LOG, "Failed to update config tunnel ip for iid {}", identifier);
416         }
417     }
418 }