c7ea093ea56040e77524dc359dbfd94454753ffc
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / ha / listeners / HwvtepNodeDataListener.java
1 /*
2  * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
9
10 import com.google.common.base.Optional;
11 import java.util.Objects;
12 import java.util.Set;
13 import java.util.concurrent.ExecutionException;
14 import java.util.function.BiConsumer;
15 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
16 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
17 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
18 import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
19 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
20 import org.opendaylight.genius.infra.Datastore;
21 import org.opendaylight.genius.infra.Datastore.Operational;
22 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
23 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
24 import org.opendaylight.genius.infra.TypedReadWriteTransaction;
25 import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
26 import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
27 import org.opendaylight.netvirt.elan.l2gw.ha.commands.MergeCommand;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteUcastMacs;
29 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
30 import org.opendaylight.yangtools.yang.binding.DataObject;
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * Listens for the node children data updates and propagates the updates between ha child and parent nodes.
37  * When an operational child node data is updated, it is copied to parent
38  * When a config parent node data is updated , it is copied to all its children.
39  */
40 public abstract class HwvtepNodeDataListener<D extends Datastore, T extends DataObject>
41         extends AsyncDataTreeChangeListenerBase<T, HwvtepNodeDataListener<D, T>> {
42
43     private static final Logger LOG = LoggerFactory.getLogger(HwvtepNodeDataListener.class);
44
45     private final ManagedNewTransactionRunner txRunner;
46     private final SingleTransactionDataBroker singleTxBroker;
47     private final MergeCommand<T, ?, ?> mergeCommand;
48     private final Class<D> datastoreType;
49     private final BiConsumer<InstanceIdentifier<T>, T> addOperation;
50     private final BiConsumer<InstanceIdentifier<T>, T> removeOperation;
51     private final HwvtepNodeHACache hwvtepNodeHACache;
52
53     public HwvtepNodeDataListener(DataBroker broker,
54                                   HwvtepNodeHACache hwvtepNodeHACache,
55                                   Class<T> clazz,
56                                   Class<HwvtepNodeDataListener<D, T>> eventClazz,
57                                   MergeCommand<T, ?, ?> mergeCommand,
58                                   Class<D> datastoreType) {
59         super(clazz, eventClazz);
60         this.hwvtepNodeHACache = hwvtepNodeHACache;
61         this.txRunner = new ManagedNewTransactionRunnerImpl(broker);
62         this.singleTxBroker = new SingleTransactionDataBroker(broker);
63         this.mergeCommand = mergeCommand;
64         this.datastoreType = datastoreType;
65         if (Operational.class.equals(datastoreType)) {
66             this.addOperation = this::copyToParent;
67             this.removeOperation = this::deleteFromParent;
68         } else {
69             this.addOperation = this::copyToChildren;
70             this.removeOperation = this::deleteFromChildren;
71         }
72     }
73
74     @Override
75     protected abstract InstanceIdentifier<T> getWildCardPath();
76
77     @Override
78     protected void add(InstanceIdentifier<T> identifier, T dataAdded) {
79         HAJobScheduler.getInstance().submitJob(() -> addOperation.accept(identifier, dataAdded));
80     }
81
82     @Override
83     protected void update(InstanceIdentifier<T> key, T before, T after) {
84         HAJobScheduler.getInstance().submitJob(() -> {
85             if (Objects.equals(before, after)) {
86                 //incase of cluter reboots tx.put will rewrite the data and fire unnecessary updates
87                 return;
88             }
89             add(key, after);
90         });
91     }
92
93     @Override
94     protected void remove(InstanceIdentifier<T> identifier, T dataRemoved) {
95         HAJobScheduler.getInstance().submitJob(() -> removeOperation.accept(identifier, dataRemoved));
96     }
97
98     private boolean isNodeConnected(InstanceIdentifier<T> identifier)
99             throws ReadFailedException {
100         return singleTxBroker.syncReadOptional(LogicalDatastoreType.OPERATIONAL,
101             identifier.firstIdentifierOf(Node.class)).isPresent();
102     }
103
104     private static <T extends DataObject> boolean isDataUpdated(Optional<T> existingDataOptional, T newData) {
105         return !existingDataOptional.isPresent() || !Objects.equals(existingDataOptional.get(), newData);
106     }
107
108     private void copyToParent(InstanceIdentifier<T> identifier, T data) {
109         if (clazz == RemoteUcastMacs.class) {
110             LOG.trace("Skipping remote ucast macs to parent");
111             return;
112         }
113         InstanceIdentifier<Node> parent = getHAParent(identifier);
114         if (parent != null) {
115             LOG.trace("Copy child op data {} to parent {}", mergeCommand.getDescription(), getNodeId(parent));
116             T parentData = mergeCommand.transform(parent, data);
117             InstanceIdentifier<T> parentIdentifier = mergeCommand.generateId(parent, parentData);
118             ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType,
119                 tx -> writeToMdsal(tx, parentData, parentIdentifier)), LOG, "Error copying to parent");
120         }
121     }
122
123     private void deleteFromParent(InstanceIdentifier<T> identifier, T data) {
124         if (clazz == RemoteUcastMacs.class) {
125             LOG.trace("Skipping remote ucast macs to parent");
126             return;
127         }
128         InstanceIdentifier<Node> parent = getHAParent(identifier);
129         if (parent != null) {
130             ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType, tx -> {
131                 if (isNodeConnected(identifier)) {
132                     LOG.trace("Copy child op data {} to parent {} create:{}", mergeCommand.getDescription(),
133                             getNodeId(parent), false);
134                     T parentData = mergeCommand.transform(parent, data);
135                     InstanceIdentifier<T> parentIdentifier = mergeCommand.generateId(parent, parentData);
136                     deleteFromMdsal(tx, parentIdentifier);
137                 }
138             }), LOG, "Error deleting from parent");
139         }
140     }
141
142     private void copyToChildren(final InstanceIdentifier<T> parentIdentifier, final T parentData) {
143         Set<InstanceIdentifier<Node>> children = getChildrenForHANode(parentIdentifier);
144         if (children != null) {
145             ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType, tx -> {
146                 for (InstanceIdentifier<Node> child : children) {
147                     LOG.trace("Copy parent config data {} to child {}", mergeCommand.getDescription(),
148                             getNodeId(child));
149                     final T childData = mergeCommand.transform(child, parentData);
150                     final InstanceIdentifier<T> identifier = mergeCommand.generateId(child, childData);
151                     writeToMdsal(tx, childData, identifier);
152                 }
153             }), LOG, "Error copying to children");
154         }
155     }
156
157     private void deleteFromChildren(final InstanceIdentifier<T> parentIdentifier, final T parentData) {
158         Set<InstanceIdentifier<Node>> children = getChildrenForHANode(parentIdentifier);
159         if (children != null) {
160             ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType, tx -> {
161                 for (InstanceIdentifier<Node> child : children) {
162                     LOG.trace("Delete parent config data {} to child {}", mergeCommand.getDescription(),
163                             getNodeId(child));
164                     final T childData = mergeCommand.transform(child, parentData);
165                     final InstanceIdentifier<T> identifier = mergeCommand.generateId(child, childData);
166                     deleteFromMdsal(tx, identifier);
167                 }
168             }), LOG, "Error deleting from children");
169         }
170     }
171
172     private void writeToMdsal(final TypedReadWriteTransaction<D> tx, final T data,
173             final InstanceIdentifier<T> identifier) throws ExecutionException, InterruptedException {
174         if (isDataUpdated(tx.read(identifier).get(), data)) {
175             tx.put(identifier, data);
176         }
177     }
178
179     private void deleteFromMdsal(final TypedReadWriteTransaction<D> tx,
180             final InstanceIdentifier<T> identifier) throws ExecutionException, InterruptedException {
181         if (tx.read(identifier).get().isPresent()) {
182             tx.delete(identifier);
183         }
184     }
185
186     private String getNodeId(InstanceIdentifier<Node> iid) {
187         return iid.firstKeyOf(Node.class).getNodeId().getValue();
188     }
189
190     @Override
191     protected HwvtepNodeDataListener<D, T> getDataTreeChangeListener() {
192         return HwvtepNodeDataListener.this;
193     }
194
195     protected Set<InstanceIdentifier<Node>> getChildrenForHANode(InstanceIdentifier<T> identifier) {
196         InstanceIdentifier<Node> parent = identifier.firstIdentifierOf(Node.class);
197         return hwvtepNodeHACache.getChildrenForHANode(parent);
198     }
199
200     protected InstanceIdentifier<Node> getHAParent(InstanceIdentifier<T> identifier) {
201         InstanceIdentifier<Node> child = identifier.firstIdentifierOf(Node.class);
202         return hwvtepNodeHACache.getParent(child);
203     }
204 }