0c99a36d4d268f8d4be58d00938639e6dd9bf6f5
[lispflowmapping.git] / mappingservice / neutron / src / main / java / org / opendaylight / lispflowmapping / neutron / intenthandler / listener / VppEndpointListener.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.lispflowmapping.neutron.intenthandler.listener;
9
10 import com.google.common.collect.ArrayListMultimap;
11 import com.google.common.collect.Multimap;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.List;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.TimeoutException;
22 import org.opendaylight.lispflowmapping.lisp.util.LispAddressUtil;
23 import org.opendaylight.lispflowmapping.neutron.intenthandler.IntentHandlerAsyncExecutorProvider;
24 import org.opendaylight.lispflowmapping.neutron.intenthandler.util.VppNetconfConnectionProbe;
25 import org.opendaylight.lispflowmapping.neutron.intenthandler.util.VppNodeReader;
26 import org.opendaylight.lispflowmapping.neutron.mappingmanager.HostInformationManager;
27 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
28 import org.opendaylight.mdsal.binding.api.DataBroker;
29 import org.opendaylight.mdsal.binding.api.DataObjectModification;
30 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
31 import org.opendaylight.mdsal.binding.api.DataTreeModification;
32 import org.opendaylight.mdsal.binding.api.MountPointService;
33 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
34 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
35 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
36 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
37 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
38 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.node.attributes.SupportingNode;
42 import org.opendaylight.yangtools.concepts.ListenerRegistration;
43 import org.opendaylight.yangtools.yang.binding.DataObject;
44 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
45 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * Created by Shakib Ahmed on 1/11/17.
51  */
52 public class VppEndpointListener implements AutoCloseable, ClusteredDataTreeChangeListener<Topology> {
53     private static final Logger LOG = LoggerFactory.getLogger(VppEndpointListener.class);
54
55     private final DataBroker dataBroker;
56     private final MountPointService mountService;
57     private final ListenerRegistration<?> reg;
58
59     private final VppNodeReader vppNodeReader;
60     private final HostInformationManager hostInformationManager;
61
62     private final Multimap<NodeId, KeyedInstanceIdentifier<Node, NodeKey>> nodeIdToKeyedInstanceIdentifierMap =
63             ArrayListMultimap.create();
64
65     private final ListeningExecutorService executorService;
66
67     public VppEndpointListener(final DataBroker dataBroker,
68                                final MountPointService mountPointService,
69                                KeyedInstanceIdentifier<Topology, TopologyKey> topologyII) {
70
71         this.dataBroker = dataBroker;
72         this.mountService = mountPointService;
73
74         vppNodeReader = new VppNodeReader(this.dataBroker, this.mountService);
75
76         hostInformationManager = HostInformationManager.getInstance();
77
78         reg = dataBroker.registerDataTreeChangeListener(
79                 DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION, topologyII), this);
80
81         executorService = IntentHandlerAsyncExecutorProvider.getInstace().getExecutor();
82     }
83
84     @Override
85     public void onDataTreeChanged(Collection<DataTreeModification<Topology>> changes) {
86         for (DataTreeModification<Topology> change : changes) {
87             final DataObjectModification<Topology> modification = change.getRootNode();
88             ListenableFuture<Void> modificationTaskHandler;
89             switch (modification.getModificationType()) {
90                 case WRITE:
91                     modificationTaskHandler = handleChange(modification);
92                     break;
93                 case SUBTREE_MODIFIED:
94                     modificationTaskHandler = handleChange(modification);
95                     break;
96                 case DELETE:
97                     modificationTaskHandler = handleDeleteOnTopology();
98                     break;
99                 default:
100                     LOG.warn("Ignored topology modification {}", modification);
101                     modificationTaskHandler = Futures.immediateFuture(null);
102                     break;
103             }
104             Futures.addCallback(modificationTaskHandler, new FutureCallback<Void>() {
105                 @Override
106                 public void onSuccess(Void vd) {
107                     LOG.debug("VppEndpoint modification handled successfully!");
108                 }
109
110                 @Override
111                 public void onFailure(Throwable throwable) {
112                     LOG.debug("Failed to handle VppEndpoint modifications!");
113                 }
114             }, MoreExecutors.directExecutor());
115         }
116     }
117
118     private ListenableFuture<Void> handleChange(DataObjectModification modification) {
119         Collection<DataObjectModification<? extends DataObject>> modifiedChildren = modification.getModifiedChildren();
120         List<ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>>> processingTasks = new ArrayList<>();
121         for (DataObjectModification modifiedNode : modifiedChildren) {
122             final Node newOrModifiedNode = (Node) modifiedNode.getDataAfter();
123             ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>> processingTask = processNode(newOrModifiedNode);
124             Futures.addCallback(processingTask, new FutureCallback<KeyedInstanceIdentifier<Node, NodeKey>>() {
125                 @Override
126                 public void onSuccess(KeyedInstanceIdentifier<Node, NodeKey> kiiToNode) {
127                     hostInformationManager.addHostRelatedInfo(newOrModifiedNode.getNodeId().getValue(),
128                             LispAddressUtil.toRloc(vppNodeReader.rlocIpOfNode(kiiToNode)));
129                 }
130
131                 @Override
132                 public void onFailure(Throwable throwable) {
133                     LOG.debug("Couldn't process {}", newOrModifiedNode.getNodeId().getValue());
134                 }
135             }, MoreExecutors.directExecutor());
136             processingTasks.add(processNode(newOrModifiedNode));
137         }
138         return Futures.immediateFuture(null);
139     }
140
141     private ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>> processNode(final Node newOrModifiedNode) {
142         ListenableFuture<Void> probeVppNodeForConnection = executorService
143                 .submit(() -> {
144                     processNodeOnConnection(newOrModifiedNode);
145                     return null;
146                 });
147
148         return Futures.transform(probeVppNodeForConnection,
149                 vd -> nodeIdToKeyedInstanceIdentifierMap.get(newOrModifiedNode.getNodeId()).iterator().next(),
150                 MoreExecutors.directExecutor());
151     }
152
153     private void processNodeOnConnection(final Node newOrModifiedNode) {
154         for (SupportingNode supportingNode : newOrModifiedNode.getSupportingNode()) {
155             final NodeId nodeMount = supportingNode.getNodeRef();
156             final VppNetconfConnectionProbe probe = new VppNetconfConnectionProbe(supportingNode.getNodeRef(),
157                     dataBroker);
158
159             try {
160                 // Verify netconf connection
161                 boolean connectionReady = probe.startProbing();
162                 if (connectionReady) {
163                     LOG.debug("Node {} is connected, creating ...", supportingNode.getNodeRef());
164                     final TopologyId topologyMount = supportingNode.getTopologyRef();
165                     final KeyedInstanceIdentifier<Node, NodeKey> iiToVpp =
166                             InstanceIdentifier.create(NetworkTopology.class)
167                             .child(Topology.class, new TopologyKey(topologyMount))
168                             .child(Node.class, new NodeKey(nodeMount));
169                     nodeIdToKeyedInstanceIdentifierMap.put(newOrModifiedNode.getNodeId(), iiToVpp);
170                 } else {
171                     LOG.debug("Failed while connecting to node {}", supportingNode.getNodeRef());
172                 }
173             } catch (InterruptedException | ExecutionException e) {
174                 LOG.warn("Exception while processing node {} ... ", supportingNode.getNodeRef(), e);
175             } catch (TimeoutException e) {
176                 LOG.warn("Node {} was not connected within {} seconds. "
177                                 + "Check node configuration and connectivity to proceed",
178                         supportingNode.getNodeRef(), VppNetconfConnectionProbe.NODE_CONNECTION_TIMER);
179             }
180         }
181     }
182
183     private ListenableFuture<Void> handleDeleteOnTopology() {
184         //TODO
185         return Futures.immediateFuture(null);
186     }
187
188     @Override
189     public void close() {
190         reg.close();
191     }
192 }