2 * Copyright (c) 2016 ,2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
10 import com.google.common.base.Optional;
11 import com.google.common.base.Strings;
12 import com.google.common.collect.Sets;
13 import java.util.Collections;
14 import java.util.List;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.function.Consumer;
19 import java.util.stream.Collectors;
20 import javax.inject.Inject;
21 import javax.inject.Singleton;
22 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
23 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
24 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
27 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.Managers;
30 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 public class HAOpClusteredListener extends HwvtepNodeBaseListener implements ClusteredDataTreeChangeListener<Node> {
37 private static final Logger LOG = LoggerFactory.getLogger(HAOpClusteredListener.class);
39 private final Set<InstanceIdentifier<Node>> connectedNodes = ConcurrentHashMap.newKeySet();
40 private final Map<InstanceIdentifier<Node>, Set<Consumer<Optional<Node>>>> waitingJobs = new ConcurrentHashMap<>();
43 public HAOpClusteredListener(DataBroker db) throws Exception {
44 super(LogicalDatastoreType.OPERATIONAL, db);
45 LOG.info("Registering HAOpClusteredListener");
48 public Set<InstanceIdentifier<Node>> getConnectedNodes() {
49 return connectedNodes;
53 synchronized void onGlobalNodeDelete(InstanceIdentifier<Node> key, Node added, ReadWriteTransaction tx) {
54 connectedNodes.remove(key);
55 hwvtepHACache.updateDisconnectedNodeStatus(key);
59 void onPsNodeDelete(InstanceIdentifier<Node> key, Node addedPSNode, ReadWriteTransaction tx) {
60 connectedNodes.remove(key);
61 hwvtepHACache.updateDisconnectedNodeStatus(key);
65 void onPsNodeAdd(InstanceIdentifier<Node> key, Node addedPSNode, ReadWriteTransaction tx) {
66 connectedNodes.add(key);
67 hwvtepHACache.updateConnectedNodeStatus(key);
71 public synchronized void onGlobalNodeAdd(InstanceIdentifier<Node> key, Node updated, ReadWriteTransaction tx) {
72 connectedNodes. add(key);
73 addToCacheIfHAChildNode(key, updated);
74 hwvtepHACache.updateConnectedNodeStatus(key);
75 if (waitingJobs.containsKey(key) && !waitingJobs.get(key).isEmpty()) {
77 HAJobScheduler jobScheduler = HAJobScheduler.getInstance();
78 Optional<Node> nodeOptional = tx.read(LogicalDatastoreType.OPERATIONAL, key).checkedGet();
79 if (nodeOptional.isPresent()) {
80 waitingJobs.get(key).forEach(
81 (waitingJob) -> jobScheduler.submitJob(() -> waitingJob.accept(nodeOptional)));
82 waitingJobs.get(key).clear();
84 LOG.error("Failed to read oper node {}", key);
86 } catch (ReadFailedException e) {
87 LOG.error("Failed to read oper node {}", key);
92 public static void addToCacheIfHAChildNode(InstanceIdentifier<Node> childPath, Node childNode) {
93 String haId = HwvtepHAUtil.getHAIdFromManagerOtherConfig(childNode);
94 if (!Strings.isNullOrEmpty(haId)) {
95 InstanceIdentifier<Node> parentId = HwvtepHAUtil.createInstanceIdentifierFromHAId(haId);
96 //HwvtepHAUtil.updateL2GwCacheNodeId(childNode, parentId);
97 hwvtepHACache.addChild(parentId, childPath/*child*/);
102 void onGlobalNodeUpdate(InstanceIdentifier<Node> childPath,
103 Node updatedChildNode,
104 Node beforeChildNode,
105 ReadWriteTransaction tx) {
106 boolean wasHAChild = hwvtepHACache.isHAEnabledDevice(childPath);
107 addToHACacheIfBecameHAChild(childPath, updatedChildNode, beforeChildNode, tx);
108 boolean isHAChild = hwvtepHACache.isHAEnabledDevice(childPath);
111 if (!wasHAChild && isHAChild) {
112 LOG.debug(getNodeId(childPath) + " " + "became ha_child");
113 } else if (wasHAChild && !isHAChild) {
114 LOG.debug(getNodeId(childPath) + " " + "unbecome ha_child");
118 static String getNodeId(InstanceIdentifier<Node> key) {
119 String nodeId = key.firstKeyOf(Node.class).getNodeId().getValue();
120 int idx = nodeId.indexOf("uuid/");
122 nodeId = nodeId.substring(idx + "uuid/".length());
128 * If Normal non-ha node changes to HA node , its added to HA cache.
130 * @param childPath HA child path which got converted to HA node
131 * @param updatedChildNode updated Child node
132 * @param beforeChildNode non-ha node before updated to HA node
133 * @param tx Transaction
135 public static void addToHACacheIfBecameHAChild(InstanceIdentifier<Node> childPath,
136 Node updatedChildNode,
137 Node beforeChildNode,
138 ReadWriteTransaction tx) {
139 HwvtepGlobalAugmentation updatedAugmentaion = updatedChildNode.getAugmentation(HwvtepGlobalAugmentation.class);
140 HwvtepGlobalAugmentation beforeAugmentaion = null;
141 if (beforeChildNode != null) {
142 beforeAugmentaion = beforeChildNode.getAugmentation(HwvtepGlobalAugmentation.class);
144 List<Managers> up = null;
145 List<Managers> be = null;
146 if (updatedAugmentaion != null) {
147 up = updatedAugmentaion.getManagers();
149 if (beforeAugmentaion != null) {
150 be = beforeAugmentaion.getManagers();
156 Managers m1 = up.get(0);
157 Managers m2 = be.get(0);
158 if (!m1.equals(m2)) {
159 LOG.trace("Manager entry updated for node {} ", updatedChildNode.getNodeId().getValue());
160 addToCacheIfHAChildNode(childPath, updatedChildNode);
165 //TODO handle unhaed case
169 public Set<InstanceIdentifier<Node>> getConnected(Set<InstanceIdentifier<Node>> candidateds) {
170 if (candidateds == null) {
171 return Collections.emptySet();
173 return candidateds.stream()
174 .filter((iid) -> connectedNodes.contains(iid))
175 .collect(Collectors.toSet());
178 public synchronized void runAfterNodeIsConnected(InstanceIdentifier<Node> iid, Consumer<Optional<Node>> consumer) {
179 if (connectedNodes.contains(iid)) {
180 ReadWriteTransaction tx = getDataBroker().newReadWriteTransaction();
181 HAJobScheduler.getInstance().submitJob(() -> {
183 consumer.accept(tx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet());
184 } catch (ReadFailedException e) {
185 LOG.error("Failed to read oper ds {}", iid);
190 waitingJobs.putIfAbsent(iid, Sets.newConcurrentHashSet());
191 waitingJobs.get(iid).add(consumer);