2 * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netvirt.elan.l2gw.listeners;
10 import static java.util.stream.Collectors.groupingBy;
11 import static java.util.stream.Collectors.toList;
12 import static java.util.stream.Collectors.toMap;
13 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
15 import com.google.common.base.Optional;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
20 import java.util.List;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.BiPredicate;
25 import java.util.function.Function;
26 import java.util.function.Predicate;
27 import javax.annotation.PostConstruct;
28 import javax.inject.Inject;
29 import javax.inject.Singleton;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
33 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.genius.datastoreutils.AsyncClusteredDataTreeChangeListenerBase;
36 import org.opendaylight.genius.mdsalutil.MDSALUtil;
37 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
38 import org.opendaylight.infrautils.metrics.Counter;
39 import org.opendaylight.infrautils.metrics.Labeled;
40 import org.opendaylight.infrautils.metrics.MetricDescriptor;
41 import org.opendaylight.infrautils.metrics.MetricProvider;
42 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
43 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayConnectionInstanceRecoveryHandler;
44 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler;
45 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils;
46 import org.opendaylight.netvirt.elan.utils.Scheduler;
47 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayCache;
48 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
49 import org.opendaylight.serviceutils.srm.RecoverableListener;
50 import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
51 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateway.connections.attributes.L2gatewayConnections;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateway.connections.attributes.l2gatewayconnections.L2gatewayConnection;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.rev150712.Neutron;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.physical._switch.attributes.TunnelIps;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
60 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
65 public class L2GatewayConnectionListener extends AsyncClusteredDataTreeChangeListenerBase<L2gatewayConnection,
66 L2GatewayConnectionListener> implements RecoverableListener {
67 private static final Logger LOG = LoggerFactory.getLogger(L2GatewayConnectionListener.class);
68 private static final int MAX_READ_TRIALS = 120;
70 private static final Function<Node, InstanceIdentifier<Node>> TO_GLOBAL_PATH =
71 HwvtepHAUtil::getGlobalNodePathFromPSNode;
73 private static final Function<Node, InstanceIdentifier<Node>> TO_NODE_PATH =
74 (node) -> HwvtepSouthboundUtils.createInstanceIdentifier(node.getNodeId());
76 private static final Function<InstanceIdentifier<Node>, String> GET_DEVICE_NAME = HwvtepHAUtil::getPsName;
78 private static final Predicate<InstanceIdentifier<Node>> IS_PS_NODE = (psIid) ->
79 HwvtepHAUtil.getPsName(psIid) != null;
81 private static final Predicate<Node> IS_HA_PARENT_NODE = (node) -> {
82 HwvtepGlobalAugmentation augmentation = node.augmentation(HwvtepGlobalAugmentation.class);
83 if (augmentation != null && augmentation.getManagers() != null) {
84 return augmentation.getManagers().stream().anyMatch(
85 manager -> manager.key().getTarget().getValue().equals(HwvtepHAUtil.MANAGER_KEY));
90 private static final BiPredicate<InstanceIdentifier<Node>, Node> PS_NODE_OF_PARENT_NODE =
91 (psIid, node) -> psIid.firstKeyOf(Node.class).getNodeId().getValue().contains(node.getNodeId().getValue());
93 private final DataBroker broker;
94 private final L2GatewayConnectionUtils l2GatewayConnectionUtils;
95 private final Scheduler scheduler;
96 private final L2GatewayCache l2GatewayCache;
97 private final Labeled<Labeled<Counter>> elanConnectionsCounter;
100 public L2GatewayConnectionListener(final DataBroker db, L2GatewayConnectionUtils l2GatewayConnectionUtils,
101 Scheduler scheduler, L2GatewayCache l2GatewayCache,
102 MetricProvider metricProvider,
103 final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler,
104 final L2GatewayConnectionInstanceRecoveryHandler l2InstanceRecoveryHandler,
105 final ServiceRecoveryRegistry serviceRecoveryRegistry) {
106 super(L2gatewayConnection.class, L2GatewayConnectionListener.class);
108 this.l2GatewayConnectionUtils = l2GatewayConnectionUtils;
109 this.scheduler = scheduler;
110 this.l2GatewayCache = l2GatewayCache;
111 this.elanConnectionsCounter = metricProvider.newCounter(MetricDescriptor.builder()
112 .anchor(this).project("netvirt").module("l2gw").id("connections").build(), "modification", "elan");
113 serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(),
115 serviceRecoveryRegistry.addRecoverableListener(l2InstanceRecoveryHandler.buildServiceRegistryKey(),
121 loadL2GwDeviceCache(1);
122 LOG.trace("Loading l2gw connection cache");
123 loadL2GwConnectionCache();
128 public void registerListener() {
129 LOG.info("Registering L2GatewayConnectionListener");
130 registerListener(LogicalDatastoreType.CONFIGURATION, broker);
133 public void deregisterListener() {
134 LOG.info("Deregistering L2GatewayConnectionListener");
135 super.deregisterListener();
139 protected void add(final InstanceIdentifier<L2gatewayConnection> identifier, final L2gatewayConnection input) {
140 LOG.trace("Adding L2gatewayConnection: {}", input);
141 elanConnectionsCounter
142 .label(DataObjectModification.ModificationType.WRITE.name())
143 .label(input.getNetworkId().getValue()).increment();
144 // Get associated L2GwId from 'input'
145 // Create logical switch in each of the L2GwDevices part of L2Gw
146 // Logical switch name is network UUID
147 // Add L2GwDevices to ELAN
148 l2GatewayConnectionUtils.addL2GatewayConnection(input);
152 protected void remove(InstanceIdentifier<L2gatewayConnection> identifier, L2gatewayConnection input) {
153 LOG.trace("Removing L2gatewayConnection: {}", input);
154 elanConnectionsCounter
155 .label(DataObjectModification.ModificationType.DELETE.name())
156 .label(input.getNetworkId().getValue()).increment();
157 l2GatewayConnectionUtils.deleteL2GatewayConnection(input);
161 protected void update(InstanceIdentifier<L2gatewayConnection> identifier, L2gatewayConnection original,
162 L2gatewayConnection update) {
163 LOG.trace("Updating L2gatewayConnection : original value={}, updated value={}", original, update);
167 protected InstanceIdentifier<L2gatewayConnection> getWildCardPath() {
168 return InstanceIdentifier.create(Neutron.class).child(L2gatewayConnections.class)
169 .child(L2gatewayConnection.class);
173 protected L2GatewayConnectionListener getDataTreeChangeListener() {
177 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
178 justification = "https://github.com/spotbugs/spotbugs/issues/811")
179 private void loadL2GwDeviceCache(final int trialNo) {
180 scheduler.getScheduledExecutorService().schedule(() -> {
181 if (trialNo == MAX_READ_TRIALS) {
182 LOG.error("Failed to read config topology");
185 ReadOnlyTransaction tx = broker.newReadOnlyTransaction();
186 InstanceIdentifier<Topology> topoIid = HwvtepSouthboundUtils.createHwvtepTopologyInstanceIdentifier();
187 Futures.addCallback(tx.read(CONFIGURATION, topoIid), new FutureCallback<Optional<Topology>>() {
189 public void onSuccess(Optional<Topology> topologyOptional) {
190 if (topologyOptional != null && topologyOptional.isPresent()) {
191 loadL2GwDeviceCache(topologyOptional.get().getNode());
193 registerListener(CONFIGURATION, broker);
197 public void onFailure(Throwable throwable) {
198 loadL2GwDeviceCache(trialNo + 1);
200 }, MoreExecutors.directExecutor());
202 }, 1, TimeUnit.SECONDS);
205 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
206 justification = "https://github.com/spotbugs/spotbugs/issues/811")
207 private void loadL2GwDeviceCache(List<Node> nodes) {
209 LOG.debug("No config topology nodes are present");
212 Map<InstanceIdentifier<Node>, Node> allNodes = nodes
214 .collect(toMap(TO_NODE_PATH, Function.identity()));
216 LOG.trace("Loading all config nodes");
218 Set<InstanceIdentifier<Node>> allIids = allNodes.keySet();
220 Map<String, List<InstanceIdentifier<Node>>> psNodesByDeviceName = allIids
223 .collect(groupingBy(GET_DEVICE_NAME, toList()));
226 allNodes.values().stream()
227 .filter(IS_HA_PARENT_NODE)
228 .forEach(parentNode -> allIids.stream()
230 .filter(psIid -> PS_NODE_OF_PARENT_NODE.test(psIid, parentNode))
231 .forEach(psIid -> addL2DeviceToCache(psIid, parentNode, allNodes.get(psIid))));
233 //Process non HA nodes there will be only one ps node iid for each device for non ha nodes
234 psNodesByDeviceName.values().stream()
235 .filter(psIids -> psIids.size() == 1)
236 .map(psIids -> psIids.get(0))
238 Node psNode = allNodes.get(psIid);
239 Node globalNode = allNodes.get(TO_GLOBAL_PATH.apply(psNode));
240 if (globalNode != null) {
241 addL2DeviceToCache(psIid, globalNode, psNode);
246 public void loadL2GwConnectionCache() {
247 InstanceIdentifier<L2gatewayConnections> parentIid = InstanceIdentifier
248 .create(Neutron.class)
249 .child(L2gatewayConnections.class);
251 Optional<L2gatewayConnections> optional = MDSALUtil.read(broker, CONFIGURATION, parentIid);
252 if (optional.isPresent() && optional.get().getL2gatewayConnection() != null) {
253 LOG.trace("Found some connections to fill in l2gw connection cache");
254 optional.get().getL2gatewayConnection()
255 .forEach(connection -> {
256 add(parentIid.child(L2gatewayConnection.class, connection.key()), connection);
261 void addL2DeviceToCache(InstanceIdentifier<Node> psIid, Node globalNode, Node psNode) {
262 LOG.trace("Adding device to cache {}", psNode.getNodeId().getValue());
263 String deviceName = HwvtepHAUtil.getPsName(psIid);
264 L2GatewayDevice l2GwDevice = l2GatewayCache.addOrGet(deviceName);
265 l2GwDevice.setConnected(true);
266 l2GwDevice.setHwvtepNodeId(globalNode.getNodeId().getValue());
268 List<TunnelIps> tunnelIps = psNode.augmentation(PhysicalSwitchAugmentation.class) != null
269 ? psNode.augmentation(PhysicalSwitchAugmentation.class).getTunnelIps() : null;
270 if (tunnelIps != null) {
271 for (TunnelIps tunnelIp : tunnelIps) {
272 IpAddress tunnelIpAddr = tunnelIp.getTunnelIpsKey();
273 l2GwDevice.addTunnelIp(tunnelIpAddr);