Migrate elanmanager to use LoggingFutures
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / ha / listeners / HwvtepNodeBaseListener.java
1 /*
2  * Copyright © 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
9
10 import com.google.common.collect.ImmutableMap;
11 import java.util.Collection;
12 import java.util.List;
13 import java.util.concurrent.ExecutionException;
14 import java.util.function.Function;
15 import javax.annotation.PreDestroy;
16 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
17 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
18 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
19 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
20 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
21 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
22 import org.opendaylight.genius.datastoreutils.TaskRetryLooper;
23 import org.opendaylight.genius.infra.Datastore;
24 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
25 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
26 import org.opendaylight.genius.infra.TypedReadWriteTransaction;
27 import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
28 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
29 import org.opendaylight.infrautils.metrics.Labeled;
30 import org.opendaylight.infrautils.metrics.Meter;
31 import org.opendaylight.infrautils.metrics.MetricDescriptor;
32 import org.opendaylight.infrautils.metrics.MetricProvider;
33 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
34 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.LogicalSwitches;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.Managers;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteMcastMacs;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteUcastMacs;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
45 import org.opendaylight.yangtools.concepts.ListenerRegistration;
46 import org.opendaylight.yangtools.yang.binding.DataObject;
47 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 public abstract class HwvtepNodeBaseListener<D extends Datastore>
52     implements DataTreeChangeListener<Node>, AutoCloseable {
53
54     private static final Logger LOG = LoggerFactory.getLogger(HwvtepNodeBaseListener.class);
55     private static final int STARTUP_LOOP_TICK = 500;
56     private static final int STARTUP_LOOP_MAX_RETRIES = 8;
57
58     private final ListenerRegistration<HwvtepNodeBaseListener> registration;
59     private final DataBroker dataBroker;
60     final ManagedNewTransactionRunner txRunner;
61     private final HwvtepNodeHACache hwvtepNodeHACache;
62     private final Class<D> datastoreType;
63     private final Function<DataObject, String> noLogicalSwitch = (data) -> "No_Ls";
64
65     private final Labeled<Labeled<Labeled<Labeled<Labeled<Meter>>>>> childModCounter;
66     private final Labeled<Labeled<Labeled<Meter>>> nodeModCounter;
67     private final boolean updateMetrics;
68
69     private static final ImmutableMap<Class, Function<DataObject, String>> LOGICAL_SWITCH_EXTRACTOR =
70         new ImmutableMap.Builder<Class, Function<DataObject, String>>()
71             .put(LogicalSwitches.class, data -> ((LogicalSwitches) data).getHwvtepNodeName().getValue())
72             .put(RemoteMcastMacs.class,
73                 data -> logicalSwitchNameFromIid(((RemoteMcastMacs) data).key().getLogicalSwitchRef().getValue()))
74             .put(RemoteUcastMacs.class, data -> logicalSwitchNameFromIid(
75                 ((RemoteUcastMacs) data).key().getLogicalSwitchRef().getValue())).build();
76
77
78     public HwvtepNodeBaseListener(Class<D> datastoreType, DataBroker dataBroker,
79                                   HwvtepNodeHACache hwvtepNodeHACache, MetricProvider metricProvider,
80                                   boolean updateMetrics) throws Exception {
81         this.dataBroker = dataBroker;
82         this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
83         this.datastoreType = datastoreType;
84         this.hwvtepNodeHACache = hwvtepNodeHACache;
85         this.updateMetrics = updateMetrics;
86         this.childModCounter = metricProvider.newMeter(
87                 MetricDescriptor.builder().anchor(this).project("netvirt").module("l2gw").id("child").build(),
88                 "datastore", "modification", "class", "nodeid", "logicalswitch");
89         this.nodeModCounter = metricProvider.newMeter(
90                 MetricDescriptor.builder().anchor(this).project("netvirt").module("l2gw").id("node").build(),
91                 "datastore", "modification", "nodeid");
92         final DataTreeIdentifier<Node> treeId =
93             new DataTreeIdentifier<>(Datastore.toType(datastoreType), getWildcardPath());
94         TaskRetryLooper looper = new TaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
95         registration = looper.loopUntilNoException(() ->
96                 dataBroker.registerDataTreeChangeListener(treeId, HwvtepNodeBaseListener.this));
97     }
98
99     protected DataBroker getDataBroker() {
100         return dataBroker;
101     }
102
103     protected HwvtepNodeHACache getHwvtepNodeHACache() {
104         return hwvtepNodeHACache;
105     }
106
107     /**
108      * If Normal non-ha node changes to HA node , its added to HA cache.
109      *
110      * @param childPath HA child path which got converted to HA node
111      * @param updatedChildNode updated Child node
112      * @param beforeChildNode non-ha node before updated to HA node
113      */
114     protected void addToHACacheIfBecameHAChild(InstanceIdentifier<Node> childPath, Node updatedChildNode,
115                                                Node beforeChildNode) {
116         HwvtepGlobalAugmentation updatedAugmentaion = updatedChildNode.augmentation(HwvtepGlobalAugmentation.class);
117         HwvtepGlobalAugmentation beforeAugmentaion = null;
118         if (beforeChildNode != null) {
119             beforeAugmentaion = beforeChildNode.augmentation(HwvtepGlobalAugmentation.class);
120         }
121         List<Managers> up = null;
122         List<Managers> be = null;
123         if (updatedAugmentaion != null) {
124             up = updatedAugmentaion.getManagers();
125         }
126         if (beforeAugmentaion != null) {
127             be = beforeAugmentaion.getManagers();
128         }
129
130         if (up != null && be != null && up.size() > 0 && be.size() > 0) {
131             Managers m1 = up.get(0);
132             Managers m2 = be.get(0);
133             if (!m1.equals(m2)) {
134                 LOG.trace("Manager entry updated for node {} ", updatedChildNode.getNodeId().getValue());
135                 HwvtepHAUtil.addToCacheIfHAChildNode(childPath, updatedChildNode, hwvtepNodeHACache);
136             }
137         }
138     }
139
140     @Override
141     public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
142         HAJobScheduler.getInstance().submitJob(() -> LoggingFutures.addErrorLogging(
143             txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType, tx -> {
144                 processConnectedNodes(changes, tx);
145                 processUpdatedNodes(changes, tx);
146                 processDisconnectedNodes(changes, tx);
147             }), LOG, "Error processing data-tree changes"));
148     }
149
150     private void processUpdatedNodes(Collection<DataTreeModification<Node>> changes,
151                                      TypedReadWriteTransaction<D> tx)
152             throws ReadFailedException, ExecutionException, InterruptedException {
153         for (DataTreeModification<Node> change : changes) {
154             final InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
155             final DataObjectModification<Node> mod = change.getRootNode();
156             String nodeId = key.firstKeyOf(Node.class).getNodeId().getValue();
157             Node updated = HwvtepHAUtil.getUpdated(mod);
158             Node original = HwvtepHAUtil.getOriginal(mod);
159             updateCounters(nodeId, mod.getModifiedChildren());
160             if (updated != null && original != null) {
161                 DataObjectModification subMod;
162                 if (!nodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
163                     onGlobalNodeUpdate(key, updated, original, mod, tx);
164                     subMod = change.getRootNode().getModifiedAugmentation(HwvtepGlobalAugmentation.class);
165                 } else {
166                     onPsNodeUpdate(updated, mod, tx);
167                     subMod = change.getRootNode().getModifiedAugmentation(PhysicalSwitchAugmentation.class);
168                 }
169                 if (subMod != null) {
170                     updateCounters(nodeId, subMod.getModifiedChildren());
171                 }
172             }
173         }
174     }
175
176     private String logicalSwitchNameFromChildMod(DataObjectModification<? extends DataObject> childMod) {
177         DataObject dataAfter = childMod.getDataAfter();
178         DataObject data = dataAfter != null ? dataAfter : childMod.getDataBefore();
179         return LOGICAL_SWITCH_EXTRACTOR.getOrDefault(childMod.getModificationType().getClass(), noLogicalSwitch)
180                 .apply(data);
181     }
182
183     private static String logicalSwitchNameFromIid(InstanceIdentifier<?> input) {
184         InstanceIdentifier<LogicalSwitches> iid = (InstanceIdentifier<LogicalSwitches>)input;
185         return iid.firstKeyOf(LogicalSwitches.class).getHwvtepNodeName().getValue();
186     }
187
188     private void updateCounters(String nodeId,
189                                 Collection<? extends DataObjectModification<? extends DataObject>> childModCollection) {
190         if (childModCollection == null || !updateMetrics) {
191             return;
192         }
193         childModCollection.forEach(childMod -> {
194             String childClsName = childMod.getDataType().getClass().getSimpleName();
195             String modificationType = childMod.getModificationType().toString();
196             String logicalSwitchName = logicalSwitchNameFromChildMod(childMod);
197             childModCounter.label(Datastore.toType(datastoreType).name())
198                     .label(modificationType)
199                     .label(childClsName)
200                     .label(nodeId)
201                     .label(logicalSwitchName).mark();
202         });
203     }
204
205     private void processDisconnectedNodes(Collection<DataTreeModification<Node>> changes,
206                                           TypedReadWriteTransaction<D> tx)
207             throws InterruptedException, ExecutionException, ReadFailedException {
208         for (DataTreeModification<Node> change : changes) {
209             final InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
210             final DataObjectModification<Node> mod = change.getRootNode();
211             Node deleted = HwvtepHAUtil.getRemoved(mod);
212             String nodeId = key.firstKeyOf(Node.class).getNodeId().getValue();
213             if (deleted != null) {
214                 if (updateMetrics) {
215                     nodeModCounter.label(Datastore.toType(datastoreType).name())
216                             .label(DataObjectModification.ModificationType.DELETE.name()).label(nodeId).mark();
217                 }
218                 if (!nodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
219                     LOG.trace("Handle global node delete {}", deleted.getNodeId().getValue());
220                     onGlobalNodeDelete(key, deleted, tx);
221                 } else {
222                     LOG.trace("Handle ps node node delete {}", deleted.getNodeId().getValue());
223                     onPsNodeDelete(key, deleted, tx);
224                 }
225             }
226         }
227     }
228
229     void processConnectedNodes(Collection<DataTreeModification<Node>> changes,
230                                TypedReadWriteTransaction<D> tx)
231             throws ExecutionException, InterruptedException {
232         for (DataTreeModification<Node> change : changes) {
233             InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
234             DataObjectModification<Node> mod = change.getRootNode();
235             Node node = HwvtepHAUtil.getCreated(mod);
236             String nodeId = key.firstKeyOf(Node.class).getNodeId().getValue();
237             if (node != null) {
238                 if (updateMetrics) {
239                     nodeModCounter.label(Datastore.toType(datastoreType).name())
240                             .label(DataObjectModification.ModificationType.WRITE.name()).label(nodeId).mark();
241                 }
242                 if (!nodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
243                     LOG.trace("Handle global node add {}", node.getNodeId().getValue());
244                     onGlobalNodeAdd(key, node, tx);
245                 } else {
246                     LOG.trace("Handle ps node add {}", node.getNodeId().getValue());
247                     onPsNodeAdd(key, node, tx);
248                 }
249             }
250         }
251     }
252
253     private InstanceIdentifier<Node> getWildcardPath() {
254         InstanceIdentifier<Node> path = InstanceIdentifier
255                 .create(NetworkTopology.class)
256                 .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID))
257                 .child(Node.class);
258         return path;
259     }
260
261     @Override
262     @PreDestroy
263     public void close() {
264         if (registration != null) {
265             registration.close();
266         }
267     }
268
269     //default methods
270     void onGlobalNodeDelete(InstanceIdentifier<Node> key, Node added, TypedReadWriteTransaction<D> tx)
271         throws ReadFailedException, ExecutionException, InterruptedException {
272     }
273
274     void onPsNodeDelete(InstanceIdentifier<Node> key, Node addedPSNode, TypedReadWriteTransaction<D> tx)
275         throws ReadFailedException, ExecutionException, InterruptedException {
276
277     }
278
279     void onGlobalNodeAdd(InstanceIdentifier<Node> key, Node added, TypedReadWriteTransaction<D> tx) {
280
281     }
282
283     void onPsNodeAdd(InstanceIdentifier<Node> key, Node addedPSNode, TypedReadWriteTransaction<D> tx)
284             throws InterruptedException, ExecutionException {
285
286     }
287
288     void onGlobalNodeUpdate(InstanceIdentifier<Node> key, Node updated, Node original,
289                             DataObjectModification<Node> mod, TypedReadWriteTransaction<D> tx)
290             throws ReadFailedException, InterruptedException, ExecutionException {
291
292     }
293
294     void onPsNodeUpdate(Node updated,
295                         DataObjectModification<Node> mod, TypedReadWriteTransaction<D> tx)
296             throws ReadFailedException, InterruptedException, ExecutionException {
297
298     }
299
300 }