ac24ff9e2f3aa5006c40ec1f9e285edf42fe4be8
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / ha / listeners / HAOpClusteredListener.java
1 /*
2  * Copyright (c) 2016 ,2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
9
10 import static org.opendaylight.genius.infra.Datastore.OPERATIONAL;
11
12 import com.google.common.base.Optional;
13 import com.google.common.collect.Sets;
14 import java.util.Collections;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ExecutionException;
19 import java.util.function.Consumer;
20 import java.util.stream.Collectors;
21 import javax.inject.Inject;
22 import javax.inject.Singleton;
23 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
24 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
25 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
26 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
29 import org.opendaylight.genius.infra.Datastore.Operational;
30 import org.opendaylight.genius.infra.TypedReadWriteTransaction;
31 import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
32 import org.opendaylight.infrautils.metrics.MetricProvider;
33 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
34 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler;
35 import org.opendaylight.serviceutils.srm.RecoverableListener;
36 import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
37 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
38 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 @Singleton
43 public class HAOpClusteredListener extends HwvtepNodeBaseListener<Operational>
44         implements ClusteredDataTreeChangeListener<Node>, RecoverableListener {
45     private static final Logger LOG = LoggerFactory.getLogger(HAOpClusteredListener.class);
46
47     private final Set<InstanceIdentifier<Node>> connectedNodes = ConcurrentHashMap.newKeySet();
48     private final Map<InstanceIdentifier<Node>, Set<Consumer<Optional<Node>>>> waitingJobs = new ConcurrentHashMap<>();
49
50     @Inject
51     public HAOpClusteredListener(DataBroker db, HwvtepNodeHACache hwvtepNodeHACache,
52                                  MetricProvider metricProvider,
53                                  final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler,
54                                  final ServiceRecoveryRegistry serviceRecoveryRegistry) throws Exception {
55         super(OPERATIONAL, db, hwvtepNodeHACache, metricProvider, false);
56         LOG.info("Registering HAOpClusteredListener");
57         serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(), this);
58     }
59
60     @Override
61     @SuppressWarnings("all")
62     public void registerListener() {
63         try {
64             LOG.info("Registering HAOpClusteredListener");
65             registerListener(OPERATIONAL, getDataBroker());
66         } catch (Exception e) {
67             LOG.error("HA OP Clustered register listener error.");
68         }
69
70     }
71
72     public void deregisterListener() {
73         LOG.info("Deregistering HAOpClusteredListener");
74         super.close();
75     }
76
77     public Set<InstanceIdentifier<Node>> getConnectedNodes() {
78         return connectedNodes;
79     }
80
81     @Override
82     synchronized  void onGlobalNodeDelete(InstanceIdentifier<Node> key, Node added,
83             TypedReadWriteTransaction<Operational> tx)  {
84         connectedNodes.remove(key);
85         getHwvtepNodeHACache().updateDisconnectedNodeStatus(key);
86     }
87
88     @Override
89     void onPsNodeDelete(InstanceIdentifier<Node> key, Node addedPSNode, TypedReadWriteTransaction<Operational> tx)  {
90         connectedNodes.remove(key);
91         getHwvtepNodeHACache().updateDisconnectedNodeStatus(key);
92     }
93
94     @Override
95     void onPsNodeAdd(InstanceIdentifier<Node> key, Node addedPSNode, TypedReadWriteTransaction<Operational> tx)    {
96         connectedNodes.add(key);
97         getHwvtepNodeHACache().updateConnectedNodeStatus(key);
98     }
99
100     @Override
101     public synchronized void onGlobalNodeAdd(InstanceIdentifier<Node> key, Node updated,
102             TypedReadWriteTransaction<Operational> tx) {
103         connectedNodes. add(key);
104         HwvtepHAUtil.addToCacheIfHAChildNode(key, updated, getHwvtepNodeHACache());
105         getHwvtepNodeHACache().updateConnectedNodeStatus(key);
106         if (waitingJobs.containsKey(key) && !waitingJobs.get(key).isEmpty()) {
107             try {
108                 HAJobScheduler jobScheduler = HAJobScheduler.getInstance();
109                 Optional<Node> nodeOptional = tx.read(key).get();
110                 if (nodeOptional.isPresent()) {
111                     waitingJobs.get(key).forEach(
112                         (waitingJob) -> jobScheduler.submitJob(() -> waitingJob.accept(nodeOptional)));
113                     waitingJobs.get(key).clear();
114                 } else {
115                     LOG.error("Failed to read oper node {}", key);
116                 }
117             } catch (InterruptedException | ExecutionException e) {
118                 LOG.error("Failed to read oper node {}", key);
119             }
120         }
121     }
122
123     @Override
124     void onGlobalNodeUpdate(InstanceIdentifier<Node> childPath,
125                             Node updatedChildNode,
126                             Node beforeChildNode,
127                             DataObjectModification<Node> mod,
128                             TypedReadWriteTransaction<Operational> tx) {
129         boolean wasHAChild = getHwvtepNodeHACache().isHAEnabledDevice(childPath);
130         addToHACacheIfBecameHAChild(childPath, updatedChildNode, beforeChildNode);
131         boolean isHAChild = getHwvtepNodeHACache().isHAEnabledDevice(childPath);
132
133
134         if (!wasHAChild && isHAChild) {
135             LOG.debug("{} became ha_child", getNodeId(childPath));
136         } else if (wasHAChild && !isHAChild) {
137             LOG.debug("{} unbecome ha_child", getNodeId(childPath));
138         }
139     }
140
141     static String getNodeId(InstanceIdentifier<Node> key) {
142         String nodeId = key.firstKeyOf(Node.class).getNodeId().getValue();
143         int idx = nodeId.indexOf("uuid/");
144         if (idx > 0) {
145             nodeId = nodeId.substring(idx + "uuid/".length());
146         }
147         return nodeId;
148     }
149
150     public Set<InstanceIdentifier<Node>> getConnected(Set<InstanceIdentifier<Node>> candidateds) {
151         if (candidateds == null) {
152             return Collections.emptySet();
153         }
154         return candidateds.stream()
155                 .filter(connectedNodes::contains)
156                 .collect(Collectors.toSet());
157     }
158
159     public synchronized void runAfterNodeIsConnected(InstanceIdentifier<Node> iid, Consumer<Optional<Node>> consumer) {
160         if (connectedNodes.contains(iid)) {
161             HAJobScheduler.getInstance().submitJob(() -> {
162                 try (ReadOnlyTransaction tx = getDataBroker().newReadOnlyTransaction()) {
163                     consumer.accept(tx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet());
164                 } catch (ReadFailedException e) {
165                     LOG.error("Failed to read oper ds {}", iid);
166                 }
167             });
168         } else {
169             waitingJobs.computeIfAbsent(iid, key -> Sets.newConcurrentHashSet()).add(consumer);
170         }
171     }
172 }
173