Java 8 migration
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / listeners / L2GatewayConnectionListener.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.elan.l2gw.listeners;
9
10 import static java.util.stream.Collectors.groupingBy;
11 import static java.util.stream.Collectors.toList;
12 import static java.util.stream.Collectors.toMap;
13 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
14
15 import com.google.common.base.Optional;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Set;
22 import java.util.concurrent.TimeUnit;
23 import java.util.function.BiPredicate;
24 import java.util.function.Function;
25 import java.util.function.Predicate;
26 import javax.annotation.PostConstruct;
27 import javax.inject.Inject;
28 import javax.inject.Singleton;
29
30 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
31 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
32 import org.opendaylight.genius.datastoreutils.AsyncClusteredDataTreeChangeListenerBase;
33 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
34 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
35 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils;
36 import org.opendaylight.netvirt.elan.utils.Scheduler;
37 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayCache;
38 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
39 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateway.connections.attributes.L2gatewayConnections;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateway.connections.attributes.l2gatewayconnections.L2gatewayConnection;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.rev150712.Neutron;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.physical._switch.attributes.TunnelIps;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
48 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 @Singleton
53 public class L2GatewayConnectionListener extends AsyncClusteredDataTreeChangeListenerBase<L2gatewayConnection,
54         L2GatewayConnectionListener> {
55     private static final Logger LOG = LoggerFactory.getLogger(L2GatewayConnectionListener.class);
56     private static final int MAX_READ_TRIALS = 120;
57
58     private static final Function<Node, InstanceIdentifier<Node>> TO_GLOBAL_PATH =
59             HwvtepHAUtil::getGlobalNodePathFromPSNode;
60
61     private static final Function<Node, InstanceIdentifier<Node>> TO_NODE_PATH =
62         (node) -> HwvtepSouthboundUtils.createInstanceIdentifier(node.getNodeId());
63
64     private static final Function<InstanceIdentifier<Node>, String> GET_DEVICE_NAME = HwvtepHAUtil::getPsName;
65
66     private static final Predicate<InstanceIdentifier<Node>> IS_PS_NODE = (psIid) ->
67             HwvtepHAUtil.getPsName(psIid) != null;
68
69     private static final Predicate<Node> IS_HA_PARENT_NODE = (node) -> {
70         HwvtepGlobalAugmentation augmentation = node.getAugmentation(HwvtepGlobalAugmentation.class);
71         if (augmentation != null && augmentation.getManagers() != null) {
72             return augmentation.getManagers().stream().anyMatch(
73                 manager -> manager.getKey().getTarget().getValue().equals(HwvtepHAUtil.MANAGER_KEY));
74         }
75         return false;
76     };
77
78     private static final BiPredicate<InstanceIdentifier<Node>, Node> PS_NODE_OF_PARENT_NODE =
79         (psIid, node) -> psIid.firstKeyOf(Node.class).getNodeId().getValue().contains(node.getNodeId().getValue());
80
81     private final DataBroker broker;
82     private final L2GatewayConnectionUtils l2GatewayConnectionUtils;
83     private final Scheduler scheduler;
84     private final L2GatewayCache l2GatewayCache;
85
86     @Inject
87     public L2GatewayConnectionListener(final DataBroker db, L2GatewayConnectionUtils l2GatewayConnectionUtils,
88                                        Scheduler scheduler, L2GatewayCache l2GatewayCache) {
89         super(L2gatewayConnection.class, L2GatewayConnectionListener.class);
90         this.broker = db;
91         this.l2GatewayConnectionUtils = l2GatewayConnectionUtils;
92         this.scheduler = scheduler;
93         this.l2GatewayCache = l2GatewayCache;
94     }
95
96     @PostConstruct
97     public void init() {
98         loadL2GwDeviceCache(1);
99     }
100
101     @Override
102     protected void add(final InstanceIdentifier<L2gatewayConnection> identifier, final L2gatewayConnection input) {
103         LOG.trace("Adding L2gatewayConnection: {}", input);
104
105         // Get associated L2GwId from 'input'
106         // Create logical switch in each of the L2GwDevices part of L2Gw
107         // Logical switch name is network UUID
108         // Add L2GwDevices to ELAN
109         l2GatewayConnectionUtils.addL2GatewayConnection(input);
110     }
111
112     @Override
113     protected void remove(InstanceIdentifier<L2gatewayConnection> identifier, L2gatewayConnection input) {
114         LOG.trace("Removing L2gatewayConnection: {}", input);
115
116         l2GatewayConnectionUtils.deleteL2GatewayConnection(input);
117     }
118
119     @Override
120     protected void update(InstanceIdentifier<L2gatewayConnection> identifier, L2gatewayConnection original,
121             L2gatewayConnection update) {
122         LOG.trace("Updating L2gatewayConnection : original value={}, updated value={}", original, update);
123     }
124
125     @Override
126     protected InstanceIdentifier<L2gatewayConnection> getWildCardPath() {
127         return InstanceIdentifier.create(Neutron.class).child(L2gatewayConnections.class)
128             .child(L2gatewayConnection.class);
129     }
130
131     @Override
132     protected L2GatewayConnectionListener getDataTreeChangeListener() {
133         return this;
134     }
135
136     private void loadL2GwDeviceCache(final int trialNo) {
137         scheduler.getScheduledExecutorService().schedule(() -> {
138             if (trialNo == MAX_READ_TRIALS) {
139                 LOG.error("Failed to read config topology");
140                 return;
141             }
142             ReadOnlyTransaction tx = broker.newReadOnlyTransaction();
143             InstanceIdentifier<Topology> topoIid = HwvtepSouthboundUtils.createHwvtepTopologyInstanceIdentifier();
144             Futures.addCallback(tx.read(CONFIGURATION, topoIid), new FutureCallback<Optional<Topology>>() {
145                 @Override
146                 public void onSuccess(Optional<Topology> topologyOptional) {
147                     if (topologyOptional != null && topologyOptional.isPresent()) {
148                         loadL2GwDeviceCache(topologyOptional.get().getNode());
149                     }
150                     registerListener(CONFIGURATION, broker);
151                 }
152
153                 @Override
154                 public void onFailure(Throwable throwable) {
155                     loadL2GwDeviceCache(trialNo + 1);
156                 }
157             }, MoreExecutors.directExecutor());
158             tx.close();
159         }, 1, TimeUnit.SECONDS);
160     }
161
162     private void loadL2GwDeviceCache(List<Node> nodes) {
163         if (nodes == null) {
164             LOG.debug("No config topology nodes are present");
165             return;
166         }
167         Map<InstanceIdentifier<Node>, Node> allNodes = nodes
168                 .stream()
169                 .collect(toMap(TO_NODE_PATH, Function.identity()));
170
171         LOG.trace("Loading all config nodes");
172
173         Set<InstanceIdentifier<Node>> allIids = allNodes.keySet();
174
175         Map<String, List<InstanceIdentifier<Node>>> psNodesByDeviceName = allIids
176                 .stream()
177                 .filter(IS_PS_NODE)
178                 .collect(groupingBy(GET_DEVICE_NAME, toList()));
179
180         //Process HA nodes
181         allNodes.values().stream()
182                 .filter(IS_HA_PARENT_NODE)
183                 .forEach(parentNode -> allIids.stream()
184                         .filter(IS_PS_NODE)
185                         .filter(psIid -> PS_NODE_OF_PARENT_NODE.test(psIid, parentNode))
186                         .forEach(psIid -> addL2DeviceToCache(psIid, parentNode, allNodes.get(psIid))));
187
188         //Process non HA nodes there will be only one ps node iid for each device for non ha nodes
189         psNodesByDeviceName.values().stream()
190                 .filter(psIids -> psIids.size() == 1)
191                 .map(psIids -> psIids.get(0))
192                 .forEach(psIid -> {
193                     Node psNode = allNodes.get(psIid);
194                     Node globalNode = allNodes.get(TO_GLOBAL_PATH.apply(psNode));
195                     if (globalNode != null) {
196                         addL2DeviceToCache(psIid, globalNode, psNode);
197                     }
198                 });
199     }
200
201     void addL2DeviceToCache(InstanceIdentifier<Node> psIid, Node globalNode, Node psNode) {
202         LOG.trace("Adding device to cache {}", psNode.getNodeId().getValue());
203         String deviceName = HwvtepHAUtil.getPsName(psIid);
204         L2GatewayDevice l2GwDevice = l2GatewayCache.addOrGet(deviceName);
205         l2GwDevice.setConnected(true);
206         l2GwDevice.setHwvtepNodeId(globalNode.getNodeId().getValue());
207
208         List<TunnelIps> tunnelIps = psNode.getAugmentation(PhysicalSwitchAugmentation.class) != null
209                 ? psNode.getAugmentation(PhysicalSwitchAugmentation.class).getTunnelIps() : null;
210         if (tunnelIps != null) {
211             for (TunnelIps tunnelIp : tunnelIps) {
212                 IpAddress tunnelIpAddr = tunnelIp.getTunnelIpsKey();
213                 l2GwDevice.addTunnelIp(tunnelIpAddr);
214             }
215         }
216     }
217 }