04cfefd7d435892d46d10b89d7dc20e404baa87c
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / ha / listeners / HwvtepNodeDataListener.java
1 /*
2  * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
9
10 import com.google.common.base.Optional;
11 import java.util.Objects;
12 import java.util.Set;
13 import java.util.function.BiConsumer;
14 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
15 import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
16 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
17 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
18 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
19 import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
20 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
21 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
22 import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
23 import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
24 import org.opendaylight.netvirt.elan.l2gw.ha.commands.MergeCommand;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteUcastMacs;
26 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
27 import org.opendaylight.yangtools.yang.binding.DataObject;
28 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * Listens for the node children data updates and propagates the updates between ha child and parent nodes.
34  * When an operational child node data is updated, it is copied to parent
35  * When a config parent node data is updated , it is copied to all its children.
36  */
37 public abstract class HwvtepNodeDataListener<T extends DataObject>
38         extends AsyncDataTreeChangeListenerBase<T, HwvtepNodeDataListener<T>> {
39
40     private static final Logger LOG = LoggerFactory.getLogger(HwvtepNodeDataListener.class);
41
42     private final ManagedNewTransactionRunner txRunner;
43     private final MergeCommand<T, ?, ?> mergeCommand;
44     private final LogicalDatastoreType datastoreType;
45     private final BiConsumer<InstanceIdentifier<T>, T> addOperation;
46     private final BiConsumer<InstanceIdentifier<T>, T> removeOperation;
47     private final HwvtepNodeHACache hwvtepNodeHACache;
48
49     public HwvtepNodeDataListener(DataBroker broker,
50                                   HwvtepNodeHACache hwvtepNodeHACache,
51                                   Class<T> clazz,
52                                   Class<HwvtepNodeDataListener<T>> eventClazz,
53                                   MergeCommand<T, ?, ?> mergeCommand,
54                                   LogicalDatastoreType datastoreType) {
55         super(clazz, eventClazz);
56         this.hwvtepNodeHACache = hwvtepNodeHACache;
57         this.txRunner = new ManagedNewTransactionRunnerImpl(broker);
58         this.mergeCommand = mergeCommand;
59         this.datastoreType = datastoreType;
60         if (LogicalDatastoreType.OPERATIONAL == datastoreType) {
61             this.addOperation = this::copyToParent;
62             this.removeOperation = this::deleteFromParent;
63         } else {
64             this.addOperation = this::copyToChildren;
65             this.removeOperation = this::deleteFromChildren;
66         }
67     }
68
69     @Override
70     protected abstract InstanceIdentifier<T> getWildCardPath();
71
72     @Override
73     protected void add(InstanceIdentifier<T> identifier, T dataAdded) {
74         HAJobScheduler.getInstance().submitJob(() -> addOperation.accept(identifier, dataAdded));
75     }
76
77     @Override
78     protected void update(InstanceIdentifier<T> key, T before, T after) {
79         HAJobScheduler.getInstance().submitJob(() -> {
80             if (Objects.equals(before, after)) {
81                 //incase of cluter reboots tx.put will rewrite the data and fire unnecessary updates
82                 return;
83             }
84             add(key, after);
85         });
86     }
87
88     @Override
89     protected void remove(InstanceIdentifier<T> identifier, T dataRemoved) {
90         HAJobScheduler.getInstance().submitJob(() -> removeOperation.accept(identifier, dataRemoved));
91     }
92
93     private boolean isNodeConnected(InstanceIdentifier<T> identifier, ReadTransaction tx)
94             throws ReadFailedException {
95         return tx.read(LogicalDatastoreType.OPERATIONAL, identifier.firstIdentifierOf(Node.class))
96                 .checkedGet().isPresent();
97     }
98
99     private static <T extends DataObject> boolean isDataUpdated(Optional<T> existingDataOptional, T newData) {
100         return !existingDataOptional.isPresent() || !Objects.equals(existingDataOptional.get(), newData);
101     }
102
103     private void copyToParent(InstanceIdentifier<T> identifier, T data) {
104         if (clazz == RemoteUcastMacs.class) {
105             LOG.trace("Skipping remote ucast macs to parent");
106             return;
107         }
108         InstanceIdentifier<Node> parent = getHAParent(identifier);
109         if (parent != null) {
110             LOG.trace("Copy child op data {} to parent {}", mergeCommand.getDescription(), getNodeId(parent));
111             T parentData = mergeCommand.transform(parent, data);
112             InstanceIdentifier<T> parentIdentifier = mergeCommand.generateId(parent, parentData);
113             ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(
114                 tx -> writeToMdsal(tx, parentData, parentIdentifier)), LOG, "Error copying to parent");
115         }
116     }
117
118     private void deleteFromParent(InstanceIdentifier<T> identifier, T data) {
119         if (clazz == RemoteUcastMacs.class) {
120             LOG.trace("Skipping remote ucast macs to parent");
121             return;
122         }
123         InstanceIdentifier<Node> parent = getHAParent(identifier);
124         if (parent != null) {
125             ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
126                 if (isNodeConnected(identifier, tx)) {
127                     LOG.trace("Copy child op data {} to parent {} create:{}", mergeCommand.getDescription(),
128                             getNodeId(parent), false);
129                     T parentData = mergeCommand.transform(parent, data);
130                     InstanceIdentifier<T> parentIdentifier = mergeCommand.generateId(parent, parentData);
131                     deleteFromMdsal(tx, parentIdentifier);
132                 }
133             }), LOG, "Error deleting from parent");
134         }
135     }
136
137     private void copyToChildren(final InstanceIdentifier<T> parentIdentifier, final T parentData) {
138         Set<InstanceIdentifier<Node>> children = getChildrenForHANode(parentIdentifier);
139         if (children != null) {
140             ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
141                 for (InstanceIdentifier<Node> child : children) {
142                     LOG.trace("Copy parent config data {} to child {}", mergeCommand.getDescription(),
143                             getNodeId(child));
144                     final T childData = mergeCommand.transform(child, parentData);
145                     final InstanceIdentifier<T> identifier = mergeCommand.generateId(child, childData);
146                     writeToMdsal(tx, childData, identifier);
147                 }
148             }), LOG, "Error copying to children");
149         }
150     }
151
152     private void deleteFromChildren(final InstanceIdentifier<T> parentIdentifier, final T parentData) {
153         Set<InstanceIdentifier<Node>> children = getChildrenForHANode(parentIdentifier);
154         if (children != null) {
155             ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
156                 for (InstanceIdentifier<Node> child : children) {
157                     LOG.trace("Delete parent config data {} to child {}", mergeCommand.getDescription(),
158                             getNodeId(child));
159                     final T childData = mergeCommand.transform(child, parentData);
160                     final InstanceIdentifier<T> identifier = mergeCommand.generateId(child, childData);
161                     deleteFromMdsal(tx, identifier);
162                 }
163             }), LOG, "Error deleting from children");
164         }
165     }
166
167     private void writeToMdsal(final ReadWriteTransaction tx, final T data, final InstanceIdentifier<T> identifier)
168             throws ReadFailedException {
169         if (isDataUpdated(tx.read(datastoreType, identifier).checkedGet(), data)) {
170             tx.put(datastoreType, identifier, data);
171         }
172     }
173
174     private void deleteFromMdsal(final ReadWriteTransaction tx,
175             final InstanceIdentifier<T> identifier) throws ReadFailedException {
176         if (tx.read(datastoreType, identifier).checkedGet().isPresent()) {
177             tx.delete(datastoreType, identifier);
178         }
179     }
180
181     private String getNodeId(InstanceIdentifier<Node> iid) {
182         return iid.firstKeyOf(Node.class).getNodeId().getValue();
183     }
184
185     @Override
186     protected HwvtepNodeDataListener<T> getDataTreeChangeListener() {
187         return HwvtepNodeDataListener.this;
188     }
189
190     protected Set<InstanceIdentifier<Node>> getChildrenForHANode(InstanceIdentifier identifier) {
191         InstanceIdentifier<Node> parent = identifier.firstIdentifierOf(Node.class);
192         return hwvtepNodeHACache.getChildrenForHANode(parent);
193     }
194
195     protected InstanceIdentifier<Node> getHAParent(InstanceIdentifier identifier) {
196         InstanceIdentifier<Node> child = identifier.firstIdentifierOf(Node.class);
197         return hwvtepNodeHACache.getParent(child);
198     }
199 }