Migrate cloudscaler to mdsal-binding-util
[genius.git] / cloudscaler / impl / src / main / java / org / opendaylight / genius / cloudscaler / rpcservice / CloudscalerRpcServiceImpl.java
1 /*
2  * Copyright (c) 2019 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.cloudscaler.rpcservice;
9
10 import com.google.common.cache.CacheBuilder;
11 import com.google.common.cache.CacheLoader;
12 import com.google.common.cache.LoadingCache;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.SettableFuture;
17 import java.util.Optional;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.TimeUnit;
20 import javax.inject.Inject;
21 import javax.inject.Singleton;
22 import org.opendaylight.infrautils.utils.concurrent.Executors;
23 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
24 import org.opendaylight.mdsal.binding.api.DataBroker;
25 import org.opendaylight.mdsal.binding.api.ReadTransaction;
26 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
27 import org.opendaylight.mdsal.binding.util.Datastore;
28 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
29 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunnerImpl;
30 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
31 import org.opendaylight.mdsal.common.api.ReadFailedException;
32 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.CloudscalerRpcService;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesEndInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesEndOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesEndOutputBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesRecoverInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesRecoverOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesStartInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesStartOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesTepDeleteInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesTepDeleteOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.compute.nodes.ComputeNode;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.compute.nodes.ComputeNodeBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.TransportZones;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.transport.zones.TransportZone;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.transport.zones.transport.zone.Vteps;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
54 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
55 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.opendaylight.yangtools.yang.common.RpcError;
58 import org.opendaylight.yangtools.yang.common.RpcResult;
59 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
60 import org.opendaylight.yangtools.yang.common.Uint64;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 @Singleton
65 public class CloudscalerRpcServiceImpl implements CloudscalerRpcService {
66
67     private static final Logger LOG = LoggerFactory.getLogger("GeniusEventLogger");
68
69     private static final Integer DELETE_DELAY = Integer.getInteger(
70             "scale.in.end.delay.inventory.delete.in.secs", 120);
71     private static ScaleinComputesEndOutput IN_PROGRESS = new ScaleinComputesEndOutputBuilder()
72             .setStatus("INPROGRESS")
73             .build();
74
75     private static ScaleinComputesEndOutput DONE = new ScaleinComputesEndOutputBuilder()
76             .setStatus("DONE")
77             .build();
78
79     private static  RpcResult<ScaleinComputesEndOutput> IN_PROGRESS_RPC_RESPONSE = RpcResultBuilder
80             .<ScaleinComputesEndOutput>success().withResult(IN_PROGRESS).build();
81
82     private static RpcResult<ScaleinComputesEndOutput> DONE_RPC_RESPONSE = RpcResultBuilder
83             .<ScaleinComputesEndOutput>success().withResult(DONE).build();
84
85     private final DataBroker dataBroker;
86     private final ComputeNodeManager computeNodeManager;
87     private final ManagedNewTransactionRunner txRunner;
88     //private final ItmTepClusteredListener itmTepClusteredListener;
89
90     //The following timestamp is not persisted across reboots
91     //upon reboot the timestamp will have a default value of that system timestamp
92     //this way scalein end that is triggered after cluster reboot will still honour the 2 min delay
93     private final LoadingCache<Uint64, Long> tepDeleteTimeStamp = CacheBuilder.newBuilder()
94             .expireAfterWrite(60, TimeUnit.MINUTES)
95             .build(new CacheLoader<Uint64, Long>() {
96                 @Override
97                 public Long load(Uint64 dpnId) {
98                     return System.currentTimeMillis();
99                 }
100             });
101
102     public static final FutureCallback<Void> DEFAULT_CALLBACK = new FutureCallback<>() {
103         @Override
104         public void onSuccess(Void result) {
105             LOG.debug("Success in Datastore operation");
106         }
107
108         @Override
109         public void onFailure(Throwable error) {
110             LOG.error("Error in Datastore operation", error);
111         }
112     };
113
114     @Inject
115     public CloudscalerRpcServiceImpl(DataBroker dataBroker,
116                                      ComputeNodeManager computeNodeManager) {
117         this.dataBroker = dataBroker;
118         this.computeNodeManager = computeNodeManager;
119         this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
120         //this.itmTepClusteredListener = new ItmTepClusteredListener(dataBroker);
121     }
122
123     @Override
124     public ListenableFuture<RpcResult<ScaleinComputesStartOutput>> scaleinComputesStart(
125             ScaleinComputesStartInput input) {
126         ReadWriteTransaction tx = this.dataBroker.newReadWriteTransaction();
127         SettableFuture<RpcResult<ScaleinComputesStartOutput>> ft = SettableFuture.create();
128         input.getScaleinComputeNames().forEach(s -> tombstoneTheNode(s, tx, true));
129         input.getScaleinComputeNames().forEach(s -> LOG.info("Cloudscaler scalein-start {}", s));
130         try {
131             tx.commit().get();
132         } catch (InterruptedException | ExecutionException e) {
133             LOG.error("Failed to tombstone all the nodes ", e);
134             ft.set(RpcResultBuilder.<ScaleinComputesStartOutput>failed().withError(RpcError.ErrorType.APPLICATION,
135                             "Failed to tombstone all the nodes " + e.getMessage()).build());
136             return ft;
137         }
138         ft.set(RpcResultBuilder.<ScaleinComputesStartOutput>success().build());
139         return ft;
140     }
141
142     @Override
143     public ListenableFuture<RpcResult<ScaleinComputesRecoverOutput>> scaleinComputesRecover(
144             ScaleinComputesRecoverInput input) {
145         ReadWriteTransaction tx = this.dataBroker.newReadWriteTransaction();
146         SettableFuture<RpcResult<ScaleinComputesRecoverOutput>> ft = SettableFuture.create();
147         input.getRecoverComputeNames().forEach(s -> tombstoneTheNode(s, tx, false));
148         input.getRecoverComputeNames().forEach(s -> LOG.info("Cloudscaler scalein-recover {}", s));
149         try {
150             tx.commit().get();
151         } catch (InterruptedException | ExecutionException e) {
152             LOG.error("Failed to recover all the nodes ", e);
153             ft.set(RpcResultBuilder.<ScaleinComputesRecoverOutput>failed().withError(RpcError.ErrorType.APPLICATION,
154                             "Failed to recover all the nodes " + e.getMessage()).build());
155             return ft;
156         }
157         //LOG.info("Recovered the nodes {}", input);
158         ft.set(RpcResultBuilder.<ScaleinComputesRecoverOutput>success().build());
159         return ft;
160     }
161
162     @Override
163     @SuppressWarnings("checkstyle:IllegalCatch")
164     public ListenableFuture<RpcResult<ScaleinComputesEndOutput>> scaleinComputesEnd(ScaleinComputesEndInput input) {
165         LOG.error("Cloudscaler scalein-end {}", input);
166         try {
167             for (String computeName : input.getScaleinComputeNames()) {
168                 ComputeNode computeNode = computeNodeManager.getComputeNodeFromName(computeName);
169                 if (computeNode == null) {
170                     LOG.warn("Cloudscaler Failed to find the compute {} for scale in end ", computeName);
171                     return Futures.immediateFuture(DONE_RPC_RESPONSE);
172                 }
173                 Long tepDeletedTimeStamp = tepDeleteTimeStamp.get(computeNode.getDpnid());
174                 Long currentTime = System.currentTimeMillis();
175                 if (currentTime - tepDeletedTimeStamp > DELETE_DELAY * 1000L) {
176                     scaleinComputesEnd2(input);
177                 } else {
178                     return Futures.immediateFuture(IN_PROGRESS_RPC_RESPONSE);
179                 }
180             }
181         } catch (Exception e) {
182             LOG.error("Cloudscaler Failed scalein-end ", e);
183             return Futures.immediateFuture(
184                     RpcResultBuilder.<ScaleinComputesEndOutput>failed().withError(RpcError.ErrorType.APPLICATION,
185                             "Failed to read the compute node " + e.getMessage()).build());
186         }
187         return Futures.immediateFuture(DONE_RPC_RESPONSE);
188     }
189
190     @SuppressWarnings("checkstyle:IllegalCatch")
191     public ListenableFuture<RpcResult<ScaleinComputesEndOutput>> scaleinComputesEnd2(ScaleinComputesEndInput input) {
192         try {
193             for (String computeName : input.getScaleinComputeNames()) {
194                 ComputeNode computeNode;
195                 try {
196                     computeNode = computeNodeManager.getComputeNodeFromName(computeName);
197                     if (computeNode == null) {
198                         LOG.error("Cloudscaler Failed to find the compute {} for scale in end ",
199                                 computeName);
200                         return Futures.immediateFuture(IN_PROGRESS_RPC_RESPONSE);
201                     }
202                 } catch (ReadFailedException e) {
203                     LOG.error("Cloudscaler Failed to read the compute node {}", e.getMessage());
204                     return Futures.immediateFuture(
205                             RpcResultBuilder.<ScaleinComputesEndOutput>failed().withError(
206                                     RpcError.ErrorType.APPLICATION,
207                                     "Failed to read the compute node " + e.getMessage()).build());
208                 }
209                 LOG.info("Cloudscaler Deleting compute node details {}", computeNode);
210                 LOG.info("Cloudscaler Deleting compute node details {}",
211                         buildOpenflowNodeIid(computeNode));
212                 LOG.info("Cloudscaler Deleting compute node details {}", buildOvsdbNodeId(computeNode));
213                 // FIXME: why don't we run this as one transaction?!
214                 LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(
215                     Datastore.CONFIGURATION, tx -> {
216                         computeNodeManager.deleteComputeNode(tx, computeNode);
217                     }), LOG, "Cloudscaler Failed to delete the compute node");
218                 LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(
219                     Datastore.CONFIGURATION, tx -> {
220                         tx.delete(buildOpenflowNodeIid(computeNode));
221                     }), LOG, "Cloudscaler Failed to delete the config inventory");
222                 LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(
223                     Datastore.CONFIGURATION, tx -> {
224                         tx.delete(buildOvsdbNodeId(computeNode));
225                     }), LOG, "Cloudscaler Failed to delete the config topology");
226             }
227         } catch (Throwable e) {
228             LOG.error("Cloudscaler Failed to do scale in end {} ", input, e);
229             return Futures.immediateFuture(
230                     RpcResultBuilder.<ScaleinComputesEndOutput>failed().withError(RpcError.ErrorType.APPLICATION,
231                             "Failed to read the transport zone " + e.getMessage()).build());
232         }
233         return Futures.immediateFuture(DONE_RPC_RESPONSE);
234     }
235
236     private InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network
237             .topology.rev131021.network.topology.topology.Node> buildOvsdbNodeId(ComputeNode computeNode) {
238         return InstanceIdentifier
239                 .create(NetworkTopology.class)
240                 .child(Topology.class, new TopologyKey(new TopologyId("ovsdb:1")))
241                 .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network
242                                 .topology.topology.Node.class,
243                         new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021
244                                 .network.topology.topology.NodeKey(new NodeId(computeNode.getNodeid())));
245     }
246
247     private InstanceIdentifier<Node> buildOpenflowNodeIid(ComputeNode computeNode) {
248         return InstanceIdentifier.builder(Nodes.class)
249                 .child(Node.class, new NodeKey(
250                         new org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId(
251                                 "openflow:" + computeNode.getDpnid().toString()))).build();
252     }
253
254     private void tombstoneTheNode(String computeName, ReadWriteTransaction tx, Boolean tombstone) {
255         ComputeNode computeNode = null;
256         try {
257             computeNode = computeNodeManager.getComputeNodeFromName(computeName);
258             if (computeNode == null) { //TODO throw error to rpc
259                 LOG.error("Cloudscaler Node not present to {} {}",
260                         computeName, tombstone ? "tombstone" : "recover");
261                 return;
262             }
263         } catch (ReadFailedException e) {
264             LOG.error("Cloudscaler Failed to {} the compute {} read failed",
265                     tombstone ? "tombstone" : "recover", computeName);
266             return;
267         }
268         ComputeNodeBuilder builder = new ComputeNodeBuilder(computeNode);
269         builder.setTombstoned(tombstone);
270         tx.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION,
271                 computeNodeManager.buildComputeNodeIid(computeName), builder.build());
272     }
273
274     @Override
275     @SuppressWarnings("checkstyle:IllegalCatch")
276     public ListenableFuture<RpcResult<ScaleinComputesTepDeleteOutput>> scaleinComputesTepDelete(
277             ScaleinComputesTepDeleteInput input) {
278         ReadTransaction readTx = this.dataBroker.newReadOnlyTransaction();
279         SettableFuture<RpcResult<ScaleinComputesTepDeleteOutput>> ft = SettableFuture.create();
280         Optional<TransportZones> tz;
281         try {
282             tz = readTx.read(LogicalDatastoreType.CONFIGURATION, InstanceIdentifier.create(TransportZones.class))
283                     .get();
284         } catch (InterruptedException | ExecutionException e) {
285             LOG.error("Cloudscaler Failed to read the transport zone {}", e.getMessage());
286             ft.set(RpcResultBuilder.<ScaleinComputesTepDeleteOutput>failed().withError(RpcError.ErrorType.APPLICATION,
287                     "Failed to read the transport zone " + e.getMessage()).build());
288             return ft;
289         } finally {
290             readTx.close();
291         }
292         try {
293             for (String computeName : input.getScaleinComputeNames()) {
294                 ComputeNode computeNode = null;
295                 try {
296                     computeNode = computeNodeManager.getComputeNodeFromName(computeName);
297                     if (computeNode == null) {
298                         LOG.warn("Cloudscaler Could not find the compute for tep delete {}", computeName);
299                         ft.set(RpcResultBuilder.<ScaleinComputesTepDeleteOutput>success().build());
300                         return ft;
301                     }
302                 } catch (ReadFailedException e) {
303                     LOG.error("Cloudscaler Failed to read the compute node {}", e.getMessage());
304                     ft.set(RpcResultBuilder.<ScaleinComputesTepDeleteOutput>failed()
305                             .withError(RpcError.ErrorType.APPLICATION, "Failed to read the compute node "
306                                     + e.getMessage()).build());
307                     return ft;
308                 }
309                 if (tz.isPresent() && tz.get().getTransportZone() != null) {
310                     for (TransportZone zone : tz.get().getTransportZone()) {
311                         if (zone.getVteps() == null) {
312                             continue;
313                         }
314                         for (Vteps vteps : zone.getVteps().values()) {
315                             if (vteps.getDpnId().equals(computeNode.getDpnid())) {
316                                 InstanceIdentifier<Vteps> dpnVtepIid = InstanceIdentifier
317                                         .create(TransportZones.class)
318                                         .child(TransportZone.class, zone.key())
319                                         .child(Vteps.class, vteps.key());
320                                 LOG.error("Cloudscaler deleting dpn {}", vteps);
321                                 LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(
322                                     Datastore.CONFIGURATION, tx -> {
323                                         tx.delete(dpnVtepIid);
324                                     }), LOG, "Cloudscaler Failed to delete the itm tep");
325                             }
326                         }
327                     }
328                 }
329             }
330             InstanceIdentifier.create(TransportZones.class)
331                     .child(TransportZone.class)
332                     .child(Vteps.class);
333         } catch (Throwable e) {
334             LOG.error("Failed to read the transport zone ", e);
335             ft.set(RpcResultBuilder.<ScaleinComputesTepDeleteOutput>failed().withError(RpcError.ErrorType.APPLICATION,
336                     "Failed to read the transport zone " + e.getMessage()).build());
337             return ft;
338         }
339         ft.set(RpcResultBuilder.<ScaleinComputesTepDeleteOutput>success().build());
340         return ft;
341     }
342
343     class ItmTepClusteredListener extends AbstractClusteredAsyncDataTreeChangeListener<Vteps> {
344
345         @Inject
346         ItmTepClusteredListener(DataBroker dataBroker) {
347             super(dataBroker, LogicalDatastoreType.OPERATIONAL,InstanceIdentifier.create(TransportZones.class)
348                     .child(TransportZone.class).child(Vteps.class),
349                     Executors.newSingleThreadExecutor("ItmTepClusteredListener", LOG));
350         }
351
352         @Override
353         public void remove(InstanceIdentifier<Vteps> instanceIdentifier, Vteps tep) {
354             tepDeleteTimeStamp.put(tep.getDpnId(), System.currentTimeMillis());
355         }
356
357         @Override
358         public void update(InstanceIdentifier<Vteps> instanceIdentifier, Vteps vteps, Vteps t1) {
359         }
360
361         @Override
362         public void add(InstanceIdentifier<Vteps> instanceIdentifier, Vteps vteps) {
363         }
364     }
365 }