aa259826eeafba3214ed7ae30608be48a3afb27a
[lispflowmapping.git] / mappingservice / neutron / src / main / java / org / opendaylight / lispflowmapping / neutron / intenthandler / listener / VppEndpointListener.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.lispflowmapping.neutron.intenthandler.listener;
9
10 import com.google.common.base.Function;
11 import com.google.common.collect.ArrayListMultimap;
12 import com.google.common.collect.Multimap;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeoutException;
23 import javax.annotation.Nonnull;
24 import javax.annotation.Nullable;
25 import org.opendaylight.lispflowmapping.lisp.util.LispAddressUtil;
26 import org.opendaylight.lispflowmapping.neutron.intenthandler.IntentHandlerAsyncExecutorProvider;
27 import org.opendaylight.lispflowmapping.neutron.intenthandler.util.VppNetconfConnectionProbe;
28 import org.opendaylight.lispflowmapping.neutron.intenthandler.util.VppNodeReader;
29 import org.opendaylight.lispflowmapping.neutron.mappingmanager.HostInformationManager;
30 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
31 import org.opendaylight.mdsal.binding.api.DataBroker;
32 import org.opendaylight.mdsal.binding.api.DataObjectModification;
33 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
34 import org.opendaylight.mdsal.binding.api.DataTreeModification;
35 import org.opendaylight.mdsal.binding.api.MountPointService;
36 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
37 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
38 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.node.attributes.SupportingNode;
45 import org.opendaylight.yangtools.concepts.ListenerRegistration;
46 import org.opendaylight.yangtools.yang.binding.DataObject;
47 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
48 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 /**
53  * Created by Shakib Ahmed on 1/11/17.
54  */
55 public class VppEndpointListener implements AutoCloseable, ClusteredDataTreeChangeListener<Topology> {
56     private static final Logger LOG = LoggerFactory.getLogger(VppEndpointListener.class);
57
58     private final DataBroker dataBroker;
59     private final MountPointService mountService;
60     private final ListenerRegistration<?> reg;
61
62     private final VppNodeReader vppNodeReader;
63     private final HostInformationManager hostInformationManager;
64
65     private final Multimap<NodeId, KeyedInstanceIdentifier<Node, NodeKey>> nodeIdToKeyedInstanceIdentifierMap =
66             ArrayListMultimap.create();
67
68     private final ListeningExecutorService executorService;
69
70     public VppEndpointListener(final DataBroker dataBroker,
71                                final MountPointService mountPointService,
72                                KeyedInstanceIdentifier<Topology, TopologyKey> topologyII) {
73
74         this.dataBroker = dataBroker;
75         this.mountService = mountPointService;
76
77         vppNodeReader = new VppNodeReader(this.dataBroker, this.mountService);
78
79         hostInformationManager = HostInformationManager.getInstance();
80
81         reg = dataBroker.registerDataTreeChangeListener(
82                 DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION, topologyII), this);
83
84         executorService = IntentHandlerAsyncExecutorProvider.getInstace().getExecutor();
85     }
86
87     @Override
88     public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<Topology>> changes) {
89         for (DataTreeModification<Topology> change : changes) {
90             final DataObjectModification<Topology> modification = change.getRootNode();
91             ListenableFuture<Void> modificationTaskHandler;
92             switch (modification.getModificationType()) {
93                 case WRITE:
94                     modificationTaskHandler = handleChange(modification);
95                     break;
96                 case SUBTREE_MODIFIED:
97                     modificationTaskHandler = handleChange(modification);
98                     break;
99                 case DELETE:
100                     modificationTaskHandler = handleDeleteOnTopology();
101                     break;
102                 default:
103                     LOG.warn("Ignored topology modification {}", modification);
104                     modificationTaskHandler = Futures.immediateFuture(null);
105                     break;
106             }
107             Futures.addCallback(modificationTaskHandler, new FutureCallback<Void>() {
108                 @Override
109                 public void onSuccess(@Nullable Void vd) {
110                     LOG.debug("VppEndpoint modification handled successfully!");
111                 }
112
113                 @Override
114                 public void onFailure(Throwable throwable) {
115                     LOG.debug("Failed to handle VppEndpoint modifications!");
116                 }
117             }, MoreExecutors.directExecutor());
118         }
119     }
120
121     private ListenableFuture<Void> handleChange(DataObjectModification modification) {
122         Collection<DataObjectModification<? extends DataObject>> modifiedChildren = modification.getModifiedChildren();
123         List<ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>>> processingTasks = new ArrayList<>();
124         for (DataObjectModification modifiedNode : modifiedChildren) {
125             final Node newOrModifiedNode = (Node) modifiedNode.getDataAfter();
126             ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>> processingTask = processNode(newOrModifiedNode);
127             Futures.addCallback(processingTask, new FutureCallback<KeyedInstanceIdentifier<Node, NodeKey>>() {
128                 @Override
129                 public void onSuccess(@Nullable KeyedInstanceIdentifier<Node, NodeKey> kiiToNode) {
130                     hostInformationManager.addHostRelatedInfo(newOrModifiedNode.getNodeId().getValue(),
131                             LispAddressUtil.toRloc(vppNodeReader.rlocIpOfNode(kiiToNode)));
132                 }
133
134                 @Override
135                 public void onFailure(Throwable throwable) {
136                     LOG.debug("Couldn't process {}", newOrModifiedNode.getNodeId().getValue());
137                 }
138             }, MoreExecutors.directExecutor());
139             processingTasks.add(processNode(newOrModifiedNode));
140         }
141         return Futures.immediateFuture(null);
142     }
143
144     private ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>> processNode(final Node newOrModifiedNode) {
145         ListenableFuture<Void> probeVppNodeForConnection = executorService
146                 .submit(() -> {
147                     processNodeOnConnection(newOrModifiedNode);
148                     return null;
149                 });
150
151         return Futures.transform(probeVppNodeForConnection,
152                 new Function<Void, KeyedInstanceIdentifier<Node, NodeKey>>() {
153                     @Nullable
154                     @Override
155                     public KeyedInstanceIdentifier<Node, NodeKey> apply(@Nullable Void vd) {
156                         return nodeIdToKeyedInstanceIdentifierMap.get(newOrModifiedNode.getNodeId()).iterator().next();
157                     }
158                 }, MoreExecutors.directExecutor());
159     }
160
161     private void processNodeOnConnection(final Node newOrModifiedNode) {
162         for (SupportingNode supportingNode : newOrModifiedNode.getSupportingNode()) {
163             final NodeId nodeMount = supportingNode.getNodeRef();
164             final VppNetconfConnectionProbe probe = new VppNetconfConnectionProbe(supportingNode.getNodeRef(),
165                     dataBroker);
166
167             try {
168                 // Verify netconf connection
169                 boolean connectionReady = probe.startProbing();
170                 if (connectionReady) {
171                     LOG.debug("Node {} is connected, creating ...", supportingNode.getNodeRef());
172                     final TopologyId topologyMount = supportingNode.getTopologyRef();
173                     final KeyedInstanceIdentifier<Node, NodeKey> iiToVpp =
174                             InstanceIdentifier.create(NetworkTopology.class)
175                             .child(Topology.class, new TopologyKey(topologyMount))
176                             .child(Node.class, new NodeKey(nodeMount));
177                     nodeIdToKeyedInstanceIdentifierMap.put(newOrModifiedNode.getNodeId(), iiToVpp);
178                 } else {
179                     LOG.debug("Failed while connecting to node {}", supportingNode.getNodeRef());
180                 }
181             } catch (InterruptedException | ExecutionException e) {
182                 LOG.warn("Exception while processing node {} ... ", supportingNode.getNodeRef(), e);
183             } catch (TimeoutException e) {
184                 LOG.warn("Node {} was not connected within {} seconds. "
185                                 + "Check node configuration and connectivity to proceed",
186                         supportingNode.getNodeRef(), VppNetconfConnectionProbe.NODE_CONNECTION_TIMER);
187             }
188         }
189     }
190
191     private ListenableFuture<Void> handleDeleteOnTopology() {
192         //TODO
193         return Futures.immediateFuture(null);
194     }
195
196     @Override
197     public void close() {
198         reg.close();
199     }
200 }