Convert elanmanager-impl to use mdsal EOS APIs
[netvirt.git] / vpnservice / elanmanager / elanmanager-impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / listeners / HwvtepPhysicalSwitchListener.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netvirt.elan.l2gw.listeners;
10
11 import java.util.Collections;
12 import java.util.HashSet;
13 import java.util.Objects;
14 import java.util.Set;
15 import java.util.concurrent.ExecutionException;
16 import java.util.function.BiPredicate;
17 import java.util.function.Predicate;
18 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
19 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
20 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
21 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
22 import org.opendaylight.genius.datastoreutils.hwvtep.HwvtepAbstractDataTreeChangeListener;
23 import org.opendaylight.genius.mdsalutil.MDSALUtil;
24 import org.opendaylight.genius.utils.clustering.EntityOwnershipUtils;
25 import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
26 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
27 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
28 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
29 import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAOpClusteredListener;
30 import org.opendaylight.netvirt.elan.l2gw.utils.ElanL2GatewayUtils;
31 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils;
32 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayUtils;
33 import org.opendaylight.netvirt.elan.l2gw.utils.L2gwServiceProvider;
34 import org.opendaylight.netvirt.elan.utils.ElanClusterUtils;
35 import org.opendaylight.netvirt.elanmanager.utils.ElanL2GwCacheUtils;
36 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
37 import org.opendaylight.netvirt.neutronvpn.api.l2gw.utils.L2GatewayCacheUtils;
38 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.ItmRpcService;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalRef;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
48 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 /**
53  * Listener to handle physical switch updates.
54  */
55 public class HwvtepPhysicalSwitchListener
56         extends HwvtepAbstractDataTreeChangeListener<PhysicalSwitchAugmentation, HwvtepPhysicalSwitchListener>
57         implements ClusteredDataTreeChangeListener<PhysicalSwitchAugmentation>, AutoCloseable {
58
59     /** The Constant LOG. */
60     private static final Logger LOG = LoggerFactory.getLogger(HwvtepPhysicalSwitchListener.class);
61
62     /** The data broker. */
63     private final DataBroker dataBroker;
64
65     /** The itm rpc service. */
66     private final ItmRpcService itmRpcService;
67
68     private final EntityOwnershipUtils entityOwnershipUtils;
69
70     private final L2GatewayConnectionUtils l2GatewayConnectionUtils;
71
72     private final HwvtepHACache hwvtepHACache = HwvtepHACache.getInstance();
73
74     protected final L2gwServiceProvider l2gwServiceProvider;
75
76     private final L2GatewayUtils l2GatewayUtils;
77
78     static BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> DEVICE_NOT_CACHED_OR_PARENT_CONNECTED =
79         (l2GatewayDevice, globalIid) -> {
80             return l2GatewayDevice == null || l2GatewayDevice.getHwvtepNodeId() == null
81                     || !Objects.equals(l2GatewayDevice.getHwvtepNodeId(),
82                     globalIid.firstKeyOf(Node.class).getNodeId().getValue());
83         };
84
85     BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> childConnectedAfterParent =
86         (l2GwDevice, globalIid) -> {
87             return !hwvtepHACache.isHAParentNode(globalIid)
88                     && l2GwDevice != null
89                     && !Objects.equals(l2GwDevice.getHwvtepNodeId(), globalIid);
90         };
91
92     Predicate<L2GatewayDevice> alreadyHasL2Gwids =
93         (l2GwDevice) -> {
94             return l2GwDevice != null && HwvtepHAUtil.isEmpty(l2GwDevice.getL2GatewayIds());
95         };
96
97
98     BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> parentConnectedAfterChild =
99         (l2GwDevice, globalIid) -> {
100             InstanceIdentifier<Node> existingIid = globalIid;
101             if (l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null) {
102                 existingIid = HwvtepHAUtil.convertToInstanceIdentifier(l2GwDevice.getHwvtepNodeId());
103             }
104             return hwvtepHACache.isHAParentNode(globalIid)
105                     && l2GwDevice != null
106                     && !Objects.equals(l2GwDevice.getHwvtepNodeId(), globalIid)
107                     && Objects.equals(globalIid, hwvtepHACache.getParent(existingIid));
108         };
109
110     static Predicate<PhysicalSwitchAugmentation> TUNNEL_IP_AVAILABLE =
111         phySwitch -> !HwvtepHAUtil.isEmpty(phySwitch.getTunnelIps());
112
113     static Predicate<PhysicalSwitchAugmentation> TUNNEL_IP_NOT_AVAILABLE =
114         TUNNEL_IP_AVAILABLE.negate();
115
116     static BiPredicate<PhysicalSwitchAugmentation, L2GatewayDevice> TUNNEL_IP_CHANGED =
117         (phySwitchAfter, existingDevice) -> {
118             return TUNNEL_IP_AVAILABLE.test(phySwitchAfter)
119                     && !Objects.equals(
120                     existingDevice.getTunnelIp(),  phySwitchAfter.getTunnelIps().get(0).getTunnelIpsKey());
121         };
122
123
124     HAOpClusteredListener haOpClusteredListener;
125
126     /**
127      * Instantiates a new hwvtep physical switch listener.
128      *
129      * @param dataBroker
130      *            the data broker
131      * @param itmRpcService itm rpc
132      * @param entityOwnershipUtils entity ownership utils
133      * @param l2GatewayConnectionUtils l2gw connection utils
134      * @param l2gwServiceProvider l2gw service Provider
135      * @param l2GatewayUtils utils
136      * @param haListener HA Op node listners
137      */
138     public HwvtepPhysicalSwitchListener(final DataBroker dataBroker, ItmRpcService itmRpcService,
139                                         EntityOwnershipUtils entityOwnershipUtils,
140                                         L2GatewayConnectionUtils l2GatewayConnectionUtils,
141                                         L2gwServiceProvider l2gwServiceProvider,
142                                         L2GatewayUtils l2GatewayUtils, HAOpClusteredListener haListener) {
143         super(PhysicalSwitchAugmentation.class, HwvtepPhysicalSwitchListener.class);
144         this.dataBroker = dataBroker;
145         this.itmRpcService = itmRpcService;
146         this.entityOwnershipUtils = entityOwnershipUtils;
147         this.l2GatewayConnectionUtils = l2GatewayConnectionUtils;
148         this.l2gwServiceProvider = l2gwServiceProvider;
149         this.l2GatewayUtils = l2GatewayUtils;
150         this.haOpClusteredListener = haListener;
151     }
152
153     @Override
154     public void init() {
155         registerListener(LogicalDatastoreType.OPERATIONAL, dataBroker);
156     }
157
158     @Override
159     protected InstanceIdentifier<PhysicalSwitchAugmentation> getWildCardPath() {
160         return InstanceIdentifier.create(NetworkTopology.class)
161                 .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID)).child(Node.class)
162                 .augmentation(PhysicalSwitchAugmentation.class);
163     }
164
165     @Override
166     protected HwvtepPhysicalSwitchListener getDataTreeChangeListener() {
167         return HwvtepPhysicalSwitchListener.this;
168     }
169
170     @Override
171     protected void removed(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
172             PhysicalSwitchAugmentation phySwitchDeleted) {
173         NodeId nodeId = getNodeId(identifier);
174         String psName = phySwitchDeleted.getHwvtepNodeName().getValue();
175         LOG.info("Received physical switch {} removed event for node {}", psName, nodeId.getValue());
176
177         L2GatewayDevice l2GwDevice = L2GatewayCacheUtils.getL2DeviceFromCache(psName);
178         if (l2GwDevice != null) {
179             if (!L2GatewayConnectionUtils.isGatewayAssociatedToL2Device(l2GwDevice)) {
180                 L2GatewayCacheUtils.removeL2DeviceFromCache(psName);
181                 LOG.debug("{} details removed from L2Gateway Cache", psName);
182                 MDSALUtil.syncDelete(this.dataBroker, LogicalDatastoreType.CONFIGURATION,
183                         HwvtepSouthboundUtils.createInstanceIdentifier(nodeId));
184             } else {
185                 LOG.debug("{} details are not removed from L2Gateway Cache as it has L2Gateway reference", psName);
186             }
187
188             l2GwDevice.setConnected(false);
189             //ElanL2GwCacheUtils.removeL2GatewayDeviceFromAllElanCache(psName);
190         } else {
191             LOG.error("Unable to find L2 Gateway details for {}", psName);
192         }
193     }
194
195     /**
196      * Upon update checks if the tunnels Ip was null earlier and it got newly added.
197      * In that case simply call add.
198      * If not then check if Tunnel Ip has been updated from an old value to new value.
199      * If yes. delete old ITM tunnels of odl Tunnel Ipand add new ITM tunnels with new Tunnel
200      * IP then call added ().
201      *
202      * @param identifier iid
203      * @param phySwitchBefore ps Node before update
204      * @param phySwitchAfter ps Node after update
205      */
206     @Override
207     protected void updated(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
208             PhysicalSwitchAugmentation phySwitchBefore, PhysicalSwitchAugmentation phySwitchAfter) {
209         NodeId nodeId = getNodeId(identifier);
210         LOG.trace("Received PhysicalSwitch Update Event for node {}: PhysicalSwitch Before: {}, "
211                 + "PhysicalSwitch After: {}", nodeId.getValue(), phySwitchBefore, phySwitchAfter);
212         String psName = getPsName(identifier);
213         if (psName == null) {
214             LOG.error("Could not find the physical switch name for node {}", nodeId.getValue());
215             return;
216         }
217         L2GatewayDevice existingDevice = L2GatewayCacheUtils.getL2DeviceFromCache(psName);
218         LOG.info("Received physical switch {} update event for node {}", psName, nodeId.getValue());
219         InstanceIdentifier<Node> globalNodeIid = getManagedByNodeIid(identifier);
220
221         if (DEVICE_NOT_CACHED_OR_PARENT_CONNECTED.test(existingDevice, globalNodeIid)) {
222             if (TUNNEL_IP_AVAILABLE.test(phySwitchAfter)) {
223                 added(identifier, phySwitchAfter);
224             }
225         } else {
226             if (!Objects.equals(phySwitchAfter.getTunnelIps(), phySwitchBefore.getTunnelIps())
227                     && TUNNEL_IP_CHANGED.test(phySwitchAfter, existingDevice)) {
228
229                 final String hwvtepId = existingDevice.getHwvtepNodeId();
230                 ElanClusterUtils.runOnlyInOwnerNode(entityOwnershipUtils, existingDevice.getDeviceName(),
231                         "handling Physical Switch add create itm tunnels ", () -> {
232                         LOG.info("Deleting itm tunnels for device {}", existingDevice.getDeviceName());
233                         L2GatewayUtils.deleteItmTunnels(itmRpcService, hwvtepId,
234                                 existingDevice.getDeviceName(), existingDevice.getTunnelIp());
235                         Thread.sleep(10000L);//TODO remove these sleeps
236                         LOG.info("Creating itm tunnels for device {}", existingDevice.getDeviceName());
237                         ElanL2GatewayUtils.createItmTunnels(itmRpcService, hwvtepId, psName,
238                                 phySwitchAfter.getTunnelIps().get(0).getTunnelIpsKey());
239                         return Collections.emptyList();
240                     }
241                 );
242                 try {
243                     Thread.sleep(20000L);//TODO remove the sleep by using better itm api to detect finish of prev op
244                 } catch (InterruptedException e) {
245                     LOG.error("Interrupted ");
246                 }
247                 existingDevice.setTunnelIps(new HashSet<>());
248                 added(identifier, phySwitchAfter);
249             }
250         }
251     }
252
253     @Override
254     protected void added(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
255                          final PhysicalSwitchAugmentation phySwitchAdded) {
256         String globalNodeId = getManagedByNodeId(identifier);
257         final InstanceIdentifier<Node> globalNodeIid = getManagedByNodeIid(identifier);
258         final InstanceIdentifier<Node> wildCard = globalNodeIid.firstIdentifierOf(Topology.class).child(Node.class);
259         NodeId nodeId = getNodeId(identifier);
260         if (TUNNEL_IP_NOT_AVAILABLE.test(phySwitchAdded)) {
261             LOG.error("Could not find the /tunnel ips for node {}", nodeId.getValue());
262             return;
263         }
264         final String psName = getPsName(identifier);
265         LOG.trace("Received physical switch {} added event received for node {}", psName, nodeId.getValue());
266
267         haOpClusteredListener.runAfterNodeIsConnected(globalNodeIid, (node) -> {
268             LOG.trace("Running job for node {} ", globalNodeIid);
269             if (!node.isPresent()) {
270                 LOG.error("Global node is absent {}", globalNodeId);
271                 return;
272             }
273             HAOpClusteredListener.addToCacheIfHAChildNode(globalNodeIid, node.get());
274             if (hwvtepHACache.isHAEnabledDevice(globalNodeIid)) {
275                 LOG.trace("Ha enabled device {}", globalNodeIid);
276                 return;
277             }
278             LOG.trace("Updating cache for node {}", globalNodeIid);
279             L2GatewayDevice l2GwDevice = L2GatewayCacheUtils.getL2DeviceFromCache(psName);
280             if (childConnectedAfterParent.test(l2GwDevice, globalNodeIid)) {
281                 LOG.trace("Device {} {} is already Connected by ",
282                         psName, globalNodeId, l2GwDevice.getHwvtepNodeId());
283                 return;
284             }
285             InstanceIdentifier<Node> existingIid = globalNodeIid;
286             if (l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null) {
287                 existingIid = HwvtepHAUtil.convertToInstanceIdentifier(l2GwDevice.getHwvtepNodeId());
288             }
289             if (parentConnectedAfterChild.test(l2GwDevice, globalNodeIid)
290                     && alreadyHasL2Gwids.test(l2GwDevice)) {
291                 LOG.error("Child node {} having l2gw configured became ha node "
292                                 + " removing the l2device {} from all elan cache and provision parent node {}",
293                           existingIid, psName, globalNodeIid);
294                 ElanL2GwCacheUtils.removeL2GatewayDeviceFromAllElanCache(l2GwDevice.getHwvtepNodeId());
295             }
296             l2GwDevice = L2GatewayCacheUtils.updateL2GatewayCache(
297                     psName, globalNodeId, phySwitchAdded.getTunnelIps());
298             handleAdd(l2GwDevice);
299             return;
300         });
301     }
302
303     boolean updateHACacheIfHANode(DataBroker broker, InstanceIdentifier<Node> globalNodeId)
304             throws ExecutionException, InterruptedException {
305         ReadWriteTransaction transaction = broker.newReadWriteTransaction();
306         Node node = transaction.read(LogicalDatastoreType.OPERATIONAL, globalNodeId).get().get();
307         HAOpClusteredListener.addToCacheIfHAChildNode(globalNodeId, node);
308         return hwvtepHACache.isHAEnabledDevice(globalNodeId);
309     }
310
311     /**
312      * Handle add.
313      *
314      * @param l2GwDevice
315      *            the l2 gw device
316      */
317     private void handleAdd(L2GatewayDevice l2GwDevice) {
318         final String psName = l2GwDevice.getDeviceName();
319         final String hwvtepNodeId = l2GwDevice.getHwvtepNodeId();
320         Set<IpAddress> tunnelIps = l2GwDevice.getTunnelIps();
321         if (tunnelIps != null) {
322             for (final IpAddress tunnelIpAddr : tunnelIps) {
323                 if (L2GatewayConnectionUtils.isGatewayAssociatedToL2Device(l2GwDevice)) {
324                     LOG.debug("L2Gateway {} associated for {} physical switch; creating ITM tunnels for {}",
325                             l2GwDevice.getL2GatewayIds(), psName, tunnelIpAddr);
326                     l2gwServiceProvider.provisionItmAndL2gwConnection(l2GwDevice, psName, hwvtepNodeId, tunnelIpAddr);
327                 } else {
328                     LOG.info("l2gw.provision.skip {}", hwvtepNodeId, psName);
329                 }
330             }
331         }
332     }
333
334
335     /**
336      * Gets the node id.
337      *
338      * @param identifier
339      *            the identifier
340      * @return the node id
341      */
342     private NodeId getNodeId(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
343         return identifier.firstKeyOf(Node.class).getNodeId();
344     }
345
346     /**
347      * Checks if is tunnel ip newly configured.
348      *
349      * @param phySwitchBefore
350      *            the phy switch before
351      * @param phySwitchAfter
352      *            the phy switch after
353      * @return true, if is tunnel ip newly configured
354      */
355     private boolean isTunnelIpNewlyConfigured(PhysicalSwitchAugmentation phySwitchBefore,
356             PhysicalSwitchAugmentation phySwitchAfter) {
357         return (phySwitchBefore.getTunnelIps() == null || phySwitchBefore.getTunnelIps().isEmpty())
358                 && phySwitchAfter.getTunnelIps() != null && !phySwitchAfter.getTunnelIps().isEmpty();
359     }
360
361     /**
362      * Gets the managed by node id.
363      *
364      * @param globalRef
365      *            the global ref
366      * @return the managed by node id
367      */
368     private String getManagedByNodeId(HwvtepGlobalRef globalRef) {
369         InstanceIdentifier<?> instId = globalRef.getValue();
370         return instId.firstKeyOf(Node.class).getNodeId().getValue();
371     }
372
373     private String getManagedByNodeId(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
374         String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
375         if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
376             return psNodeId.substring(0, psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
377         }
378         return psNodeId;
379     }
380
381     private InstanceIdentifier<Node> getManagedByNodeIid(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
382         String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
383         if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
384             psNodeId = psNodeId.substring(0, psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
385             return identifier.firstIdentifierOf(Topology.class).child(Node.class, new NodeKey(new NodeId(psNodeId)));
386         }
387         return null;
388     }
389
390     private String getPsName(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
391         String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
392         if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
393             return psNodeId.substring(psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH) + HwvtepHAUtil.PHYSICALSWITCH
394                     .length());
395         }
396         return null;
397     }
398 }