Replace deprecated Futures.addCallback by the newer version
[lispflowmapping.git] / mappingservice / neutron / src / main / java / org / opendaylight / lispflowmapping / neutron / intenthandler / listener / VppEndpointListener.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.lispflowmapping.neutron.intenthandler.listener;
9
10 import com.google.common.base.Function;
11 import com.google.common.collect.ArrayListMultimap;
12 import com.google.common.collect.Multimap;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeoutException;
24 import javax.annotation.Nonnull;
25 import javax.annotation.Nullable;
26
27 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
30 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
31 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
32 import org.opendaylight.controller.md.sal.binding.api.MountPointService;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.lispflowmapping.lisp.util.LispAddressUtil;
35 import org.opendaylight.lispflowmapping.neutron.intenthandler.IntentHandlerAsyncExecutorProvider;
36 import org.opendaylight.lispflowmapping.neutron.intenthandler.util.VppNetconfConnectionProbe;
37 import org.opendaylight.lispflowmapping.neutron.intenthandler.util.VppNodeReader;
38 import org.opendaylight.lispflowmapping.neutron.mappingmanager.HostInformationManager;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.node.attributes.SupportingNode;
47 import org.opendaylight.yangtools.concepts.ListenerRegistration;
48 import org.opendaylight.yangtools.yang.binding.DataObject;
49 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
50 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 /**
55  * Created by Shakib Ahmed on 1/11/17.
56  */
57 public class VppEndpointListener implements AutoCloseable, ClusteredDataTreeChangeListener<Topology> {
58     private static final Logger LOG = LoggerFactory.getLogger(VppEndpointListener.class);
59
60     private final DataBroker dataBroker;
61     private final MountPointService mountService;
62     private final ListenerRegistration<?> reg;
63
64     private final VppNodeReader vppNodeReader;
65     private final HostInformationManager hostInformationManager;
66
67     private final Multimap<NodeId, KeyedInstanceIdentifier<Node, NodeKey>> nodeIdToKeyedInstanceIdentifierMap =
68             ArrayListMultimap.create();
69
70     private final ListeningExecutorService executorService;
71
72     public VppEndpointListener(final DataBroker dataBroker,
73                                final MountPointService mountPointService,
74                                KeyedInstanceIdentifier<Topology, TopologyKey> topologyII) {
75
76         this.dataBroker = dataBroker;
77         this.mountService = mountPointService;
78
79         vppNodeReader = new VppNodeReader(this.dataBroker, this.mountService);
80
81         hostInformationManager = HostInformationManager.getInstance();
82
83         reg = dataBroker.registerDataTreeChangeListener(
84                 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, topologyII), this);
85
86         executorService = IntentHandlerAsyncExecutorProvider.getInstace().getExecutor();
87     }
88
89     @Override
90     public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<Topology>> changes) {
91         for (DataTreeModification<Topology> change : changes) {
92             final DataObjectModification<Topology> modification = change.getRootNode();
93             ListenableFuture<Void> modificationTaskHandler;
94             switch (modification.getModificationType()) {
95                 case WRITE:
96                     modificationTaskHandler = handleChange(modification);
97                     break;
98                 case SUBTREE_MODIFIED:
99                     modificationTaskHandler = handleChange(modification);
100                     break;
101                 case DELETE:
102                     modificationTaskHandler = handleDeleteOnTopology();
103                     break;
104                 default:
105                     LOG.warn("Ignored topology modification {}", modification);
106                     modificationTaskHandler = Futures.immediateFuture(null);
107                     break;
108             }
109             Futures.addCallback(modificationTaskHandler, new FutureCallback<Void>() {
110                 @Override
111                 public void onSuccess(@Nullable Void vd) {
112                     LOG.debug("VppEndpoint modification handled successfully!");
113                 }
114
115                 @Override
116                 public void onFailure(Throwable throwable) {
117                     LOG.debug("Failed to handle VppEndpoint modifications!");
118                 }
119             }, MoreExecutors.directExecutor());
120         }
121     }
122
123     private ListenableFuture<Void> handleChange(DataObjectModification modification) {
124         Collection<DataObjectModification<? extends DataObject>> modifiedChildren = modification.getModifiedChildren();
125         List<ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>>> processingTasks = new ArrayList<>();
126         for (DataObjectModification modifiedNode : modifiedChildren) {
127             final Node newOrModifiedNode = (Node) modifiedNode.getDataAfter();
128             ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>> processingTask = processNode(newOrModifiedNode);
129             Futures.addCallback(processingTask, new FutureCallback<KeyedInstanceIdentifier<Node, NodeKey>>() {
130                 @Override
131                 public void onSuccess(@Nullable KeyedInstanceIdentifier<Node, NodeKey> kiiToNode) {
132                     hostInformationManager.addHostRelatedInfo(newOrModifiedNode.getNodeId().getValue(),
133                             LispAddressUtil.toRloc(vppNodeReader.rlocIpOfNode(kiiToNode)));
134                 }
135
136                 @Override
137                 public void onFailure(Throwable throwable) {
138                     LOG.debug("Couldn't process {}", newOrModifiedNode.getNodeId().getValue());
139                 }
140             }, MoreExecutors.directExecutor());
141             processingTasks.add(processNode(newOrModifiedNode));
142         }
143         return Futures.immediateFuture(null);
144     }
145
146     private ListenableFuture<KeyedInstanceIdentifier<Node, NodeKey>> processNode(final Node newOrModifiedNode) {
147         ListenableFuture<Void> probeVppNodeForConnection = executorService
148                 .submit(() -> {
149                     processNodeOnConnection(newOrModifiedNode);
150                     return null;
151                 });
152
153         return Futures.transform(probeVppNodeForConnection,
154                 new Function<Void, KeyedInstanceIdentifier<Node, NodeKey>>() {
155                     @Nullable
156                     @Override
157                     public KeyedInstanceIdentifier<Node, NodeKey> apply(@Nullable Void vd) {
158                         return nodeIdToKeyedInstanceIdentifierMap.get(newOrModifiedNode.getNodeId()).iterator().next();
159                     }
160                 });
161     }
162
163     private void processNodeOnConnection(final Node newOrModifiedNode) {
164         for (SupportingNode supportingNode : newOrModifiedNode.getSupportingNode()) {
165             final NodeId nodeMount = supportingNode.getNodeRef();
166             final VppNetconfConnectionProbe probe = new VppNetconfConnectionProbe(supportingNode.getNodeRef(),
167                     dataBroker);
168
169             try {
170                 // Verify netconf connection
171                 boolean connectionReady = probe.startProbing();
172                 if (connectionReady) {
173                     LOG.debug("Node {} is connected, creating ...", supportingNode.getNodeRef());
174                     final TopologyId topologyMount = supportingNode.getTopologyRef();
175                     final KeyedInstanceIdentifier<Node, NodeKey> iiToVpp =
176                             InstanceIdentifier.create(NetworkTopology.class)
177                             .child(Topology.class, new TopologyKey(topologyMount))
178                             .child(Node.class, new NodeKey(nodeMount));
179                     nodeIdToKeyedInstanceIdentifierMap.put(newOrModifiedNode.getNodeId(), iiToVpp);
180                 } else {
181                     LOG.debug("Failed while connecting to node {}", supportingNode.getNodeRef());
182                 }
183             } catch (InterruptedException | ExecutionException e) {
184                 LOG.warn("Exception while processing node {} ... ", supportingNode.getNodeRef(), e);
185             } catch (TimeoutException e) {
186                 LOG.warn("Node {} was not connected within {} seconds. "
187                                 + "Check node configuration and connectivity to proceed",
188                         supportingNode.getNodeRef(), VppNetconfConnectionProbe.NODE_CONNECTION_TIMER);
189             }
190         }
191     }
192
193     private ListenableFuture<Void> handleDeleteOnTopology() {
194         //TODO
195         return Futures.immediateFuture(null);
196     }
197
198     @Override
199     public void close() {
200         reg.close();
201     }
202 }