2 * Copyright (c) 2016 ,2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
10 import static org.opendaylight.mdsal.binding.util.Datastore.OPERATIONAL;
12 import com.google.common.collect.Sets;
13 import java.util.Collections;
15 import java.util.Optional;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ExecutionException;
19 import java.util.function.Consumer;
20 import java.util.stream.Collectors;
21 import javax.inject.Inject;
22 import javax.inject.Singleton;
23 import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
24 import org.opendaylight.infrautils.metrics.MetricProvider;
25 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
26 import org.opendaylight.mdsal.binding.api.DataBroker;
27 import org.opendaylight.mdsal.binding.api.DataObjectModification;
28 import org.opendaylight.mdsal.binding.api.ReadTransaction;
29 import org.opendaylight.mdsal.binding.util.Datastore.Operational;
30 import org.opendaylight.mdsal.binding.util.TypedReadWriteTransaction;
31 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
32 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
33 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler;
34 import org.opendaylight.serviceutils.srm.RecoverableListener;
35 import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
36 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
37 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 public class HAOpClusteredListener extends HwvtepNodeBaseListener<Operational>
43 implements ClusteredDataTreeChangeListener<Node>, RecoverableListener {
44 private static final Logger LOG = LoggerFactory.getLogger(HAOpClusteredListener.class);
46 private final Set<InstanceIdentifier<Node>> connectedNodes = ConcurrentHashMap.newKeySet();
47 private final Map<InstanceIdentifier<Node>, Set<Consumer<Optional<Node>>>> waitingJobs = new ConcurrentHashMap<>();
50 public HAOpClusteredListener(DataBroker db, HwvtepNodeHACache hwvtepNodeHACache,
51 MetricProvider metricProvider,
52 final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler,
53 final ServiceRecoveryRegistry serviceRecoveryRegistry) throws Exception {
54 super(OPERATIONAL, db, hwvtepNodeHACache, metricProvider, false);
55 LOG.info("Registering HAOpClusteredListener");
56 serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(), this);
60 @SuppressWarnings("all")
61 public void registerListener() {
63 LOG.info("Registering HAOpClusteredListener");
64 registerListener(OPERATIONAL, getDataBroker());
65 } catch (Exception e) {
66 LOG.error("HA OP Clustered register listener error.");
71 public void deregisterListener() {
72 LOG.info("Deregistering HAOpClusteredListener");
76 public Set<InstanceIdentifier<Node>> getConnectedNodes() {
77 return connectedNodes;
81 synchronized void onGlobalNodeDelete(InstanceIdentifier<Node> key, Node added,
82 TypedReadWriteTransaction<Operational> tx) {
83 connectedNodes.remove(key);
84 getHwvtepNodeHACache().updateDisconnectedNodeStatus(key);
88 void onPsNodeDelete(InstanceIdentifier<Node> key, Node addedPSNode, TypedReadWriteTransaction<Operational> tx) {
89 connectedNodes.remove(key);
90 getHwvtepNodeHACache().updateDisconnectedNodeStatus(key);
94 void onPsNodeAdd(InstanceIdentifier<Node> key, Node addedPSNode, TypedReadWriteTransaction<Operational> tx) {
95 connectedNodes.add(key);
96 getHwvtepNodeHACache().updateConnectedNodeStatus(key);
100 public synchronized void onGlobalNodeAdd(InstanceIdentifier<Node> key, Node updated,
101 TypedReadWriteTransaction<Operational> tx) {
102 connectedNodes. add(key);
103 HwvtepHAUtil.addToCacheIfHAChildNode(key, updated, getHwvtepNodeHACache());
104 getHwvtepNodeHACache().updateConnectedNodeStatus(key);
105 if (waitingJobs.containsKey(key) && !waitingJobs.get(key).isEmpty()) {
107 HAJobScheduler jobScheduler = HAJobScheduler.getInstance();
108 Optional<Node> nodeOptional = tx.read(key).get();
109 if (nodeOptional.isPresent()) {
110 waitingJobs.get(key).forEach(
111 (waitingJob) -> jobScheduler.submitJob(() -> waitingJob.accept(nodeOptional)));
112 waitingJobs.get(key).clear();
114 LOG.error("Failed to read oper node {}", key);
116 } catch (InterruptedException | ExecutionException e) {
117 LOG.error("Failed to read oper node {}", key);
123 void onGlobalNodeUpdate(InstanceIdentifier<Node> childPath,
124 Node updatedChildNode,
125 Node beforeChildNode,
126 DataObjectModification<Node> mod,
127 TypedReadWriteTransaction<Operational> tx) {
128 boolean wasHAChild = getHwvtepNodeHACache().isHAEnabledDevice(childPath);
129 addToHACacheIfBecameHAChild(childPath, updatedChildNode, beforeChildNode);
130 boolean isHAChild = getHwvtepNodeHACache().isHAEnabledDevice(childPath);
133 if (!wasHAChild && isHAChild) {
134 LOG.debug("{} became ha_child", getNodeId(childPath));
135 } else if (wasHAChild && !isHAChild) {
136 LOG.debug("{} unbecome ha_child", getNodeId(childPath));
140 static String getNodeId(InstanceIdentifier<Node> key) {
141 String nodeId = key.firstKeyOf(Node.class).getNodeId().getValue();
142 int idx = nodeId.indexOf("uuid/");
144 nodeId = nodeId.substring(idx + "uuid/".length());
149 public Set<InstanceIdentifier<Node>> getConnected(Set<InstanceIdentifier<Node>> candidateds) {
150 if (candidateds == null) {
151 return Collections.emptySet();
153 return candidateds.stream()
154 .filter(connectedNodes::contains)
155 .collect(Collectors.toSet());
158 public synchronized void runAfterNodeIsConnected(InstanceIdentifier<Node> iid, Consumer<Optional<Node>> consumer) {
159 if (connectedNodes.contains(iid)) {
160 HAJobScheduler.getInstance().submitJob(() -> {
161 try (ReadTransaction tx = getDataBroker().newReadOnlyTransaction()) {
162 consumer.accept(tx.read(LogicalDatastoreType.OPERATIONAL, iid).get());
163 } catch (InterruptedException | ExecutionException e) {
164 LOG.error("Failed to read oper ds {}", iid);
168 waitingJobs.computeIfAbsent(iid, key -> Sets.newConcurrentHashSet()).add(consumer);