2 * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
10 import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
11 import static org.opendaylight.genius.infra.Datastore.OPERATIONAL;
13 import com.google.common.base.Strings;
14 import java.util.ArrayList;
15 import java.util.List;
16 import java.util.Optional;
18 import java.util.concurrent.ExecutionException;
19 import java.util.function.BiPredicate;
20 import java.util.stream.Collectors;
21 import javax.inject.Inject;
22 import javax.inject.Singleton;
23 import org.opendaylight.genius.infra.Datastore.Operational;
24 import org.opendaylight.genius.infra.TypedReadWriteTransaction;
25 import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
26 import org.opendaylight.infrautils.metrics.MetricProvider;
27 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
28 import org.opendaylight.mdsal.binding.api.DataBroker;
29 import org.opendaylight.mdsal.binding.api.DataObjectModification;
30 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
31 import org.opendaylight.netvirt.elan.l2gw.ha.handlers.HAEventHandler;
32 import org.opendaylight.netvirt.elan.l2gw.ha.handlers.IHAEventHandler;
33 import org.opendaylight.netvirt.elan.l2gw.ha.handlers.NodeCopier;
34 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler;
35 import org.opendaylight.serviceutils.srm.RecoverableListener;
36 import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
38 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
39 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
44 public class HAOpNodeListener extends HwvtepNodeBaseListener<Operational> implements RecoverableListener {
46 private static final Logger LOG = LoggerFactory.getLogger(HAOpNodeListener.class);
48 private static final BiPredicate<String, InstanceIdentifier<Node>> IS_PS_CHILD_TO_GLOBAL_NODE =
49 (globalNodeId, iid) -> {
50 String psNodeId = iid.firstKeyOf(Node.class).getNodeId().getValue();
51 return psNodeId.startsWith(globalNodeId) && psNodeId.contains("physicalswitch");
54 private final IHAEventHandler haEventHandler;
55 private final HAOpClusteredListener haOpClusteredListener;
56 private final NodeCopier nodeCopier;
59 public HAOpNodeListener(DataBroker db, HAEventHandler haEventHandler,
60 HAOpClusteredListener haOpClusteredListener,
61 NodeCopier nodeCopier, HwvtepNodeHACache hwvtepNodeHACache,
62 MetricProvider metricProvider,
63 final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler,
64 final ServiceRecoveryRegistry serviceRecoveryRegistry) throws Exception {
65 super(OPERATIONAL, db, hwvtepNodeHACache, metricProvider, true);
66 this.haEventHandler = haEventHandler;
67 this.haOpClusteredListener = haOpClusteredListener;
68 this.nodeCopier = nodeCopier;
69 serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(),
74 @SuppressWarnings("all")
75 public void registerListener() {
77 LOG.info("Registering HAOpNodeListener");
78 registerListener(OPERATIONAL, getDataBroker());
79 } catch (Exception e) {
80 LOG.error("HA OP Node register listener error.");
84 public void deregisterListener() {
85 LOG.info("Deregistering HAOpNodeListener");
89 String getNodeId(InstanceIdentifier<Node> iid) {
90 return iid.firstKeyOf(Node.class).getNodeId().getValue();
94 public void onGlobalNodeAdd(InstanceIdentifier<Node> childGlobalPath,
96 TypedReadWriteTransaction<Operational> tx) {
97 //copy child global node to ha global node
98 //create ha global config node if not present
99 //copy ha global config node to child global config node
100 LOG.trace("Node connected {} - Checking if Ha or Non-Ha enabled ", childNode.getNodeId().getValue());
101 haOpClusteredListener.onGlobalNodeAdd(childGlobalPath, childNode, tx);
102 if (isNotHAChild(childGlobalPath)) {
105 InstanceIdentifier<Node> haNodePath = getHwvtepNodeHACache().getParent(childGlobalPath);
106 LOG.trace("Ha enabled child node connected {}", childNode.getNodeId().getValue());
108 nodeCopier.copyGlobalNode(Optional.ofNullable(childNode), childGlobalPath, haNodePath, OPERATIONAL, tx);
109 LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION,
110 confTx -> nodeCopier.copyGlobalNode(Optional.ofNullable(null), haNodePath, childGlobalPath,
111 CONFIGURATION, confTx)), LOG, "Error copying to configuration");
112 } catch (InterruptedException | ExecutionException e) {
113 LOG.error("Failed to read nodes {} , {} ", childGlobalPath, haNodePath);
115 readAndCopyChildPsOpToParent(childNode, tx);
118 //Update on global node has been taken care by HAListeners as per perf improvement
120 void onGlobalNodeUpdate(InstanceIdentifier<Node> childGlobalPath,
121 Node updatedChildNode,
122 Node originalChildNode,
123 DataObjectModification<Node> mod,
124 TypedReadWriteTransaction<Operational> tx) {
126 String oldHAId = HwvtepHAUtil.getHAIdFromManagerOtherConfig(originalChildNode);
127 if (!Strings.isNullOrEmpty(oldHAId)) { //was already ha child
128 InstanceIdentifier<Node> haPath = getHwvtepNodeHACache().getParent(childGlobalPath);
129 LOG.debug("Copy oper update from child {} to parent {}", childGlobalPath, haPath);
130 haEventHandler.copyChildGlobalOpUpdateToHAParent(haPath, mod, tx);
131 return;//TODO handle unha case
134 addToHACacheIfBecameHAChild(childGlobalPath, updatedChildNode, originalChildNode);
135 if (isNotHAChild(childGlobalPath)) {
138 LOG.info("{} became ha child ", updatedChildNode.getNodeId().getValue());
139 onGlobalNodeAdd(childGlobalPath, updatedChildNode, tx);
143 void onGlobalNodeDelete(InstanceIdentifier<Node> childGlobalPath,
145 TypedReadWriteTransaction<Operational> tx)
146 throws ExecutionException, InterruptedException {
147 haOpClusteredListener.onGlobalNodeDelete(childGlobalPath, childNode, tx);
148 if (isNotHAChild(childGlobalPath)) {
149 LOG.info("non ha child global delete {} ", getNodeId(childGlobalPath));
152 LOG.info("ha child global delete {} ", getNodeId(childGlobalPath));
153 InstanceIdentifier<Node> haNodePath = getHwvtepNodeHACache().getParent(childGlobalPath);
154 Set<InstanceIdentifier<Node>> children = getHwvtepNodeHACache().getChildrenForHANode(haNodePath);
155 if (haOpClusteredListener.getConnected(children).isEmpty()) {
156 LOG.info("All child deleted for ha node {} ", HwvtepHAUtil.getNodeIdVal(haNodePath));
157 //ha ps delete is taken care by ps node delete
158 //HwvtepHAUtil.deleteSwitchesManagedBy-Node(haNodePath, tx);
159 HwvtepHAUtil.deleteNodeIfPresent(tx, haNodePath);
161 LOG.info("not all child deleted {} connected {}", getNodeId(childGlobalPath),
162 haOpClusteredListener.getConnected(children));
167 void onPsNodeAdd(InstanceIdentifier<Node> childPsPath,
169 TypedReadWriteTransaction<Operational> tx) {
170 //copy child ps oper node to ha ps oper node
171 //copy ha ps config node to child ps config
172 haOpClusteredListener.onPsNodeAdd(childPsPath, childPsNode, tx);
173 InstanceIdentifier<Node> childGlobalPath = HwvtepHAUtil.getGlobalNodePathFromPSNode(childPsNode);
174 if (!haOpClusteredListener.getConnectedNodes().contains(childGlobalPath)) {
177 if (isNotHAChild(childGlobalPath)) {
180 LOG.info("ha ps child connected {} ", getNodeId(childPsPath));
181 InstanceIdentifier<Node> haGlobalPath = getHwvtepNodeHACache().getParent(childGlobalPath);
182 InstanceIdentifier<Node> haPsPath = HwvtepHAUtil.convertPsPath(childPsNode, haGlobalPath);
184 nodeCopier.copyPSNode(Optional.ofNullable(childPsNode), childPsPath, haPsPath, haGlobalPath,
186 LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION,
187 confTx -> nodeCopier.copyPSNode(Optional.ofNullable(null), haPsPath, childPsPath, childGlobalPath,
188 CONFIGURATION, confTx)), LOG, "Error copying to configuration");
189 } catch (InterruptedException | ExecutionException e) {
190 LOG.error("Failed to read nodes {} , {} ", childPsPath, haGlobalPath);
195 void onPsNodeUpdate(Node updatedChildPSNode,
196 DataObjectModification<Node> mod,
197 TypedReadWriteTransaction<Operational> tx) {
198 InstanceIdentifier<Node> childGlobalPath = HwvtepHAUtil.getGlobalNodePathFromPSNode(updatedChildPSNode);
199 if (isNotHAChild(childGlobalPath)) {
202 InstanceIdentifier<Node> haGlobalPath = getHwvtepNodeHACache().getParent(childGlobalPath);
203 haEventHandler.copyChildPsOpUpdateToHAParent(updatedChildPSNode, haGlobalPath, mod, tx);
207 void onPsNodeDelete(InstanceIdentifier<Node> childPsPath,
209 TypedReadWriteTransaction<Operational> tx)
210 throws ExecutionException, InterruptedException {
211 //one child ps node disconnected
212 //find if all child ps nodes disconnected then delete parent ps node
213 haOpClusteredListener.onPsNodeDelete(childPsPath, childPsNode, tx);
214 InstanceIdentifier<Node> disconnectedChildGlobalPath = HwvtepHAUtil.getGlobalNodePathFromPSNode(childPsNode);
215 if (isNotHAChild(disconnectedChildGlobalPath)) {
216 LOG.info("on non ha ps child delete {} ", getNodeId(childPsPath));
219 InstanceIdentifier<Node> haGlobalPath = getHwvtepNodeHACache().getParent(disconnectedChildGlobalPath);
220 Set<InstanceIdentifier<Node>> childPsPaths = getHwvtepNodeHACache().getChildrenForHANode(haGlobalPath).stream()
221 .map((childGlobalPath) -> HwvtepHAUtil.convertPsPath(childPsNode, childGlobalPath))
222 .collect(Collectors.toSet());
223 //TODO validate what if this is null
224 if (haOpClusteredListener.getConnected(childPsPaths).isEmpty()) {
225 InstanceIdentifier<Node> haPsPath = HwvtepHAUtil.convertPsPath(childPsNode, haGlobalPath);
226 LOG.info("All child deleted for ha ps node {} ", HwvtepHAUtil.getNodeIdVal(haPsPath));
227 HwvtepHAUtil.deleteNodeIfPresent(tx, haPsPath);
228 //HwvtepHAUtil.deleteGlobalNodeSwitches(haGlobalPath, haPsPath, LogicalDatastoreType.OPERATIONAL, tx);
230 LOG.info("not all ha ps child deleted {} connected {}", getNodeId(childPsPath),
231 haOpClusteredListener.getConnected(childPsPaths));
235 private void readAndCopyChildPsOpToParent(Node childNode, TypedReadWriteTransaction<Operational> tx) {
236 String childGlobalNodeId = childNode.getNodeId().getValue();
237 List<InstanceIdentifier> childPsIids = new ArrayList<>();
238 HwvtepGlobalAugmentation hwvtepGlobalAugmentation = childNode.augmentation(HwvtepGlobalAugmentation.class);
239 if (hwvtepGlobalAugmentation == null
240 || HwvtepHAUtil.isEmpty(hwvtepGlobalAugmentation.nonnullSwitches().values())) {
241 haOpClusteredListener.getConnectedNodes()
243 .filter((connectedIid) -> IS_PS_CHILD_TO_GLOBAL_NODE.test(childGlobalNodeId, connectedIid))
244 .forEach(childPsIids::add);
246 hwvtepGlobalAugmentation.nonnullSwitches().values().forEach(
247 (switches) -> childPsIids.add(switches.getSwitchRef().getValue()));
249 if (childPsIids.isEmpty()) {
250 LOG.info("No child ps found for global {}", childGlobalNodeId);
252 childPsIids.forEach((psIid) -> {
254 InstanceIdentifier<Node> childPsIid = psIid;
255 Optional<Node> childPsNode = tx.read(childPsIid).get();
256 if (childPsNode.isPresent()) {
257 LOG.debug("Child oper PS node found");
258 onPsNodeAdd(childPsIid, childPsNode.get(), tx);
260 LOG.debug("Child oper ps node not found {}", childPsIid);
262 } catch (InterruptedException | ExecutionException e) {
263 LOG.error("Failed to read child ps node {}", psIid);
268 private boolean isNotHAChild(InstanceIdentifier<Node> nodeId) {
269 return getHwvtepNodeHACache().getParent(nodeId) == null;