2 * Copyright (c) 2017 Cisco Systems, Inc. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.lispflowmapping.neutron.intenthandler.listener;
10 import com.google.common.base.Function;
11 import com.google.common.collect.ArrayListMultimap;
12 import com.google.common.collect.Multimap;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeoutException;
24 import javax.annotation.Nonnull;
25 import javax.annotation.Nullable;
27 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
30 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
31 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
32 import org.opendaylight.controller.md.sal.binding.api.MountPointService;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.lispflowmapping.lisp.util.LispAddressUtil;
35 import org.opendaylight.lispflowmapping.neutron.intenthandler.IntentHandlerAsyncExecutorProvider;
36 import org.opendaylight.lispflowmapping.neutron.intenthandler.util.VppNetconfConnectionProbe;
37 import org.opendaylight.lispflowmapping.neutron.intenthandler.util.VppNodeReader;
38 import org.opendaylight.lispflowmapping.neutron.mappingmanager.HostInformationManager;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.node.attributes.SupportingNode;
47 import org.opendaylight.yangtools.concepts.ListenerRegistration;
48 import org.opendaylight.yangtools.yang.binding.DataObject;
49 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
50 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
55 * Created by Shakib Ahmed on 1/11/17.
57 public class VppEndpointListener implements AutoCloseable, ClusteredDataTreeChangeListener<Topology> {
58 private static final Logger LOG = LoggerFactory.getLogger(VppEndpointListener.class);
60 private final DataBroker dataBroker;
61 private final MountPointService mountService;
62 private final ListenerRegistration<?> reg;
64 private final VppNodeReader vppNodeReader;
65 private final HostInformationManager hostInformationManager;
67 private final Multimap<NodeId, KeyedInstanceIdentifier<Node, NodeKey>> nodeIdToKeyedInstanceIdentifierMap =
68 ArrayListMultimap.create();
70 private final ListeningExecutorService executorService;
72 public VppEndpointListener(final DataBroker dataBroker,
73 final MountPointService mountPointService,
74 KeyedInstanceIdentifier<Topology, TopologyKey> topologyII) {
76 this.dataBroker = dataBroker;
77 this.mountService = mountPointService;
79 vppNodeReader = new VppNodeReader(this.dataBroker, this.mountService);
81 hostInformationManager = HostInformationManager.getInstance();
83 reg = dataBroker.registerDataTreeChangeListener(
84 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, topologyII), this);
86 executorService = IntentHandlerAsyncExecutorProvider.getInstace().getExecutor();
90 public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<Topology>> changes) {
91 for (DataTreeModification<Topology> change : changes) {
92 final DataObjectModification<Topology> modification = change.getRootNode();
93 ListenableFuture<Void> modificationTaskHandler;
94 switch (modification.getModificationType()) {
96 modificationTaskHandler = handleChange(modification);
98 case SUBTREE_MODIFIED:
99 modificationTaskHandler = handleChange(modification);
102 modificationTaskHandler = handleDeleteOnTopology();
105 LOG.warn("Ignored topology modification {}", modification);
106 modificationTaskHandler = Futures.immediateFuture(null);
109 Futures.addCallback(modificationTaskHandler, new FutureCallback<Void>() {
111 public void onSuccess(@Nullable Void vd) {
112 LOG.debug("VppEndpoint modification handled successfully!");
116 public void onFailure(Throwable throwable) {
117 LOG.debug("Failed to handle VppEndpoint modifications!");
119 }, MoreExecutors.directExecutor());
123 private ListenableFuture<Void> handleChange(DataObjectModification modification) {
124 Collection<DataObjectModification<? extends DataObject>> modifiedChildren = modification.getModifiedChildren();
125 List<ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>>> processingTasks = new ArrayList<>();
126 for (DataObjectModification modifiedNode : modifiedChildren) {
127 final Node newOrModifiedNode = (Node) modifiedNode.getDataAfter();
128 ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>> processingTask = processNode(newOrModifiedNode);
129 Futures.addCallback(processingTask, new FutureCallback<KeyedInstanceIdentifier<Node, NodeKey>>() {
131 public void onSuccess(@Nullable KeyedInstanceIdentifier<Node, NodeKey> kiiToNode) {
132 hostInformationManager.addHostRelatedInfo(newOrModifiedNode.getNodeId().getValue(),
133 LispAddressUtil.toRloc(vppNodeReader.rlocIpOfNode(kiiToNode)));
137 public void onFailure(Throwable throwable) {
138 LOG.debug("Couldn't process {}", newOrModifiedNode.getNodeId().getValue());
140 }, MoreExecutors.directExecutor());
141 processingTasks.add(processNode(newOrModifiedNode));
143 return Futures.immediateFuture(null);
146 private ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>> processNode(final Node newOrModifiedNode) {
147 ListenableFuture<Void> probeVppNodeForConnection = executorService
149 processNodeOnConnection(newOrModifiedNode);
153 return Futures.transform(probeVppNodeForConnection,
154 new Function<Void, KeyedInstanceIdentifier<Node, NodeKey>>() {
157 public KeyedInstanceIdentifier<Node, NodeKey> apply(@Nullable Void vd) {
158 return nodeIdToKeyedInstanceIdentifierMap.get(newOrModifiedNode.getNodeId()).iterator().next();
163 private void processNodeOnConnection(final Node newOrModifiedNode) {
164 for (SupportingNode supportingNode : newOrModifiedNode.getSupportingNode()) {
165 final NodeId nodeMount = supportingNode.getNodeRef();
166 final VppNetconfConnectionProbe probe = new VppNetconfConnectionProbe(supportingNode.getNodeRef(),
170 // Verify netconf connection
171 boolean connectionReady = probe.startProbing();
172 if (connectionReady) {
173 LOG.debug("Node {} is connected, creating ...", supportingNode.getNodeRef());
174 final TopologyId topologyMount = supportingNode.getTopologyRef();
175 final KeyedInstanceIdentifier<Node, NodeKey> iiToVpp =
176 InstanceIdentifier.create(NetworkTopology.class)
177 .child(Topology.class, new TopologyKey(topologyMount))
178 .child(Node.class, new NodeKey(nodeMount));
179 nodeIdToKeyedInstanceIdentifierMap.put(newOrModifiedNode.getNodeId(), iiToVpp);
181 LOG.debug("Failed while connecting to node {}", supportingNode.getNodeRef());
183 } catch (InterruptedException | ExecutionException e) {
184 LOG.warn("Exception while processing node {} ... ", supportingNode.getNodeRef(), e);
185 } catch (TimeoutException e) {
186 LOG.warn("Node {} was not connected within {} seconds. "
187 + "Check node configuration and connectivity to proceed",
188 supportingNode.getNodeRef(), VppNetconfConnectionProbe.NODE_CONNECTION_TIMER);
193 private ListenableFuture<Void> handleDeleteOnTopology() {
195 return Futures.immediateFuture(null);
199 public void close() {