Java 8 migration
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / listeners / HwvtepPhysicalSwitchListener.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netvirt.elan.l2gw.listeners;
10
11 import com.google.common.base.Optional;
12 import java.util.Collections;
13 import java.util.HashSet;
14 import java.util.List;
15 import java.util.Objects;
16 import java.util.Set;
17 import java.util.concurrent.ExecutionException;
18 import java.util.function.BiPredicate;
19 import java.util.function.Predicate;
20 import javax.annotation.PostConstruct;
21 import javax.inject.Inject;
22 import javax.inject.Singleton;
23
24 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.genius.datastoreutils.hwvtep.HwvtepAbstractDataTreeChangeListener;
29 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
30 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
31 import org.opendaylight.genius.mdsalutil.MDSALUtil;
32 import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
33 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
34 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
35 import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
36 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
37 import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAOpClusteredListener;
38 import org.opendaylight.netvirt.elan.l2gw.utils.ElanL2GatewayUtils;
39 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils;
40 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayUtils;
41 import org.opendaylight.netvirt.elan.l2gw.utils.L2gwServiceProvider;
42 import org.opendaylight.netvirt.elan.l2gw.utils.StaleVlanBindingsCleaner;
43 import org.opendaylight.netvirt.elan.utils.ElanClusterUtils;
44 import org.opendaylight.netvirt.elanmanager.utils.ElanL2GwCacheUtils;
45 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayCache;
46 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
47 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.ItmRpcService;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentationBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.physical._switch.attributes.TunnelIps;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
54 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
55 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
56 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
58 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 /**
63  * Listener to handle physical switch updates.
64  */
65 @Singleton
66 public class HwvtepPhysicalSwitchListener
67         extends HwvtepAbstractDataTreeChangeListener<PhysicalSwitchAugmentation, HwvtepPhysicalSwitchListener>
68         implements ClusteredDataTreeChangeListener<PhysicalSwitchAugmentation> {
69
70     /** The Constant LOG. */
71     private static final Logger LOG = LoggerFactory.getLogger(HwvtepPhysicalSwitchListener.class);
72
73     private static final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> DEVICE_NOT_CACHED_OR_PARENT_CONNECTED =
74         (l2GatewayDevice, globalIid) -> l2GatewayDevice == null || l2GatewayDevice.getHwvtepNodeId() == null
75                 || !Objects.equals(l2GatewayDevice.getHwvtepNodeId(),
76                         globalIid.firstKeyOf(Node.class).getNodeId().getValue());
77
78     private static final Predicate<PhysicalSwitchAugmentation> TUNNEL_IP_AVAILABLE =
79         phySwitch -> !HwvtepHAUtil.isEmpty(phySwitch.getTunnelIps());
80
81     private static final Predicate<PhysicalSwitchAugmentation> TUNNEL_IP_NOT_AVAILABLE = TUNNEL_IP_AVAILABLE.negate();
82
83     private static final BiPredicate<PhysicalSwitchAugmentation, L2GatewayDevice> TUNNEL_IP_CHANGED =
84         (phySwitchAfter, existingDevice) -> TUNNEL_IP_AVAILABLE.test(phySwitchAfter)
85                 && !Objects.equals(
86                         existingDevice.getTunnelIp(),  phySwitchAfter.getTunnelIps().get(0).getTunnelIpsKey());
87
88     /** The data broker. */
89     private final DataBroker dataBroker;
90     private final ManagedNewTransactionRunner txRunner;
91
92     /** The itm rpc service. */
93     private final ItmRpcService itmRpcService;
94
95     private final ElanClusterUtils elanClusterUtils;
96
97     private final HwvtepHACache hwvtepHACache = HwvtepHACache.getInstance();
98
99     private final L2gwServiceProvider l2gwServiceProvider;
100
101     private final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> childConnectedAfterParent =
102         (l2GwDevice, globalIid) -> {
103             return !hwvtepHACache.isHAParentNode(globalIid)
104                     && l2GwDevice != null;
105                     // FIXME: The following call to equals compares different types (String and InstanceIdentifier) and
106                     // thus will always return false. I don't know what the intention is here so commented out for now.
107                     //&& !Objects.equals(l2GwDevice.getHwvtepNodeId(), globalIid);
108         };
109
110     private final Predicate<L2GatewayDevice> alreadyHasL2Gwids =
111         (l2GwDevice) -> l2GwDevice != null && HwvtepHAUtil.isEmpty(l2GwDevice.getL2GatewayIds());
112
113     private final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> parentConnectedAfterChild =
114         (l2GwDevice, globalIid) -> {
115             InstanceIdentifier<Node> existingIid = globalIid;
116             if (l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null) {
117                 existingIid = HwvtepHAUtil.convertToInstanceIdentifier(l2GwDevice.getHwvtepNodeId());
118             }
119             return hwvtepHACache.isHAParentNode(globalIid)
120                     && l2GwDevice != null
121                     // FIXME: The following call to equals compares different types (String and InstanceIdentifier) and
122                     // thus will always return false. I don't know what the intention is here so commented out for now.
123                     //&& !Objects.equals(l2GwDevice.getHwvtepNodeId(), globalIid)
124                     && Objects.equals(globalIid, hwvtepHACache.getParent(existingIid));
125         };
126
127
128     private final HAOpClusteredListener haOpClusteredListener;
129
130     private final L2GatewayCache l2GatewayCache;
131
132     private final StaleVlanBindingsCleaner staleVlanBindingsCleaner;
133
134     /**
135      * Instantiates a new hwvtep physical switch listener.
136      * @param dataBroker DataBroker
137      * @param itmRpcService ItmRpcService
138      * @param elanClusterUtils ElanClusterUtils
139      * @param l2gwServiceProvider L2gwServiceProvider
140      * @param haListener HAOpClusteredListener
141      * @param l2GatewayCache L2GatewayCache
142      * @param staleVlanBindingsCleaner StaleVlanBindingsCleaner
143      */
144     @Inject
145     public HwvtepPhysicalSwitchListener(final DataBroker dataBroker, ItmRpcService itmRpcService,
146             ElanClusterUtils elanClusterUtils, L2gwServiceProvider l2gwServiceProvider,
147             HAOpClusteredListener haListener, L2GatewayCache l2GatewayCache,
148             StaleVlanBindingsCleaner staleVlanBindingsCleaner) {
149         super(PhysicalSwitchAugmentation.class, HwvtepPhysicalSwitchListener.class);
150         this.dataBroker = dataBroker;
151         this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
152         this.itmRpcService = itmRpcService;
153         this.elanClusterUtils = elanClusterUtils;
154         this.l2gwServiceProvider = l2gwServiceProvider;
155         this.staleVlanBindingsCleaner = staleVlanBindingsCleaner;
156         this.haOpClusteredListener = haListener;
157         this.l2GatewayCache = l2GatewayCache;
158     }
159
160     @Override
161     @PostConstruct
162     public void init() {
163         registerListener(LogicalDatastoreType.OPERATIONAL, dataBroker);
164     }
165
166     @Override
167     protected InstanceIdentifier<PhysicalSwitchAugmentation> getWildCardPath() {
168         return InstanceIdentifier.create(NetworkTopology.class)
169                 .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID)).child(Node.class)
170                 .augmentation(PhysicalSwitchAugmentation.class);
171     }
172
173     @Override
174     protected HwvtepPhysicalSwitchListener getDataTreeChangeListener() {
175         return HwvtepPhysicalSwitchListener.this;
176     }
177
178     @Override
179     protected void removed(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
180             PhysicalSwitchAugmentation phySwitchDeleted) {
181         NodeId nodeId = getNodeId(identifier);
182         String psName = phySwitchDeleted.getHwvtepNodeName().getValue();
183         LOG.info("Received physical switch {} removed event for node {}", psName, nodeId.getValue());
184
185         L2GatewayDevice l2GwDevice = l2GatewayCache.get(psName);
186         if (l2GwDevice != null) {
187             if (!L2GatewayConnectionUtils.isGatewayAssociatedToL2Device(l2GwDevice)) {
188                 l2GatewayCache.remove(psName);
189                 LOG.debug("{} details removed from L2Gateway Cache", psName);
190                 MDSALUtil.syncDelete(this.dataBroker, LogicalDatastoreType.CONFIGURATION,
191                         HwvtepSouthboundUtils.createInstanceIdentifier(nodeId));
192             } else {
193                 LOG.debug("{} details are not removed from L2Gateway Cache as it has L2Gateway reference", psName);
194             }
195
196             l2GwDevice.setConnected(false);
197             //ElanL2GwCacheUtils.removeL2GatewayDeviceFromAllElanCache(psName);
198         } else {
199             LOG.error("Unable to find L2 Gateway details for {}", psName);
200         }
201     }
202
203     /**
204      * Upon update checks if the tunnels Ip was null earlier and it got newly added.
205      * In that case simply call add.
206      * If not then check if Tunnel Ip has been updated from an old value to new value.
207      * If yes. delete old ITM tunnels of odl Tunnel Ipand add new ITM tunnels with new Tunnel
208      * IP then call added ().
209      *
210      * @param identifier iid
211      * @param phySwitchBefore ps Node before update
212      * @param phySwitchAfter ps Node after update
213      */
214     @Override
215     protected void updated(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
216             PhysicalSwitchAugmentation phySwitchBefore, PhysicalSwitchAugmentation phySwitchAfter) {
217         NodeId nodeId = getNodeId(identifier);
218         LOG.trace("Received PhysicalSwitch Update Event for node {}: PhysicalSwitch Before: {}, "
219                 + "PhysicalSwitch After: {}", nodeId.getValue(), phySwitchBefore, phySwitchAfter);
220         String psName = getPsName(identifier);
221         if (psName == null) {
222             LOG.error("Could not find the physical switch name for node {}", nodeId.getValue());
223             return;
224         }
225         L2GatewayDevice existingDevice = l2GatewayCache.get(psName);
226         LOG.info("Received physical switch {} update event for node {}", psName, nodeId.getValue());
227         InstanceIdentifier<Node> globalNodeIid = getManagedByNodeIid(identifier);
228
229         if (DEVICE_NOT_CACHED_OR_PARENT_CONNECTED.test(existingDevice, globalNodeIid)) {
230             if (TUNNEL_IP_AVAILABLE.test(phySwitchAfter)) {
231                 added(identifier, phySwitchAfter);
232             }
233         } else {
234             if (!Objects.equals(phySwitchAfter.getTunnelIps(), phySwitchBefore.getTunnelIps())
235                     && TUNNEL_IP_CHANGED.test(phySwitchAfter, existingDevice)) {
236
237                 final String hwvtepId = existingDevice.getHwvtepNodeId();
238                 elanClusterUtils.runOnlyInOwnerNode(existingDevice.getDeviceName(),
239                     "handling Physical Switch add create itm tunnels ",
240                     () -> {
241                         LOG.info("Deleting itm tunnels for device {}", existingDevice.getDeviceName());
242                         L2GatewayUtils.deleteItmTunnels(itmRpcService, hwvtepId,
243                                 existingDevice.getDeviceName(), existingDevice.getTunnelIp());
244                         Thread.sleep(10000L);//TODO remove these sleeps
245                         LOG.info("Creating itm tunnels for device {}", existingDevice.getDeviceName());
246                         ElanL2GatewayUtils.createItmTunnels(dataBroker, itmRpcService, hwvtepId, psName,
247                                 phySwitchAfter.getTunnelIps().get(0).getTunnelIpsKey());
248                         return Collections.emptyList();
249                     }
250                 );
251                 try {
252                     Thread.sleep(20000L);//TODO remove the sleep by using better itm api to detect finish of prev op
253                 } catch (InterruptedException e) {
254                     LOG.error("Interrupted ");
255                 }
256                 existingDevice.setTunnelIps(new HashSet<>());
257                 added(identifier, phySwitchAfter);
258             }
259         }
260     }
261
262     @Override
263     protected void added(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
264                          final PhysicalSwitchAugmentation phySwitchAdded) {
265         String globalNodeId = getManagedByNodeId(identifier);
266         final InstanceIdentifier<Node> globalNodeIid = getManagedByNodeIid(identifier);
267         NodeId nodeId = getNodeId(identifier);
268         if (TUNNEL_IP_NOT_AVAILABLE.test(phySwitchAdded)) {
269             LOG.error("Could not find the /tunnel ips for node {}", nodeId.getValue());
270             return;
271         }
272         final String psName = getPsName(identifier);
273         LOG.trace("Received physical switch {} added event received for node {}", psName, nodeId.getValue());
274
275         haOpClusteredListener.runAfterNodeIsConnected(globalNodeIid, (node) -> {
276             LOG.trace("Running job for node {} ", globalNodeIid);
277             if (!node.isPresent()) {
278                 LOG.error("Global node is absent {}", globalNodeId);
279                 return;
280             }
281             HAOpClusteredListener.addToCacheIfHAChildNode(globalNodeIid, node.get());
282             if (hwvtepHACache.isHAEnabledDevice(globalNodeIid)) {
283                 LOG.trace("Ha enabled device {}", globalNodeIid);
284                 return;
285             }
286             LOG.trace("Updating cache for node {}", globalNodeIid);
287             L2GatewayDevice l2GwDevice = l2GatewayCache.get(psName);
288             if (childConnectedAfterParent.test(l2GwDevice, globalNodeIid)) {
289                 LOG.trace("Device {} {} is already Connected by {}",
290                         psName, globalNodeId, l2GwDevice.getHwvtepNodeId());
291                 return;
292             }
293             InstanceIdentifier<Node> existingIid = globalNodeIid;
294             if (l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null) {
295                 existingIid = HwvtepHAUtil.convertToInstanceIdentifier(l2GwDevice.getHwvtepNodeId());
296             }
297             if (parentConnectedAfterChild.test(l2GwDevice, globalNodeIid)
298                     && alreadyHasL2Gwids.test(l2GwDevice)) {
299                 LOG.error("Child node {} having l2gw configured became ha node "
300                                 + " removing the l2device {} from all elan cache and provision parent node {}",
301                           existingIid, psName, globalNodeIid);
302                 ElanL2GwCacheUtils.removeL2GatewayDeviceFromAllElanCache(l2GwDevice.getHwvtepNodeId());
303             }
304
305             l2GwDevice = l2GatewayCache.addOrGet(psName);
306             l2GwDevice.setConnected(true);
307             l2GwDevice.setHwvtepNodeId(globalNodeId);
308
309             List<TunnelIps> tunnelIps = phySwitchAdded.getTunnelIps();
310             if (tunnelIps != null) {
311                 for (TunnelIps tunnelIp : tunnelIps) {
312                     IpAddress tunnelIpAddr = tunnelIp.getTunnelIpsKey();
313                     l2GwDevice.addTunnelIp(tunnelIpAddr);
314                 }
315             }
316
317             handleAdd(l2GwDevice);
318             elanClusterUtils.runOnlyInOwnerNode("Update config tunnels IP ",
319                 () -> updateConfigTunnelIp(identifier, phySwitchAdded));
320         });
321     }
322
323     boolean updateHACacheIfHANode(InstanceIdentifier<Node> globalNodeId)
324             throws ExecutionException, InterruptedException {
325         try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction()) {
326             tx.read(LogicalDatastoreType.OPERATIONAL, globalNodeId).get().toJavaUtil().ifPresent(
327                 node -> HAOpClusteredListener.addToCacheIfHAChildNode(globalNodeId, node));
328         }
329         return hwvtepHACache.isHAEnabledDevice(globalNodeId);
330     }
331
332     /**
333      * Handle add.
334      *
335      * @param l2GwDevice
336      *            the l2 gw device
337      */
338     private void handleAdd(L2GatewayDevice l2GwDevice) {
339         final String psName = l2GwDevice.getDeviceName();
340         final String hwvtepNodeId = l2GwDevice.getHwvtepNodeId();
341         Set<IpAddress> tunnelIps = l2GwDevice.getTunnelIps();
342         for (final IpAddress tunnelIpAddr : tunnelIps) {
343             if (L2GatewayConnectionUtils.isGatewayAssociatedToL2Device(l2GwDevice)) {
344                 LOG.debug("L2Gateway {} associated for {} physical switch; creating ITM tunnels for {}",
345                         l2GwDevice.getL2GatewayIds(), psName, tunnelIpAddr);
346                 l2gwServiceProvider.provisionItmAndL2gwConnection(l2GwDevice, psName, hwvtepNodeId, tunnelIpAddr);
347             } else {
348                 LOG.info("l2gw.provision.skip {}:{}", hwvtepNodeId, psName);
349             }
350         }
351         elanClusterUtils.runOnlyInOwnerNode("Stale entry cleanup", () -> {
352             InstanceIdentifier<Node> globalNodeIid = HwvtepSouthboundUtils.createInstanceIdentifier(
353                     new NodeId(hwvtepNodeId));
354             InstanceIdentifier<Node> psIid = HwvtepSouthboundUtils.createInstanceIdentifier(
355                     HwvtepSouthboundUtils.createManagedNodeId(new NodeId(hwvtepNodeId), psName));
356             staleVlanBindingsCleaner.scheduleStaleCleanup(psName, globalNodeIid, psIid);
357         });
358     }
359
360
361     /**
362      * Gets the node id.
363      *
364      * @param identifier
365      *            the identifier
366      * @return the node id
367      */
368     private NodeId getNodeId(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
369         return identifier.firstKeyOf(Node.class).getNodeId();
370     }
371
372     private String getManagedByNodeId(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
373         String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
374         if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
375             return psNodeId.substring(0, psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
376         }
377         return psNodeId;
378     }
379
380     private InstanceIdentifier<Node> getManagedByNodeIid(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
381         String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
382         if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
383             psNodeId = psNodeId.substring(0, psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
384             return identifier.firstIdentifierOf(Topology.class).child(Node.class, new NodeKey(new NodeId(psNodeId)));
385         }
386         return null;
387     }
388
389     private String getPsName(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
390         String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
391         if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
392             return psNodeId.substring(psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH) + HwvtepHAUtil.PHYSICALSWITCH
393                     .length());
394         }
395         return null;
396     }
397
398     private void updateConfigTunnelIp(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
399                                       PhysicalSwitchAugmentation phySwitchAdded) {
400         if (phySwitchAdded.getTunnelIps() != null) {
401             ListenableFutures.addErrorLogging(
402                 txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
403                     Optional<PhysicalSwitchAugmentation> existingSwitch = tx.read(
404                             LogicalDatastoreType.CONFIGURATION, identifier).checkedGet();
405                     PhysicalSwitchAugmentationBuilder psBuilder = new PhysicalSwitchAugmentationBuilder();
406                     if (existingSwitch.isPresent()) {
407                         psBuilder = new PhysicalSwitchAugmentationBuilder(existingSwitch.get());
408                     }
409                     psBuilder.setTunnelIps(phySwitchAdded.getTunnelIps());
410                     tx.put(LogicalDatastoreType.CONFIGURATION, identifier, psBuilder.build(), true);
411                     LOG.trace("Updating config tunnel ips {}", identifier);
412                 }), LOG, "Failed to update the config tunnel ips {}", identifier);
413         }
414     }
415 }