2 * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
10 import static org.opendaylight.mdsal.binding.util.Datastore.CONFIGURATION;
11 import static org.opendaylight.mdsal.binding.util.Datastore.OPERATIONAL;
13 import com.google.common.base.Strings;
14 import java.util.ArrayList;
15 import java.util.List;
16 import java.util.Optional;
18 import java.util.concurrent.ExecutionException;
19 import java.util.function.BiPredicate;
20 import java.util.function.Predicate;
21 import java.util.stream.Collectors;
22 import javax.inject.Inject;
23 import javax.inject.Singleton;
24 import org.opendaylight.genius.utils.batching.ResourceBatchingManager;
25 import org.opendaylight.mdsal.binding.api.DataBroker;
26 import org.opendaylight.mdsal.binding.api.DataObjectModification;
27 import org.opendaylight.mdsal.binding.util.Datastore.Operational;
28 import org.opendaylight.mdsal.binding.util.TypedReadWriteTransaction;
29 import org.opendaylight.netvirt.elan.l2gw.ha.BatchedTransaction;
30 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
31 import org.opendaylight.netvirt.elan.l2gw.ha.handlers.HAEventHandler;
32 import org.opendaylight.netvirt.elan.l2gw.ha.handlers.IHAEventHandler;
33 import org.opendaylight.netvirt.elan.l2gw.ha.handlers.NodeCopier;
34 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler;
35 import org.opendaylight.serviceutils.srm.RecoverableListener;
36 import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentationBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepPhysicalPortAugmentation;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepPhysicalPortAugmentationBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentationBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.Managers;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointBuilder;
49 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
54 public class HAOpNodeListener extends HwvtepNodeBaseListener<Operational> implements RecoverableListener {
56 private static final Logger LOG = LoggerFactory.getLogger(HAOpNodeListener.class);
58 static BiPredicate<String, InstanceIdentifier<Node>> IS_PS_CHILD_TO_GLOBAL_NODE = (globalNodeId, iid) -> {
59 String psNodeId = iid.firstKeyOf(Node.class).getNodeId().getValue();
60 return psNodeId.startsWith(globalNodeId) && psNodeId.contains("physicalswitch");
63 static Predicate<InstanceIdentifier<Node>> IS_NOT_HA_CHILD = (iid) -> hwvtepHACache.getParent(iid) == null;
65 private final IHAEventHandler haEventHandler;
66 private final HAOpClusteredListener haOpClusteredListener;
67 private final NodeCopier nodeCopier;
68 private final IdManagerService idManager;
71 public HAOpNodeListener(DataBroker db, HAEventHandler haEventHandler,
72 HAOpClusteredListener haOpClusteredListener,
73 NodeCopier nodeCopier,
74 final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler,
75 final ServiceRecoveryRegistry serviceRecoveryRegistry,
76 final IdManagerService idManager) throws Exception {
77 super(OPERATIONAL, db);
78 this.haEventHandler = haEventHandler;
79 this.haOpClusteredListener = haOpClusteredListener;
80 this.nodeCopier = nodeCopier;
81 this.idManager = idManager;
82 serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(),
84 ResourceBatchingManager.getInstance().registerDefaultBatchHandlers(db);
88 @SuppressWarnings("all")
89 public void registerListener() {
91 LOG.info("Registering HAOpNodeListener");
92 registerListener(OPERATIONAL, getDataBroker());
93 } catch (Exception e) {
94 LOG.error("HA OP Node register listener error.", e);
98 public void deregisterListener() {
99 LOG.info("Deregistering HAOpNodeListener");
103 String getNodeId(InstanceIdentifier<Node> iid) {
104 return iid.firstKeyOf(Node.class).getNodeId().getValue();
108 public void onGlobalNodeAdd(InstanceIdentifier<Node> childGlobalPath,
110 TypedReadWriteTransaction<Operational> tx) {
111 //copy child global node to ha global node
112 //create ha global config node if not present
113 //copy ha global config node to child global config node
114 LOG.info("HAOpNodeListener Node connected {} - Checking if Ha or Non-Ha enabled {}",
115 childNode.getNodeId().getValue(), getManagers(childNode));
116 haOpClusteredListener.onGlobalNodeAdd(childGlobalPath, childNode, tx);
118 txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, configTx -> {
119 if (IS_NOT_HA_CHILD.test(childGlobalPath)) {
120 LOG.info("HAOpNodeListener The connected node is not a HA child {}",
121 childNode.getNodeId().getValue());
122 if (hwvtepHACache.isHAParentNode(childGlobalPath)) {
123 LOG.info("HAOpNodeListener this is Parent Node {}",
124 childNode.getNodeId().getValue());
125 HwvtepGlobalAugmentation globalAugmentation = childNode
126 .augmentation(HwvtepGlobalAugmentation.class);
127 String operDbVersion = globalAugmentation.getDbVersion();
130 Optional<Node> globalConfigNodeOptional = configTx.read(childGlobalPath).get();
131 if (globalConfigNodeOptional.isPresent()) {
132 HwvtepGlobalAugmentation globalConfigAugmentation = globalConfigNodeOptional
133 .get().augmentation(HwvtepGlobalAugmentation.class);
134 String configDbVersion = globalConfigAugmentation.getDbVersion();
135 if (operDbVersion != null && !operDbVersion.equals(configDbVersion)) {
136 LOG.info("Change in Db version from {} to {} for Node {}",
137 configDbVersion, operDbVersion, childGlobalPath);
138 HwvtepGlobalAugmentationBuilder haBuilder =
139 new HwvtepGlobalAugmentationBuilder(globalConfigAugmentation);
140 haBuilder.setDbVersion(operDbVersion);
141 NodeBuilder nodeBuilder = new NodeBuilder(childNode);
142 nodeBuilder.addAugmentation(haBuilder.build());
143 configTx.merge(childGlobalPath, nodeBuilder.build());
145 LOG.debug("No Change in Db version from {} to {} for Node {}",
146 configDbVersion, operDbVersion, childGlobalPath);
149 } catch (ExecutionException | InterruptedException ex) {
150 LOG.error("HAOpNodeListener Failed to read node {} from Config DS",
157 InstanceIdentifier<Node> haNodePath = hwvtepHACache.getParent(childGlobalPath);
158 LOG.info("HAOpNodeListener Ha enabled child node connected {} create parent oper node",
159 childNode.getNodeId().getValue());
161 nodeCopier.copyGlobalNode(Optional.ofNullable(childNode),
162 childGlobalPath, haNodePath, OPERATIONAL, tx);
164 Optional<Node> existingDstGlobalNodeOptional = tx.read(haNodePath).get();
165 List<Managers> managers = HwvtepHAUtil
166 .buildManagersForHANode(Optional.ofNullable(childNode).get(),
167 existingDstGlobalNodeOptional);
169 Optional<Node> globalNodeOptional = configTx.read(haNodePath).get();
170 if (globalNodeOptional.isPresent()) {
171 //Also update the manager section in config which helps in cluster reboot scenarios
172 managers.stream().forEach((manager) -> {
173 InstanceIdentifier<Managers> managerIid = haNodePath
174 .augmentation(HwvtepGlobalAugmentation.class)
175 .child(Managers.class, manager.key());
176 configTx.put(managerIid, manager);
178 nodeCopier.copyGlobalNode(globalNodeOptional, haNodePath, childGlobalPath,
181 NodeBuilder nodeBuilder = new NodeBuilder().setNodeId(haNodePath
182 .firstKeyOf(Node.class).getNodeId());
183 HwvtepGlobalAugmentationBuilder augBuilder = new HwvtepGlobalAugmentationBuilder();
184 augBuilder.setManagers(managers);
185 if (existingDstGlobalNodeOptional.isPresent()) {
186 HwvtepGlobalAugmentation srcGlobalAugmentation =
187 existingDstGlobalNodeOptional.get()
188 .augmentation(HwvtepGlobalAugmentation.class);
189 if (srcGlobalAugmentation != null) {
190 augBuilder.setDbVersion(srcGlobalAugmentation.getDbVersion());
193 nodeBuilder.addAugmentation(augBuilder.build());
194 configTx.put(haNodePath, nodeBuilder.build());
196 } catch (ExecutionException | InterruptedException e) {
197 LOG.error("HAOpNodeListener Failed to read nodes {} , {} ", childGlobalPath,
201 readAndCopyChildPsOpToParent(childNode, tx);
204 public Object getManagers(Node node) {
205 if (node.augmentation(HwvtepGlobalAugmentation.class) != null
206 && node.augmentation(HwvtepGlobalAugmentation.class).getManagers() != null) {
207 return node.augmentation(HwvtepGlobalAugmentation.class).getManagers();
212 //Update on global node has been taken care by HAListeners as per perf improvement
214 void onGlobalNodeUpdate(InstanceIdentifier<Node> childGlobalPath,
215 Node updatedChildNode,
216 Node originalChildNode,
217 DataObjectModification<Node> mod,
218 TypedReadWriteTransaction<Operational> tx) {
220 LOG.trace("Node updated {} {}", updatedChildNode, originalChildNode);
222 String oldHAId = HwvtepHAUtil.getHAIdFromManagerOtherConfig(originalChildNode);
223 if (!Strings.isNullOrEmpty(oldHAId)) { //was already ha child
224 InstanceIdentifier<Node> haPath = hwvtepHACache.getParent(childGlobalPath);
225 LOG.debug("Copy oper update from child {} to parent {}", childGlobalPath, haPath);
226 ((BatchedTransaction)tx).setSrcNodeId(updatedChildNode.getNodeId());
227 ((BatchedTransaction)tx).updateMetric(true);
228 haEventHandler.copyChildGlobalOpUpdateToHAParent(haPath, mod, tx);
229 return;//TODO handle unha case
232 HAOpClusteredListener.addToHACacheIfBecameHAChild(childGlobalPath, updatedChildNode, originalChildNode);
233 if (IS_NOT_HA_CHILD.test(childGlobalPath)) {
234 if (!hwvtepHACache.isHAParentNode(childGlobalPath)) {
236 LOG.trace("Connected node is not ha child {}", updatedChildNode);
240 LOG.info("HAOpNodeListener {} became ha child ", updatedChildNode.getNodeId().getValue());
241 onGlobalNodeAdd(childGlobalPath, updatedChildNode, tx);
245 void onGlobalNodeDelete(InstanceIdentifier<Node> childGlobalPath,
247 TypedReadWriteTransaction<Operational> tx) {
248 haOpClusteredListener.onGlobalNodeDelete(childGlobalPath, childNode, tx);
249 if (IS_NOT_HA_CHILD.test(childGlobalPath)) {
250 LOG.info("HAOpNodeListener non ha child global delete {} ", getNodeId(childGlobalPath));
253 LOG.info("HAOpNodeListener ha child global delete {} ", getNodeId(childGlobalPath));
254 InstanceIdentifier<Node> haNodePath = hwvtepHACache.getParent(childGlobalPath);
255 Set<InstanceIdentifier<Node>> children = hwvtepHACache.getChildrenForHANode(haNodePath);
256 if (haOpClusteredListener.getConnected(children).isEmpty()) {
257 LOG.info("HAOpNodeListener All child deleted for ha node {} ", HwvtepHAUtil.getNodeIdVal(haNodePath));
258 //ha ps delete is taken care by ps node delete
259 //HwvtepHAUtil.deleteSwitchesManagedBy-Node(haNodePath, tx);
261 HwvtepHAUtil.deleteNodeIfPresent(tx, haNodePath);
262 } catch (ExecutionException | InterruptedException e) {
263 LOG.error("HAOpNodeListener HA Node Delete failed {}", haNodePath);
266 LOG.info("HAOpNodeListener not all child deleted {} connected {}", getNodeId(childGlobalPath),
267 haOpClusteredListener.getConnected(children));
272 public void onPsNodeAdd(InstanceIdentifier<Node> childPsPath,
274 TypedReadWriteTransaction<Operational> tx) {
275 //copy child ps oper node to ha ps oper node
276 //copy ha ps config node to child ps config
277 haOpClusteredListener.onPsNodeAdd(childPsPath, childPsNode, tx);
278 InstanceIdentifier<Node> childGlobalPath = HwvtepHAUtil
279 .getGlobalNodePathFromPSNode(childPsNode);
280 if (!haOpClusteredListener.getConnectedNodes().contains(childGlobalPath)) {
281 LOG.error("HAOpNodeListener Ignoring ps node add as global node not found {}",
282 childPsNode.getNodeId().getValue());
285 if (IS_NOT_HA_CHILD.test(childGlobalPath)) {
286 if (!hwvtepHACache.isHAParentNode(childGlobalPath)) {
287 LOG.error("HAOpNodeListener Ignoring ps node add as the node is not ha child {}",
288 childPsNode.getNodeId().getValue());
292 LOG.info("HAOpNodeListener Ha ps child connected {} ", getNodeId(childPsPath));
293 InstanceIdentifier<Node> haGlobalPath = hwvtepHACache.getParent(childGlobalPath);
294 InstanceIdentifier<Node> haPsPath = HwvtepHAUtil.convertPsPath(childPsNode, haGlobalPath);
295 txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, configTx -> {
298 .copyPSNode(Optional.ofNullable(childPsNode), childPsPath, haPsPath, haGlobalPath,
301 Optional<Node> haPsNodeOptional = configTx.read(haPsPath).get();
302 if (haPsNodeOptional.isPresent()) {
303 nodeCopier.copyPSNode(haPsNodeOptional, haPsPath, childPsPath, childGlobalPath,
306 PhysicalSwitchAugmentationBuilder psBuilder = new PhysicalSwitchAugmentationBuilder();
307 PhysicalSwitchAugmentation srcPsAugmentation = childPsNode
308 .augmentation(PhysicalSwitchAugmentation.class);
309 if (srcPsAugmentation != null) {
310 psBuilder.setTunnelIps(srcPsAugmentation.getTunnelIps());
312 LOG.error("Physical Switch Augmentation is null for the child ps node: {}",
315 //setting tunnel ip and termination points in the parent node
316 List<TerminationPoint> terminationPoints = getTerminationPointForConfig(
318 // for (TerminationPoint terminationPoint: terminationPoints) {
319 // HwvtepTerminationPointCache.getInstance().addTerminationPoint(haGlobalPath, terminationPoint);
321 NodeBuilder nodeBuilder = new NodeBuilder()
322 .setNodeId(haPsPath.firstKeyOf(Node.class).getNodeId());
323 nodeBuilder.addAugmentation(psBuilder.build());
324 LOG.info("HAOpNodeListener creating the HAParent PhysicalSwitch {}", haPsPath);
325 configTx.put(haPsPath, nodeBuilder
326 .setTerminationPoint(terminationPoints).build());
328 } catch (ExecutionException | InterruptedException e) {
329 LOG.error("Failed to read nodes {} , {} ", childPsPath, haGlobalPath);
334 private List<TerminationPoint> getTerminationPointForConfig(Node childPsNode) {
335 List<TerminationPoint> configTPList = new ArrayList<>();
336 if (childPsNode != null && childPsNode.getTerminationPoint() != null) {
337 childPsNode.getTerminationPoint().values().forEach(operTerminationPoint -> {
338 TerminationPointBuilder tpBuilder = new TerminationPointBuilder(operTerminationPoint);
339 tpBuilder.removeAugmentation(HwvtepPhysicalPortAugmentation.class);
340 HwvtepPhysicalPortAugmentation operPPAugmentation =
341 operTerminationPoint. augmentation(HwvtepPhysicalPortAugmentation.class);
342 HwvtepPhysicalPortAugmentationBuilder tpAugmentationBuilder =
343 new HwvtepPhysicalPortAugmentationBuilder();
344 tpAugmentationBuilder.setAclBindings(operPPAugmentation.getAclBindings());
345 tpAugmentationBuilder
346 .setHwvtepNodeDescription(operPPAugmentation.getHwvtepNodeDescription());
347 tpAugmentationBuilder.setHwvtepNodeName(operPPAugmentation.getHwvtepNodeName());
348 tpAugmentationBuilder.setPhysicalPortUuid(operPPAugmentation.getPhysicalPortUuid());
349 tpAugmentationBuilder.setVlanStats(operPPAugmentation.getVlanStats());
350 tpAugmentationBuilder.setVlanBindings(operPPAugmentation.getVlanBindings());
352 tpBuilder.addAugmentation(HwvtepPhysicalPortAugmentation.class,
353 tpAugmentationBuilder.build());
354 configTPList.add(tpBuilder.build());
361 void onPsNodeUpdate(Node updatedChildPSNode,
362 DataObjectModification<Node> mod,
363 TypedReadWriteTransaction<Operational> tx) {
364 InstanceIdentifier<Node> childGlobalPath = HwvtepHAUtil.getGlobalNodePathFromPSNode(updatedChildPSNode);
365 if (IS_NOT_HA_CHILD.test(childGlobalPath)) {
368 //tunnel ip and termination points from child to parent
369 InstanceIdentifier<Node> haGlobalPath = hwvtepHACache.getParent(childGlobalPath);
370 ((BatchedTransaction)tx).setSrcNodeId(updatedChildPSNode.getNodeId());
371 ((BatchedTransaction)tx).updateMetric(true);
372 haEventHandler.copyChildPsOpUpdateToHAParent(updatedChildPSNode, haGlobalPath, mod, tx);
376 void onPsNodeDelete(InstanceIdentifier<Node> childPsPath,
378 TypedReadWriteTransaction<Operational> tx) {
379 //one child ps node disconnected
380 //find if all child ps nodes disconnected then delete parent ps node
381 haOpClusteredListener.onPsNodeDelete(childPsPath, childPsNode, tx);
382 InstanceIdentifier<Node> disconnectedChildGlobalPath = HwvtepHAUtil.getGlobalNodePathFromPSNode(childPsNode);
383 if (IS_NOT_HA_CHILD.test(disconnectedChildGlobalPath)) {
384 LOG.info("HAOpNodeListener on non ha ps child delete {} ", getNodeId(childPsPath));
387 InstanceIdentifier<Node> haGlobalPath = hwvtepHACache.getParent(disconnectedChildGlobalPath);
388 Set<InstanceIdentifier<Node>> childPsPaths = hwvtepHACache.getChildrenForHANode(haGlobalPath).stream()
389 .map((childGlobalPath) -> HwvtepHAUtil.convertPsPath(childPsNode, childGlobalPath))
390 .collect(Collectors.toSet());
391 //TODO validate what if this is null
392 if (haOpClusteredListener.getConnected(childPsPaths).isEmpty()) {
393 InstanceIdentifier<Node> haPsPath = HwvtepHAUtil.convertPsPath(childPsNode, haGlobalPath);
394 LOG.info("HAOpNodeListener All child deleted for ha ps node {} ", HwvtepHAUtil.getNodeIdVal(haPsPath));
396 HwvtepHAUtil.deleteNodeIfPresent(tx, haPsPath);
397 } catch (ExecutionException | InterruptedException e) {
398 LOG.error("HAOpNodeListener Exception While Delete HA PS Node : {}", haPsPath);
400 //HwvtepHAUtil.deleteGlobalNodeSwitches(haGlobalPath, haPsPath, LogicalDatastoreType.OPERATIONAL, tx);
402 LOG.info("HAOpNodeListener not all ha ps child deleted {} connected {}", getNodeId(childPsPath),
403 haOpClusteredListener.getConnected(childPsPaths));
407 private void readAndCopyChildPsOpToParent(Node childNode, TypedReadWriteTransaction<Operational> tx) {
408 String childGlobalNodeId = childNode.getNodeId().getValue();
409 List<InstanceIdentifier> childPsIids = new ArrayList<>();
410 HwvtepGlobalAugmentation hwvtepGlobalAugmentation = childNode.augmentation(HwvtepGlobalAugmentation.class);
411 if (hwvtepGlobalAugmentation == null
412 || HwvtepHAUtil.isEmpty(hwvtepGlobalAugmentation.nonnullSwitches().values())) {
413 haOpClusteredListener.getConnectedNodes()
415 .filter((connectedIid) -> IS_PS_CHILD_TO_GLOBAL_NODE.test(childGlobalNodeId, connectedIid))
416 .forEach((connectedIid) -> childPsIids.add(connectedIid));
418 hwvtepGlobalAugmentation.getSwitches().values().forEach(
419 (switches) -> childPsIids.add(switches.getSwitchRef().getValue()));
421 if (childPsIids.isEmpty()) {
422 LOG.info("HAOpNodeListener No child ps found for global {}", childGlobalNodeId);
424 childPsIids.forEach((psIid) -> {
426 InstanceIdentifier<Node> childPsIid = psIid;
427 Optional<Node> childPsNode = tx.read(childPsIid).get();
428 if (childPsNode.isPresent()) {
429 LOG.debug("Child oper PS node found");
430 onPsNodeAdd(childPsIid, childPsNode.get(), tx);
432 LOG.error("HAOpNodeListener Child oper ps node not found {}", childPsIid);
434 } catch (ExecutionException | InterruptedException e) {
435 LOG.error("HAOpNodeListener Failed to read child ps node {}", psIid);