NETVIRT-1630 migrate to md-sal APIs
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / ha / listeners / HAOpNodeListener.java
1 /*
2  * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
9
10 import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
11 import static org.opendaylight.genius.infra.Datastore.OPERATIONAL;
12
13 import com.google.common.base.Strings;
14 import java.util.ArrayList;
15 import java.util.List;
16 import java.util.Optional;
17 import java.util.Set;
18 import java.util.concurrent.ExecutionException;
19 import java.util.function.BiPredicate;
20 import java.util.stream.Collectors;
21 import javax.inject.Inject;
22 import javax.inject.Singleton;
23 import org.opendaylight.genius.infra.Datastore.Operational;
24 import org.opendaylight.genius.infra.TypedReadWriteTransaction;
25 import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
26 import org.opendaylight.infrautils.metrics.MetricProvider;
27 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
28 import org.opendaylight.mdsal.binding.api.DataBroker;
29 import org.opendaylight.mdsal.binding.api.DataObjectModification;
30 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
31 import org.opendaylight.netvirt.elan.l2gw.ha.handlers.HAEventHandler;
32 import org.opendaylight.netvirt.elan.l2gw.ha.handlers.IHAEventHandler;
33 import org.opendaylight.netvirt.elan.l2gw.ha.handlers.NodeCopier;
34 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler;
35 import org.opendaylight.serviceutils.srm.RecoverableListener;
36 import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
38 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
39 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 @Singleton
44 public class HAOpNodeListener extends HwvtepNodeBaseListener<Operational> implements RecoverableListener {
45
46     private static final Logger LOG = LoggerFactory.getLogger(HAOpNodeListener.class);
47
48     private static final BiPredicate<String, InstanceIdentifier<Node>> IS_PS_CHILD_TO_GLOBAL_NODE =
49         (globalNodeId, iid) -> {
50             String psNodeId = iid.firstKeyOf(Node.class).getNodeId().getValue();
51             return psNodeId.startsWith(globalNodeId) && psNodeId.contains("physicalswitch");
52         };
53
54     private final IHAEventHandler haEventHandler;
55     private final HAOpClusteredListener haOpClusteredListener;
56     private final NodeCopier nodeCopier;
57
58     @Inject
59     public HAOpNodeListener(DataBroker db, HAEventHandler haEventHandler,
60                             HAOpClusteredListener haOpClusteredListener,
61                             NodeCopier nodeCopier, HwvtepNodeHACache hwvtepNodeHACache,
62                             MetricProvider metricProvider,
63                             final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler,
64                             final ServiceRecoveryRegistry serviceRecoveryRegistry) throws Exception {
65         super(OPERATIONAL, db, hwvtepNodeHACache, metricProvider, true);
66         this.haEventHandler = haEventHandler;
67         this.haOpClusteredListener = haOpClusteredListener;
68         this.nodeCopier = nodeCopier;
69         serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(),
70                 this);
71     }
72
73     @Override
74     @SuppressWarnings("all")
75     public void registerListener() {
76         try {
77             LOG.info("Registering HAOpNodeListener");
78             registerListener(OPERATIONAL, getDataBroker());
79         } catch (Exception e) {
80             LOG.error("HA OP Node register listener error.");
81         }
82     }
83
84     public void deregisterListener() {
85         LOG.info("Deregistering HAOpNodeListener");
86         super.close();
87     }
88
89     String getNodeId(InstanceIdentifier<Node> iid) {
90         return iid.firstKeyOf(Node.class).getNodeId().getValue();
91     }
92
93     @Override
94     public void onGlobalNodeAdd(InstanceIdentifier<Node> childGlobalPath,
95                                 Node childNode,
96                                 TypedReadWriteTransaction<Operational> tx) {
97         //copy child global node to ha global node
98         //create ha global config node if not present
99         //copy ha global config node to child global config node
100         LOG.trace("Node connected {} - Checking if Ha or Non-Ha enabled ", childNode.getNodeId().getValue());
101         haOpClusteredListener.onGlobalNodeAdd(childGlobalPath, childNode, tx);
102         if (isNotHAChild(childGlobalPath)) {
103             return;
104         }
105         InstanceIdentifier<Node> haNodePath = getHwvtepNodeHACache().getParent(childGlobalPath);
106         LOG.trace("Ha enabled child node connected {}", childNode.getNodeId().getValue());
107         try {
108             nodeCopier.copyGlobalNode(Optional.ofNullable(childNode), childGlobalPath, haNodePath, OPERATIONAL, tx);
109             LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION,
110                 confTx -> nodeCopier.copyGlobalNode(Optional.ofNullable(null), haNodePath, childGlobalPath,
111                     CONFIGURATION, confTx)), LOG, "Error copying to configuration");
112         } catch (InterruptedException | ExecutionException e) {
113             LOG.error("Failed to read nodes {} , {} ", childGlobalPath, haNodePath);
114         }
115         readAndCopyChildPsOpToParent(childNode, tx);
116     }
117
118     //Update on global node has been taken care by HAListeners as per perf improvement
119     @Override
120     void onGlobalNodeUpdate(InstanceIdentifier<Node> childGlobalPath,
121                             Node updatedChildNode,
122                             Node originalChildNode,
123                             DataObjectModification<Node> mod,
124                             TypedReadWriteTransaction<Operational> tx)  {
125
126         String oldHAId = HwvtepHAUtil.getHAIdFromManagerOtherConfig(originalChildNode);
127         if (!Strings.isNullOrEmpty(oldHAId)) { //was already ha child
128             InstanceIdentifier<Node> haPath = getHwvtepNodeHACache().getParent(childGlobalPath);
129             LOG.debug("Copy oper update from child {} to parent {}", childGlobalPath, haPath);
130             haEventHandler.copyChildGlobalOpUpdateToHAParent(haPath, mod, tx);
131             return;//TODO handle unha case
132         }
133
134         addToHACacheIfBecameHAChild(childGlobalPath, updatedChildNode, originalChildNode);
135         if (isNotHAChild(childGlobalPath)) {
136             return;
137         }
138         LOG.info("{} became ha child ", updatedChildNode.getNodeId().getValue());
139         onGlobalNodeAdd(childGlobalPath, updatedChildNode, tx);
140     }
141
142     @Override
143     void onGlobalNodeDelete(InstanceIdentifier<Node> childGlobalPath,
144                             Node childNode,
145                             TypedReadWriteTransaction<Operational> tx)
146             throws ExecutionException, InterruptedException {
147         haOpClusteredListener.onGlobalNodeDelete(childGlobalPath, childNode, tx);
148         if (isNotHAChild(childGlobalPath)) {
149             LOG.info("non ha child global delete {} ", getNodeId(childGlobalPath));
150             return;
151         }
152         LOG.info("ha child global delete {} ", getNodeId(childGlobalPath));
153         InstanceIdentifier<Node> haNodePath = getHwvtepNodeHACache().getParent(childGlobalPath);
154         Set<InstanceIdentifier<Node>> children = getHwvtepNodeHACache().getChildrenForHANode(haNodePath);
155         if (haOpClusteredListener.getConnected(children).isEmpty()) {
156             LOG.info("All child deleted for ha node {} ", HwvtepHAUtil.getNodeIdVal(haNodePath));
157             //ha ps delete is taken care by ps node delete
158             //HwvtepHAUtil.deleteSwitchesManagedBy-Node(haNodePath, tx);
159             HwvtepHAUtil.deleteNodeIfPresent(tx, haNodePath);
160         } else {
161             LOG.info("not all child deleted {} connected {}", getNodeId(childGlobalPath),
162                     haOpClusteredListener.getConnected(children));
163         }
164     }
165
166     @Override
167     void onPsNodeAdd(InstanceIdentifier<Node> childPsPath,
168                      Node childPsNode,
169                      TypedReadWriteTransaction<Operational> tx) {
170         //copy child ps oper node to ha ps oper node
171         //copy ha ps config node to child ps config
172         haOpClusteredListener.onPsNodeAdd(childPsPath, childPsNode, tx);
173         InstanceIdentifier<Node> childGlobalPath = HwvtepHAUtil.getGlobalNodePathFromPSNode(childPsNode);
174         if (!haOpClusteredListener.getConnectedNodes().contains(childGlobalPath)) {
175             return;
176         }
177         if (isNotHAChild(childGlobalPath)) {
178             return;
179         }
180         LOG.info("ha ps child connected {} ", getNodeId(childPsPath));
181         InstanceIdentifier<Node> haGlobalPath = getHwvtepNodeHACache().getParent(childGlobalPath);
182         InstanceIdentifier<Node> haPsPath = HwvtepHAUtil.convertPsPath(childPsNode, haGlobalPath);
183         try {
184             nodeCopier.copyPSNode(Optional.ofNullable(childPsNode), childPsPath, haPsPath, haGlobalPath,
185                     OPERATIONAL, tx);
186             LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION,
187                 confTx -> nodeCopier.copyPSNode(Optional.ofNullable(null), haPsPath, childPsPath, childGlobalPath,
188                     CONFIGURATION, confTx)), LOG, "Error copying to configuration");
189         } catch (InterruptedException | ExecutionException e) {
190             LOG.error("Failed to read nodes {} , {} ", childPsPath, haGlobalPath);
191         }
192     }
193
194     @Override
195     void onPsNodeUpdate(Node updatedChildPSNode,
196         DataObjectModification<Node> mod,
197         TypedReadWriteTransaction<Operational> tx) {
198         InstanceIdentifier<Node> childGlobalPath = HwvtepHAUtil.getGlobalNodePathFromPSNode(updatedChildPSNode);
199         if (isNotHAChild(childGlobalPath)) {
200             return;
201         }
202         InstanceIdentifier<Node> haGlobalPath = getHwvtepNodeHACache().getParent(childGlobalPath);
203         haEventHandler.copyChildPsOpUpdateToHAParent(updatedChildPSNode, haGlobalPath, mod, tx);
204     }
205
206     @Override
207     void onPsNodeDelete(InstanceIdentifier<Node> childPsPath,
208                         Node childPsNode,
209                         TypedReadWriteTransaction<Operational> tx)
210             throws ExecutionException, InterruptedException {
211         //one child ps node disconnected
212         //find if all child ps nodes disconnected then delete parent ps node
213         haOpClusteredListener.onPsNodeDelete(childPsPath, childPsNode, tx);
214         InstanceIdentifier<Node> disconnectedChildGlobalPath = HwvtepHAUtil.getGlobalNodePathFromPSNode(childPsNode);
215         if (isNotHAChild(disconnectedChildGlobalPath)) {
216             LOG.info("on non ha ps child delete {} ", getNodeId(childPsPath));
217             return;
218         }
219         InstanceIdentifier<Node> haGlobalPath = getHwvtepNodeHACache().getParent(disconnectedChildGlobalPath);
220         Set<InstanceIdentifier<Node>> childPsPaths = getHwvtepNodeHACache().getChildrenForHANode(haGlobalPath).stream()
221                 .map((childGlobalPath) -> HwvtepHAUtil.convertPsPath(childPsNode, childGlobalPath))
222                 .collect(Collectors.toSet());
223         //TODO validate what if this is null
224         if (haOpClusteredListener.getConnected(childPsPaths).isEmpty()) {
225             InstanceIdentifier<Node> haPsPath = HwvtepHAUtil.convertPsPath(childPsNode, haGlobalPath);
226             LOG.info("All child deleted for ha ps node {} ", HwvtepHAUtil.getNodeIdVal(haPsPath));
227             HwvtepHAUtil.deleteNodeIfPresent(tx, haPsPath);
228             //HwvtepHAUtil.deleteGlobalNodeSwitches(haGlobalPath, haPsPath, LogicalDatastoreType.OPERATIONAL, tx);
229         } else {
230             LOG.info("not all ha ps child deleted {} connected {}", getNodeId(childPsPath),
231                     haOpClusteredListener.getConnected(childPsPaths));
232         }
233     }
234
235     private void readAndCopyChildPsOpToParent(Node childNode, TypedReadWriteTransaction<Operational> tx) {
236         String childGlobalNodeId = childNode.getNodeId().getValue();
237         List<InstanceIdentifier> childPsIids = new ArrayList<>();
238         HwvtepGlobalAugmentation hwvtepGlobalAugmentation = childNode.augmentation(HwvtepGlobalAugmentation.class);
239         if (hwvtepGlobalAugmentation == null || HwvtepHAUtil.isEmpty(hwvtepGlobalAugmentation.getSwitches())) {
240             haOpClusteredListener.getConnectedNodes()
241                     .stream()
242                     .filter((connectedIid) -> IS_PS_CHILD_TO_GLOBAL_NODE.test(childGlobalNodeId, connectedIid))
243                     .forEach(childPsIids::add);
244         } else {
245             hwvtepGlobalAugmentation.getSwitches().forEach(
246                 (switches) -> childPsIids.add(switches.getSwitchRef().getValue()));
247         }
248         if (childPsIids.isEmpty()) {
249             LOG.info("No child ps found for global {}", childGlobalNodeId);
250         }
251         childPsIids.forEach((psIid) -> {
252             try {
253                 InstanceIdentifier<Node> childPsIid = psIid;
254                 Optional<Node> childPsNode = tx.read(childPsIid).get();
255                 if (childPsNode.isPresent()) {
256                     LOG.debug("Child oper PS node found");
257                     onPsNodeAdd(childPsIid, childPsNode.get(), tx);
258                 } else {
259                     LOG.debug("Child oper ps node not found {}", childPsIid);
260                 }
261             } catch (InterruptedException | ExecutionException e) {
262                 LOG.error("Failed to read child ps node {}", psIid);
263             }
264         });
265     }
266
267     private boolean isNotHAChild(InstanceIdentifier<Node> nodeId) {
268         return  getHwvtepNodeHACache().getParent(nodeId) == null;
269     }
270 }