* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.ovsdb.hwvtepsouthbound;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Timer;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataObjectModification;
+import org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType;
+import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
+import org.opendaylight.mdsal.binding.api.DataTreeModification;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.ovsdb.utils.mdsal.utils.Scheduler;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.ConnectionInfo;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HwvtepOperGlobalListener implements ClusteredDataTreeChangeListener<Node>, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(HwvtepOperGlobalListener.class);
+ private static final Map<InstanceIdentifier<Node>, ConnectionInfo> NODE_CONNECTION_INFO = new ConcurrentHashMap<>();
+ private static final Map<InstanceIdentifier<Node>, ScheduledFuture> TIMEOUT_FTS = new ConcurrentHashMap<>();
- private final Timer timer = new Timer();
private ListenerRegistration<HwvtepOperGlobalListener> registration;
private final HwvtepConnectionManager hcm;
private final DataBroker db;
- private final Map<YangInstanceIdentifier, Node> connectedNodes = new ConcurrentHashMap<>();
+ private static final Map<InstanceIdentifier<Node>, List<Callable<Void>>> NODE_DELET_WAITING_JOBS
+ = new ConcurrentHashMap<>();
+ private static final Map<InstanceIdentifier<Node>, Node> CONNECTED_NODES = new ConcurrentHashMap<>();
+
- HwvtepOperGlobalListener(DataBroker db, HwvtepConnectionManager hcm) {
+ HwvtepOperGlobalListener(final DataBroker db, HwvtepConnectionManager hcm) {
LOG.info("Registering HwvtepOperGlobalListener");
this.db = db;
this.hcm = hcm;
private void registerListener() {
final DataTreeIdentifier<Node> treeId =
- new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, getWildcardPath());
+ DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, getWildcardPath());
registration = db.registerDataTreeChangeListener(treeId, HwvtepOperGlobalListener.this);
}
}
@Override
- public void onDataTreeChanged(Collection<DataTreeModification<Node>> changes) {
- changes.forEach(change -> {
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
+ LOG.trace("onDataTreeChanged: ");
+ try {
+ connect(changes);
+ updated(changes);
+ disconnect(changes);
+ } catch (Exception e) {
+ LOG.error("Failed to handle dcn event ", e);
+ }
+ }
+
+ public static void runAfterTimeoutIfNodeNotCreated(InstanceIdentifier<Node> iid, Runnable job) {
+ ScheduledFuture<?> ft = TIMEOUT_FTS.get(iid);
+ if (ft != null) {
+ ft.cancel(false);
+ }
+ ft = Scheduler.getScheduledExecutorService().schedule(() -> {
+ TIMEOUT_FTS.remove(iid);
+ if (!NODE_CONNECTION_INFO.containsKey(iid)) {
+ job.run();
+ }
+ }, HwvtepSouthboundConstants.EOS_TIMEOUT, TimeUnit.SECONDS);
+ TIMEOUT_FTS.put(iid, ft);
+ }
+
+ public void runAfterNodeDeleted(InstanceIdentifier<Node> iid, Callable<Void> job) throws Exception {
+ synchronized (HwvtepOperGlobalListener.class) {
+ if (NODE_DELET_WAITING_JOBS.containsKey(iid)) {
+ LOG.error("Node present in the cache {} adding to delete queue", iid);
+ NODE_DELET_WAITING_JOBS.get(iid).add(job);
+ //Also delete the node so that reconciliation kicks in
+ deleteTheNodeOfOldConnection(iid, getNodeConnectionInfo(iid));
+ HwvtepSouthboundUtil.getScheduledExecutorService().schedule(() -> {
+ runPendingJobs(iid);
+ }, HwvtepSouthboundConstants.HWVTEP_REGISTER_CALLBACKS_WAIT_TIMEOUT, TimeUnit.SECONDS);
+ } else {
+ LOG.info("Node not present in the cache {} running the job now", iid);
+ job.call();
+ }
+ }
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private synchronized void runPendingJobs(InstanceIdentifier<Node> iid) {
+ List<Callable<Void>> jobs = NODE_DELET_WAITING_JOBS.remove(iid);
+ if (jobs != null && !jobs.isEmpty()) {
+ jobs.forEach((job) -> {
+ try {
+ LOG.error("Node disconnected job found {} running it now ", iid);
+ job.call();
+ } catch (Exception e) {
+ LOG.error("Failed to run callable ", e);
+ }
+ });
+ jobs.clear();
+ }
+ }
+
+ private void connect(Collection<DataTreeModification<Node>> changes) {
+ changes.forEach((change) -> {
InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
DataObjectModification<Node> mod = change.getRootNode();
- InstanceIdentifier<Node> nodeIid = change.getRootPath().getRootIdentifier();
- YangInstanceIdentifier entityId =
- HwvtepSouthboundUtil.getInstanceIdentifierCodec().getYangInstanceIdentifier(nodeIid);
Node node = getCreated(mod);
- if (node != null) {
- connectedNodes.put(entityId, node);
+ if (node == null) {
+ return;
+ }
+ CONNECTED_NODES.put(key, node);
+ ScheduledFuture ft = TIMEOUT_FTS.remove(key);
+ if (ft != null) {
+ ft.cancel(false);
+ }
+ HwvtepGlobalAugmentation globalAugmentation = node.augmentation(HwvtepGlobalAugmentation.class);
+ if (globalAugmentation != null) {
+ ConnectionInfo connectionInfo = globalAugmentation.getConnectionInfo();
+ if (connectionInfo != null) {
+ NODE_CONNECTION_INFO.put(key, connectionInfo);
+ }
}
- node = getRemoved(mod);
if (node != null) {
- connectedNodes.remove(entityId);
- HwvtepConnectionInstance connectionInstance = hcm.getConnectionInstanceFromNodeIid(nodeIid);
- if (Objects.equals(connectionInstance.getConnectionInfo().getRemotePort(),
- HwvtepSouthboundUtil.getRemotePort(node))) {
- //Oops some one deleted the node held by me This should never happen
- try {
- connectionInstance.refreshOperNode();
- } catch (ExecutionException | InterruptedException e) {
- LOG.error("Failed to refresh operational nodes ", e);
- }
+ synchronized (HwvtepOperGlobalListener.class) {
+ NODE_DELET_WAITING_JOBS.putIfAbsent(key, new ArrayList<>());
}
+ }
+ });
+ }
+ private void updated(Collection<DataTreeModification<Node>> changes) {
+ changes.forEach((change) -> {
+ InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
+ DataObjectModification<Node> mod = change.getRootNode();
+ Node node = getUpdated(mod);
+ if (node != null) {
+ CONNECTED_NODES.put(key, node);
}
});
}
- private Node getCreated(DataObjectModification<Node> mod) {
+ public static Node getNode(final InstanceIdentifier<Node> key) {
+ return CONNECTED_NODES.get(key);
+ }
+
+ private void disconnect(Collection<DataTreeModification<Node>> changes) {
+ changes.forEach((change) -> {
+ InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
+ DataObjectModification<Node> mod = change.getRootNode();
+ Node node = getRemoved(mod);
+ if (node != null) {
+ CONNECTED_NODES.remove(key);
+ NODE_CONNECTION_INFO.remove(key);
+ synchronized (HwvtepOperGlobalListener.class) {
+ runPendingJobs(key);
+ }
+ }
+ });
+ }
+
+ private static String getNodeId(InstanceIdentifier<Node> iid) {
+ return iid.firstKeyOf(Node.class).getNodeId().getValue();
+ }
+
+ public void scheduleOldConnectionNodeDelete(InstanceIdentifier<Node> iid) {
+ ConnectionInfo oldConnectionInfo = getNodeConnectionInfo(iid);
+ HwvtepSouthboundUtil.getScheduledExecutorService().schedule(() -> {
+ deleteTheNodeOfOldConnection(iid, oldConnectionInfo);
+ }, HwvtepSouthboundConstants.STALE_HWVTEP_CLEANUP_DELAY_SECS, TimeUnit.SECONDS);
+ }
+
+ private void deleteTheNodeOfOldConnection(InstanceIdentifier<Node> iid,
+ ConnectionInfo oldConnectionInfo) {
+ if (oldConnectionInfo == null) {
+ return;
+ }
+ ConnectionInfo latestConnectionInfo = getNodeConnectionInfo(iid);
+ if (Objects.equals(latestConnectionInfo, oldConnectionInfo)) {
+ //Still old connection node is not deleted
+ LOG.debug("Delete Node {} from oper ", getNodeId(iid));
+ hcm.cleanupOperationalNode(iid);
+ }
+ }
+
+ private static ConnectionInfo getNodeConnectionInfo(InstanceIdentifier<Node> iid) {
+ return NODE_CONNECTION_INFO.get(iid);
+ }
+
+ private static Node getCreated(final DataObjectModification<Node> mod) {
if (mod.getModificationType() == ModificationType.WRITE && mod.getDataBefore() == null) {
return mod.getDataAfter();
}
return null;
}
- private Node getRemoved(DataObjectModification<Node> mod) {
+ private static Node getRemoved(final DataObjectModification<Node> mod) {
if (mod.getModificationType() == ModificationType.DELETE) {
return mod.getDataBefore();
}
return null;
}
- public Map<YangInstanceIdentifier, Node> getConnectedNodes() {
- return Collections.unmodifiableMap(connectedNodes);
+ private static InstanceIdentifier<Node> getWildcardPath() {
+ return InstanceIdentifier.create(NetworkTopology.class)
+ .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID))
+ .child(Node.class);
}
- private InstanceIdentifier<Node> getWildcardPath() {
- InstanceIdentifier<Node> path = InstanceIdentifier
- .create(NetworkTopology.class)
- .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID))
- .child(Node.class);
- return path;
+ private Node getUpdated(DataObjectModification<Node> mod) {
+ Node node = null;
+ switch (mod.getModificationType()) {
+ case SUBTREE_MODIFIED:
+ node = mod.getDataAfter();
+ break;
+ case WRITE:
+ if (mod.getDataBefore() != null) {
+ node = mod.getDataAfter();
+ }
+ break;
+ default:
+ break;
+ }
+ return node;
}
+
+
}