MRI version bumpup for Aluminium
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / listeners / L2GatewayConnectionListener.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.elan.l2gw.listeners;
9
10 import static java.util.stream.Collectors.groupingBy;
11 import static java.util.stream.Collectors.toList;
12 import static java.util.stream.Collectors.toMap;
13 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
14
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
19 import java.util.ArrayList;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Optional;
23 import java.util.Set;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.TimeUnit;
26 import java.util.function.BiPredicate;
27 import java.util.function.Function;
28 import java.util.function.Predicate;
29 import javax.annotation.PreDestroy;
30 import javax.inject.Inject;
31 import javax.inject.Singleton;
32 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
33 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
34 import org.opendaylight.infrautils.metrics.Counter;
35 import org.opendaylight.infrautils.metrics.Labeled;
36 import org.opendaylight.infrautils.metrics.MetricDescriptor;
37 import org.opendaylight.infrautils.metrics.MetricProvider;
38 import org.opendaylight.infrautils.utils.concurrent.Executors;
39 import org.opendaylight.mdsal.binding.api.DataBroker;
40 import org.opendaylight.mdsal.binding.api.DataObjectModification;
41 import org.opendaylight.mdsal.binding.api.ReadTransaction;
42 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
43 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
44 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayConnectionInstanceRecoveryHandler;
45 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler;
46 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils;
47 import org.opendaylight.netvirt.elan.utils.Scheduler;
48 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayCache;
49 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
50 import org.opendaylight.serviceutils.srm.RecoverableListener;
51 import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
52 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
53 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateway.connections.attributes.L2gatewayConnections;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateway.connections.attributes.l2gatewayconnections.L2gatewayConnection;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.rev150712.Neutron;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.physical._switch.attributes.TunnelIps;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
62 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 @Singleton
67 public class L2GatewayConnectionListener extends AbstractClusteredAsyncDataTreeChangeListener<L2gatewayConnection>
68         implements RecoverableListener {
69     private static final Logger LOG = LoggerFactory.getLogger(L2GatewayConnectionListener.class);
70     private static final int MAX_READ_TRIALS = 120;
71
72     private static final Function<Node, InstanceIdentifier<Node>> TO_GLOBAL_PATH =
73             HwvtepHAUtil::getGlobalNodePathFromPSNode;
74
75     private static final Function<Node, InstanceIdentifier<Node>> TO_NODE_PATH =
76         (node) -> HwvtepSouthboundUtils.createInstanceIdentifier(node.getNodeId());
77
78     private static final Function<InstanceIdentifier<Node>, String> GET_DEVICE_NAME = HwvtepHAUtil::getPsName;
79
80     private static final Predicate<InstanceIdentifier<Node>> IS_PS_NODE = (psIid) ->
81             HwvtepHAUtil.getPsName(psIid) != null;
82
83     private static final Predicate<Node> IS_HA_PARENT_NODE = (node) -> {
84         HwvtepGlobalAugmentation augmentation = node.augmentation(HwvtepGlobalAugmentation.class);
85         if (augmentation != null && augmentation.getManagers() != null) {
86             return augmentation.getManagers().values().stream().anyMatch(
87                 manager -> manager.key().getTarget().getValue().equals(HwvtepHAUtil.MANAGER_KEY));
88         }
89         return false;
90     };
91
92     private static final BiPredicate<InstanceIdentifier<Node>, Node> PS_NODE_OF_PARENT_NODE =
93         (psIid, node) -> psIid.firstKeyOf(Node.class).getNodeId().getValue().contains(node.getNodeId().getValue());
94
95     private final DataBroker broker;
96     private final L2GatewayConnectionUtils l2GatewayConnectionUtils;
97     private final Scheduler scheduler;
98     private final L2GatewayCache l2GatewayCache;
99     private final Labeled<Labeled<Counter>> elanConnectionsCounter;
100
101     @Inject
102     public L2GatewayConnectionListener(final DataBroker db, L2GatewayConnectionUtils l2GatewayConnectionUtils,
103                                        Scheduler scheduler, L2GatewayCache l2GatewayCache,
104                                        MetricProvider metricProvider,
105                                        final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler,
106                                        final L2GatewayConnectionInstanceRecoveryHandler l2InstanceRecoveryHandler,
107                                        final ServiceRecoveryRegistry serviceRecoveryRegistry) {
108         super(db, LogicalDatastoreType.CONFIGURATION, InstanceIdentifier.create(Neutron.class)
109                 .child(L2gatewayConnections.class).child(L2gatewayConnection.class),
110                 Executors.newListeningSingleThreadExecutor("L2GatewayConnectionListener", LOG));
111         this.broker = db;
112         this.l2GatewayConnectionUtils = l2GatewayConnectionUtils;
113         this.scheduler = scheduler;
114         this.l2GatewayCache = l2GatewayCache;
115         this.elanConnectionsCounter = metricProvider.newCounter(MetricDescriptor.builder()
116                 .anchor(this).project("netvirt").module("l2gw").id("connections").build(), "modification", "elan");
117         serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(),
118                 this);
119         serviceRecoveryRegistry.addRecoverableListener(l2InstanceRecoveryHandler.buildServiceRegistryKey(),
120                 this);
121         init();
122     }
123
124     public void init() {
125         loadL2GwDeviceCache(1);
126         LOG.trace("Loading l2gw connection cache");
127         loadL2GwConnectionCache();
128     }
129
130     @Override
131     @PreDestroy
132     public void close() {
133         super.close();
134         Executors.shutdownAndAwaitTermination(getExecutorService());
135     }
136
137     @Override
138     public void registerListener() {
139         super.register();
140         LOG.info("Registering L2GatewayConnectionListener");
141     }
142
143     public void deregisterListener() {
144         super.close();
145         LOG.info("Deregistering L2GatewayConnectionListener");
146     }
147
148     @Override
149     public void add(final InstanceIdentifier<L2gatewayConnection> identifier, final L2gatewayConnection input) {
150         LOG.trace("Adding L2gatewayConnection: {}", input);
151         elanConnectionsCounter
152                 .label(DataObjectModification.ModificationType.WRITE.name())
153                 .label(input.getNetworkId().getValue()).increment();
154         // Get associated L2GwId from 'input'
155         // Create logical switch in each of the L2GwDevices part of L2Gw
156         // Logical switch name is network UUID
157         // Add L2GwDevices to ELAN
158         l2GatewayConnectionUtils.addL2GatewayConnection(input);
159     }
160
161     @Override
162     public void remove(InstanceIdentifier<L2gatewayConnection> identifier, L2gatewayConnection input) {
163         LOG.trace("Removing L2gatewayConnection: {}", input);
164         elanConnectionsCounter
165                 .label(DataObjectModification.ModificationType.DELETE.name())
166                 .label(input.getNetworkId().getValue()).increment();
167         l2GatewayConnectionUtils.deleteL2GatewayConnection(input);
168     }
169
170     @Override
171     public void update(InstanceIdentifier<L2gatewayConnection> identifier, L2gatewayConnection original,
172             L2gatewayConnection update) {
173         LOG.trace("Updating L2gatewayConnection : original value={}, updated value={}", original, update);
174     }
175
176     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
177             justification = "https://github.com/spotbugs/spotbugs/issues/811")
178     private void loadL2GwDeviceCache(final int trialNo) {
179         scheduler.getScheduledExecutorService().schedule(() -> {
180             if (trialNo == MAX_READ_TRIALS) {
181                 LOG.error("Failed to read config topology");
182                 return;
183             }
184             ReadTransaction tx = broker.newReadOnlyTransaction();
185             InstanceIdentifier<Topology> topoIid = HwvtepSouthboundUtils.createHwvtepTopologyInstanceIdentifier();
186             Futures.addCallback(tx.read(CONFIGURATION, topoIid), new FutureCallback<Optional<Topology>>() {
187                 @Override
188                 public void onSuccess(Optional<Topology> topologyOptional) {
189                     if (topologyOptional != null && topologyOptional.isPresent()) {
190                         loadL2GwDeviceCache(new ArrayList<Node>(topologyOptional.get().getNode().values()));
191                     }
192                     registerListener();
193                 }
194
195                 @Override
196                 public void onFailure(Throwable throwable) {
197                     loadL2GwDeviceCache(trialNo + 1);
198                 }
199             }, MoreExecutors.directExecutor());
200             tx.close();
201         }, 1, TimeUnit.SECONDS);
202     }
203
204     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
205             justification = "https://github.com/spotbugs/spotbugs/issues/811")
206     private void loadL2GwDeviceCache(List<Node> nodes) {
207         if (nodes == null) {
208             LOG.debug("No config topology nodes are present");
209             return;
210         }
211         Map<InstanceIdentifier<Node>, Node> allNodes = nodes
212                 .stream()
213                 .collect(toMap(TO_NODE_PATH, Function.identity()));
214
215         LOG.trace("Loading all config nodes");
216
217         Set<InstanceIdentifier<Node>> allIids = allNodes.keySet();
218
219         Map<String, List<InstanceIdentifier<Node>>> psNodesByDeviceName = allIids
220                 .stream()
221                 .filter(IS_PS_NODE)
222                 .collect(groupingBy(GET_DEVICE_NAME, toList()));
223
224         //Process HA nodes
225         allNodes.values().stream()
226                 .filter(IS_HA_PARENT_NODE)
227                 .forEach(parentNode -> allIids.stream()
228                         .filter(IS_PS_NODE)
229                         .filter(psIid -> PS_NODE_OF_PARENT_NODE.test(psIid, parentNode))
230                         .forEach(psIid -> addL2DeviceToCache(psIid, parentNode, allNodes.get(psIid))));
231
232         //Process non HA nodes there will be only one ps node iid for each device for non ha nodes
233         psNodesByDeviceName.values().stream()
234                 .filter(psIids -> psIids.size() == 1)
235                 .map(psIids -> psIids.get(0))
236                 .forEach(psIid -> {
237                     Node psNode = allNodes.get(psIid);
238                     Node globalNode = allNodes.get(TO_GLOBAL_PATH.apply(psNode));
239                     if (globalNode != null) {
240                         addL2DeviceToCache(psIid, globalNode, psNode);
241                     }
242                 });
243     }
244
245     public void loadL2GwConnectionCache() {
246         InstanceIdentifier<L2gatewayConnections> parentIid = InstanceIdentifier
247                 .create(Neutron.class)
248                 .child(L2gatewayConnections.class);
249
250         Optional<L2gatewayConnections> optional = Optional.empty();
251         try {
252             optional = SingleTransactionDataBroker.syncReadOptional(broker, CONFIGURATION,
253                     parentIid);
254         } catch (ExecutionException | InterruptedException e) {
255             LOG.error("loadL2GwConnectionCache: Exception while reading L2gatewayConnections DS", e);
256         }
257         if (optional.isPresent() && optional.get().getL2gatewayConnection() != null) {
258             LOG.trace("Found some connections to fill in l2gw connection cache");
259             new ArrayList<>(optional.get().getL2gatewayConnection().values())
260                     .forEach(connection -> {
261                         add(parentIid.child(L2gatewayConnection.class, connection.key()), connection);
262                     });
263         }
264     }
265
266     void addL2DeviceToCache(InstanceIdentifier<Node> psIid, Node globalNode, Node psNode) {
267         LOG.trace("Adding device to cache {}", psNode.getNodeId().getValue());
268         String deviceName = HwvtepHAUtil.getPsName(psIid);
269         L2GatewayDevice l2GwDevice = l2GatewayCache.addOrGet(deviceName);
270         l2GwDevice.setConnected(true);
271         l2GwDevice.setHwvtepNodeId(globalNode.getNodeId().getValue());
272
273         List<TunnelIps> tunnelIps = psNode.augmentation(PhysicalSwitchAugmentation.class) != null
274                 ? new ArrayList<>(psNode.augmentation(PhysicalSwitchAugmentation.class).getTunnelIps().values()) : null;
275         if (tunnelIps != null) {
276             for (TunnelIps tunnelIp : tunnelIps) {
277                 IpAddress tunnelIpAddr = tunnelIp.getTunnelIpsKey();
278                 l2GwDevice.addTunnelIp(tunnelIpAddr);
279             }
280         }
281     }
282 }