2 * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
10 import com.google.common.base.Optional;
11 import java.util.Objects;
13 import java.util.function.BiConsumer;
14 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
15 import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
16 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
17 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
18 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
19 import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
20 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
21 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
22 import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
23 import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
24 import org.opendaylight.netvirt.elan.l2gw.ha.commands.MergeCommand;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteUcastMacs;
26 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
27 import org.opendaylight.yangtools.yang.binding.DataObject;
28 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * Listens for the node children data updates and propagates the updates between ha child and parent nodes.
34 * When an operational child node data is updated, it is copied to parent
35 * When a config parent node data is updated , it is copied to all its children.
37 public abstract class HwvtepNodeDataListener<T extends DataObject>
38 extends AsyncDataTreeChangeListenerBase<T, HwvtepNodeDataListener<T>> {
40 private static final Logger LOG = LoggerFactory.getLogger(HwvtepNodeDataListener.class);
42 private final ManagedNewTransactionRunner txRunner;
43 private final MergeCommand<T, ?, ?> mergeCommand;
44 private final LogicalDatastoreType datastoreType;
45 private final BiConsumer<InstanceIdentifier<T>, T> addOperation;
46 private final BiConsumer<InstanceIdentifier<T>, T> removeOperation;
47 private final HwvtepNodeHACache hwvtepNodeHACache;
49 public HwvtepNodeDataListener(DataBroker broker,
50 HwvtepNodeHACache hwvtepNodeHACache,
52 Class<HwvtepNodeDataListener<T>> eventClazz,
53 MergeCommand<T, ?, ?> mergeCommand,
54 LogicalDatastoreType datastoreType) {
55 super(clazz, eventClazz);
56 this.hwvtepNodeHACache = hwvtepNodeHACache;
57 this.txRunner = new ManagedNewTransactionRunnerImpl(broker);
58 this.mergeCommand = mergeCommand;
59 this.datastoreType = datastoreType;
60 if (LogicalDatastoreType.OPERATIONAL == datastoreType) {
61 this.addOperation = this::copyToParent;
62 this.removeOperation = this::deleteFromParent;
64 this.addOperation = this::copyToChildren;
65 this.removeOperation = this::deleteFromChildren;
70 protected abstract InstanceIdentifier<T> getWildCardPath();
73 protected void add(InstanceIdentifier<T> identifier, T dataAdded) {
74 HAJobScheduler.getInstance().submitJob(() -> addOperation.accept(identifier, dataAdded));
78 protected void update(InstanceIdentifier<T> key, T before, T after) {
79 HAJobScheduler.getInstance().submitJob(() -> {
80 if (Objects.equals(before, after)) {
81 //incase of cluter reboots tx.put will rewrite the data and fire unnecessary updates
89 protected void remove(InstanceIdentifier<T> identifier, T dataRemoved) {
90 HAJobScheduler.getInstance().submitJob(() -> removeOperation.accept(identifier, dataRemoved));
93 private boolean isNodeConnected(InstanceIdentifier<T> identifier, ReadTransaction tx)
94 throws ReadFailedException {
95 return tx.read(LogicalDatastoreType.OPERATIONAL, identifier.firstIdentifierOf(Node.class))
96 .checkedGet().isPresent();
99 private static <T extends DataObject> boolean isDataUpdated(Optional<T> existingDataOptional, T newData) {
100 return !existingDataOptional.isPresent() || !Objects.equals(existingDataOptional.get(), newData);
103 private void copyToParent(InstanceIdentifier<T> identifier, T data) {
104 if (clazz == RemoteUcastMacs.class) {
105 LOG.trace("Skipping remote ucast macs to parent");
108 InstanceIdentifier<Node> parent = getHAParent(identifier);
109 if (parent != null) {
110 LOG.trace("Copy child op data {} to parent {}", mergeCommand.getDescription(), getNodeId(parent));
111 T parentData = mergeCommand.transform(parent, data);
112 InstanceIdentifier<T> parentIdentifier = mergeCommand.generateId(parent, parentData);
113 ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(
114 tx -> writeToMdsal(tx, parentData, parentIdentifier)), LOG, "Error copying to parent");
118 private void deleteFromParent(InstanceIdentifier<T> identifier, T data) {
119 if (clazz == RemoteUcastMacs.class) {
120 LOG.trace("Skipping remote ucast macs to parent");
123 InstanceIdentifier<Node> parent = getHAParent(identifier);
124 if (parent != null) {
125 ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
126 if (isNodeConnected(identifier, tx)) {
127 LOG.trace("Copy child op data {} to parent {} create:{}", mergeCommand.getDescription(),
128 getNodeId(parent), false);
129 T parentData = mergeCommand.transform(parent, data);
130 InstanceIdentifier<T> parentIdentifier = mergeCommand.generateId(parent, parentData);
131 deleteFromMdsal(tx, parentIdentifier);
133 }), LOG, "Error deleting from parent");
137 private void copyToChildren(final InstanceIdentifier<T> parentIdentifier, final T parentData) {
138 Set<InstanceIdentifier<Node>> children = getChildrenForHANode(parentIdentifier);
139 if (children != null) {
140 ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
141 for (InstanceIdentifier<Node> child : children) {
142 LOG.trace("Copy parent config data {} to child {}", mergeCommand.getDescription(),
144 final T childData = mergeCommand.transform(child, parentData);
145 final InstanceIdentifier<T> identifier = mergeCommand.generateId(child, childData);
146 writeToMdsal(tx, childData, identifier);
148 }), LOG, "Error copying to children");
152 private void deleteFromChildren(final InstanceIdentifier<T> parentIdentifier, final T parentData) {
153 Set<InstanceIdentifier<Node>> children = getChildrenForHANode(parentIdentifier);
154 if (children != null) {
155 ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
156 for (InstanceIdentifier<Node> child : children) {
157 LOG.trace("Delete parent config data {} to child {}", mergeCommand.getDescription(),
159 final T childData = mergeCommand.transform(child, parentData);
160 final InstanceIdentifier<T> identifier = mergeCommand.generateId(child, childData);
161 deleteFromMdsal(tx, identifier);
163 }), LOG, "Error deleting from children");
167 private void writeToMdsal(final ReadWriteTransaction tx, final T data, final InstanceIdentifier<T> identifier)
168 throws ReadFailedException {
169 if (isDataUpdated(tx.read(datastoreType, identifier).checkedGet(), data)) {
170 tx.put(datastoreType, identifier, data);
174 private void deleteFromMdsal(final ReadWriteTransaction tx,
175 final InstanceIdentifier<T> identifier) throws ReadFailedException {
176 if (tx.read(datastoreType, identifier).checkedGet().isPresent()) {
177 tx.delete(datastoreType, identifier);
181 private String getNodeId(InstanceIdentifier<Node> iid) {
182 return iid.firstKeyOf(Node.class).getNodeId().getValue();
186 protected HwvtepNodeDataListener<T> getDataTreeChangeListener() {
187 return HwvtepNodeDataListener.this;
190 protected Set<InstanceIdentifier<Node>> getChildrenForHANode(InstanceIdentifier identifier) {
191 InstanceIdentifier<Node> parent = identifier.firstIdentifierOf(Node.class);
192 return hwvtepNodeHACache.getChildrenForHANode(parent);
195 protected InstanceIdentifier<Node> getHAParent(InstanceIdentifier identifier) {
196 InstanceIdentifier<Node> child = identifier.firstIdentifierOf(Node.class);
197 return hwvtepNodeHACache.getParent(child);