73b77204155577476e931b07ceaa7c3b2972c567
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / ha / listeners / HAOpClusteredListener.java
1 /*
2  * Copyright (c) 2016 ,2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
9
10 import com.google.common.base.Optional;
11 import com.google.common.collect.Sets;
12 import java.util.Collections;
13 import java.util.Map;
14 import java.util.Set;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.function.Consumer;
17 import java.util.stream.Collectors;
18 import javax.inject.Inject;
19 import javax.inject.Singleton;
20 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
23 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
24 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
27 import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
28 import org.opendaylight.infrautils.metrics.MetricProvider;
29 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
30 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 @Singleton
36 public class HAOpClusteredListener extends HwvtepNodeBaseListener implements ClusteredDataTreeChangeListener<Node> {
37     private static final Logger LOG = LoggerFactory.getLogger(HAOpClusteredListener.class);
38
39     private final Set<InstanceIdentifier<Node>> connectedNodes = ConcurrentHashMap.newKeySet();
40     private final Map<InstanceIdentifier<Node>, Set<Consumer<Optional<Node>>>> waitingJobs = new ConcurrentHashMap<>();
41
42     @Inject
43     public HAOpClusteredListener(DataBroker db, HwvtepNodeHACache hwvtepNodeHACache,
44                                  MetricProvider metricProvider) throws Exception {
45         super(LogicalDatastoreType.OPERATIONAL, db, hwvtepNodeHACache, metricProvider, false);
46         LOG.info("Registering HAOpClusteredListener");
47     }
48
49     public Set<InstanceIdentifier<Node>> getConnectedNodes() {
50         return connectedNodes;
51     }
52
53     @Override
54     synchronized  void onGlobalNodeDelete(InstanceIdentifier<Node> key, Node added, ReadWriteTransaction tx)  {
55         connectedNodes.remove(key);
56         getHwvtepNodeHACache().updateDisconnectedNodeStatus(key);
57     }
58
59     @Override
60     void onPsNodeDelete(InstanceIdentifier<Node> key, Node addedPSNode, ReadWriteTransaction tx)  {
61         connectedNodes.remove(key);
62         getHwvtepNodeHACache().updateDisconnectedNodeStatus(key);
63     }
64
65     @Override
66     void onPsNodeAdd(InstanceIdentifier<Node> key, Node addedPSNode, ReadWriteTransaction tx)    {
67         connectedNodes.add(key);
68         getHwvtepNodeHACache().updateConnectedNodeStatus(key);
69     }
70
71     @Override
72     public synchronized void onGlobalNodeAdd(InstanceIdentifier<Node> key, Node updated, ReadWriteTransaction tx) {
73         connectedNodes. add(key);
74         HwvtepHAUtil.addToCacheIfHAChildNode(key, updated, getHwvtepNodeHACache());
75         getHwvtepNodeHACache().updateConnectedNodeStatus(key);
76         if (waitingJobs.containsKey(key) && !waitingJobs.get(key).isEmpty()) {
77             try {
78                 HAJobScheduler jobScheduler = HAJobScheduler.getInstance();
79                 Optional<Node> nodeOptional = tx.read(LogicalDatastoreType.OPERATIONAL, key).checkedGet();
80                 if (nodeOptional.isPresent()) {
81                     waitingJobs.get(key).forEach(
82                         (waitingJob) -> jobScheduler.submitJob(() -> waitingJob.accept(nodeOptional)));
83                     waitingJobs.get(key).clear();
84                 } else {
85                     LOG.error("Failed to read oper node {}", key);
86                 }
87             } catch (ReadFailedException e) {
88                 LOG.error("Failed to read oper node {}", key);
89             }
90         }
91     }
92
93     @Override
94     void onGlobalNodeUpdate(InstanceIdentifier<Node> childPath,
95                             Node updatedChildNode,
96                             Node beforeChildNode,
97                             DataObjectModification<Node> mod,
98                             ReadWriteTransaction tx) {
99         boolean wasHAChild = getHwvtepNodeHACache().isHAEnabledDevice(childPath);
100         addToHACacheIfBecameHAChild(childPath, updatedChildNode, beforeChildNode);
101         boolean isHAChild = getHwvtepNodeHACache().isHAEnabledDevice(childPath);
102
103
104         if (!wasHAChild && isHAChild) {
105             LOG.debug("{} became ha_child", getNodeId(childPath));
106         } else if (wasHAChild && !isHAChild) {
107             LOG.debug("{} unbecome ha_child", getNodeId(childPath));
108         }
109     }
110
111     static String getNodeId(InstanceIdentifier<Node> key) {
112         String nodeId = key.firstKeyOf(Node.class).getNodeId().getValue();
113         int idx = nodeId.indexOf("uuid/");
114         if (idx > 0) {
115             nodeId = nodeId.substring(idx + "uuid/".length());
116         }
117         return nodeId;
118     }
119
120     public Set<InstanceIdentifier<Node>> getConnected(Set<InstanceIdentifier<Node>> candidateds) {
121         if (candidateds == null) {
122             return Collections.emptySet();
123         }
124         return candidateds.stream()
125                 .filter(connectedNodes::contains)
126                 .collect(Collectors.toSet());
127     }
128
129     public synchronized void runAfterNodeIsConnected(InstanceIdentifier<Node> iid, Consumer<Optional<Node>> consumer) {
130         if (connectedNodes.contains(iid)) {
131             HAJobScheduler.getInstance().submitJob(() -> {
132                 try (ReadOnlyTransaction tx = getDataBroker().newReadOnlyTransaction()) {
133                     consumer.accept(tx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet());
134                 } catch (ReadFailedException e) {
135                     LOG.error("Failed to read oper ds {}", iid);
136                 }
137             });
138         } else {
139             waitingJobs.computeIfAbsent(iid, key -> Sets.newConcurrentHashSet()).add(consumer);
140         }
141     }
142 }
143