49ef723ebf59c78e647159591642f2fcff50b8c6
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / ha / listeners / HAOpNodeListener.java
1 /*
2  * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
9
10 import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
11 import static org.opendaylight.genius.infra.Datastore.OPERATIONAL;
12
13 import com.google.common.base.Optional;
14 import com.google.common.base.Strings;
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.Set;
18 import java.util.concurrent.ExecutionException;
19 import java.util.function.BiPredicate;
20 import java.util.stream.Collectors;
21 import javax.inject.Inject;
22 import javax.inject.Singleton;
23 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
24 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
25 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
26 import org.opendaylight.genius.infra.Datastore.Operational;
27 import org.opendaylight.genius.infra.TypedReadWriteTransaction;
28 import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
29 import org.opendaylight.infrautils.metrics.MetricProvider;
30 import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
31 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
32 import org.opendaylight.netvirt.elan.l2gw.ha.handlers.HAEventHandler;
33 import org.opendaylight.netvirt.elan.l2gw.ha.handlers.IHAEventHandler;
34 import org.opendaylight.netvirt.elan.l2gw.ha.handlers.NodeCopier;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
36 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
37 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 @Singleton
42 public class HAOpNodeListener extends HwvtepNodeBaseListener<Operational> {
43
44     private static final Logger LOG = LoggerFactory.getLogger(HAOpNodeListener.class);
45
46     private static final BiPredicate<String, InstanceIdentifier<Node>> IS_PS_CHILD_TO_GLOBAL_NODE =
47         (globalNodeId, iid) -> {
48             String psNodeId = iid.firstKeyOf(Node.class).getNodeId().getValue();
49             return psNodeId.startsWith(globalNodeId) && psNodeId.contains("physicalswitch");
50         };
51
52     private final IHAEventHandler haEventHandler;
53     private final HAOpClusteredListener haOpClusteredListener;
54     private final NodeCopier nodeCopier;
55
56     @Inject
57     public HAOpNodeListener(DataBroker db, HAEventHandler haEventHandler,
58                             HAOpClusteredListener haOpClusteredListener,
59                             NodeCopier nodeCopier, HwvtepNodeHACache hwvtepNodeHACache,
60                             MetricProvider metricProvider) throws Exception {
61         super(OPERATIONAL, db, hwvtepNodeHACache, metricProvider, true);
62         this.haEventHandler = haEventHandler;
63         this.haOpClusteredListener = haOpClusteredListener;
64         this.nodeCopier = nodeCopier;
65     }
66
67     String getNodeId(InstanceIdentifier<Node> iid) {
68         return iid.firstKeyOf(Node.class).getNodeId().getValue();
69     }
70
71     @Override
72     public void onGlobalNodeAdd(InstanceIdentifier<Node> childGlobalPath,
73                                 Node childNode,
74                                 TypedReadWriteTransaction<Operational> tx) {
75         //copy child global node to ha global node
76         //create ha global config node if not present
77         //copy ha global config node to child global config node
78         LOG.trace("Node connected {} - Checking if Ha or Non-Ha enabled ", childNode.getNodeId().getValue());
79         haOpClusteredListener.onGlobalNodeAdd(childGlobalPath, childNode, tx);
80         if (isNotHAChild(childGlobalPath)) {
81             return;
82         }
83         InstanceIdentifier<Node> haNodePath = getHwvtepNodeHACache().getParent(childGlobalPath);
84         LOG.trace("Ha enabled child node connected {}", childNode.getNodeId().getValue());
85         try {
86             nodeCopier.copyGlobalNode(Optional.fromNullable(childNode), childGlobalPath, haNodePath, OPERATIONAL, tx);
87             ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION,
88                 confTx -> nodeCopier.copyGlobalNode(Optional.fromNullable(null), haNodePath, childGlobalPath,
89                     CONFIGURATION, confTx)), LOG, "Error copying to configuration");
90         } catch (InterruptedException | ExecutionException e) {
91             LOG.error("Failed to read nodes {} , {} ", childGlobalPath, haNodePath);
92         }
93         readAndCopyChildPsOpToParent(childNode, tx);
94     }
95
96     //Update on global node has been taken care by HAListeners as per perf improvement
97     @Override
98     void onGlobalNodeUpdate(InstanceIdentifier<Node> childGlobalPath,
99                             Node updatedChildNode,
100                             Node originalChildNode,
101                             DataObjectModification<Node> mod,
102                             TypedReadWriteTransaction<Operational> tx) throws ReadFailedException {
103
104         String oldHAId = HwvtepHAUtil.getHAIdFromManagerOtherConfig(originalChildNode);
105         if (!Strings.isNullOrEmpty(oldHAId)) { //was already ha child
106             InstanceIdentifier<Node> haPath = getHwvtepNodeHACache().getParent(childGlobalPath);
107             LOG.debug("Copy oper update from child {} to parent {}", childGlobalPath, haPath);
108             haEventHandler.copyChildGlobalOpUpdateToHAParent(haPath, mod, tx);
109             return;//TODO handle unha case
110         }
111
112         addToHACacheIfBecameHAChild(childGlobalPath, updatedChildNode, originalChildNode);
113         if (isNotHAChild(childGlobalPath)) {
114             return;
115         }
116         LOG.info("{} became ha child ", updatedChildNode.getNodeId().getValue());
117         onGlobalNodeAdd(childGlobalPath, updatedChildNode, tx);
118     }
119
120     @Override
121     void onGlobalNodeDelete(InstanceIdentifier<Node> childGlobalPath,
122                             Node childNode,
123                             TypedReadWriteTransaction<Operational> tx)
124             throws ExecutionException, InterruptedException {
125         haOpClusteredListener.onGlobalNodeDelete(childGlobalPath, childNode, tx);
126         if (isNotHAChild(childGlobalPath)) {
127             LOG.info("non ha child global delete {} ", getNodeId(childGlobalPath));
128             return;
129         }
130         LOG.info("ha child global delete {} ", getNodeId(childGlobalPath));
131         InstanceIdentifier<Node> haNodePath = getHwvtepNodeHACache().getParent(childGlobalPath);
132         Set<InstanceIdentifier<Node>> children = getHwvtepNodeHACache().getChildrenForHANode(haNodePath);
133         if (haOpClusteredListener.getConnected(children).isEmpty()) {
134             LOG.info("All child deleted for ha node {} ", HwvtepHAUtil.getNodeIdVal(haNodePath));
135             //ha ps delete is taken care by ps node delete
136             //HwvtepHAUtil.deleteSwitchesManagedBy-Node(haNodePath, tx);
137             HwvtepHAUtil.deleteNodeIfPresent(tx, haNodePath);
138         } else {
139             LOG.info("not all child deleted {} connected {}", getNodeId(childGlobalPath),
140                     haOpClusteredListener.getConnected(children));
141         }
142     }
143
144     @Override
145     void onPsNodeAdd(InstanceIdentifier<Node> childPsPath,
146                      Node childPsNode,
147                      TypedReadWriteTransaction<Operational> tx) {
148         //copy child ps oper node to ha ps oper node
149         //copy ha ps config node to child ps config
150         haOpClusteredListener.onPsNodeAdd(childPsPath, childPsNode, tx);
151         InstanceIdentifier<Node> childGlobalPath = HwvtepHAUtil.getGlobalNodePathFromPSNode(childPsNode);
152         if (!haOpClusteredListener.getConnectedNodes().contains(childGlobalPath)) {
153             return;
154         }
155         if (isNotHAChild(childGlobalPath)) {
156             return;
157         }
158         LOG.info("ha ps child connected {} ", getNodeId(childPsPath));
159         InstanceIdentifier<Node> haGlobalPath = getHwvtepNodeHACache().getParent(childGlobalPath);
160         InstanceIdentifier<Node> haPsPath = HwvtepHAUtil.convertPsPath(childPsNode, haGlobalPath);
161         try {
162             nodeCopier.copyPSNode(Optional.fromNullable(childPsNode), childPsPath, haPsPath, haGlobalPath,
163                     OPERATIONAL, tx);
164             ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION,
165                 confTx -> nodeCopier.copyPSNode(Optional.fromNullable(null), haPsPath, childPsPath, childGlobalPath,
166                     CONFIGURATION, confTx)), LOG, "Error copying to configuration");
167         } catch (InterruptedException | ExecutionException e) {
168             LOG.error("Failed to read nodes {} , {} ", childPsPath, haGlobalPath);
169         }
170     }
171
172     @Override
173     void onPsNodeUpdate(Node updatedChildPSNode,
174             Node originalChildPSNode,
175             DataObjectModification<Node> mod,
176             TypedReadWriteTransaction<Operational> tx) {
177         InstanceIdentifier<Node> childGlobalPath = HwvtepHAUtil.getGlobalNodePathFromPSNode(updatedChildPSNode);
178         if (isNotHAChild(childGlobalPath)) {
179             return;
180         }
181         InstanceIdentifier<Node> haGlobalPath = getHwvtepNodeHACache().getParent(childGlobalPath);
182         haEventHandler.copyChildPsOpUpdateToHAParent(updatedChildPSNode, haGlobalPath, mod, tx);
183     }
184
185     @Override
186     void onPsNodeDelete(InstanceIdentifier<Node> childPsPath,
187                         Node childPsNode,
188                         TypedReadWriteTransaction<Operational> tx)
189             throws ExecutionException, InterruptedException {
190         //one child ps node disconnected
191         //find if all child ps nodes disconnected then delete parent ps node
192         haOpClusteredListener.onPsNodeDelete(childPsPath, childPsNode, tx);
193         InstanceIdentifier<Node> disconnectedChildGlobalPath = HwvtepHAUtil.getGlobalNodePathFromPSNode(childPsNode);
194         if (isNotHAChild(disconnectedChildGlobalPath)) {
195             LOG.info("on non ha ps child delete {} ", getNodeId(childPsPath));
196             return;
197         }
198         InstanceIdentifier<Node> haGlobalPath = getHwvtepNodeHACache().getParent(disconnectedChildGlobalPath);
199         Set<InstanceIdentifier<Node>> childPsPaths = getHwvtepNodeHACache().getChildrenForHANode(haGlobalPath).stream()
200                 .map((childGlobalPath) -> HwvtepHAUtil.convertPsPath(childPsNode, childGlobalPath))
201                 .collect(Collectors.toSet());
202         //TODO validate what if this is null
203         if (haOpClusteredListener.getConnected(childPsPaths).isEmpty()) {
204             InstanceIdentifier<Node> haPsPath = HwvtepHAUtil.convertPsPath(childPsNode, haGlobalPath);
205             LOG.info("All child deleted for ha ps node {} ", HwvtepHAUtil.getNodeIdVal(haPsPath));
206             HwvtepHAUtil.deleteNodeIfPresent(tx, haPsPath);
207             //HwvtepHAUtil.deleteGlobalNodeSwitches(haGlobalPath, haPsPath, LogicalDatastoreType.OPERATIONAL, tx);
208         } else {
209             LOG.info("not all ha ps child deleted {} connected {}", getNodeId(childPsPath),
210                     haOpClusteredListener.getConnected(childPsPaths));
211         }
212     }
213
214     private void readAndCopyChildPsOpToParent(Node childNode, TypedReadWriteTransaction<Operational> tx) {
215         String childGlobalNodeId = childNode.getNodeId().getValue();
216         List<InstanceIdentifier> childPsIids = new ArrayList<>();
217         HwvtepGlobalAugmentation hwvtepGlobalAugmentation = childNode.augmentation(HwvtepGlobalAugmentation.class);
218         if (hwvtepGlobalAugmentation == null || HwvtepHAUtil.isEmpty(hwvtepGlobalAugmentation.getSwitches())) {
219             haOpClusteredListener.getConnectedNodes()
220                     .stream()
221                     .filter((connectedIid) -> IS_PS_CHILD_TO_GLOBAL_NODE.test(childGlobalNodeId, connectedIid))
222                     .forEach(childPsIids::add);
223         } else {
224             hwvtepGlobalAugmentation.getSwitches().forEach(
225                 (switches) -> childPsIids.add(switches.getSwitchRef().getValue()));
226         }
227         if (childPsIids.isEmpty()) {
228             LOG.info("No child ps found for global {}", childGlobalNodeId);
229         }
230         childPsIids.forEach((psIid) -> {
231             try {
232                 InstanceIdentifier<Node> childPsIid = psIid;
233                 Optional<Node> childPsNode = tx.read(childPsIid).get();
234                 if (childPsNode.isPresent()) {
235                     LOG.debug("Child oper PS node found");
236                     onPsNodeAdd(childPsIid, childPsNode.get(), tx);
237                 } else {
238                     LOG.debug("Child oper ps node not found {}", childPsIid);
239                 }
240             } catch (InterruptedException | ExecutionException e) {
241                 LOG.error("Failed to read child ps node {}", psIid);
242             }
243         });
244     }
245
246     private boolean isNotHAChild(InstanceIdentifier<Node> nodeId) {
247         return  getHwvtepNodeHACache().getParent(nodeId) == null;
248     }
249 }