2 * Copyright (c) 2019 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.cloudscaler.rpcservice;
10 import com.google.common.cache.CacheBuilder;
11 import com.google.common.cache.CacheLoader;
12 import com.google.common.cache.LoadingCache;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.SettableFuture;
17 import java.util.Optional;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.TimeUnit;
20 import javax.inject.Inject;
21 import javax.inject.Singleton;
22 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
23 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
24 import org.opendaylight.infrautils.utils.concurrent.Executors;
25 import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
26 import org.opendaylight.mdsal.binding.api.DataBroker;
27 import org.opendaylight.mdsal.binding.api.ReadTransaction;
28 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
29 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
30 import org.opendaylight.mdsal.common.api.ReadFailedException;
31 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.CloudscalerRpcService;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesEndInput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesEndOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesEndOutputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesRecoverInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesRecoverOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesStartInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesStartOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesTepDeleteInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesTepDeleteOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.compute.nodes.ComputeNode;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.compute.nodes.ComputeNodeBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.TransportZones;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.transport.zones.TransportZone;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.transport.zones.transport.zone.Vteps;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
54 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
55 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
56 import org.opendaylight.yangtools.yang.common.RpcError;
57 import org.opendaylight.yangtools.yang.common.RpcResult;
58 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
59 import org.opendaylight.yangtools.yang.common.Uint64;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
64 public class CloudscalerRpcServiceImpl implements CloudscalerRpcService {
66 private static final Logger LOG = LoggerFactory.getLogger("GeniusEventLogger");
68 private static final Integer DELETE_DELAY = Integer.getInteger(
69 "scale.in.end.delay.inventory.delete.in.secs", 120);
70 private static ScaleinComputesEndOutput IN_PROGRESS = new ScaleinComputesEndOutputBuilder()
71 .setStatus("INPROGRESS")
74 private static ScaleinComputesEndOutput DONE = new ScaleinComputesEndOutputBuilder()
78 private static RpcResult<ScaleinComputesEndOutput> IN_PROGRESS_RPC_RESPONSE = RpcResultBuilder
79 .<ScaleinComputesEndOutput>success().withResult(IN_PROGRESS).build();
81 private static RpcResult<ScaleinComputesEndOutput> DONE_RPC_RESPONSE = RpcResultBuilder
82 .<ScaleinComputesEndOutput>success().withResult(DONE).build();
84 private final DataBroker dataBroker;
85 private final ComputeNodeManager computeNodeManager;
86 private final ManagedNewTransactionRunner txRunner;
87 //private final ItmTepClusteredListener itmTepClusteredListener;
89 //The following timestamp is not persisted across reboots
90 //upon reboot the timestamp will have a default value of that system timestamp
91 //this way scalein end that is triggered after cluster reboot will still honour the 2 min delay
92 private final LoadingCache<Uint64, Long> tepDeleteTimeStamp = CacheBuilder.newBuilder()
93 .expireAfterWrite(60, TimeUnit.MINUTES)
94 .build(new CacheLoader<Uint64, Long>() {
96 public Long load(Uint64 dpnId) {
97 return System.currentTimeMillis();
101 public static final FutureCallback<Void> DEFAULT_CALLBACK = new FutureCallback<>() {
103 public void onSuccess(Void result) {
104 LOG.debug("Success in Datastore operation");
108 public void onFailure(Throwable error) {
109 LOG.error("Error in Datastore operation", error);
114 public CloudscalerRpcServiceImpl(DataBroker dataBroker,
115 ComputeNodeManager computeNodeManager) {
116 this.dataBroker = dataBroker;
117 this.computeNodeManager = computeNodeManager;
118 this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
119 //this.itmTepClusteredListener = new ItmTepClusteredListener(dataBroker);
123 public ListenableFuture<RpcResult<ScaleinComputesStartOutput>> scaleinComputesStart(
124 ScaleinComputesStartInput input) {
125 ReadWriteTransaction tx = this.dataBroker.newReadWriteTransaction();
126 SettableFuture<RpcResult<ScaleinComputesStartOutput>> ft = SettableFuture.create();
127 input.getScaleinComputeNames().forEach(s -> tombstoneTheNode(s, tx, true));
128 input.getScaleinComputeNames().forEach(s -> LOG.info("Cloudscaler scalein-start {}", s));
131 } catch (InterruptedException | ExecutionException e) {
132 LOG.error("Failed to tombstone all the nodes ", e);
133 ft.set(RpcResultBuilder.<ScaleinComputesStartOutput>failed().withError(RpcError.ErrorType.APPLICATION,
134 "Failed to tombstone all the nodes " + e.getMessage()).build());
137 ft.set(RpcResultBuilder.<ScaleinComputesStartOutput>success().build());
142 public ListenableFuture<RpcResult<ScaleinComputesRecoverOutput>> scaleinComputesRecover(
143 ScaleinComputesRecoverInput input) {
144 ReadWriteTransaction tx = this.dataBroker.newReadWriteTransaction();
145 SettableFuture<RpcResult<ScaleinComputesRecoverOutput>> ft = SettableFuture.create();
146 input.getRecoverComputeNames().forEach(s -> tombstoneTheNode(s, tx, false));
147 input.getRecoverComputeNames().forEach(s -> LOG.info("Cloudscaler scalein-recover {}", s));
150 } catch (InterruptedException | ExecutionException e) {
151 LOG.error("Failed to recover all the nodes ", e);
152 ft.set(RpcResultBuilder.<ScaleinComputesRecoverOutput>failed().withError(RpcError.ErrorType.APPLICATION,
153 "Failed to recover all the nodes " + e.getMessage()).build());
156 //LOG.info("Recovered the nodes {}", input);
157 ft.set(RpcResultBuilder.<ScaleinComputesRecoverOutput>success().build());
162 @SuppressWarnings("checkstyle:IllegalCatch")
163 public ListenableFuture<RpcResult<ScaleinComputesEndOutput>> scaleinComputesEnd(ScaleinComputesEndInput input) {
164 LOG.error("Cloudscaler scalein-end {}", input);
166 for (String computeName : input.getScaleinComputeNames()) {
167 ComputeNode computeNode = computeNodeManager.getComputeNodeFromName(computeName);
168 if (computeNode == null) {
169 LOG.warn("Cloudscaler Failed to find the compute {} for scale in end ", computeName);
170 return Futures.immediateFuture(DONE_RPC_RESPONSE);
172 Long tepDeletedTimeStamp = tepDeleteTimeStamp.get(computeNode.getDpnid());
173 Long currentTime = System.currentTimeMillis();
174 if (currentTime - tepDeletedTimeStamp > DELETE_DELAY * 1000L) {
175 scaleinComputesEnd2(input);
177 return Futures.immediateFuture(IN_PROGRESS_RPC_RESPONSE);
180 } catch (Exception e) {
181 LOG.error("Cloudscaler Failed scalein-end ", e);
182 return Futures.immediateFuture(
183 RpcResultBuilder.<ScaleinComputesEndOutput>failed().withError(RpcError.ErrorType.APPLICATION,
184 "Failed to read the compute node " + e.getMessage()).build());
186 return Futures.immediateFuture(DONE_RPC_RESPONSE);
189 @SuppressWarnings("checkstyle:IllegalCatch")
190 public ListenableFuture<RpcResult<ScaleinComputesEndOutput>> scaleinComputesEnd2(ScaleinComputesEndInput input) {
192 for (String computeName : input.getScaleinComputeNames()) {
193 ComputeNode computeNode;
195 computeNode = computeNodeManager.getComputeNodeFromName(computeName);
196 if (computeNode == null) {
197 LOG.error("Cloudscaler Failed to find the compute {} for scale in end ",
199 return Futures.immediateFuture(IN_PROGRESS_RPC_RESPONSE);
201 } catch (ReadFailedException e) {
202 LOG.error("Cloudscaler Failed to read the compute node {}", e.getMessage());
203 return Futures.immediateFuture(
204 RpcResultBuilder.<ScaleinComputesEndOutput>failed().withError(
205 RpcError.ErrorType.APPLICATION,
206 "Failed to read the compute node " + e.getMessage()).build());
208 LOG.info("Cloudscaler Deleting compute node details {}", computeNode);
209 LOG.info("Cloudscaler Deleting compute node details {}",
210 buildOpenflowNodeIid(computeNode));
211 LOG.info("Cloudscaler Deleting compute node details {}", buildOvsdbNodeId(computeNode));
212 ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
213 computeNodeManager.deleteComputeNode(tx, computeNode);
214 }), LOG, "Cloudscaler Failed to delete the compute node");
215 ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
216 tx.delete(LogicalDatastoreType.CONFIGURATION, buildOpenflowNodeIid(computeNode));
217 }), LOG, "Cloudscaler Failed to delete the config inventory");
218 ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
219 tx.delete(LogicalDatastoreType.CONFIGURATION, buildOvsdbNodeId(computeNode));
220 }), LOG, "Cloudscaler Failed to delete the config topology");
222 } catch (Throwable e) {
223 LOG.error("Cloudscaler Failed to do scale in end {} ", input, e);
224 return Futures.immediateFuture(
225 RpcResultBuilder.<ScaleinComputesEndOutput>failed().withError(RpcError.ErrorType.APPLICATION,
226 "Failed to read the transport zone " + e.getMessage()).build());
228 return Futures.immediateFuture(DONE_RPC_RESPONSE);
231 private InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network
232 .topology.rev131021.network.topology.topology.Node> buildOvsdbNodeId(ComputeNode computeNode) {
233 return InstanceIdentifier
234 .create(NetworkTopology.class)
235 .child(Topology.class, new TopologyKey(new TopologyId("ovsdb:1")))
236 .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network
237 .topology.topology.Node.class,
238 new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021
239 .network.topology.topology.NodeKey(new NodeId(computeNode.getNodeid())));
242 private InstanceIdentifier<Node> buildOpenflowNodeIid(ComputeNode computeNode) {
243 return InstanceIdentifier.builder(Nodes.class)
244 .child(Node.class, new NodeKey(
245 new org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId(
246 "openflow:" + computeNode.getDpnid().toString()))).build();
249 private void tombstoneTheNode(String computeName, ReadWriteTransaction tx, Boolean tombstone) {
250 ComputeNode computeNode = null;
252 computeNode = computeNodeManager.getComputeNodeFromName(computeName);
253 if (computeNode == null) { //TODO throw error to rpc
254 LOG.error("Cloudscaler Node not present to {} {}",
255 computeName, tombstone ? "tombstone" : "recover");
258 } catch (ReadFailedException e) {
259 LOG.error("Cloudscaler Failed to {} the compute {} read failed",
260 tombstone ? "tombstone" : "recover", computeName);
263 ComputeNodeBuilder builder = new ComputeNodeBuilder(computeNode);
264 builder.setTombstoned(tombstone);
265 tx.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION,
266 computeNodeManager.buildComputeNodeIid(computeName), builder.build());
270 @SuppressWarnings("checkstyle:IllegalCatch")
271 public ListenableFuture<RpcResult<ScaleinComputesTepDeleteOutput>> scaleinComputesTepDelete(
272 ScaleinComputesTepDeleteInput input) {
273 ReadTransaction readTx = this.dataBroker.newReadOnlyTransaction();
274 SettableFuture<RpcResult<ScaleinComputesTepDeleteOutput>> ft = SettableFuture.create();
275 Optional<TransportZones> tz;
277 tz = readTx.read(LogicalDatastoreType.CONFIGURATION, InstanceIdentifier.create(TransportZones.class))
279 } catch (InterruptedException | ExecutionException e) {
280 LOG.error("Cloudscaler Failed to read the transport zone {}", e.getMessage());
281 ft.set(RpcResultBuilder.<ScaleinComputesTepDeleteOutput>failed().withError(RpcError.ErrorType.APPLICATION,
282 "Failed to read the transport zone " + e.getMessage()).build());
288 for (String computeName : input.getScaleinComputeNames()) {
289 ComputeNode computeNode = null;
291 computeNode = computeNodeManager.getComputeNodeFromName(computeName);
292 if (computeNode == null) {
293 LOG.warn("Cloudscaler Could not find the compute for tep delete {}", computeName);
294 ft.set(RpcResultBuilder.<ScaleinComputesTepDeleteOutput>success().build());
297 } catch (ReadFailedException e) {
298 LOG.error("Cloudscaler Failed to read the compute node {}", e.getMessage());
299 ft.set(RpcResultBuilder.<ScaleinComputesTepDeleteOutput>failed()
300 .withError(RpcError.ErrorType.APPLICATION, "Failed to read the compute node "
301 + e.getMessage()).build());
304 if (tz.isPresent() && tz.get().getTransportZone() != null) {
305 for (TransportZone zone : tz.get().getTransportZone()) {
306 if (zone.getVteps() == null) {
309 for (Vteps vteps : zone.getVteps().values()) {
310 if (vteps.getDpnId().equals(computeNode.getDpnid())) {
311 InstanceIdentifier<Vteps> dpnVtepIid = InstanceIdentifier
312 .create(TransportZones.class)
313 .child(TransportZone.class, zone.key())
314 .child(Vteps.class, vteps.key());
315 LOG.error("Cloudscaler deleting dpn {}", vteps);
316 ListenableFutures.addErrorLogging(
317 txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
318 tx.delete(LogicalDatastoreType.CONFIGURATION, dpnVtepIid);
319 }), LOG, "Cloudscaler Failed to delete the itm tep");
325 InstanceIdentifier.create(TransportZones.class)
326 .child(TransportZone.class)
328 } catch (Throwable e) {
329 LOG.error("Failed to read the transport zone ", e);
330 ft.set(RpcResultBuilder.<ScaleinComputesTepDeleteOutput>failed().withError(RpcError.ErrorType.APPLICATION,
331 "Failed to read the transport zone " + e.getMessage()).build());
334 ft.set(RpcResultBuilder.<ScaleinComputesTepDeleteOutput>success().build());
338 class ItmTepClusteredListener extends AbstractClusteredAsyncDataTreeChangeListener<Vteps> {
341 ItmTepClusteredListener(DataBroker dataBroker) {
342 super(dataBroker, LogicalDatastoreType.OPERATIONAL,InstanceIdentifier.create(TransportZones.class)
343 .child(TransportZone.class).child(Vteps.class),
344 Executors.newSingleThreadExecutor("ItmTepClusteredListener", LOG));
348 public void remove(InstanceIdentifier<Vteps> instanceIdentifier, Vteps tep) {
349 tepDeleteTimeStamp.put(tep.getDpnId(), System.currentTimeMillis());
353 public void update(InstanceIdentifier<Vteps> instanceIdentifier, Vteps vteps, Vteps t1) {
357 public void add(InstanceIdentifier<Vteps> instanceIdentifier, Vteps vteps) {