2 * Copyright (c) 2017 Cisco Systems, Inc. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.lispflowmapping.neutron.intenthandler.listener;
10 import com.google.common.collect.ArrayListMultimap;
11 import com.google.common.collect.Multimap;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.List;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.TimeoutException;
22 import org.opendaylight.lispflowmapping.lisp.util.LispAddressUtil;
23 import org.opendaylight.lispflowmapping.neutron.intenthandler.IntentHandlerAsyncExecutorProvider;
24 import org.opendaylight.lispflowmapping.neutron.intenthandler.util.VppNetconfConnectionProbe;
25 import org.opendaylight.lispflowmapping.neutron.intenthandler.util.VppNodeReader;
26 import org.opendaylight.lispflowmapping.neutron.mappingmanager.HostInformationManager;
27 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
28 import org.opendaylight.mdsal.binding.api.DataBroker;
29 import org.opendaylight.mdsal.binding.api.DataObjectModification;
30 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
31 import org.opendaylight.mdsal.binding.api.DataTreeModification;
32 import org.opendaylight.mdsal.binding.api.MountPointService;
33 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
34 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
35 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
36 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
37 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
38 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.node.attributes.SupportingNode;
42 import org.opendaylight.yangtools.concepts.ListenerRegistration;
43 import org.opendaylight.yangtools.yang.binding.DataObject;
44 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
45 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
50 * Created by Shakib Ahmed on 1/11/17.
52 public class VppEndpointListener implements AutoCloseable, ClusteredDataTreeChangeListener<Topology> {
53 private static final Logger LOG = LoggerFactory.getLogger(VppEndpointListener.class);
55 private final DataBroker dataBroker;
56 private final MountPointService mountService;
57 private final ListenerRegistration<?> reg;
59 private final VppNodeReader vppNodeReader;
60 private final HostInformationManager hostInformationManager;
62 private final Multimap<NodeId, KeyedInstanceIdentifier<Node, NodeKey>> nodeIdToKeyedInstanceIdentifierMap =
63 ArrayListMultimap.create();
65 private final ListeningExecutorService executorService;
67 public VppEndpointListener(final DataBroker dataBroker,
68 final MountPointService mountPointService,
69 KeyedInstanceIdentifier<Topology, TopologyKey> topologyII) {
71 this.dataBroker = dataBroker;
72 this.mountService = mountPointService;
74 vppNodeReader = new VppNodeReader(this.dataBroker, this.mountService);
76 hostInformationManager = HostInformationManager.getInstance();
78 reg = dataBroker.registerDataTreeChangeListener(
79 DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION, topologyII), this);
81 executorService = IntentHandlerAsyncExecutorProvider.getInstace().getExecutor();
85 public void onDataTreeChanged(Collection<DataTreeModification<Topology>> changes) {
86 for (DataTreeModification<Topology> change : changes) {
87 final DataObjectModification<Topology> modification = change.getRootNode();
88 ListenableFuture<Void> modificationTaskHandler;
89 switch (modification.getModificationType()) {
91 modificationTaskHandler = handleChange(modification);
93 case SUBTREE_MODIFIED:
94 modificationTaskHandler = handleChange(modification);
97 modificationTaskHandler = handleDeleteOnTopology();
100 LOG.warn("Ignored topology modification {}", modification);
101 modificationTaskHandler = Futures.immediateFuture(null);
104 Futures.addCallback(modificationTaskHandler, new FutureCallback<Void>() {
106 public void onSuccess(Void vd) {
107 LOG.debug("VppEndpoint modification handled successfully!");
111 public void onFailure(Throwable throwable) {
112 LOG.debug("Failed to handle VppEndpoint modifications!");
114 }, MoreExecutors.directExecutor());
118 private ListenableFuture<Void> handleChange(DataObjectModification modification) {
119 Collection<DataObjectModification<? extends DataObject>> modifiedChildren = modification.getModifiedChildren();
120 List<ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>>> processingTasks = new ArrayList<>();
121 for (DataObjectModification modifiedNode : modifiedChildren) {
122 final Node newOrModifiedNode = (Node) modifiedNode.getDataAfter();
123 ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>> processingTask = processNode(newOrModifiedNode);
124 Futures.addCallback(processingTask, new FutureCallback<KeyedInstanceIdentifier<Node, NodeKey>>() {
126 public void onSuccess(KeyedInstanceIdentifier<Node, NodeKey> kiiToNode) {
127 hostInformationManager.addHostRelatedInfo(newOrModifiedNode.getNodeId().getValue(),
128 LispAddressUtil.toRloc(vppNodeReader.rlocIpOfNode(kiiToNode)));
132 public void onFailure(Throwable throwable) {
133 LOG.debug("Couldn't process {}", newOrModifiedNode.getNodeId().getValue());
135 }, MoreExecutors.directExecutor());
136 processingTasks.add(processNode(newOrModifiedNode));
138 return Futures.immediateFuture(null);
141 private ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>> processNode(final Node newOrModifiedNode) {
142 ListenableFuture<Void> probeVppNodeForConnection = executorService
144 processNodeOnConnection(newOrModifiedNode);
148 return Futures.transform(probeVppNodeForConnection,
149 vd -> nodeIdToKeyedInstanceIdentifierMap.get(newOrModifiedNode.getNodeId()).iterator().next(),
150 MoreExecutors.directExecutor());
153 private void processNodeOnConnection(final Node newOrModifiedNode) {
154 for (SupportingNode supportingNode : newOrModifiedNode.getSupportingNode()) {
155 final NodeId nodeMount = supportingNode.getNodeRef();
156 final VppNetconfConnectionProbe probe = new VppNetconfConnectionProbe(supportingNode.getNodeRef(),
160 // Verify netconf connection
161 boolean connectionReady = probe.startProbing();
162 if (connectionReady) {
163 LOG.debug("Node {} is connected, creating ...", supportingNode.getNodeRef());
164 final TopologyId topologyMount = supportingNode.getTopologyRef();
165 final KeyedInstanceIdentifier<Node, NodeKey> iiToVpp =
166 InstanceIdentifier.create(NetworkTopology.class)
167 .child(Topology.class, new TopologyKey(topologyMount))
168 .child(Node.class, new NodeKey(nodeMount));
169 nodeIdToKeyedInstanceIdentifierMap.put(newOrModifiedNode.getNodeId(), iiToVpp);
171 LOG.debug("Failed while connecting to node {}", supportingNode.getNodeRef());
173 } catch (InterruptedException | ExecutionException e) {
174 LOG.warn("Exception while processing node {} ... ", supportingNode.getNodeRef(), e);
175 } catch (TimeoutException e) {
176 LOG.warn("Node {} was not connected within {} seconds. "
177 + "Check node configuration and connectivity to proceed",
178 supportingNode.getNodeRef(), VppNetconfConnectionProbe.NODE_CONNECTION_TIMER);
183 private ListenableFuture<Void> handleDeleteOnTopology() {
185 return Futures.immediateFuture(null);
189 public void close() {