MRI version bump for Aluminium
[genius.git] / cloudscaler / impl / src / main / java / org / opendaylight / genius / cloudscaler / rpcservice / CloudscalerRpcServiceImpl.java
1 /*
2  * Copyright (c) 2019 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.cloudscaler.rpcservice;
9
10 import com.google.common.cache.CacheBuilder;
11 import com.google.common.cache.CacheLoader;
12 import com.google.common.cache.LoadingCache;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.SettableFuture;
17 import java.util.Optional;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.TimeUnit;
20 import javax.inject.Inject;
21 import javax.inject.Singleton;
22 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
23 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
24 import org.opendaylight.infrautils.utils.concurrent.Executors;
25 import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
26 import org.opendaylight.mdsal.binding.api.DataBroker;
27 import org.opendaylight.mdsal.binding.api.ReadTransaction;
28 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
29 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
30 import org.opendaylight.mdsal.common.api.ReadFailedException;
31 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.CloudscalerRpcService;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesEndInput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesEndOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesEndOutputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesRecoverInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesRecoverOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesStartInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesStartOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesTepDeleteInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesTepDeleteOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.compute.nodes.ComputeNode;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.compute.nodes.ComputeNodeBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.TransportZones;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.transport.zones.TransportZone;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.transport.zones.transport.zone.Vteps;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
54 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
55 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
56 import org.opendaylight.yangtools.yang.common.RpcError;
57 import org.opendaylight.yangtools.yang.common.RpcResult;
58 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
59 import org.opendaylight.yangtools.yang.common.Uint64;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 @Singleton
64 public class CloudscalerRpcServiceImpl implements CloudscalerRpcService {
65
66     private static final Logger LOG = LoggerFactory.getLogger("GeniusEventLogger");
67
68     private static final Integer DELETE_DELAY = Integer.getInteger(
69             "scale.in.end.delay.inventory.delete.in.secs", 120);
70     private static ScaleinComputesEndOutput IN_PROGRESS = new ScaleinComputesEndOutputBuilder()
71             .setStatus("INPROGRESS")
72             .build();
73
74     private static ScaleinComputesEndOutput DONE = new ScaleinComputesEndOutputBuilder()
75             .setStatus("DONE")
76             .build();
77
78     private static  RpcResult<ScaleinComputesEndOutput> IN_PROGRESS_RPC_RESPONSE = RpcResultBuilder
79             .<ScaleinComputesEndOutput>success().withResult(IN_PROGRESS).build();
80
81     private static RpcResult<ScaleinComputesEndOutput> DONE_RPC_RESPONSE = RpcResultBuilder
82             .<ScaleinComputesEndOutput>success().withResult(DONE).build();
83
84     private final DataBroker dataBroker;
85     private final ComputeNodeManager computeNodeManager;
86     private final ManagedNewTransactionRunner txRunner;
87     //private final ItmTepClusteredListener itmTepClusteredListener;
88
89     //The following timestamp is not persisted across reboots
90     //upon reboot the timestamp will have a default value of that system timestamp
91     //this way scalein end that is triggered after cluster reboot will still honour the 2 min delay
92     private final LoadingCache<Uint64, Long> tepDeleteTimeStamp = CacheBuilder.newBuilder()
93             .expireAfterWrite(60, TimeUnit.MINUTES)
94             .build(new CacheLoader<Uint64, Long>() {
95                 @Override
96                 public Long load(Uint64 dpnId) {
97                     return System.currentTimeMillis();
98                 }
99             });
100
101     public static final FutureCallback<Void> DEFAULT_CALLBACK = new FutureCallback<>() {
102         @Override
103         public void onSuccess(Void result) {
104             LOG.debug("Success in Datastore operation");
105         }
106
107         @Override
108         public void onFailure(Throwable error) {
109             LOG.error("Error in Datastore operation", error);
110         }
111     };
112
113     @Inject
114     public CloudscalerRpcServiceImpl(DataBroker dataBroker,
115                                      ComputeNodeManager computeNodeManager) {
116         this.dataBroker = dataBroker;
117         this.computeNodeManager = computeNodeManager;
118         this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
119         //this.itmTepClusteredListener = new ItmTepClusteredListener(dataBroker);
120     }
121
122     @Override
123     public ListenableFuture<RpcResult<ScaleinComputesStartOutput>> scaleinComputesStart(
124             ScaleinComputesStartInput input) {
125         ReadWriteTransaction tx = this.dataBroker.newReadWriteTransaction();
126         SettableFuture<RpcResult<ScaleinComputesStartOutput>> ft = SettableFuture.create();
127         input.getScaleinComputeNames().forEach(s -> tombstoneTheNode(s, tx, true));
128         input.getScaleinComputeNames().forEach(s -> LOG.info("Cloudscaler scalein-start {}", s));
129         try {
130             tx.commit().get();
131         } catch (InterruptedException | ExecutionException e) {
132             LOG.error("Failed to tombstone all the nodes ", e);
133             ft.set(RpcResultBuilder.<ScaleinComputesStartOutput>failed().withError(RpcError.ErrorType.APPLICATION,
134                             "Failed to tombstone all the nodes " + e.getMessage()).build());
135             return ft;
136         }
137         ft.set(RpcResultBuilder.<ScaleinComputesStartOutput>success().build());
138         return ft;
139     }
140
141     @Override
142     public ListenableFuture<RpcResult<ScaleinComputesRecoverOutput>> scaleinComputesRecover(
143             ScaleinComputesRecoverInput input) {
144         ReadWriteTransaction tx = this.dataBroker.newReadWriteTransaction();
145         SettableFuture<RpcResult<ScaleinComputesRecoverOutput>> ft = SettableFuture.create();
146         input.getRecoverComputeNames().forEach(s -> tombstoneTheNode(s, tx, false));
147         input.getRecoverComputeNames().forEach(s -> LOG.info("Cloudscaler scalein-recover {}", s));
148         try {
149             tx.commit().get();
150         } catch (InterruptedException | ExecutionException e) {
151             LOG.error("Failed to recover all the nodes ", e);
152             ft.set(RpcResultBuilder.<ScaleinComputesRecoverOutput>failed().withError(RpcError.ErrorType.APPLICATION,
153                             "Failed to recover all the nodes " + e.getMessage()).build());
154             return ft;
155         }
156         //LOG.info("Recovered the nodes {}", input);
157         ft.set(RpcResultBuilder.<ScaleinComputesRecoverOutput>success().build());
158         return ft;
159     }
160
161     @Override
162     @SuppressWarnings("checkstyle:IllegalCatch")
163     public ListenableFuture<RpcResult<ScaleinComputesEndOutput>> scaleinComputesEnd(ScaleinComputesEndInput input) {
164         LOG.error("Cloudscaler scalein-end {}", input);
165         try {
166             for (String computeName : input.getScaleinComputeNames()) {
167                 ComputeNode computeNode = computeNodeManager.getComputeNodeFromName(computeName);
168                 if (computeNode == null) {
169                     LOG.warn("Cloudscaler Failed to find the compute {} for scale in end ", computeName);
170                     return Futures.immediateFuture(DONE_RPC_RESPONSE);
171                 }
172                 Long tepDeletedTimeStamp = tepDeleteTimeStamp.get(computeNode.getDpnid());
173                 Long currentTime = System.currentTimeMillis();
174                 if (currentTime - tepDeletedTimeStamp > DELETE_DELAY * 1000L) {
175                     scaleinComputesEnd2(input);
176                 } else {
177                     return Futures.immediateFuture(IN_PROGRESS_RPC_RESPONSE);
178                 }
179             }
180         } catch (Exception e) {
181             LOG.error("Cloudscaler Failed scalein-end ", e);
182             return Futures.immediateFuture(
183                     RpcResultBuilder.<ScaleinComputesEndOutput>failed().withError(RpcError.ErrorType.APPLICATION,
184                             "Failed to read the compute node " + e.getMessage()).build());
185         }
186         return Futures.immediateFuture(DONE_RPC_RESPONSE);
187     }
188
189     @SuppressWarnings("checkstyle:IllegalCatch")
190     public ListenableFuture<RpcResult<ScaleinComputesEndOutput>> scaleinComputesEnd2(ScaleinComputesEndInput input) {
191         try {
192             for (String computeName : input.getScaleinComputeNames()) {
193                 ComputeNode computeNode;
194                 try {
195                     computeNode = computeNodeManager.getComputeNodeFromName(computeName);
196                     if (computeNode == null) {
197                         LOG.error("Cloudscaler Failed to find the compute {} for scale in end ",
198                                 computeName);
199                         return Futures.immediateFuture(IN_PROGRESS_RPC_RESPONSE);
200                     }
201                 } catch (ReadFailedException e) {
202                     LOG.error("Cloudscaler Failed to read the compute node {}", e.getMessage());
203                     return Futures.immediateFuture(
204                             RpcResultBuilder.<ScaleinComputesEndOutput>failed().withError(
205                                     RpcError.ErrorType.APPLICATION,
206                                     "Failed to read the compute node " + e.getMessage()).build());
207                 }
208                 LOG.info("Cloudscaler Deleting compute node details {}", computeNode);
209                 LOG.info("Cloudscaler Deleting compute node details {}",
210                         buildOpenflowNodeIid(computeNode));
211                 LOG.info("Cloudscaler Deleting compute node details {}", buildOvsdbNodeId(computeNode));
212                 ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
213                     computeNodeManager.deleteComputeNode(tx, computeNode);
214                 }), LOG, "Cloudscaler Failed to delete the compute node");
215                 ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
216                     tx.delete(LogicalDatastoreType.CONFIGURATION, buildOpenflowNodeIid(computeNode));
217                 }), LOG, "Cloudscaler Failed to delete the config inventory");
218                 ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
219                     tx.delete(LogicalDatastoreType.CONFIGURATION, buildOvsdbNodeId(computeNode));
220                 }), LOG, "Cloudscaler Failed to delete the config topology");
221             }
222         } catch (Throwable e) {
223             LOG.error("Cloudscaler Failed to do scale in end {} ", input, e);
224             return Futures.immediateFuture(
225                     RpcResultBuilder.<ScaleinComputesEndOutput>failed().withError(RpcError.ErrorType.APPLICATION,
226                             "Failed to read the transport zone " + e.getMessage()).build());
227         }
228         return Futures.immediateFuture(DONE_RPC_RESPONSE);
229     }
230
231     private InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network
232             .topology.rev131021.network.topology.topology.Node> buildOvsdbNodeId(ComputeNode computeNode) {
233         return InstanceIdentifier
234                 .create(NetworkTopology.class)
235                 .child(Topology.class, new TopologyKey(new TopologyId("ovsdb:1")))
236                 .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network
237                                 .topology.topology.Node.class,
238                         new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021
239                                 .network.topology.topology.NodeKey(new NodeId(computeNode.getNodeid())));
240     }
241
242     private InstanceIdentifier<Node> buildOpenflowNodeIid(ComputeNode computeNode) {
243         return InstanceIdentifier.builder(Nodes.class)
244                 .child(Node.class, new NodeKey(
245                         new org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId(
246                                 "openflow:" + computeNode.getDpnid().toString()))).build();
247     }
248
249     private void tombstoneTheNode(String computeName, ReadWriteTransaction tx, Boolean tombstone) {
250         ComputeNode computeNode = null;
251         try {
252             computeNode = computeNodeManager.getComputeNodeFromName(computeName);
253             if (computeNode == null) { //TODO throw error to rpc
254                 LOG.error("Cloudscaler Node not present to {} {}",
255                         computeName, tombstone ? "tombstone" : "recover");
256                 return;
257             }
258         } catch (ReadFailedException e) {
259             LOG.error("Cloudscaler Failed to {} the compute {} read failed",
260                     tombstone ? "tombstone" : "recover", computeName);
261             return;
262         }
263         ComputeNodeBuilder builder = new ComputeNodeBuilder(computeNode);
264         builder.setTombstoned(tombstone);
265         tx.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION,
266                 computeNodeManager.buildComputeNodeIid(computeName), builder.build());
267     }
268
269     @Override
270     @SuppressWarnings("checkstyle:IllegalCatch")
271     public ListenableFuture<RpcResult<ScaleinComputesTepDeleteOutput>> scaleinComputesTepDelete(
272             ScaleinComputesTepDeleteInput input) {
273         ReadTransaction readTx = this.dataBroker.newReadOnlyTransaction();
274         SettableFuture<RpcResult<ScaleinComputesTepDeleteOutput>> ft = SettableFuture.create();
275         Optional<TransportZones> tz;
276         try {
277             tz = readTx.read(LogicalDatastoreType.CONFIGURATION, InstanceIdentifier.create(TransportZones.class))
278                     .get();
279         } catch (InterruptedException | ExecutionException e) {
280             LOG.error("Cloudscaler Failed to read the transport zone {}", e.getMessage());
281             ft.set(RpcResultBuilder.<ScaleinComputesTepDeleteOutput>failed().withError(RpcError.ErrorType.APPLICATION,
282                     "Failed to read the transport zone " + e.getMessage()).build());
283             return ft;
284         } finally {
285             readTx.close();
286         }
287         try {
288             for (String computeName : input.getScaleinComputeNames()) {
289                 ComputeNode computeNode = null;
290                 try {
291                     computeNode = computeNodeManager.getComputeNodeFromName(computeName);
292                     if (computeNode == null) {
293                         LOG.warn("Cloudscaler Could not find the compute for tep delete {}", computeName);
294                         ft.set(RpcResultBuilder.<ScaleinComputesTepDeleteOutput>success().build());
295                         return ft;
296                     }
297                 } catch (ReadFailedException e) {
298                     LOG.error("Cloudscaler Failed to read the compute node {}", e.getMessage());
299                     ft.set(RpcResultBuilder.<ScaleinComputesTepDeleteOutput>failed()
300                             .withError(RpcError.ErrorType.APPLICATION, "Failed to read the compute node "
301                                     + e.getMessage()).build());
302                     return ft;
303                 }
304                 if (tz.isPresent() && tz.get().getTransportZone() != null) {
305                     for (TransportZone zone : tz.get().getTransportZone()) {
306                         if (zone.getVteps() == null) {
307                             continue;
308                         }
309                         for (Vteps vteps : zone.getVteps().values()) {
310                             if (vteps.getDpnId().equals(computeNode.getDpnid())) {
311                                 InstanceIdentifier<Vteps> dpnVtepIid = InstanceIdentifier
312                                         .create(TransportZones.class)
313                                         .child(TransportZone.class, zone.key())
314                                         .child(Vteps.class, vteps.key());
315                                 LOG.error("Cloudscaler deleting dpn {}", vteps);
316                                 ListenableFutures.addErrorLogging(
317                                         txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
318                                             tx.delete(LogicalDatastoreType.CONFIGURATION, dpnVtepIid);
319                                         }), LOG, "Cloudscaler Failed to delete the itm tep");
320                             }
321                         }
322                     }
323                 }
324             }
325             InstanceIdentifier.create(TransportZones.class)
326                     .child(TransportZone.class)
327                     .child(Vteps.class);
328         } catch (Throwable e) {
329             LOG.error("Failed to read the transport zone ", e);
330             ft.set(RpcResultBuilder.<ScaleinComputesTepDeleteOutput>failed().withError(RpcError.ErrorType.APPLICATION,
331                     "Failed to read the transport zone " + e.getMessage()).build());
332             return ft;
333         }
334         ft.set(RpcResultBuilder.<ScaleinComputesTepDeleteOutput>success().build());
335         return ft;
336     }
337
338     class ItmTepClusteredListener extends AbstractClusteredAsyncDataTreeChangeListener<Vteps> {
339
340         @Inject
341         ItmTepClusteredListener(DataBroker dataBroker) {
342             super(dataBroker, LogicalDatastoreType.OPERATIONAL,InstanceIdentifier.create(TransportZones.class)
343                     .child(TransportZone.class).child(Vteps.class),
344                     Executors.newSingleThreadExecutor("ItmTepClusteredListener", LOG));
345         }
346
347         @Override
348         public void remove(InstanceIdentifier<Vteps> instanceIdentifier, Vteps tep) {
349             tepDeleteTimeStamp.put(tep.getDpnId(), System.currentTimeMillis());
350         }
351
352         @Override
353         public void update(InstanceIdentifier<Vteps> instanceIdentifier, Vteps vteps, Vteps t1) {
354         }
355
356         @Override
357         public void add(InstanceIdentifier<Vteps> instanceIdentifier, Vteps vteps) {
358         }
359     }
360 }