2 * Copyright © 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
10 import java.util.Collection;
11 import java.util.concurrent.ExecutionException;
12 import javax.annotation.PreDestroy;
13 import org.opendaylight.genius.datastoreutils.TaskRetryLooper;
14 import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
15 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
16 import org.opendaylight.mdsal.binding.api.DataBroker;
17 import org.opendaylight.mdsal.binding.api.DataObjectModification;
18 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
19 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
20 import org.opendaylight.mdsal.binding.api.DataTreeModification;
21 import org.opendaylight.mdsal.binding.util.Datastore;
22 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
23 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunnerImpl;
24 import org.opendaylight.mdsal.binding.util.TypedReadWriteTransaction;
25 import org.opendaylight.mdsal.common.api.ReadFailedException;
26 import org.opendaylight.netvirt.elan.l2gw.ha.BatchedTransaction;
27 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
28 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
29 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
30 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
31 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
32 import org.opendaylight.yangtools.concepts.ListenerRegistration;
33 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 public abstract class HwvtepNodeBaseListener<D extends Datastore> implements
38 DataTreeChangeListener<Node>, AutoCloseable {
40 private static final Logger LOG = LoggerFactory.getLogger(HwvtepNodeBaseListener.class);
41 private static final int STARTUP_LOOP_TICK = 500;
42 private static final int STARTUP_LOOP_MAX_RETRIES = 8;
44 static HwvtepHACache hwvtepHACache = HwvtepHACache.getInstance();
46 private ListenerRegistration<HwvtepNodeBaseListener> registration;
47 private final DataBroker dataBroker;
48 private final Class<D> datastoreType;
49 protected final ManagedNewTransactionRunner txRunner;
52 public HwvtepNodeBaseListener(Class<D> datastoreType, DataBroker dataBroker) throws Exception {
53 this.dataBroker = dataBroker;
54 this.datastoreType = datastoreType;
55 this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
56 registerListener(datastoreType, dataBroker);
59 protected void registerListener(Class<D> dsType, DataBroker broker) throws Exception {
60 final DataTreeIdentifier<Node> treeId = DataTreeIdentifier.create(Datastore.toType(dsType),
62 TaskRetryLooper looper = new TaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
63 registration = looper.loopUntilNoException(() ->
64 broker.registerDataTreeChangeListener(treeId, HwvtepNodeBaseListener.this));
67 protected DataBroker getDataBroker() {
72 public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
73 // Batch Transaction used to internally submit to ResourceBatching Manager here
74 HAJobScheduler.getInstance().submitJob(() -> {
75 TypedReadWriteTransaction tx = getTx();
77 processConnectedNodes(changes, tx);
78 processUpdatedNodes(changes, tx);
79 processDisconnectedNodes(changes, tx);
81 } catch (InterruptedException | ExecutionException | ReadFailedException e) {
82 LOG.error("Error processing data-tree changes", e);
87 @SuppressWarnings("illegalcatch")
88 private void processUpdatedNodes(Collection<DataTreeModification<Node>> changes,
89 TypedReadWriteTransaction<D> tx)
90 throws ReadFailedException, ExecutionException, InterruptedException {
91 for (DataTreeModification<Node> change : changes) {
92 final InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
93 final DataObjectModification<Node> mod = change.getRootNode();
94 String nodeId = key.firstKeyOf(Node.class).getNodeId().getValue();
95 Node updated = HwvtepHAUtil.getUpdated(mod);
96 Node original = HwvtepHAUtil.getOriginal(mod);
98 if (updated != null && original != null) {
99 if (!nodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
100 onGlobalNodeUpdate(key, updated, original, mod, tx);
102 onPsNodeUpdate(updated, mod, tx);
105 } catch (Exception e) {
106 LOG.error("Exception during Processing Updated Node {}", nodeId, e);
111 @SuppressWarnings("checkstyle:IllegalCatch")
112 private void processDisconnectedNodes(Collection<DataTreeModification<Node>> changes,
113 TypedReadWriteTransaction<D> tx)
114 throws InterruptedException, ExecutionException, ReadFailedException {
116 for (DataTreeModification<Node> change : changes) {
117 final InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
118 final DataObjectModification<Node> mod = change.getRootNode();
119 Node deleted = HwvtepHAUtil.getRemoved(mod);
120 String nodeId = key.firstKeyOf(Node.class).getNodeId().getValue();
121 if (deleted != null) {
122 if (!nodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
123 LOG.trace("Handle global node delete {}", deleted.getNodeId().getValue());
124 onGlobalNodeDelete(key, deleted, tx);
126 LOG.trace("Handle ps node node delete {}", deleted.getNodeId().getValue());
127 onPsNodeDelete(key, deleted, tx);
133 @SuppressWarnings("checkstyle:IllegalCatch")
134 void processConnectedNodes(Collection<DataTreeModification<Node>> changes,
135 TypedReadWriteTransaction<D> tx) {
136 for (DataTreeModification<Node> change : changes) {
138 InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
139 DataObjectModification<Node> mod = change.getRootNode();
140 Node node = HwvtepHAUtil.getCreated(mod);
141 String nodeId = key.firstKeyOf(Node.class).getNodeId().getValue();
144 if (!nodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
145 LOG.trace("Handle global node add {}", node.getNodeId().getValue());
146 onGlobalNodeAdd(key, node, tx);
148 LOG.trace("Handle ps node add {}", node.getNodeId().getValue());
149 onPsNodeAdd(key, node, tx);
152 } catch (ExecutionException | InterruptedException e) {
153 LOG.error("Exception during Processing Connected Node {}", nodeId, e);
158 private InstanceIdentifier<Node> getWildcardPath() {
159 InstanceIdentifier<Node> path = InstanceIdentifier
160 .create(NetworkTopology.class)
161 .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID))
168 public void close() {
169 if (registration != null) {
170 registration.close();
174 TypedReadWriteTransaction<D> getTx() {
175 return new BatchedTransaction(datastoreType);
179 void onGlobalNodeDelete(InstanceIdentifier<Node> key, Node added, TypedReadWriteTransaction<D> tx)
180 throws ExecutionException, InterruptedException {
183 void onPsNodeDelete(InstanceIdentifier<Node> key, Node addedPSNode, TypedReadWriteTransaction<D> tx)
184 throws ExecutionException, InterruptedException {
187 void onGlobalNodeAdd(InstanceIdentifier<Node> key, Node added, TypedReadWriteTransaction<D> tx) {
190 void onPsNodeAdd(InstanceIdentifier<Node> key, Node addedPSNode, TypedReadWriteTransaction<D> tx)
191 throws InterruptedException, ExecutionException {
194 void onGlobalNodeUpdate(InstanceIdentifier<Node> key, Node updated, Node original,
195 DataObjectModification<Node> mod, TypedReadWriteTransaction<D> tx) {
198 void onPsNodeUpdate(Node updated,
199 DataObjectModification<Node> mod, TypedReadWriteTransaction<D> tx) {