make sure ovsdb connected node appears in operds
[ovsdb.git] / southbound / southbound-impl / src / main / java / org / opendaylight / ovsdb / southbound / OvsdbOperGlobalListener.java
1 /*
2  * Copyright (c) 2019 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.southbound;
10
11 import java.util.Collection;
12 import java.util.Map;
13 import java.util.concurrent.ConcurrentHashMap;
14 import java.util.concurrent.ConcurrentMap;
15
16 import java.util.concurrent.ScheduledFuture;
17 import java.util.concurrent.TimeUnit;
18 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
19 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
20 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
21 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
22 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.ovsdb.southbound.transactions.md.TransactionInvoker;
25 import org.opendaylight.ovsdb.utils.mdsal.utils.Scheduler;
26 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
27 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
28 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
29 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
30 import org.opendaylight.yangtools.concepts.ListenerRegistration;
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 public class OvsdbOperGlobalListener implements ClusteredDataTreeChangeListener<Node>, AutoCloseable {
36
37     private static final Logger LOG = LoggerFactory.getLogger(OvsdbOperGlobalListener.class);
38     private ListenerRegistration<OvsdbOperGlobalListener> registration;
39     private DataBroker db;
40     public static final ConcurrentMap<InstanceIdentifier<Node>, Node> OPER_NODE_CACHE = new ConcurrentHashMap<>();
41     private final OvsdbConnectionManager ovsdbConnectionManager;
42     private final TransactionInvoker txInvoker;
43
44
45     OvsdbOperGlobalListener(DataBroker db, OvsdbConnectionManager ovsdbConnectionManager,
46                             TransactionInvoker txInvoker) {
47         LOG.info("Registering OvsdbOperGlobalListener");
48         this.db = db;
49         this.ovsdbConnectionManager = ovsdbConnectionManager;
50         this.txInvoker = txInvoker;
51         registerListener();
52     }
53
54     public void registerListener() {
55         DataTreeIdentifier<Node> treeId =
56                 new DataTreeIdentifier<Node>(LogicalDatastoreType.OPERATIONAL, getWildcardPath());
57         registration = db.registerDataTreeChangeListener(treeId, this);
58     }
59
60     @Override
61     public void close() {
62         if (registration != null) {
63             registration.close();
64             LOG.info("OVSDB Oper Node listener has been closed.");
65         }
66     }
67
68     @Override
69     @SuppressWarnings("checkstyle:IllegalCatch")
70     public void onDataTreeChanged(Collection<DataTreeModification<Node>> changes) {
71         changes.forEach((change) -> {
72             try {
73                 InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
74                 DataObjectModification<Node> mod = change.getRootNode();
75                 Node addNode = getCreated(mod);
76                 if (addNode != null) {
77                     OPER_NODE_CACHE.put(key, addNode);
78                     LOG.info("Node added to oper {}", SouthboundUtil.getOvsdbNodeId(key));
79                 }
80                 Node removedNode = getRemoved(mod);
81                 if (removedNode != null) {
82                     OPER_NODE_CACHE.remove(key);
83                     LOG.info("Node deleted from oper {}", SouthboundUtil.getOvsdbNodeId(key));
84
85                     OvsdbConnectionInstance connectionInstance = ovsdbConnectionManager.getConnectionInstance(key);
86                     if (connectionInstance != null && connectionInstance.isActive()
87                             && connectionInstance.getHasDeviceOwnership() != null
88                             && connectionInstance.getHasDeviceOwnership()) {
89                         //Oops some one deleted the node held by me This should never happen.
90                         //put the node back in oper
91                         txInvoker.invoke(transaction -> {
92                             transaction.put(LogicalDatastoreType.OPERATIONAL, key, removedNode);
93                         });
94
95                     }
96                 }
97
98                 Node modifiedNode = getUpdated(mod);
99                 if (modifiedNode != null) {
100                     OPER_NODE_CACHE.put(key, modifiedNode);
101                 }
102             } catch (Exception e) {
103                 LOG.error("Failed to handle oper node ", e);
104             }
105         });
106     }
107
108     private static final Map<InstanceIdentifier<Node>, ScheduledFuture> TIMEOUT_FTS = new ConcurrentHashMap<>();
109
110     public static void runAfterTimeoutIfNodeNotCreated(InstanceIdentifier<Node> iid, Runnable job) {
111         ScheduledFuture<?> ft = TIMEOUT_FTS.get(iid);
112         if (ft != null) {
113             ft.cancel(false);
114         }
115         ft = Scheduler.getScheduledExecutorService().schedule(() -> {
116             TIMEOUT_FTS.remove(iid);
117             if (!OPER_NODE_CACHE.containsKey(iid)) {
118                 job.run();
119             }
120         }, SouthboundConstants.EOS_TIMEOUT, TimeUnit.SECONDS);
121         TIMEOUT_FTS.put(iid, ft);
122     }
123
124     private Node getCreated(DataObjectModification<Node> mod) {
125         if ((mod.getModificationType() == DataObjectModification.ModificationType.WRITE)
126                 && (mod.getDataBefore() == null)) {
127             return mod.getDataAfter();
128         }
129         return null;
130     }
131
132     private Node getRemoved(DataObjectModification<Node> mod) {
133         if (mod.getModificationType() == DataObjectModification.ModificationType.DELETE) {
134             return mod.getDataBefore();
135         }
136         return null;
137     }
138
139     private Node getUpdated(DataObjectModification<Node> mod) {
140         Node node = null;
141         switch (mod.getModificationType()) {
142             case SUBTREE_MODIFIED:
143                 node = mod.getDataAfter();
144                 break;
145             case WRITE:
146                 if (mod.getDataBefore() !=  null) {
147                     node = mod.getDataAfter();
148                 }
149                 break;
150             default:
151                 break;
152         }
153         return node;
154     }
155
156     private InstanceIdentifier<Node> getWildcardPath() {
157         InstanceIdentifier<Node> path = InstanceIdentifier
158                 .create(NetworkTopology.class)
159                 .child(Topology.class, new TopologyKey(SouthboundConstants.OVSDB_TOPOLOGY_ID))
160                 .child(Node.class);
161         return path;
162     }
163
164 }