2 * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netvirt.elan.l2gw.listeners;
10 import static java.util.stream.Collectors.groupingBy;
11 import static java.util.stream.Collectors.toList;
12 import static java.util.stream.Collectors.toMap;
13 import static org.opendaylight.mdsal.binding.util.Datastore.CONFIGURATION;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.Collections;
18 import java.util.List;
20 import java.util.Optional;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.BiPredicate;
25 import java.util.function.Function;
26 import java.util.function.Predicate;
27 import javax.annotation.PostConstruct;
28 import javax.annotation.PreDestroy;
29 import javax.inject.Inject;
30 import javax.inject.Singleton;
31 import org.opendaylight.genius.utils.batching.ResourceBatchingManager;
32 import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
33 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
34 import org.opendaylight.infrautils.utils.concurrent.Executors;
35 import org.opendaylight.mdsal.binding.api.DataBroker;
36 import org.opendaylight.mdsal.binding.util.Datastore.Configuration;
37 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
38 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunnerImpl;
39 import org.opendaylight.mdsal.binding.util.TypedReadTransaction;
40 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
41 import org.opendaylight.netvirt.elan.cache.ConfigMcastCache;
42 import org.opendaylight.netvirt.elan.cache.ItmExternalTunnelCache;
43 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
44 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayConnectionInstanceRecoveryHandler;
45 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler;
46 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils;
47 import org.opendaylight.netvirt.elan.utils.Scheduler;
48 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayCache;
49 import org.opendaylight.serviceutils.srm.RecoverableListener;
50 import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
51 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateway.connections.attributes.L2gatewayConnections;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateway.connections.attributes.l2gatewayconnections.L2gatewayConnection;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.rev150712.Neutron;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteMcastMacs;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.physical._switch.attributes.TunnelIps;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
62 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
67 public class L2GatewayConnectionListener extends AbstractClusteredAsyncDataTreeChangeListener<L2gatewayConnection>
68 implements RecoverableListener {
70 private static final Logger LOG = LoggerFactory.getLogger(L2GatewayConnectionListener.class);
71 private static final int MAX_READ_TRIALS = 120;
73 private static final Function<Node, InstanceIdentifier<Node>> TO_GLOBAL_PATH =
74 HwvtepHAUtil::getGlobalNodePathFromPSNode;
76 private static final Function<Node, InstanceIdentifier<Node>> TO_NODE_PATH =
77 (node) -> HwvtepSouthboundUtils.createInstanceIdentifier(node.getNodeId());
79 private static final Function<InstanceIdentifier<Node>, String> GET_DEVICE_NAME = HwvtepHAUtil::getPsName;
81 private static final Predicate<InstanceIdentifier<Node>> IS_PS_NODE = (psIid) -> {
82 return HwvtepHAUtil.getPsName(psIid) != null;
85 private static final Predicate<Node> IS_HA_PARENT_NODE = (node) -> {
86 HwvtepGlobalAugmentation augmentation = node.augmentation(HwvtepGlobalAugmentation.class);
87 if (augmentation != null && augmentation.nonnullManagers() != null) {
88 return augmentation.nonnullManagers().values().stream().anyMatch(
89 manager -> manager.key().getTarget().getValue().equals(HwvtepHAUtil.MANAGER_KEY));
94 private static final BiPredicate<InstanceIdentifier<Node>, Node> PS_NODE_OF_PARENT_NODE =
95 (psIid, node) -> psIid.firstKeyOf(Node.class).getNodeId().getValue().contains(node.getNodeId().getValue());
97 private final DataBroker broker;
98 private final L2GatewayConnectionUtils l2GatewayConnectionUtils;
99 private final Scheduler scheduler;
100 private final L2GatewayCache l2GatewayCache;
101 private final ConfigMcastCache configMcastCache;
102 private final L2GatewayListener l2GatewayListener;
103 private final ItmExternalTunnelCache itmExternalTunnelCache;
104 private final HwvtepPhysicalSwitchListener hwvtepPhysicalSwitchListener;
105 private final ManagedNewTransactionRunner txRunner;
107 Map<InstanceIdentifier<Node>, Node> allNodes = null;
110 public L2GatewayConnectionListener(final DataBroker db, L2GatewayConnectionUtils l2GatewayConnectionUtils,
111 Scheduler scheduler, L2GatewayCache l2GatewayCache,
112 final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler,
113 final L2GatewayConnectionInstanceRecoveryHandler l2InstanceRecoveryHandler,
114 final ServiceRecoveryRegistry serviceRecoveryRegistry,
115 ConfigMcastCache configMcastCache,
116 L2GatewayListener l2GatewayListener,
117 ItmExternalTunnelCache itmExternalTunnelCache,
118 HwvtepPhysicalSwitchListener hwvtepPhysicalSwitchListener) {
119 super(db, LogicalDatastoreType.CONFIGURATION, InstanceIdentifier.create(Neutron.class)
120 .child(L2gatewayConnections.class).child(L2gatewayConnection.class),
121 Executors.newListeningSingleThreadExecutor("L2GatewayConnectionListener", LOG));
122 this.txRunner = new ManagedNewTransactionRunnerImpl(db);
124 this.l2GatewayConnectionUtils = l2GatewayConnectionUtils;
125 this.scheduler = scheduler;
126 this.l2GatewayCache = l2GatewayCache;
127 this.configMcastCache = configMcastCache;
128 this.l2GatewayListener = l2GatewayListener;
129 this.itmExternalTunnelCache = itmExternalTunnelCache;
130 this.hwvtepPhysicalSwitchListener = hwvtepPhysicalSwitchListener;
131 serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(),
133 serviceRecoveryRegistry.addRecoverableListener(l2InstanceRecoveryHandler.buildServiceRegistryKey(),
138 @SuppressWarnings("illegalcatch")
140 ResourceBatchingManager.getInstance().registerDefaultBatchHandlers(this.broker);
141 scheduler.getScheduledExecutorService().schedule(() -> {
142 txRunner.callWithNewReadOnlyTransactionAndClose(CONFIGURATION, tx -> {
144 LOG.trace("Loading l2gw device cache");
145 loadL2GwDeviceCache(tx);
146 LOG.trace("Loading l2gw Mcast cache");
147 fillConfigMcastCache();
148 LOG.trace("Loading l2gw connection cache");
149 loadL2GwConnectionCache(tx);
150 } catch (Exception e) {
151 LOG.error("Failed to load cache", e);
154 l2GatewayListener.registerListener();
155 ///configMcastCache.registerListener(CONFIGURATION, broker);
156 //itmExternalTunnelCache.registerListener(CONFIGURATION, broker);
158 hwvtepPhysicalSwitchListener.registerListener();
161 }, 1, TimeUnit.SECONDS);
165 public void register() {
166 LOG.info("Registering L2GatewayConnectionListener Override Method");
172 public void close() {
174 Executors.shutdownAndAwaitTermination(getExecutorService());
178 public void registerListener() {
180 LOG.info("Registering L2GatewayConnectionListener");
183 public void deregisterListener() {
185 LOG.info("Deregistering L2GatewayConnectionListener");
189 public void add(final InstanceIdentifier<L2gatewayConnection> identifier, final L2gatewayConnection input) {
190 LOG.trace("Adding L2gatewayConnection: {}", input);
192 // Get associated L2GwId from 'input'
193 // Create logical switch in each of the L2GwDevices part of L2Gw
194 // Logical switch name is network UUID
195 // Add L2GwDevices to ELAN
196 l2GatewayConnectionUtils.addL2GatewayConnection(input);
200 public void remove(InstanceIdentifier<L2gatewayConnection> identifier, L2gatewayConnection input) {
201 LOG.trace("Removing L2gatewayConnection: {}", input);
203 l2GatewayConnectionUtils.deleteL2GatewayConnection(input);
207 public void update(InstanceIdentifier<L2gatewayConnection> identifier, L2gatewayConnection original,
208 L2gatewayConnection update) {
209 LOG.trace("Updating L2gatewayConnection : original value={}, updated value={}", original, update);
212 private void addL2DeviceToCache(InstanceIdentifier<Node> psIid, Node globalNode, Node psNode) {
213 LOG.trace("L2GatewayConnectionListener Adding device to cache {}", psNode.getNodeId().getValue());
214 String deviceName = HwvtepHAUtil.getPsName(psIid);
215 List<TunnelIps> tunnelIps = new ArrayList<>(getTunnelIps(psNode));
216 if (tunnelIps != null) {
217 l2GatewayCache.updateL2GatewayCache(deviceName, globalNode.getNodeId().getValue(), tunnelIps);
218 LOG.info("L2GatewayConnectionListener Added device to cache {} {}",
219 psNode.getNodeId().getValue(), tunnelIps);
221 LOG.error("L2GatewayConnectionListener Could not add device to l2gw cache no tunnel ip found {}",
222 psNode.getNodeId().getValue());
226 private void fillConfigMcastCache() {
227 if (allNodes == null) {
230 //allNodes.entrySet().stream().map(entry -> entry);
231 allNodes.entrySet().stream()
232 .filter(entry -> entry.getValue().augmentation(HwvtepGlobalAugmentation.class) != null)
234 entry.getValue().augmentation(HwvtepGlobalAugmentation.class).getRemoteMcastMacs() != null)
236 entry.getValue().augmentation(HwvtepGlobalAugmentation.class).getRemoteMcastMacs().values().stream()
238 configMcastCache.added(getMacIid(entry.getKey(), mac), mac);
243 private InstanceIdentifier<RemoteMcastMacs> getMacIid(InstanceIdentifier<Node> nodeIid, RemoteMcastMacs mac) {
244 return nodeIid.augmentation(HwvtepGlobalAugmentation.class).child(RemoteMcastMacs.class, mac.key());
247 public void loadL2GwConnectionCache(TypedReadTransaction<Configuration> tx) {
248 InstanceIdentifier<L2gatewayConnections> parentIid = InstanceIdentifier
249 .create(Neutron.class)
250 .child(L2gatewayConnections.class);
251 Optional<L2gatewayConnections> optional = Optional.empty();
253 optional = tx.read(parentIid).get();
254 } catch (ExecutionException | InterruptedException e) {
255 LOG.error("Exception while reading l2gwconnecton for populating Cache");
257 if (optional.isPresent() && optional.get().getL2gatewayConnection() != null) {
258 LOG.trace("Found some connections to fill in l2gw connection cache");
259 optional.get().getL2gatewayConnection().values()
260 .forEach(connection -> {
261 add(parentIid.child(L2gatewayConnection.class, connection.key()), connection);
266 private void loadL2GwDeviceCache(TypedReadTransaction tx) {
267 allNodes = (Map<InstanceIdentifier<Node>, Node>) readAllConfigNodes(tx)
269 .collect(toMap(TO_NODE_PATH, Function.identity()));
271 LOG.trace("Loading all config nodes");
273 Set<InstanceIdentifier<Node>> allIids = allNodes.keySet();
275 Map<String, List<InstanceIdentifier<Node>>> psNodesByDeviceName = allIids
278 .collect(groupingBy(GET_DEVICE_NAME, toList()));
281 createHANodes(allIids);
283 //Process non HA nodes there will be only one ps node iid for each device for non ha nodes
284 psNodesByDeviceName.values().stream()
285 .filter(psIids -> psIids.size() == 1)
286 .map(psIids -> psIids.get(0))
288 Node psNode = allNodes.get(psIid);
289 Node globalNode = allNodes.get(TO_GLOBAL_PATH.apply(psNode));
290 if (globalNode != null) {
291 addL2DeviceToCache(psIid, globalNode, psNode);
296 private void createHANodes(Set<InstanceIdentifier<Node>> allIids) {
297 allNodes.values().stream()
298 .filter(IS_HA_PARENT_NODE)
299 .forEach(parentNode -> {
300 fillHACache(parentNode);
303 .filter(psIid -> PS_NODE_OF_PARENT_NODE.test(psIid, parentNode))
305 addL2DeviceToCache(psIid, parentNode, allNodes.get(psIid));
310 private static void fillHACache(Node parentNode) {
311 InstanceIdentifier<Node> parentIid
312 = HwvtepHAUtil.convertToInstanceIdentifier(parentNode.getNodeId().getValue());
313 List<NodeId> childIids
314 = HwvtepHAUtil.getChildNodeIdsFromManagerOtherConfig(Optional.of(parentNode));
315 if (childIids != null) {
316 for (NodeId childid : childIids) {
317 InstanceIdentifier<Node> childIid
318 = HwvtepHAUtil.convertToInstanceIdentifier(childid.getValue());
319 HwvtepHACache.getInstance().addChild(parentIid, childIid);
324 private Collection<TunnelIps> getTunnelIps(Node psNode) {
325 if (psNode.augmentation(PhysicalSwitchAugmentation.class) != null) {
326 return psNode.augmentation(PhysicalSwitchAugmentation.class).nonnullTunnelIps().values();
328 return Collections.EMPTY_LIST;
331 private List<Node> readAllConfigNodes(TypedReadTransaction<Configuration> tx) {
335 Optional<Topology> topologyOptional = Optional.empty();
338 topologyOptional = tx.read(HwvtepSouthboundUtils.createHwvtepTopologyInstanceIdentifier()).get();
340 } catch (ExecutionException | InterruptedException e) {
343 } catch (InterruptedException e1) {
344 LOG.trace("Sleep interrupted");
347 } while (trialNo++ < MAX_READ_TRIALS);
348 if (topologyOptional != null && topologyOptional.isPresent() && topologyOptional.get().getNode() != null) {
349 return new ArrayList<>(topologyOptional.get().nonnullNode().values());
351 return Collections.EMPTY_LIST;