19071cf7b160285a08fc23bafb4a3ad67c46aadd
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / listeners / L2GatewayConnectionListener.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.elan.l2gw.listeners;
9
10 import static java.util.stream.Collectors.groupingBy;
11 import static java.util.stream.Collectors.toList;
12 import static java.util.stream.Collectors.toMap;
13 import static org.opendaylight.mdsal.binding.util.Datastore.CONFIGURATION;
14
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.Collections;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Optional;
21 import java.util.Set;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.BiPredicate;
25 import java.util.function.Function;
26 import java.util.function.Predicate;
27 import javax.annotation.PostConstruct;
28 import javax.annotation.PreDestroy;
29 import javax.inject.Inject;
30 import javax.inject.Singleton;
31 import org.opendaylight.genius.utils.batching.ResourceBatchingManager;
32 import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
33 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
34 import org.opendaylight.infrautils.utils.concurrent.Executors;
35 import org.opendaylight.mdsal.binding.api.DataBroker;
36 import org.opendaylight.mdsal.binding.util.Datastore.Configuration;
37 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
38 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunnerImpl;
39 import org.opendaylight.mdsal.binding.util.TypedReadTransaction;
40 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
41 import org.opendaylight.netvirt.elan.cache.ConfigMcastCache;
42 import org.opendaylight.netvirt.elan.cache.ItmExternalTunnelCache;
43 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
44 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayConnectionInstanceRecoveryHandler;
45 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler;
46 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils;
47 import org.opendaylight.netvirt.elan.utils.Scheduler;
48 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayCache;
49 import org.opendaylight.serviceutils.srm.RecoverableListener;
50 import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
51 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateway.connections.attributes.L2gatewayConnections;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateway.connections.attributes.l2gatewayconnections.L2gatewayConnection;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.rev150712.Neutron;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteMcastMacs;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.physical._switch.attributes.TunnelIps;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
62 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 @Singleton
67 public class L2GatewayConnectionListener extends AbstractClusteredAsyncDataTreeChangeListener<L2gatewayConnection>
68         implements RecoverableListener {
69
70     private static final Logger LOG = LoggerFactory.getLogger(L2GatewayConnectionListener.class);
71     private static final int MAX_READ_TRIALS = 120;
72
73     private static final Function<Node, InstanceIdentifier<Node>> TO_GLOBAL_PATH =
74         HwvtepHAUtil::getGlobalNodePathFromPSNode;
75
76     private static final Function<Node, InstanceIdentifier<Node>> TO_NODE_PATH =
77         (node) -> HwvtepSouthboundUtils.createInstanceIdentifier(node.getNodeId());
78
79     private static final Function<InstanceIdentifier<Node>, String> GET_DEVICE_NAME = HwvtepHAUtil::getPsName;
80
81     private static final Predicate<InstanceIdentifier<Node>> IS_PS_NODE = (psIid) -> {
82         return HwvtepHAUtil.getPsName(psIid) != null;
83     };
84
85     private static final Predicate<Node> IS_HA_PARENT_NODE = (node) -> {
86         HwvtepGlobalAugmentation augmentation = node.augmentation(HwvtepGlobalAugmentation.class);
87         if (augmentation != null && augmentation.nonnullManagers() != null) {
88             return augmentation.nonnullManagers().values().stream().anyMatch(
89                 manager -> manager.key().getTarget().getValue().equals(HwvtepHAUtil.MANAGER_KEY));
90         }
91         return false;
92     };
93
94     private static final BiPredicate<InstanceIdentifier<Node>, Node> PS_NODE_OF_PARENT_NODE =
95         (psIid, node) -> psIid.firstKeyOf(Node.class).getNodeId().getValue().contains(node.getNodeId().getValue());
96
97     private final DataBroker broker;
98     private final L2GatewayConnectionUtils l2GatewayConnectionUtils;
99     private final Scheduler scheduler;
100     private final L2GatewayCache l2GatewayCache;
101     private final ConfigMcastCache configMcastCache;
102     private final L2GatewayListener l2GatewayListener;
103     private final ItmExternalTunnelCache itmExternalTunnelCache;
104     private final HwvtepPhysicalSwitchListener hwvtepPhysicalSwitchListener;
105     private final ManagedNewTransactionRunner txRunner;
106
107     Map<InstanceIdentifier<Node>, Node> allNodes = null;
108
109     @Inject
110     public L2GatewayConnectionListener(final DataBroker db, L2GatewayConnectionUtils l2GatewayConnectionUtils,
111                                        Scheduler scheduler, L2GatewayCache l2GatewayCache,
112                                        final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler,
113                                        final L2GatewayConnectionInstanceRecoveryHandler l2InstanceRecoveryHandler,
114                                        final ServiceRecoveryRegistry serviceRecoveryRegistry,
115                                        ConfigMcastCache configMcastCache,
116                                        L2GatewayListener l2GatewayListener,
117                                        ItmExternalTunnelCache itmExternalTunnelCache,
118                                        HwvtepPhysicalSwitchListener hwvtepPhysicalSwitchListener) {
119         super(db, LogicalDatastoreType.CONFIGURATION, InstanceIdentifier.create(Neutron.class)
120                 .child(L2gatewayConnections.class).child(L2gatewayConnection.class),
121             Executors.newListeningSingleThreadExecutor("L2GatewayConnectionListener", LOG));
122         this.txRunner = new ManagedNewTransactionRunnerImpl(db);
123         this.broker = db;
124         this.l2GatewayConnectionUtils = l2GatewayConnectionUtils;
125         this.scheduler = scheduler;
126         this.l2GatewayCache = l2GatewayCache;
127         this.configMcastCache = configMcastCache;
128         this.l2GatewayListener = l2GatewayListener;
129         this.itmExternalTunnelCache = itmExternalTunnelCache;
130         this.hwvtepPhysicalSwitchListener = hwvtepPhysicalSwitchListener;
131         serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(),
132                 this);
133         serviceRecoveryRegistry.addRecoverableListener(l2InstanceRecoveryHandler.buildServiceRegistryKey(),
134                 this);
135     }
136
137     @PostConstruct
138     @SuppressWarnings("illegalcatch")
139     public void init() {
140         ResourceBatchingManager.getInstance().registerDefaultBatchHandlers(this.broker);
141         scheduler.getScheduledExecutorService().schedule(() -> {
142             txRunner.callWithNewReadOnlyTransactionAndClose(CONFIGURATION, tx -> {
143                 try {
144                     LOG.trace("Loading l2gw device cache");
145                     loadL2GwDeviceCache(tx);
146                     LOG.trace("Loading l2gw Mcast cache");
147                     fillConfigMcastCache();
148                     LOG.trace("Loading l2gw connection cache");
149                     loadL2GwConnectionCache(tx);
150                 } catch (Exception e) {
151                     LOG.error("Failed to load cache", e);
152                 } finally {
153                     allNodes.clear();
154                     l2GatewayListener.registerListener();
155                     ///configMcastCache.registerListener(CONFIGURATION, broker);
156                     //itmExternalTunnelCache.registerListener(CONFIGURATION, broker);
157                     registerListener();
158                     hwvtepPhysicalSwitchListener.registerListener();
159                 }
160             });
161         }, 1, TimeUnit.SECONDS);
162     }
163
164     @Override
165     public void register() {
166         LOG.info("Registering L2GatewayConnectionListener Override Method");
167         super.register();
168     }
169
170     @Override
171     @PreDestroy
172     public void close() {
173         super.close();
174         Executors.shutdownAndAwaitTermination(getExecutorService());
175     }
176
177     @Override
178     public void registerListener() {
179         super.register();
180         LOG.info("Registering L2GatewayConnectionListener");
181     }
182
183     public void deregisterListener() {
184         super.close();
185         LOG.info("Deregistering L2GatewayConnectionListener");
186     }
187
188     @Override
189     public void add(final InstanceIdentifier<L2gatewayConnection> identifier, final L2gatewayConnection input) {
190         LOG.trace("Adding L2gatewayConnection: {}", input);
191
192         // Get associated L2GwId from 'input'
193         // Create logical switch in each of the L2GwDevices part of L2Gw
194         // Logical switch name is network UUID
195         // Add L2GwDevices to ELAN
196         l2GatewayConnectionUtils.addL2GatewayConnection(input);
197     }
198
199     @Override
200     public void remove(InstanceIdentifier<L2gatewayConnection> identifier, L2gatewayConnection input) {
201         LOG.trace("Removing L2gatewayConnection: {}", input);
202
203         l2GatewayConnectionUtils.deleteL2GatewayConnection(input);
204     }
205
206     @Override
207     public void update(InstanceIdentifier<L2gatewayConnection> identifier, L2gatewayConnection original,
208             L2gatewayConnection update) {
209         LOG.trace("Updating L2gatewayConnection : original value={}, updated value={}", original, update);
210     }
211
212     private void addL2DeviceToCache(InstanceIdentifier<Node> psIid, Node globalNode, Node psNode) {
213         LOG.trace("L2GatewayConnectionListener Adding device to cache {}", psNode.getNodeId().getValue());
214         String deviceName = HwvtepHAUtil.getPsName(psIid);
215         List<TunnelIps> tunnelIps = new ArrayList<>(getTunnelIps(psNode));
216         if (tunnelIps != null) {
217             l2GatewayCache.updateL2GatewayCache(deviceName, globalNode.getNodeId().getValue(), tunnelIps);
218             LOG.info("L2GatewayConnectionListener Added device to cache {} {}",
219                     psNode.getNodeId().getValue(), tunnelIps);
220         } else {
221             LOG.error("L2GatewayConnectionListener Could not add device to l2gw cache no tunnel ip found {}",
222                     psNode.getNodeId().getValue());
223         }
224     }
225
226     private void fillConfigMcastCache() {
227         if (allNodes == null) {
228             return;
229         }
230         //allNodes.entrySet().stream().map(entry -> entry);
231         allNodes.entrySet().stream()
232                 .filter(entry -> entry.getValue().augmentation(HwvtepGlobalAugmentation.class) != null)
233                 .filter(entry ->
234                         entry.getValue().augmentation(HwvtepGlobalAugmentation.class).getRemoteMcastMacs() != null)
235                 .forEach(entry -> {
236                     entry.getValue().augmentation(HwvtepGlobalAugmentation.class).getRemoteMcastMacs().values().stream()
237                             .forEach(mac -> {
238                                 configMcastCache.added(getMacIid(entry.getKey(), mac), mac);
239                             });
240                 });
241     }
242
243     private InstanceIdentifier<RemoteMcastMacs> getMacIid(InstanceIdentifier<Node> nodeIid, RemoteMcastMacs mac) {
244         return nodeIid.augmentation(HwvtepGlobalAugmentation.class).child(RemoteMcastMacs.class, mac.key());
245     }
246
247     public void loadL2GwConnectionCache(TypedReadTransaction<Configuration> tx) {
248         InstanceIdentifier<L2gatewayConnections> parentIid = InstanceIdentifier
249                 .create(Neutron.class)
250                 .child(L2gatewayConnections.class);
251         Optional<L2gatewayConnections> optional = Optional.empty();
252         try {
253             optional = tx.read(parentIid).get();
254         } catch (ExecutionException | InterruptedException e) {
255             LOG.error("Exception while reading l2gwconnecton for populating Cache");
256         }
257         if (optional.isPresent() && optional.get().getL2gatewayConnection() != null) {
258             LOG.trace("Found some connections to fill in l2gw connection cache");
259             optional.get().getL2gatewayConnection().values()
260                 .forEach(connection -> {
261                     add(parentIid.child(L2gatewayConnection.class, connection.key()), connection);
262                 });
263         }
264     }
265
266     private void loadL2GwDeviceCache(TypedReadTransaction tx) {
267         allNodes = (Map<InstanceIdentifier<Node>, Node>) readAllConfigNodes(tx)
268                 .stream()
269                 .collect(toMap(TO_NODE_PATH, Function.identity()));
270
271         LOG.trace("Loading all config nodes");
272
273         Set<InstanceIdentifier<Node>> allIids = allNodes.keySet();
274
275         Map<String, List<InstanceIdentifier<Node>>> psNodesByDeviceName = allIids
276                 .stream()
277                 .filter(IS_PS_NODE)
278                 .collect(groupingBy(GET_DEVICE_NAME, toList()));
279
280         //Process HA nodes
281         createHANodes(allIids);
282
283         //Process non HA nodes there will be only one ps node iid for each device for non ha nodes
284         psNodesByDeviceName.values().stream()
285                 .filter(psIids -> psIids.size() == 1)
286                 .map(psIids -> psIids.get(0))
287                 .forEach(psIid -> {
288                     Node psNode = allNodes.get(psIid);
289                     Node globalNode = allNodes.get(TO_GLOBAL_PATH.apply(psNode));
290                     if (globalNode != null) {
291                         addL2DeviceToCache(psIid, globalNode, psNode);
292                     }
293                 });
294     }
295
296     private void createHANodes(Set<InstanceIdentifier<Node>> allIids) {
297         allNodes.values().stream()
298                 .filter(IS_HA_PARENT_NODE)
299                 .forEach(parentNode -> {
300                     fillHACache(parentNode);
301                     allIids.stream()
302                             .filter(IS_PS_NODE)
303                             .filter(psIid -> PS_NODE_OF_PARENT_NODE.test(psIid, parentNode))
304                             .forEach(psIid -> {
305                                 addL2DeviceToCache(psIid, parentNode, allNodes.get(psIid));
306                             });
307                 });
308     }
309
310     private static void fillHACache(Node parentNode) {
311         InstanceIdentifier<Node> parentIid
312                 = HwvtepHAUtil.convertToInstanceIdentifier(parentNode.getNodeId().getValue());
313         List<NodeId> childIids
314                 = HwvtepHAUtil.getChildNodeIdsFromManagerOtherConfig(Optional.of(parentNode));
315         if (childIids != null) {
316             for (NodeId childid : childIids) {
317                 InstanceIdentifier<Node> childIid
318                         = HwvtepHAUtil.convertToInstanceIdentifier(childid.getValue());
319                 HwvtepHACache.getInstance().addChild(parentIid, childIid);
320             }
321         }
322     }
323
324     private Collection<TunnelIps> getTunnelIps(Node psNode) {
325         if (psNode.augmentation(PhysicalSwitchAugmentation.class) != null) {
326             return psNode.augmentation(PhysicalSwitchAugmentation.class).nonnullTunnelIps().values();
327         }
328         return Collections.EMPTY_LIST;
329     }
330
331     private List<Node> readAllConfigNodes(TypedReadTransaction<Configuration> tx) {
332
333
334         int trialNo = 1;
335         Optional<Topology> topologyOptional = Optional.empty();
336         do {
337             try {
338                 topologyOptional = tx.read(HwvtepSouthboundUtils.createHwvtepTopologyInstanceIdentifier()).get();
339                 break;
340             } catch (ExecutionException | InterruptedException e) {
341                 try {
342                     Thread.sleep(1000);
343                 } catch (InterruptedException e1) {
344                     LOG.trace("Sleep interrupted");
345                 }
346             }
347         } while (trialNo++ < MAX_READ_TRIALS);
348         if (topologyOptional != null && topologyOptional.isPresent() && topologyOptional.get().getNode() != null) {
349             return  new ArrayList<>(topologyOptional.get().nonnullNode().values());
350         }
351         return Collections.EMPTY_LIST;
352     }
353 }