2 * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
10 import static org.opendaylight.mdsal.binding.util.Datastore.CONFIGURATION;
11 import static org.opendaylight.mdsal.binding.util.Datastore.OPERATIONAL;
13 import com.google.common.base.Strings;
14 import java.util.ArrayList;
15 import java.util.List;
16 import java.util.Optional;
18 import java.util.concurrent.ExecutionException;
19 import java.util.function.BiPredicate;
20 import java.util.function.Predicate;
21 import java.util.stream.Collectors;
22 import javax.inject.Inject;
23 import javax.inject.Singleton;
24 import org.opendaylight.genius.utils.batching.ResourceBatchingManager;
25 import org.opendaylight.mdsal.binding.api.DataBroker;
26 import org.opendaylight.mdsal.binding.api.DataObjectModification;
27 import org.opendaylight.mdsal.binding.util.Datastore.Operational;
28 import org.opendaylight.mdsal.binding.util.TypedReadWriteTransaction;
29 import org.opendaylight.netvirt.elan.l2gw.ha.BatchedTransaction;
30 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
31 import org.opendaylight.netvirt.elan.l2gw.ha.handlers.HAEventHandler;
32 import org.opendaylight.netvirt.elan.l2gw.ha.handlers.IHAEventHandler;
33 import org.opendaylight.netvirt.elan.l2gw.ha.handlers.NodeCopier;
34 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler;
35 import org.opendaylight.serviceutils.srm.RecoverableListener;
36 import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentationBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepPhysicalPortAugmentation;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepPhysicalPortAugmentationBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentationBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.Managers;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointBuilder;
49 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
54 public class HAOpNodeListener extends HwvtepNodeBaseListener<Operational> implements RecoverableListener {
56 private static final Logger LOG = LoggerFactory.getLogger(HAOpNodeListener.class);
58 static BiPredicate<String, InstanceIdentifier<Node>> IS_PS_CHILD_TO_GLOBAL_NODE = (globalNodeId, iid) -> {
59 String psNodeId = iid.firstKeyOf(Node.class).getNodeId().getValue();
60 return psNodeId.startsWith(globalNodeId) && psNodeId.contains("physicalswitch");
63 static Predicate<InstanceIdentifier<Node>> IS_NOT_HA_CHILD = iid -> hwvtepHACache.getParent(iid) == null;
65 private final IHAEventHandler haEventHandler;
66 private final HAOpClusteredListener haOpClusteredListener;
67 private final NodeCopier nodeCopier;
68 private final IdManagerService idManager;
71 public HAOpNodeListener(DataBroker db, HAEventHandler haEventHandler,
72 HAOpClusteredListener haOpClusteredListener,
73 NodeCopier nodeCopier,
74 final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler,
75 final ServiceRecoveryRegistry serviceRecoveryRegistry,
76 final IdManagerService idManager) throws Exception {
77 super(OPERATIONAL, db);
78 this.haEventHandler = haEventHandler;
79 this.haOpClusteredListener = haOpClusteredListener;
80 this.nodeCopier = nodeCopier;
81 this.idManager = idManager;
82 serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(),
84 ResourceBatchingManager.getInstance().registerDefaultBatchHandlers(db);
88 @SuppressWarnings("all")
89 public void registerListener() {
91 LOG.info("Registering HAOpNodeListener");
92 registerListener(OPERATIONAL, getDataBroker());
93 } catch (Exception e) {
94 LOG.error("HA OP Node register listener error.", e);
99 public void deregisterListener() {
100 LOG.info("Deregistering HAOpNodeListener");
104 String getNodeId(InstanceIdentifier<Node> iid) {
105 return iid.firstKeyOf(Node.class).getNodeId().getValue();
109 public void onGlobalNodeAdd(InstanceIdentifier<Node> childGlobalPath,
111 TypedReadWriteTransaction<Operational> tx) {
112 //copy child global node to ha global node
113 //create ha global config node if not present
114 //copy ha global config node to child global config node
115 LOG.info("HAOpNodeListener Node connected {} - Checking if Ha or Non-Ha enabled {}",
116 childNode.getNodeId().getValue(), getManagers(childNode));
117 haOpClusteredListener.onGlobalNodeAdd(childGlobalPath, childNode, tx);
119 txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, configTx -> {
120 if (IS_NOT_HA_CHILD.test(childGlobalPath)) {
121 LOG.info("HAOpNodeListener The connected node is not a HA child {}",
122 childNode.getNodeId().getValue());
123 if (hwvtepHACache.isHAParentNode(childGlobalPath)) {
124 LOG.info("HAOpNodeListener this is Parent Node {}",
125 childNode.getNodeId().getValue());
126 HwvtepGlobalAugmentation globalAugmentation = childNode
127 .augmentation(HwvtepGlobalAugmentation.class);
128 String operDbVersion = globalAugmentation.getDbVersion();
131 Optional<Node> globalConfigNodeOptional = configTx.read(childGlobalPath).get();
132 if (globalConfigNodeOptional.isPresent()) {
133 HwvtepGlobalAugmentation globalConfigAugmentation = globalConfigNodeOptional
134 .get().augmentation(HwvtepGlobalAugmentation.class);
135 String configDbVersion = globalConfigAugmentation.getDbVersion();
136 if (operDbVersion != null && !operDbVersion.equals(configDbVersion)) {
137 LOG.info("Change in Db version from {} to {} for Node {}",
138 configDbVersion, operDbVersion, childGlobalPath);
139 HwvtepGlobalAugmentationBuilder haBuilder =
140 new HwvtepGlobalAugmentationBuilder(globalConfigAugmentation);
141 haBuilder.setDbVersion(operDbVersion);
142 NodeBuilder nodeBuilder = new NodeBuilder(childNode);
143 nodeBuilder.addAugmentation(haBuilder.build());
144 configTx.merge(childGlobalPath, nodeBuilder.build());
146 LOG.debug("No Change in Db version from {} to {} for Node {}",
147 configDbVersion, operDbVersion, childGlobalPath);
150 } catch (ExecutionException | InterruptedException ex) {
151 LOG.error("HAOpNodeListener Failed to read node {} from Config DS",
158 InstanceIdentifier<Node> haNodePath = hwvtepHACache.getParent(childGlobalPath);
159 LOG.info("HAOpNodeListener Ha enabled child node connected {} create parent oper node",
160 childNode.getNodeId().getValue());
162 nodeCopier.copyGlobalNode(Optional.ofNullable(childNode),
163 childGlobalPath, haNodePath, OPERATIONAL, tx);
165 Optional<Node> existingDstGlobalNodeOptional = tx.read(haNodePath).get();
166 List<Managers> managers = HwvtepHAUtil
167 .buildManagersForHANode(Optional.ofNullable(childNode).get(),
168 existingDstGlobalNodeOptional);
170 Optional<Node> globalNodeOptional = configTx.read(haNodePath).get();
171 if (globalNodeOptional.isPresent()) {
172 //Also update the manager section in config which helps in cluster reboot scenarios
173 managers.stream().forEach(manager -> {
174 InstanceIdentifier<Managers> managerIid = haNodePath
175 .augmentation(HwvtepGlobalAugmentation.class)
176 .child(Managers.class, manager.key());
177 configTx.put(managerIid, manager);
179 nodeCopier.copyGlobalNode(globalNodeOptional, haNodePath, childGlobalPath,
182 NodeBuilder nodeBuilder = new NodeBuilder().setNodeId(haNodePath
183 .firstKeyOf(Node.class).getNodeId());
184 HwvtepGlobalAugmentationBuilder augBuilder = new HwvtepGlobalAugmentationBuilder();
185 augBuilder.setManagers(managers);
186 if (existingDstGlobalNodeOptional.isPresent()) {
187 HwvtepGlobalAugmentation srcGlobalAugmentation =
188 existingDstGlobalNodeOptional.get()
189 .augmentation(HwvtepGlobalAugmentation.class);
190 if (srcGlobalAugmentation != null) {
191 augBuilder.setDbVersion(srcGlobalAugmentation.getDbVersion());
194 nodeBuilder.addAugmentation(augBuilder.build());
195 configTx.put(haNodePath, nodeBuilder.build());
197 } catch (ExecutionException | InterruptedException e) {
198 LOG.error("HAOpNodeListener Failed to read nodes {} , {} ", childGlobalPath,
202 readAndCopyChildPsOpToParent(childNode, tx);
205 public Object getManagers(Node node) {
206 if (node.augmentation(HwvtepGlobalAugmentation.class) != null
207 && node.augmentation(HwvtepGlobalAugmentation.class).getManagers() != null) {
208 return node.augmentation(HwvtepGlobalAugmentation.class).getManagers();
213 //Update on global node has been taken care by HAListeners as per perf improvement
215 void onGlobalNodeUpdate(InstanceIdentifier<Node> childGlobalPath,
216 Node updatedChildNode,
217 Node originalChildNode,
218 DataObjectModification<Node> mod,
219 TypedReadWriteTransaction<Operational> tx) {
221 LOG.trace("Node updated {} {}", updatedChildNode, originalChildNode);
223 String oldHAId = HwvtepHAUtil.getHAIdFromManagerOtherConfig(originalChildNode);
224 if (!Strings.isNullOrEmpty(oldHAId)) { //was already ha child
225 InstanceIdentifier<Node> haPath = hwvtepHACache.getParent(childGlobalPath);
226 LOG.debug("Copy oper update from child {} to parent {}", childGlobalPath, haPath);
227 ((BatchedTransaction)tx).setSrcNodeId(updatedChildNode.getNodeId());
228 ((BatchedTransaction)tx).updateMetric(true);
229 haEventHandler.copyChildGlobalOpUpdateToHAParent(haPath, mod, tx);
230 return;//TODO handle unha case
233 HAOpClusteredListener.addToHACacheIfBecameHAChild(childGlobalPath, updatedChildNode, originalChildNode);
234 if (IS_NOT_HA_CHILD.test(childGlobalPath)) {
235 if (!hwvtepHACache.isHAParentNode(childGlobalPath)) {
237 LOG.trace("Connected node is not ha child {}", updatedChildNode);
241 LOG.info("HAOpNodeListener {} became ha child ", updatedChildNode.getNodeId().getValue());
242 onGlobalNodeAdd(childGlobalPath, updatedChildNode, tx);
246 void onGlobalNodeDelete(InstanceIdentifier<Node> childGlobalPath,
248 TypedReadWriteTransaction<Operational> tx) {
249 haOpClusteredListener.onGlobalNodeDelete(childGlobalPath, childNode, tx);
250 if (IS_NOT_HA_CHILD.test(childGlobalPath)) {
251 LOG.info("HAOpNodeListener non ha child global delete {} ", getNodeId(childGlobalPath));
254 LOG.info("HAOpNodeListener ha child global delete {} ", getNodeId(childGlobalPath));
255 InstanceIdentifier<Node> haNodePath = hwvtepHACache.getParent(childGlobalPath);
256 Set<InstanceIdentifier<Node>> children = hwvtepHACache.getChildrenForHANode(haNodePath);
257 if (haOpClusteredListener.getConnected(children).isEmpty()) {
258 LOG.info("HAOpNodeListener All child deleted for ha node {} ", HwvtepHAUtil.getNodeIdVal(haNodePath));
259 //ha ps delete is taken care by ps node delete
260 //HwvtepHAUtil.deleteSwitchesManagedBy-Node(haNodePath, tx);
262 HwvtepHAUtil.deleteNodeIfPresent(tx, haNodePath);
263 } catch (ExecutionException | InterruptedException e) {
264 LOG.error("HAOpNodeListener HA Node Delete failed {}", haNodePath);
267 LOG.info("HAOpNodeListener not all child deleted {} connected {}", getNodeId(childGlobalPath),
268 haOpClusteredListener.getConnected(children));
273 public void onPsNodeAdd(InstanceIdentifier<Node> childPsPath,
275 TypedReadWriteTransaction<Operational> tx) {
276 //copy child ps oper node to ha ps oper node
277 //copy ha ps config node to child ps config
278 haOpClusteredListener.onPsNodeAdd(childPsPath, childPsNode, tx);
279 InstanceIdentifier<Node> childGlobalPath = HwvtepHAUtil
280 .getGlobalNodePathFromPSNode(childPsNode);
281 if (!haOpClusteredListener.getConnectedNodes().contains(childGlobalPath)) {
282 LOG.error("HAOpNodeListener Ignoring ps node add as global node not found {}",
283 childPsNode.getNodeId().getValue());
286 if (IS_NOT_HA_CHILD.test(childGlobalPath)) {
287 if (!hwvtepHACache.isHAParentNode(childGlobalPath)) {
288 LOG.error("HAOpNodeListener Ignoring ps node add as the node is not ha child {}",
289 childPsNode.getNodeId().getValue());
293 LOG.info("HAOpNodeListener Ha ps child connected {} ", getNodeId(childPsPath));
294 InstanceIdentifier<Node> haGlobalPath = hwvtepHACache.getParent(childGlobalPath);
295 InstanceIdentifier<Node> haPsPath = HwvtepHAUtil.convertPsPath(childPsNode, haGlobalPath);
296 txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, configTx -> {
299 .copyPSNode(Optional.ofNullable(childPsNode), childPsPath, haPsPath, haGlobalPath,
302 Optional<Node> haPsNodeOptional = configTx.read(haPsPath).get();
303 if (haPsNodeOptional.isPresent()) {
304 nodeCopier.copyPSNode(haPsNodeOptional, haPsPath, childPsPath, childGlobalPath,
307 PhysicalSwitchAugmentationBuilder psBuilder = new PhysicalSwitchAugmentationBuilder();
308 PhysicalSwitchAugmentation srcPsAugmentation = childPsNode
309 .augmentation(PhysicalSwitchAugmentation.class);
310 if (srcPsAugmentation != null) {
311 psBuilder.setTunnelIps(srcPsAugmentation.getTunnelIps());
313 LOG.error("Physical Switch Augmentation is null for the child ps node: {}",
316 //setting tunnel ip and termination points in the parent node
317 List<TerminationPoint> terminationPoints = getTerminationPointForConfig(
319 // for (TerminationPoint terminationPoint: terminationPoints) {
320 // HwvtepTerminationPointCache.getInstance().addTerminationPoint(haGlobalPath, terminationPoint);
322 NodeBuilder nodeBuilder = new NodeBuilder()
323 .setNodeId(haPsPath.firstKeyOf(Node.class).getNodeId());
324 nodeBuilder.addAugmentation(psBuilder.build());
325 LOG.info("HAOpNodeListener creating the HAParent PhysicalSwitch {}", haPsPath);
326 configTx.put(haPsPath, nodeBuilder
327 .setTerminationPoint(terminationPoints).build());
329 } catch (ExecutionException | InterruptedException e) {
330 LOG.error("Failed to read nodes {} , {} ", childPsPath, haGlobalPath);
335 private List<TerminationPoint> getTerminationPointForConfig(Node childPsNode) {
336 List<TerminationPoint> configTPList = new ArrayList<>();
337 if (childPsNode != null && childPsNode.getTerminationPoint() != null) {
338 childPsNode.getTerminationPoint().values().forEach(operTerminationPoint -> {
339 TerminationPointBuilder tpBuilder = new TerminationPointBuilder(operTerminationPoint);
340 tpBuilder.removeAugmentation(HwvtepPhysicalPortAugmentation.class);
341 HwvtepPhysicalPortAugmentation operPPAugmentation =
342 operTerminationPoint. augmentation(HwvtepPhysicalPortAugmentation.class);
343 HwvtepPhysicalPortAugmentationBuilder tpAugmentationBuilder =
344 new HwvtepPhysicalPortAugmentationBuilder();
345 tpAugmentationBuilder.setAclBindings(operPPAugmentation.getAclBindings());
346 tpAugmentationBuilder
347 .setHwvtepNodeDescription(operPPAugmentation.getHwvtepNodeDescription());
348 tpAugmentationBuilder.setHwvtepNodeName(operPPAugmentation.getHwvtepNodeName());
349 tpAugmentationBuilder.setPhysicalPortUuid(operPPAugmentation.getPhysicalPortUuid());
350 tpAugmentationBuilder.setVlanStats(operPPAugmentation.getVlanStats());
351 tpAugmentationBuilder.setVlanBindings(operPPAugmentation.getVlanBindings());
353 tpBuilder.addAugmentation(tpAugmentationBuilder.build());
354 configTPList.add(tpBuilder.build());
361 void onPsNodeUpdate(Node updatedChildPSNode,
362 DataObjectModification<Node> mod,
363 TypedReadWriteTransaction<Operational> tx) {
364 InstanceIdentifier<Node> childGlobalPath = HwvtepHAUtil.getGlobalNodePathFromPSNode(updatedChildPSNode);
365 if (IS_NOT_HA_CHILD.test(childGlobalPath)) {
368 //tunnel ip and termination points from child to parent
369 InstanceIdentifier<Node> haGlobalPath = hwvtepHACache.getParent(childGlobalPath);
370 ((BatchedTransaction)tx).setSrcNodeId(updatedChildPSNode.getNodeId());
371 ((BatchedTransaction)tx).updateMetric(true);
372 haEventHandler.copyChildPsOpUpdateToHAParent(updatedChildPSNode, haGlobalPath, mod, tx);
376 void onPsNodeDelete(InstanceIdentifier<Node> childPsPath,
378 TypedReadWriteTransaction<Operational> tx) {
379 //one child ps node disconnected
380 //find if all child ps nodes disconnected then delete parent ps node
381 haOpClusteredListener.onPsNodeDelete(childPsPath, childPsNode, tx);
382 InstanceIdentifier<Node> disconnectedChildGlobalPath = HwvtepHAUtil.getGlobalNodePathFromPSNode(childPsNode);
383 if (IS_NOT_HA_CHILD.test(disconnectedChildGlobalPath)) {
384 LOG.info("HAOpNodeListener on non ha ps child delete {} ", getNodeId(childPsPath));
387 InstanceIdentifier<Node> haGlobalPath = hwvtepHACache.getParent(disconnectedChildGlobalPath);
388 Set<InstanceIdentifier<Node>> childPsPaths = hwvtepHACache.getChildrenForHANode(haGlobalPath).stream()
389 .map(childGlobalPath -> HwvtepHAUtil.convertPsPath(childPsNode, childGlobalPath))
390 .collect(Collectors.toSet());
391 //TODO validate what if this is null
392 if (haOpClusteredListener.getConnected(childPsPaths).isEmpty()) {
393 InstanceIdentifier<Node> haPsPath = HwvtepHAUtil.convertPsPath(childPsNode, haGlobalPath);
394 LOG.info("HAOpNodeListener All child deleted for ha ps node {} ", HwvtepHAUtil.getNodeIdVal(haPsPath));
396 HwvtepHAUtil.deleteNodeIfPresent(tx, haPsPath);
397 } catch (ExecutionException | InterruptedException e) {
398 LOG.error("HAOpNodeListener Exception While Delete HA PS Node : {}", haPsPath);
400 //HwvtepHAUtil.deleteGlobalNodeSwitches(haGlobalPath, haPsPath, LogicalDatastoreType.OPERATIONAL, tx);
402 LOG.info("HAOpNodeListener not all ha ps child deleted {} connected {}", getNodeId(childPsPath),
403 haOpClusteredListener.getConnected(childPsPaths));
407 private void readAndCopyChildPsOpToParent(Node childNode, TypedReadWriteTransaction<Operational> tx) {
408 String childGlobalNodeId = childNode.getNodeId().getValue();
409 List<InstanceIdentifier> childPsIids = new ArrayList<>();
410 HwvtepGlobalAugmentation hwvtepGlobalAugmentation = childNode.augmentation(HwvtepGlobalAugmentation.class);
411 if (hwvtepGlobalAugmentation == null
412 || HwvtepHAUtil.isEmpty(hwvtepGlobalAugmentation.nonnullSwitches().values())) {
413 haOpClusteredListener.getConnectedNodes()
415 .filter(connectedIid -> IS_PS_CHILD_TO_GLOBAL_NODE.test(childGlobalNodeId, connectedIid))
416 .forEach(connectedIid -> childPsIids.add(connectedIid));
418 hwvtepGlobalAugmentation.getSwitches().values().forEach(
419 switches -> childPsIids.add(switches.getSwitchRef().getValue()));
421 if (childPsIids.isEmpty()) {
422 LOG.info("HAOpNodeListener No child ps found for global {}", childGlobalNodeId);
424 childPsIids.forEach(psIid -> {
426 InstanceIdentifier<Node> childPsIid = psIid;
427 Optional<Node> childPsNode = tx.read(childPsIid).get();
428 if (childPsNode.isPresent()) {
429 LOG.debug("Child oper PS node found");
430 onPsNodeAdd(childPsIid, childPsNode.get(), tx);
432 LOG.error("HAOpNodeListener Child oper ps node not found {}", childPsIid);
434 } catch (ExecutionException | InterruptedException e) {
435 LOG.error("HAOpNodeListener Failed to read child ps node {}", psIid);