2 * Copyright (c) 2017 Cisco Systems, Inc. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.lispflowmapping.neutron.intenthandler.listener;
10 import com.google.common.base.Function;
11 import com.google.common.collect.ArrayListMultimap;
12 import com.google.common.collect.Multimap;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.ListeningExecutorService;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeoutException;
23 import javax.annotation.Nonnull;
24 import javax.annotation.Nullable;
26 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
29 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
30 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
31 import org.opendaylight.controller.md.sal.binding.api.MountPointService;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.lispflowmapping.lisp.util.LispAddressUtil;
34 import org.opendaylight.lispflowmapping.neutron.intenthandler.IntentHandlerAsyncExecutorProvider;
35 import org.opendaylight.lispflowmapping.neutron.intenthandler.util.VppNetconfConnectionProbe;
36 import org.opendaylight.lispflowmapping.neutron.intenthandler.util.VppNodeReader;
37 import org.opendaylight.lispflowmapping.neutron.mappingmanager.HostInformationManager;
38 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.node.attributes.SupportingNode;
46 import org.opendaylight.yangtools.concepts.ListenerRegistration;
47 import org.opendaylight.yangtools.yang.binding.DataObject;
48 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
49 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
54 * Created by Shakib Ahmed on 1/11/17.
56 public class VppEndpointListener implements AutoCloseable, ClusteredDataTreeChangeListener<Topology> {
57 private static final Logger LOG = LoggerFactory.getLogger(VppEndpointListener.class);
59 private final DataBroker dataBroker;
60 private final MountPointService mountService;
61 private final ListenerRegistration<?> reg;
63 private final VppNodeReader vppNodeReader;
64 private final HostInformationManager hostInformationManager;
66 private final Multimap<NodeId, KeyedInstanceIdentifier<Node, NodeKey>> nodeIdToKeyedInstanceIdentifierMap =
67 ArrayListMultimap.create();
69 private final ListeningExecutorService executorService;
71 public VppEndpointListener(final DataBroker dataBroker,
72 final MountPointService mountPointService,
73 KeyedInstanceIdentifier<Topology, TopologyKey> topologyII) {
75 this.dataBroker = dataBroker;
76 this.mountService = mountPointService;
78 vppNodeReader = new VppNodeReader(this.dataBroker, this.mountService);
80 hostInformationManager = HostInformationManager.getInstance();
82 reg = dataBroker.registerDataTreeChangeListener(
83 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, topologyII), this);
85 executorService = IntentHandlerAsyncExecutorProvider.getInstace().getExecutor();
89 public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<Topology>> changes) {
90 for (DataTreeModification<Topology> change : changes) {
91 final DataObjectModification<Topology> modification = change.getRootNode();
92 ListenableFuture<Void> modificationTaskHandler;
93 switch (modification.getModificationType()) {
95 modificationTaskHandler = handleChange(modification);
97 case SUBTREE_MODIFIED:
98 modificationTaskHandler = handleChange(modification);
101 modificationTaskHandler = handleDeleteOnTopology();
104 LOG.warn("Ignored topology modification {}", modification);
105 modificationTaskHandler = Futures.immediateFuture(null);
108 Futures.addCallback(modificationTaskHandler, new FutureCallback<Void>() {
110 public void onSuccess(@Nullable Void vd) {
111 LOG.debug("VppEndpoint modification handled successfully!");
115 public void onFailure(Throwable throwable) {
116 LOG.debug("Failed to handle VppEndpoint modifications!");
122 private ListenableFuture<Void> handleChange(DataObjectModification modification) {
123 Collection<DataObjectModification<? extends DataObject>> modifiedChildren = modification.getModifiedChildren();
124 List<ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>>> processingTasks = new ArrayList<>();
125 for (DataObjectModification modifiedNode : modifiedChildren) {
126 final Node newOrModifiedNode = (Node) modifiedNode.getDataAfter();
127 ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>> processingTask = processNode(newOrModifiedNode);
128 Futures.addCallback(processingTask, new FutureCallback<KeyedInstanceIdentifier<Node, NodeKey>>() {
130 public void onSuccess(@Nullable KeyedInstanceIdentifier<Node, NodeKey> kiiToNode) {
131 hostInformationManager.addHostRelatedInfo(newOrModifiedNode.getNodeId().getValue(),
132 LispAddressUtil.toRloc(vppNodeReader.rlocIpOfNode(kiiToNode)));
136 public void onFailure(Throwable throwable) {
137 LOG.debug("Couldn't process {}", newOrModifiedNode.getNodeId().getValue());
140 processingTasks.add(processNode(newOrModifiedNode));
142 return Futures.immediateFuture(null);
145 private ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>> processNode(final Node newOrModifiedNode) {
146 ListenableFuture<Void> probeVppNodeForConnection = executorService
148 processNodeOnConnection(newOrModifiedNode);
152 return Futures.transform(probeVppNodeForConnection,
153 new Function<Void, KeyedInstanceIdentifier<Node, NodeKey>>() {
156 public KeyedInstanceIdentifier<Node, NodeKey> apply(@Nullable Void vd) {
157 return nodeIdToKeyedInstanceIdentifierMap.get(newOrModifiedNode.getNodeId()).iterator().next();
162 private void processNodeOnConnection(final Node newOrModifiedNode) {
163 for (SupportingNode supportingNode : newOrModifiedNode.getSupportingNode()) {
164 final NodeId nodeMount = supportingNode.getNodeRef();
165 final VppNetconfConnectionProbe probe = new VppNetconfConnectionProbe(supportingNode.getNodeRef(),
169 // Verify netconf connection
170 boolean connectionReady = probe.startProbing();
171 if (connectionReady) {
172 LOG.debug("Node {} is connected, creating ...", supportingNode.getNodeRef());
173 final TopologyId topologyMount = supportingNode.getTopologyRef();
174 final KeyedInstanceIdentifier<Node, NodeKey> iiToVpp =
175 InstanceIdentifier.create(NetworkTopology.class)
176 .child(Topology.class, new TopologyKey(topologyMount))
177 .child(Node.class, new NodeKey(nodeMount));
178 nodeIdToKeyedInstanceIdentifierMap.put(newOrModifiedNode.getNodeId(), iiToVpp);
180 LOG.debug("Failed while connecting to node {}", supportingNode.getNodeRef());
182 } catch (InterruptedException | ExecutionException e) {
183 LOG.warn("Exception while processing node {} ... ", supportingNode.getNodeRef(), e);
184 } catch (TimeoutException e) {
185 LOG.warn("Node {} was not connected within {} seconds. "
186 + "Check node configuration and connectivity to proceed",
187 supportingNode.getNodeRef(), VppNetconfConnectionProbe.NODE_CONNECTION_TIMER);
192 private ListenableFuture<Void> handleDeleteOnTopology() {
194 return Futures.immediateFuture(null);
198 public void close() {