2 * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
10 import com.google.common.base.Optional;
11 import java.util.Objects;
13 import java.util.concurrent.ExecutionException;
14 import java.util.function.BiConsumer;
15 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
16 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
17 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
18 import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
19 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
20 import org.opendaylight.genius.infra.Datastore;
21 import org.opendaylight.genius.infra.Datastore.Operational;
22 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
23 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
24 import org.opendaylight.genius.infra.TypedReadWriteTransaction;
25 import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
26 import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
27 import org.opendaylight.netvirt.elan.l2gw.ha.commands.MergeCommand;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteUcastMacs;
29 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
30 import org.opendaylight.yangtools.yang.binding.DataObject;
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 * Listens for the node children data updates and propagates the updates between ha child and parent nodes.
37 * When an operational child node data is updated, it is copied to parent
38 * When a config parent node data is updated , it is copied to all its children.
40 public abstract class HwvtepNodeDataListener<D extends Datastore, T extends DataObject>
41 extends AsyncDataTreeChangeListenerBase<T, HwvtepNodeDataListener<D, T>> {
43 private static final Logger LOG = LoggerFactory.getLogger(HwvtepNodeDataListener.class);
45 private final ManagedNewTransactionRunner txRunner;
46 private final SingleTransactionDataBroker singleTxBroker;
47 private final MergeCommand<T, ?, ?> mergeCommand;
48 private final Class<D> datastoreType;
49 private final BiConsumer<InstanceIdentifier<T>, T> addOperation;
50 private final BiConsumer<InstanceIdentifier<T>, T> removeOperation;
51 private final HwvtepNodeHACache hwvtepNodeHACache;
53 public HwvtepNodeDataListener(DataBroker broker,
54 HwvtepNodeHACache hwvtepNodeHACache,
56 Class<HwvtepNodeDataListener<D, T>> eventClazz,
57 MergeCommand<T, ?, ?> mergeCommand,
58 Class<D> datastoreType) {
59 super(clazz, eventClazz);
60 this.hwvtepNodeHACache = hwvtepNodeHACache;
61 this.txRunner = new ManagedNewTransactionRunnerImpl(broker);
62 this.singleTxBroker = new SingleTransactionDataBroker(broker);
63 this.mergeCommand = mergeCommand;
64 this.datastoreType = datastoreType;
65 if (Operational.class.equals(datastoreType)) {
66 this.addOperation = this::copyToParent;
67 this.removeOperation = this::deleteFromParent;
69 this.addOperation = this::copyToChildren;
70 this.removeOperation = this::deleteFromChildren;
75 protected abstract InstanceIdentifier<T> getWildCardPath();
78 protected void add(InstanceIdentifier<T> identifier, T dataAdded) {
79 HAJobScheduler.getInstance().submitJob(() -> addOperation.accept(identifier, dataAdded));
83 protected void update(InstanceIdentifier<T> key, T before, T after) {
84 HAJobScheduler.getInstance().submitJob(() -> {
85 if (Objects.equals(before, after)) {
86 //incase of cluter reboots tx.put will rewrite the data and fire unnecessary updates
94 protected void remove(InstanceIdentifier<T> identifier, T dataRemoved) {
95 HAJobScheduler.getInstance().submitJob(() -> removeOperation.accept(identifier, dataRemoved));
98 private boolean isNodeConnected(InstanceIdentifier<T> identifier)
99 throws ReadFailedException {
100 return singleTxBroker.syncReadOptional(LogicalDatastoreType.OPERATIONAL,
101 identifier.firstIdentifierOf(Node.class)).isPresent();
104 private static <T extends DataObject> boolean isDataUpdated(Optional<T> existingDataOptional, T newData) {
105 return !existingDataOptional.isPresent() || !Objects.equals(existingDataOptional.get(), newData);
108 private void copyToParent(InstanceIdentifier<T> identifier, T data) {
109 if (clazz == RemoteUcastMacs.class) {
110 LOG.trace("Skipping remote ucast macs to parent");
113 InstanceIdentifier<Node> parent = getHAParent(identifier);
114 if (parent != null) {
115 LOG.trace("Copy child op data {} to parent {}", mergeCommand.getDescription(), getNodeId(parent));
116 T parentData = mergeCommand.transform(parent, data);
117 InstanceIdentifier<T> parentIdentifier = mergeCommand.generateId(parent, parentData);
118 ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType,
119 tx -> writeToMdsal(tx, parentData, parentIdentifier)), LOG, "Error copying to parent");
123 private void deleteFromParent(InstanceIdentifier<T> identifier, T data) {
124 if (clazz == RemoteUcastMacs.class) {
125 LOG.trace("Skipping remote ucast macs to parent");
128 InstanceIdentifier<Node> parent = getHAParent(identifier);
129 if (parent != null) {
130 ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType, tx -> {
131 if (isNodeConnected(identifier)) {
132 LOG.trace("Copy child op data {} to parent {} create:{}", mergeCommand.getDescription(),
133 getNodeId(parent), false);
134 T parentData = mergeCommand.transform(parent, data);
135 InstanceIdentifier<T> parentIdentifier = mergeCommand.generateId(parent, parentData);
136 deleteFromMdsal(tx, parentIdentifier);
138 }), LOG, "Error deleting from parent");
142 private void copyToChildren(final InstanceIdentifier<T> parentIdentifier, final T parentData) {
143 Set<InstanceIdentifier<Node>> children = getChildrenForHANode(parentIdentifier);
144 if (children != null) {
145 ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType, tx -> {
146 for (InstanceIdentifier<Node> child : children) {
147 LOG.trace("Copy parent config data {} to child {}", mergeCommand.getDescription(),
149 final T childData = mergeCommand.transform(child, parentData);
150 final InstanceIdentifier<T> identifier = mergeCommand.generateId(child, childData);
151 writeToMdsal(tx, childData, identifier);
153 }), LOG, "Error copying to children");
157 private void deleteFromChildren(final InstanceIdentifier<T> parentIdentifier, final T parentData) {
158 Set<InstanceIdentifier<Node>> children = getChildrenForHANode(parentIdentifier);
159 if (children != null) {
160 ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType, tx -> {
161 for (InstanceIdentifier<Node> child : children) {
162 LOG.trace("Delete parent config data {} to child {}", mergeCommand.getDescription(),
164 final T childData = mergeCommand.transform(child, parentData);
165 final InstanceIdentifier<T> identifier = mergeCommand.generateId(child, childData);
166 deleteFromMdsal(tx, identifier);
168 }), LOG, "Error deleting from children");
172 private void writeToMdsal(final TypedReadWriteTransaction<D> tx, final T data,
173 final InstanceIdentifier<T> identifier) throws ExecutionException, InterruptedException {
174 if (isDataUpdated(tx.read(identifier).get(), data)) {
175 tx.put(identifier, data);
179 private void deleteFromMdsal(final TypedReadWriteTransaction<D> tx,
180 final InstanceIdentifier<T> identifier) throws ExecutionException, InterruptedException {
181 if (tx.read(identifier).get().isPresent()) {
182 tx.delete(identifier);
186 private String getNodeId(InstanceIdentifier<Node> iid) {
187 return iid.firstKeyOf(Node.class).getNodeId().getValue();
191 protected HwvtepNodeDataListener<D, T> getDataTreeChangeListener() {
192 return HwvtepNodeDataListener.this;
195 protected Set<InstanceIdentifier<Node>> getChildrenForHANode(InstanceIdentifier<T> identifier) {
196 InstanceIdentifier<Node> parent = identifier.firstIdentifierOf(Node.class);
197 return hwvtepNodeHACache.getChildrenForHANode(parent);
200 protected InstanceIdentifier<Node> getHAParent(InstanceIdentifier<T> identifier) {
201 InstanceIdentifier<Node> child = identifier.firstIdentifierOf(Node.class);
202 return hwvtepNodeHACache.getParent(child);