Fix build faliures due to OFPlugin checktyle fixes
[netvirt.git] / vpnservice / elanmanager / elanmanager-impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / ha / listeners / HAOpClusteredListener.java
1 /*
2  * Copyright (c) 2016 ,2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
9
10 import com.google.common.base.Optional;
11 import com.google.common.base.Strings;
12 import com.google.common.collect.Sets;
13 import java.util.Collections;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.function.Consumer;
19 import java.util.stream.Collectors;
20 import javax.inject.Inject;
21 import javax.inject.Singleton;
22 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
23 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
24 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
27 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.Managers;
30 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 @Singleton
36 public class HAOpClusteredListener extends HwvtepNodeBaseListener implements ClusteredDataTreeChangeListener<Node> {
37     private static final Logger LOG = LoggerFactory.getLogger(HAOpClusteredListener.class);
38
39     private final Set<InstanceIdentifier<Node>> connectedNodes = ConcurrentHashMap.newKeySet();
40     private final Map<InstanceIdentifier<Node>, Set<Consumer<Optional<Node>>>> waitingJobs = new ConcurrentHashMap<>();
41
42     @Inject
43     public HAOpClusteredListener(DataBroker db) throws Exception {
44         super(LogicalDatastoreType.OPERATIONAL, db);
45         LOG.info("Registering HAOpClusteredListener");
46     }
47
48     public Set<InstanceIdentifier<Node>> getConnectedNodes() {
49         return connectedNodes;
50     }
51
52     @Override
53     synchronized  void onGlobalNodeDelete(InstanceIdentifier<Node> key, Node added, ReadWriteTransaction tx)  {
54         connectedNodes.remove(key);
55         hwvtepHACache.updateDisconnectedNodeStatus(key);
56     }
57
58     @Override
59     void onPsNodeDelete(InstanceIdentifier<Node> key, Node addedPSNode, ReadWriteTransaction tx)  {
60         connectedNodes.remove(key);
61         hwvtepHACache.updateDisconnectedNodeStatus(key);
62     }
63
64     @Override
65     void onPsNodeAdd(InstanceIdentifier<Node> key, Node addedPSNode, ReadWriteTransaction tx)    {
66         connectedNodes.add(key);
67         hwvtepHACache.updateConnectedNodeStatus(key);
68     }
69
70     @Override
71     public synchronized void onGlobalNodeAdd(InstanceIdentifier<Node> key, Node updated, ReadWriteTransaction tx) {
72         connectedNodes. add(key);
73         addToCacheIfHAChildNode(key, updated);
74         hwvtepHACache.updateConnectedNodeStatus(key);
75         if (waitingJobs.containsKey(key) && !waitingJobs.get(key).isEmpty()) {
76             try {
77                 HAJobScheduler jobScheduler = HAJobScheduler.getInstance();
78                 Optional<Node> nodeOptional = tx.read(LogicalDatastoreType.OPERATIONAL, key).checkedGet();
79                 if (nodeOptional.isPresent()) {
80                     waitingJobs.get(key).forEach(
81                         (waitingJob) -> jobScheduler.submitJob(() -> waitingJob.accept(nodeOptional)));
82                     waitingJobs.get(key).clear();
83                 } else {
84                     LOG.error("Failed to read oper node {}", key);
85                 }
86             } catch (ReadFailedException e) {
87                 LOG.error("Failed to read oper node {}", key);
88             }
89         }
90     }
91
92     public static void addToCacheIfHAChildNode(InstanceIdentifier<Node> childPath, Node childNode) {
93         String haId = HwvtepHAUtil.getHAIdFromManagerOtherConfig(childNode);
94         if (!Strings.isNullOrEmpty(haId)) {
95             InstanceIdentifier<Node> parentId = HwvtepHAUtil.createInstanceIdentifierFromHAId(haId);
96             //HwvtepHAUtil.updateL2GwCacheNodeId(childNode, parentId);
97             hwvtepHACache.addChild(parentId, childPath/*child*/);
98         }
99     }
100
101     @Override
102     void onGlobalNodeUpdate(InstanceIdentifier<Node> childPath,
103                             Node updatedChildNode,
104                             Node beforeChildNode,
105                             ReadWriteTransaction tx) {
106         boolean wasHAChild = hwvtepHACache.isHAEnabledDevice(childPath);
107         addToHACacheIfBecameHAChild(childPath, updatedChildNode, beforeChildNode, tx);
108         boolean isHAChild = hwvtepHACache.isHAEnabledDevice(childPath);
109
110
111         if (!wasHAChild && isHAChild) {
112             LOG.debug(getNodeId(childPath) + " " + "became ha_child");
113         } else if (wasHAChild && !isHAChild) {
114             LOG.debug(getNodeId(childPath) + " " + "unbecome ha_child");
115         }
116     }
117
118     static String getNodeId(InstanceIdentifier<Node> key) {
119         String nodeId = key.firstKeyOf(Node.class).getNodeId().getValue();
120         int idx = nodeId.indexOf("uuid/");
121         if (idx > 0) {
122             nodeId = nodeId.substring(idx + "uuid/".length());
123         }
124         return nodeId;
125     }
126
127     /**
128      * If Normal non-ha node changes to HA node , its added to HA cache.
129      *
130      * @param childPath HA child path which got converted to HA node
131      * @param updatedChildNode updated Child node
132      * @param beforeChildNode non-ha node before updated to HA node
133      * @param tx Transaction
134      */
135     public static void addToHACacheIfBecameHAChild(InstanceIdentifier<Node> childPath,
136                                                    Node updatedChildNode,
137                                                    Node beforeChildNode,
138                                                    ReadWriteTransaction tx) {
139         HwvtepGlobalAugmentation updatedAugmentaion = updatedChildNode.getAugmentation(HwvtepGlobalAugmentation.class);
140         HwvtepGlobalAugmentation beforeAugmentaion = null;
141         if (beforeChildNode != null) {
142             beforeAugmentaion = beforeChildNode.getAugmentation(HwvtepGlobalAugmentation.class);
143         }
144         List<Managers> up = null;
145         List<Managers> be = null;
146         if (updatedAugmentaion != null) {
147             up = updatedAugmentaion.getManagers();
148         }
149         if (beforeAugmentaion != null) {
150             be = beforeAugmentaion.getManagers();
151         }
152         if (up != null) {
153             if (be != null) {
154                 if (up.size() > 0) {
155                     if (be.size() > 0) {
156                         Managers m1 = up.get(0);
157                         Managers m2 = be.get(0);
158                         if (!m1.equals(m2)) {
159                             LOG.trace("Manager entry updated for node {} ", updatedChildNode.getNodeId().getValue());
160                             addToCacheIfHAChildNode(childPath, updatedChildNode);
161                         }
162                     }
163                 }
164             }
165             //TODO handle unhaed case
166         }
167     }
168
169     public Set<InstanceIdentifier<Node>> getConnected(Set<InstanceIdentifier<Node>> candidateds) {
170         if (candidateds == null) {
171             return Collections.emptySet();
172         }
173         return candidateds.stream()
174                 .filter((iid) -> connectedNodes.contains(iid))
175                 .collect(Collectors.toSet());
176     }
177
178     public synchronized void runAfterNodeIsConnected(InstanceIdentifier<Node> iid, Consumer<Optional<Node>> consumer) {
179         if (connectedNodes.contains(iid)) {
180             ReadWriteTransaction tx = getDataBroker().newReadWriteTransaction();
181             HAJobScheduler.getInstance().submitJob(() -> {
182                 try {
183                     consumer.accept(tx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet());
184                 } catch (ReadFailedException e) {
185                     LOG.error("Failed to read oper ds {}", iid);
186                 }
187             });
188             return;
189         }
190         waitingJobs.putIfAbsent(iid, Sets.newConcurrentHashSet());
191         waitingJobs.get(iid).add(consumer);
192     }
193 }
194