2 * Copyright (c) 2017 Cisco Systems, Inc. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.lispflowmapping.neutron.intenthandler.listener;
10 import com.google.common.base.Function;
11 import com.google.common.collect.ArrayListMultimap;
12 import com.google.common.collect.Multimap;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeoutException;
23 import javax.annotation.Nonnull;
24 import javax.annotation.Nullable;
25 import org.opendaylight.lispflowmapping.lisp.util.LispAddressUtil;
26 import org.opendaylight.lispflowmapping.neutron.intenthandler.IntentHandlerAsyncExecutorProvider;
27 import org.opendaylight.lispflowmapping.neutron.intenthandler.util.VppNetconfConnectionProbe;
28 import org.opendaylight.lispflowmapping.neutron.intenthandler.util.VppNodeReader;
29 import org.opendaylight.lispflowmapping.neutron.mappingmanager.HostInformationManager;
30 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
31 import org.opendaylight.mdsal.binding.api.DataBroker;
32 import org.opendaylight.mdsal.binding.api.DataObjectModification;
33 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
34 import org.opendaylight.mdsal.binding.api.DataTreeModification;
35 import org.opendaylight.mdsal.binding.api.MountPointService;
36 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
37 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
38 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.node.attributes.SupportingNode;
45 import org.opendaylight.yangtools.concepts.ListenerRegistration;
46 import org.opendaylight.yangtools.yang.binding.DataObject;
47 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
48 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
53 * Created by Shakib Ahmed on 1/11/17.
55 public class VppEndpointListener implements AutoCloseable, ClusteredDataTreeChangeListener<Topology> {
56 private static final Logger LOG = LoggerFactory.getLogger(VppEndpointListener.class);
58 private final DataBroker dataBroker;
59 private final MountPointService mountService;
60 private final ListenerRegistration<?> reg;
62 private final VppNodeReader vppNodeReader;
63 private final HostInformationManager hostInformationManager;
65 private final Multimap<NodeId, KeyedInstanceIdentifier<Node, NodeKey>> nodeIdToKeyedInstanceIdentifierMap =
66 ArrayListMultimap.create();
68 private final ListeningExecutorService executorService;
70 public VppEndpointListener(final DataBroker dataBroker,
71 final MountPointService mountPointService,
72 KeyedInstanceIdentifier<Topology, TopologyKey> topologyII) {
74 this.dataBroker = dataBroker;
75 this.mountService = mountPointService;
77 vppNodeReader = new VppNodeReader(this.dataBroker, this.mountService);
79 hostInformationManager = HostInformationManager.getInstance();
81 reg = dataBroker.registerDataTreeChangeListener(
82 DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION, topologyII), this);
84 executorService = IntentHandlerAsyncExecutorProvider.getInstace().getExecutor();
88 public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<Topology>> changes) {
89 for (DataTreeModification<Topology> change : changes) {
90 final DataObjectModification<Topology> modification = change.getRootNode();
91 ListenableFuture<Void> modificationTaskHandler;
92 switch (modification.getModificationType()) {
94 modificationTaskHandler = handleChange(modification);
96 case SUBTREE_MODIFIED:
97 modificationTaskHandler = handleChange(modification);
100 modificationTaskHandler = handleDeleteOnTopology();
103 LOG.warn("Ignored topology modification {}", modification);
104 modificationTaskHandler = Futures.immediateFuture(null);
107 Futures.addCallback(modificationTaskHandler, new FutureCallback<Void>() {
109 public void onSuccess(@Nullable Void vd) {
110 LOG.debug("VppEndpoint modification handled successfully!");
114 public void onFailure(Throwable throwable) {
115 LOG.debug("Failed to handle VppEndpoint modifications!");
117 }, MoreExecutors.directExecutor());
121 private ListenableFuture<Void> handleChange(DataObjectModification modification) {
122 Collection<DataObjectModification<? extends DataObject>> modifiedChildren = modification.getModifiedChildren();
123 List<ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>>> processingTasks = new ArrayList<>();
124 for (DataObjectModification modifiedNode : modifiedChildren) {
125 final Node newOrModifiedNode = (Node) modifiedNode.getDataAfter();
126 ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>> processingTask = processNode(newOrModifiedNode);
127 Futures.addCallback(processingTask, new FutureCallback<KeyedInstanceIdentifier<Node, NodeKey>>() {
129 public void onSuccess(@Nullable KeyedInstanceIdentifier<Node, NodeKey> kiiToNode) {
130 hostInformationManager.addHostRelatedInfo(newOrModifiedNode.getNodeId().getValue(),
131 LispAddressUtil.toRloc(vppNodeReader.rlocIpOfNode(kiiToNode)));
135 public void onFailure(Throwable throwable) {
136 LOG.debug("Couldn't process {}", newOrModifiedNode.getNodeId().getValue());
138 }, MoreExecutors.directExecutor());
139 processingTasks.add(processNode(newOrModifiedNode));
141 return Futures.immediateFuture(null);
144 private ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>> processNode(final Node newOrModifiedNode) {
145 ListenableFuture<Void> probeVppNodeForConnection = executorService
147 processNodeOnConnection(newOrModifiedNode);
151 return Futures.transform(probeVppNodeForConnection,
152 new Function<Void, KeyedInstanceIdentifier<Node, NodeKey>>() {
155 public KeyedInstanceIdentifier<Node, NodeKey> apply(@Nullable Void vd) {
156 return nodeIdToKeyedInstanceIdentifierMap.get(newOrModifiedNode.getNodeId()).iterator().next();
158 }, MoreExecutors.directExecutor());
161 private void processNodeOnConnection(final Node newOrModifiedNode) {
162 for (SupportingNode supportingNode : newOrModifiedNode.getSupportingNode()) {
163 final NodeId nodeMount = supportingNode.getNodeRef();
164 final VppNetconfConnectionProbe probe = new VppNetconfConnectionProbe(supportingNode.getNodeRef(),
168 // Verify netconf connection
169 boolean connectionReady = probe.startProbing();
170 if (connectionReady) {
171 LOG.debug("Node {} is connected, creating ...", supportingNode.getNodeRef());
172 final TopologyId topologyMount = supportingNode.getTopologyRef();
173 final KeyedInstanceIdentifier<Node, NodeKey> iiToVpp =
174 InstanceIdentifier.create(NetworkTopology.class)
175 .child(Topology.class, new TopologyKey(topologyMount))
176 .child(Node.class, new NodeKey(nodeMount));
177 nodeIdToKeyedInstanceIdentifierMap.put(newOrModifiedNode.getNodeId(), iiToVpp);
179 LOG.debug("Failed while connecting to node {}", supportingNode.getNodeRef());
181 } catch (InterruptedException | ExecutionException e) {
182 LOG.warn("Exception while processing node {} ... ", supportingNode.getNodeRef(), e);
183 } catch (TimeoutException e) {
184 LOG.warn("Node {} was not connected within {} seconds. "
185 + "Check node configuration and connectivity to proceed",
186 supportingNode.getNodeRef(), VppNetconfConnectionProbe.NODE_CONNECTION_TIMER);
191 private ListenableFuture<Void> handleDeleteOnTopology() {
193 return Futures.immediateFuture(null);
197 public void close() {