/* * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.netvirt.elan.l2gw.ha.listeners; import static org.opendaylight.genius.infra.Datastore.CONFIGURATION; import static org.opendaylight.genius.infra.Datastore.OPERATIONAL; import com.google.common.base.Strings; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.function.BiPredicate; import java.util.stream.Collectors; import javax.inject.Inject; import javax.inject.Singleton; import org.opendaylight.genius.infra.Datastore.Operational; import org.opendaylight.genius.infra.TypedReadWriteTransaction; import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache; import org.opendaylight.infrautils.metrics.MetricProvider; import org.opendaylight.infrautils.utils.concurrent.LoggingFutures; import org.opendaylight.mdsal.binding.api.DataBroker; import org.opendaylight.mdsal.binding.api.DataObjectModification; import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil; import org.opendaylight.netvirt.elan.l2gw.ha.handlers.HAEventHandler; import org.opendaylight.netvirt.elan.l2gw.ha.handlers.IHAEventHandler; import org.opendaylight.netvirt.elan.l2gw.ha.handlers.NodeCopier; import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler; import org.opendaylight.serviceutils.srm.RecoverableListener; import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Singleton public class HAOpNodeListener extends HwvtepNodeBaseListener implements RecoverableListener { private static final Logger LOG = LoggerFactory.getLogger(HAOpNodeListener.class); private static final BiPredicate> IS_PS_CHILD_TO_GLOBAL_NODE = (globalNodeId, iid) -> { String psNodeId = iid.firstKeyOf(Node.class).getNodeId().getValue(); return psNodeId.startsWith(globalNodeId) && psNodeId.contains("physicalswitch"); }; private final IHAEventHandler haEventHandler; private final HAOpClusteredListener haOpClusteredListener; private final NodeCopier nodeCopier; @Inject public HAOpNodeListener(DataBroker db, HAEventHandler haEventHandler, HAOpClusteredListener haOpClusteredListener, NodeCopier nodeCopier, HwvtepNodeHACache hwvtepNodeHACache, MetricProvider metricProvider, final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler, final ServiceRecoveryRegistry serviceRecoveryRegistry) throws Exception { super(OPERATIONAL, db, hwvtepNodeHACache, metricProvider, true); this.haEventHandler = haEventHandler; this.haOpClusteredListener = haOpClusteredListener; this.nodeCopier = nodeCopier; serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(), this); } @Override @SuppressWarnings("all") public void registerListener() { try { LOG.info("Registering HAOpNodeListener"); registerListener(OPERATIONAL, getDataBroker()); } catch (Exception e) { LOG.error("HA OP Node register listener error."); } } public void deregisterListener() { LOG.info("Deregistering HAOpNodeListener"); super.close(); } String getNodeId(InstanceIdentifier iid) { return iid.firstKeyOf(Node.class).getNodeId().getValue(); } @Override public void onGlobalNodeAdd(InstanceIdentifier childGlobalPath, Node childNode, TypedReadWriteTransaction tx) { //copy child global node to ha global node //create ha global config node if not present //copy ha global config node to child global config node LOG.trace("Node connected {} - Checking if Ha or Non-Ha enabled ", childNode.getNodeId().getValue()); haOpClusteredListener.onGlobalNodeAdd(childGlobalPath, childNode, tx); if (isNotHAChild(childGlobalPath)) { return; } InstanceIdentifier haNodePath = getHwvtepNodeHACache().getParent(childGlobalPath); LOG.trace("Ha enabled child node connected {}", childNode.getNodeId().getValue()); try { nodeCopier.copyGlobalNode(Optional.ofNullable(childNode), childGlobalPath, haNodePath, OPERATIONAL, tx); LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, confTx -> nodeCopier.copyGlobalNode(Optional.ofNullable(null), haNodePath, childGlobalPath, CONFIGURATION, confTx)), LOG, "Error copying to configuration"); } catch (InterruptedException | ExecutionException e) { LOG.error("Failed to read nodes {} , {} ", childGlobalPath, haNodePath); } readAndCopyChildPsOpToParent(childNode, tx); } //Update on global node has been taken care by HAListeners as per perf improvement @Override void onGlobalNodeUpdate(InstanceIdentifier childGlobalPath, Node updatedChildNode, Node originalChildNode, DataObjectModification mod, TypedReadWriteTransaction tx) { String oldHAId = HwvtepHAUtil.getHAIdFromManagerOtherConfig(originalChildNode); if (!Strings.isNullOrEmpty(oldHAId)) { //was already ha child InstanceIdentifier haPath = getHwvtepNodeHACache().getParent(childGlobalPath); LOG.debug("Copy oper update from child {} to parent {}", childGlobalPath, haPath); haEventHandler.copyChildGlobalOpUpdateToHAParent(haPath, mod, tx); return;//TODO handle unha case } addToHACacheIfBecameHAChild(childGlobalPath, updatedChildNode, originalChildNode); if (isNotHAChild(childGlobalPath)) { return; } LOG.info("{} became ha child ", updatedChildNode.getNodeId().getValue()); onGlobalNodeAdd(childGlobalPath, updatedChildNode, tx); } @Override void onGlobalNodeDelete(InstanceIdentifier childGlobalPath, Node childNode, TypedReadWriteTransaction tx) throws ExecutionException, InterruptedException { haOpClusteredListener.onGlobalNodeDelete(childGlobalPath, childNode, tx); if (isNotHAChild(childGlobalPath)) { LOG.info("non ha child global delete {} ", getNodeId(childGlobalPath)); return; } LOG.info("ha child global delete {} ", getNodeId(childGlobalPath)); InstanceIdentifier haNodePath = getHwvtepNodeHACache().getParent(childGlobalPath); Set> children = getHwvtepNodeHACache().getChildrenForHANode(haNodePath); if (haOpClusteredListener.getConnected(children).isEmpty()) { LOG.info("All child deleted for ha node {} ", HwvtepHAUtil.getNodeIdVal(haNodePath)); //ha ps delete is taken care by ps node delete //HwvtepHAUtil.deleteSwitchesManagedBy-Node(haNodePath, tx); HwvtepHAUtil.deleteNodeIfPresent(tx, haNodePath); } else { LOG.info("not all child deleted {} connected {}", getNodeId(childGlobalPath), haOpClusteredListener.getConnected(children)); } } @Override void onPsNodeAdd(InstanceIdentifier childPsPath, Node childPsNode, TypedReadWriteTransaction tx) { //copy child ps oper node to ha ps oper node //copy ha ps config node to child ps config haOpClusteredListener.onPsNodeAdd(childPsPath, childPsNode, tx); InstanceIdentifier childGlobalPath = HwvtepHAUtil.getGlobalNodePathFromPSNode(childPsNode); if (!haOpClusteredListener.getConnectedNodes().contains(childGlobalPath)) { return; } if (isNotHAChild(childGlobalPath)) { return; } LOG.info("ha ps child connected {} ", getNodeId(childPsPath)); InstanceIdentifier haGlobalPath = getHwvtepNodeHACache().getParent(childGlobalPath); InstanceIdentifier haPsPath = HwvtepHAUtil.convertPsPath(childPsNode, haGlobalPath); try { nodeCopier.copyPSNode(Optional.ofNullable(childPsNode), childPsPath, haPsPath, haGlobalPath, OPERATIONAL, tx); LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, confTx -> nodeCopier.copyPSNode(Optional.ofNullable(null), haPsPath, childPsPath, childGlobalPath, CONFIGURATION, confTx)), LOG, "Error copying to configuration"); } catch (InterruptedException | ExecutionException e) { LOG.error("Failed to read nodes {} , {} ", childPsPath, haGlobalPath); } } @Override void onPsNodeUpdate(Node updatedChildPSNode, DataObjectModification mod, TypedReadWriteTransaction tx) { InstanceIdentifier childGlobalPath = HwvtepHAUtil.getGlobalNodePathFromPSNode(updatedChildPSNode); if (isNotHAChild(childGlobalPath)) { return; } InstanceIdentifier haGlobalPath = getHwvtepNodeHACache().getParent(childGlobalPath); haEventHandler.copyChildPsOpUpdateToHAParent(updatedChildPSNode, haGlobalPath, mod, tx); } @Override void onPsNodeDelete(InstanceIdentifier childPsPath, Node childPsNode, TypedReadWriteTransaction tx) throws ExecutionException, InterruptedException { //one child ps node disconnected //find if all child ps nodes disconnected then delete parent ps node haOpClusteredListener.onPsNodeDelete(childPsPath, childPsNode, tx); InstanceIdentifier disconnectedChildGlobalPath = HwvtepHAUtil.getGlobalNodePathFromPSNode(childPsNode); if (isNotHAChild(disconnectedChildGlobalPath)) { LOG.info("on non ha ps child delete {} ", getNodeId(childPsPath)); return; } InstanceIdentifier haGlobalPath = getHwvtepNodeHACache().getParent(disconnectedChildGlobalPath); Set> childPsPaths = getHwvtepNodeHACache().getChildrenForHANode(haGlobalPath).stream() .map((childGlobalPath) -> HwvtepHAUtil.convertPsPath(childPsNode, childGlobalPath)) .collect(Collectors.toSet()); //TODO validate what if this is null if (haOpClusteredListener.getConnected(childPsPaths).isEmpty()) { InstanceIdentifier haPsPath = HwvtepHAUtil.convertPsPath(childPsNode, haGlobalPath); LOG.info("All child deleted for ha ps node {} ", HwvtepHAUtil.getNodeIdVal(haPsPath)); HwvtepHAUtil.deleteNodeIfPresent(tx, haPsPath); //HwvtepHAUtil.deleteGlobalNodeSwitches(haGlobalPath, haPsPath, LogicalDatastoreType.OPERATIONAL, tx); } else { LOG.info("not all ha ps child deleted {} connected {}", getNodeId(childPsPath), haOpClusteredListener.getConnected(childPsPaths)); } } private void readAndCopyChildPsOpToParent(Node childNode, TypedReadWriteTransaction tx) { String childGlobalNodeId = childNode.getNodeId().getValue(); List childPsIids = new ArrayList<>(); HwvtepGlobalAugmentation hwvtepGlobalAugmentation = childNode.augmentation(HwvtepGlobalAugmentation.class); if (hwvtepGlobalAugmentation == null || HwvtepHAUtil.isEmpty(hwvtepGlobalAugmentation.getSwitches())) { haOpClusteredListener.getConnectedNodes() .stream() .filter((connectedIid) -> IS_PS_CHILD_TO_GLOBAL_NODE.test(childGlobalNodeId, connectedIid)) .forEach(childPsIids::add); } else { hwvtepGlobalAugmentation.getSwitches().forEach( (switches) -> childPsIids.add(switches.getSwitchRef().getValue())); } if (childPsIids.isEmpty()) { LOG.info("No child ps found for global {}", childGlobalNodeId); } childPsIids.forEach((psIid) -> { try { InstanceIdentifier childPsIid = psIid; Optional childPsNode = tx.read(childPsIid).get(); if (childPsNode.isPresent()) { LOG.debug("Child oper PS node found"); onPsNodeAdd(childPsIid, childPsNode.get(), tx); } else { LOG.debug("Child oper ps node not found {}", childPsIid); } } catch (InterruptedException | ExecutionException e) { LOG.error("Failed to read child ps node {}", psIid); } }); } private boolean isNotHAChild(InstanceIdentifier nodeId) { return getHwvtepNodeHACache().getParent(nodeId) == null; } }