2 * Copyright (c) 2019 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.cloudscaler.rpcservice;
10 import com.google.common.cache.CacheBuilder;
11 import com.google.common.cache.CacheLoader;
12 import com.google.common.cache.LoadingCache;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.SettableFuture;
17 import java.util.Optional;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.TimeUnit;
20 import javax.inject.Inject;
21 import javax.inject.Singleton;
22 import org.opendaylight.infrautils.utils.concurrent.Executors;
23 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
24 import org.opendaylight.mdsal.binding.api.DataBroker;
25 import org.opendaylight.mdsal.binding.api.ReadTransaction;
26 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
27 import org.opendaylight.mdsal.binding.util.Datastore;
28 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
29 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunnerImpl;
30 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
31 import org.opendaylight.mdsal.common.api.ReadFailedException;
32 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.CloudscalerRpcService;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesEndInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesEndOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesEndOutputBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesRecoverInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesRecoverOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesStartInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesStartOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesTepDeleteInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.ScaleinComputesTepDeleteOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.compute.nodes.ComputeNode;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.cloudscaler.rpcs.rev171220.compute.nodes.ComputeNodeBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.TransportZones;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.transport.zones.TransportZone;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.transport.zones.transport.zone.Vteps;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
54 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
55 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.opendaylight.yangtools.yang.common.RpcError;
58 import org.opendaylight.yangtools.yang.common.RpcResult;
59 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
60 import org.opendaylight.yangtools.yang.common.Uint64;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
65 public class CloudscalerRpcServiceImpl implements CloudscalerRpcService {
67 private static final Logger LOG = LoggerFactory.getLogger("GeniusEventLogger");
69 private static final Integer DELETE_DELAY = Integer.getInteger(
70 "scale.in.end.delay.inventory.delete.in.secs", 120);
71 private static ScaleinComputesEndOutput IN_PROGRESS = new ScaleinComputesEndOutputBuilder()
72 .setStatus("INPROGRESS")
75 private static ScaleinComputesEndOutput DONE = new ScaleinComputesEndOutputBuilder()
79 private static RpcResult<ScaleinComputesEndOutput> IN_PROGRESS_RPC_RESPONSE = RpcResultBuilder
80 .<ScaleinComputesEndOutput>success().withResult(IN_PROGRESS).build();
82 private static RpcResult<ScaleinComputesEndOutput> DONE_RPC_RESPONSE = RpcResultBuilder
83 .<ScaleinComputesEndOutput>success().withResult(DONE).build();
85 private final DataBroker dataBroker;
86 private final ComputeNodeManager computeNodeManager;
87 private final ManagedNewTransactionRunner txRunner;
88 //private final ItmTepClusteredListener itmTepClusteredListener;
90 //The following timestamp is not persisted across reboots
91 //upon reboot the timestamp will have a default value of that system timestamp
92 //this way scalein end that is triggered after cluster reboot will still honour the 2 min delay
93 private final LoadingCache<Uint64, Long> tepDeleteTimeStamp = CacheBuilder.newBuilder()
94 .expireAfterWrite(60, TimeUnit.MINUTES)
95 .build(new CacheLoader<Uint64, Long>() {
97 public Long load(Uint64 dpnId) {
98 return System.currentTimeMillis();
102 public static final FutureCallback<Void> DEFAULT_CALLBACK = new FutureCallback<>() {
104 public void onSuccess(Void result) {
105 LOG.debug("Success in Datastore operation");
109 public void onFailure(Throwable error) {
110 LOG.error("Error in Datastore operation", error);
115 public CloudscalerRpcServiceImpl(DataBroker dataBroker,
116 ComputeNodeManager computeNodeManager) {
117 this.dataBroker = dataBroker;
118 this.computeNodeManager = computeNodeManager;
119 this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
120 //this.itmTepClusteredListener = new ItmTepClusteredListener(dataBroker);
124 public ListenableFuture<RpcResult<ScaleinComputesStartOutput>> scaleinComputesStart(
125 ScaleinComputesStartInput input) {
126 ReadWriteTransaction tx = this.dataBroker.newReadWriteTransaction();
127 SettableFuture<RpcResult<ScaleinComputesStartOutput>> ft = SettableFuture.create();
128 input.getScaleinComputeNames().forEach(s -> tombstoneTheNode(s, tx, true));
129 input.getScaleinComputeNames().forEach(s -> LOG.info("Cloudscaler scalein-start {}", s));
132 } catch (InterruptedException | ExecutionException e) {
133 LOG.error("Failed to tombstone all the nodes ", e);
134 ft.set(RpcResultBuilder.<ScaleinComputesStartOutput>failed().withError(RpcError.ErrorType.APPLICATION,
135 "Failed to tombstone all the nodes " + e.getMessage()).build());
138 ft.set(RpcResultBuilder.<ScaleinComputesStartOutput>success().build());
143 public ListenableFuture<RpcResult<ScaleinComputesRecoverOutput>> scaleinComputesRecover(
144 ScaleinComputesRecoverInput input) {
145 ReadWriteTransaction tx = this.dataBroker.newReadWriteTransaction();
146 SettableFuture<RpcResult<ScaleinComputesRecoverOutput>> ft = SettableFuture.create();
147 input.getRecoverComputeNames().forEach(s -> tombstoneTheNode(s, tx, false));
148 input.getRecoverComputeNames().forEach(s -> LOG.info("Cloudscaler scalein-recover {}", s));
151 } catch (InterruptedException | ExecutionException e) {
152 LOG.error("Failed to recover all the nodes ", e);
153 ft.set(RpcResultBuilder.<ScaleinComputesRecoverOutput>failed().withError(RpcError.ErrorType.APPLICATION,
154 "Failed to recover all the nodes " + e.getMessage()).build());
157 //LOG.info("Recovered the nodes {}", input);
158 ft.set(RpcResultBuilder.<ScaleinComputesRecoverOutput>success().build());
163 @SuppressWarnings("checkstyle:IllegalCatch")
164 public ListenableFuture<RpcResult<ScaleinComputesEndOutput>> scaleinComputesEnd(ScaleinComputesEndInput input) {
165 LOG.error("Cloudscaler scalein-end {}", input);
167 for (String computeName : input.getScaleinComputeNames()) {
168 ComputeNode computeNode = computeNodeManager.getComputeNodeFromName(computeName);
169 if (computeNode == null) {
170 LOG.warn("Cloudscaler Failed to find the compute {} for scale in end ", computeName);
171 return Futures.immediateFuture(DONE_RPC_RESPONSE);
173 Long tepDeletedTimeStamp = tepDeleteTimeStamp.get(computeNode.getDpnid());
174 Long currentTime = System.currentTimeMillis();
175 if (currentTime - tepDeletedTimeStamp > DELETE_DELAY * 1000L) {
176 scaleinComputesEnd2(input);
178 return Futures.immediateFuture(IN_PROGRESS_RPC_RESPONSE);
181 } catch (Exception e) {
182 LOG.error("Cloudscaler Failed scalein-end ", e);
183 return Futures.immediateFuture(
184 RpcResultBuilder.<ScaleinComputesEndOutput>failed().withError(RpcError.ErrorType.APPLICATION,
185 "Failed to read the compute node " + e.getMessage()).build());
187 return Futures.immediateFuture(DONE_RPC_RESPONSE);
190 @SuppressWarnings("checkstyle:IllegalCatch")
191 public ListenableFuture<RpcResult<ScaleinComputesEndOutput>> scaleinComputesEnd2(ScaleinComputesEndInput input) {
193 for (String computeName : input.getScaleinComputeNames()) {
194 ComputeNode computeNode;
196 computeNode = computeNodeManager.getComputeNodeFromName(computeName);
197 if (computeNode == null) {
198 LOG.error("Cloudscaler Failed to find the compute {} for scale in end ",
200 return Futures.immediateFuture(IN_PROGRESS_RPC_RESPONSE);
202 } catch (ReadFailedException e) {
203 LOG.error("Cloudscaler Failed to read the compute node {}", e.getMessage());
204 return Futures.immediateFuture(
205 RpcResultBuilder.<ScaleinComputesEndOutput>failed().withError(
206 RpcError.ErrorType.APPLICATION,
207 "Failed to read the compute node " + e.getMessage()).build());
209 LOG.info("Cloudscaler Deleting compute node details {}", computeNode);
210 LOG.info("Cloudscaler Deleting compute node details {}",
211 buildOpenflowNodeIid(computeNode));
212 LOG.info("Cloudscaler Deleting compute node details {}", buildOvsdbNodeId(computeNode));
213 // FIXME: why don't we run this as one transaction?!
214 LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(
215 Datastore.CONFIGURATION, tx -> {
216 computeNodeManager.deleteComputeNode(tx, computeNode);
217 }), LOG, "Cloudscaler Failed to delete the compute node");
218 LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(
219 Datastore.CONFIGURATION, tx -> {
220 tx.delete(buildOpenflowNodeIid(computeNode));
221 }), LOG, "Cloudscaler Failed to delete the config inventory");
222 LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(
223 Datastore.CONFIGURATION, tx -> {
224 tx.delete(buildOvsdbNodeId(computeNode));
225 }), LOG, "Cloudscaler Failed to delete the config topology");
227 } catch (Throwable e) {
228 LOG.error("Cloudscaler Failed to do scale in end {} ", input, e);
229 return Futures.immediateFuture(
230 RpcResultBuilder.<ScaleinComputesEndOutput>failed().withError(RpcError.ErrorType.APPLICATION,
231 "Failed to read the transport zone " + e.getMessage()).build());
233 return Futures.immediateFuture(DONE_RPC_RESPONSE);
236 private static InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network
237 .topology.rev131021.network.topology.topology.Node> buildOvsdbNodeId(ComputeNode computeNode) {
238 return InstanceIdentifier
239 .create(NetworkTopology.class)
240 .child(Topology.class, new TopologyKey(new TopologyId("ovsdb:1")))
241 .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network
242 .topology.topology.Node.class,
243 new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021
244 .network.topology.topology.NodeKey(new NodeId(computeNode.getNodeid())));
247 private static InstanceIdentifier<Node> buildOpenflowNodeIid(ComputeNode computeNode) {
248 return InstanceIdentifier.builder(Nodes.class)
249 .child(Node.class, new NodeKey(
250 new org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId(
251 "openflow:" + computeNode.getDpnid().toString()))).build();
254 private void tombstoneTheNode(String computeName, ReadWriteTransaction tx, Boolean tombstone) {
255 ComputeNode computeNode = null;
257 computeNode = computeNodeManager.getComputeNodeFromName(computeName);
258 if (computeNode == null) { //TODO throw error to rpc
259 LOG.error("Cloudscaler Node not present to {} {}",
260 computeName, tombstone ? "tombstone" : "recover");
263 } catch (ReadFailedException e) {
264 LOG.error("Cloudscaler Failed to {} the compute {} read failed",
265 tombstone ? "tombstone" : "recover", computeName);
268 ComputeNodeBuilder builder = new ComputeNodeBuilder(computeNode);
269 builder.setTombstoned(tombstone);
270 tx.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION,
271 computeNodeManager.buildComputeNodeIid(computeName), builder.build());
275 @SuppressWarnings("checkstyle:IllegalCatch")
276 public ListenableFuture<RpcResult<ScaleinComputesTepDeleteOutput>> scaleinComputesTepDelete(
277 ScaleinComputesTepDeleteInput input) {
278 ReadTransaction readTx = this.dataBroker.newReadOnlyTransaction();
279 SettableFuture<RpcResult<ScaleinComputesTepDeleteOutput>> ft = SettableFuture.create();
280 Optional<TransportZones> tz;
282 tz = readTx.read(LogicalDatastoreType.CONFIGURATION, InstanceIdentifier.create(TransportZones.class))
284 } catch (InterruptedException | ExecutionException e) {
285 LOG.error("Cloudscaler Failed to read the transport zone {}", e.getMessage());
286 ft.set(RpcResultBuilder.<ScaleinComputesTepDeleteOutput>failed().withError(RpcError.ErrorType.APPLICATION,
287 "Failed to read the transport zone " + e.getMessage()).build());
293 for (String computeName : input.getScaleinComputeNames()) {
294 ComputeNode computeNode = null;
296 computeNode = computeNodeManager.getComputeNodeFromName(computeName);
297 if (computeNode == null) {
298 LOG.warn("Cloudscaler Could not find the compute for tep delete {}", computeName);
299 ft.set(RpcResultBuilder.<ScaleinComputesTepDeleteOutput>success().build());
302 } catch (ReadFailedException e) {
303 LOG.error("Cloudscaler Failed to read the compute node {}", e.getMessage());
304 ft.set(RpcResultBuilder.<ScaleinComputesTepDeleteOutput>failed()
305 .withError(RpcError.ErrorType.APPLICATION, "Failed to read the compute node "
306 + e.getMessage()).build());
309 if (tz.isPresent() && tz.get().getTransportZone() != null) {
310 for (TransportZone zone : tz.get().getTransportZone()) {
311 if (zone.getVteps() == null) {
314 for (Vteps vteps : zone.getVteps().values()) {
315 if (vteps.getDpnId().equals(computeNode.getDpnid())) {
316 InstanceIdentifier<Vteps> dpnVtepIid = InstanceIdentifier
317 .create(TransportZones.class)
318 .child(TransportZone.class, zone.key())
319 .child(Vteps.class, vteps.key());
320 LOG.error("Cloudscaler deleting dpn {}", vteps);
321 LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(
322 Datastore.CONFIGURATION, tx -> {
323 tx.delete(dpnVtepIid);
324 }), LOG, "Cloudscaler Failed to delete the itm tep");
330 InstanceIdentifier.create(TransportZones.class)
331 .child(TransportZone.class)
333 } catch (Throwable e) {
334 LOG.error("Failed to read the transport zone ", e);
335 ft.set(RpcResultBuilder.<ScaleinComputesTepDeleteOutput>failed().withError(RpcError.ErrorType.APPLICATION,
336 "Failed to read the transport zone " + e.getMessage()).build());
339 ft.set(RpcResultBuilder.<ScaleinComputesTepDeleteOutput>success().build());
343 class ItmTepClusteredListener extends AbstractClusteredAsyncDataTreeChangeListener<Vteps> {
346 ItmTepClusteredListener(DataBroker dataBroker) {
347 super(dataBroker, LogicalDatastoreType.OPERATIONAL,InstanceIdentifier.create(TransportZones.class)
348 .child(TransportZone.class).child(Vteps.class),
349 Executors.newSingleThreadExecutor("ItmTepClusteredListener", LOG));
353 public void remove(InstanceIdentifier<Vteps> instanceIdentifier, Vteps tep) {
354 tepDeleteTimeStamp.put(tep.getDpnId(), System.currentTimeMillis());
358 public void update(InstanceIdentifier<Vteps> instanceIdentifier, Vteps vteps, Vteps t1) {
362 public void add(InstanceIdentifier<Vteps> instanceIdentifier, Vteps vteps) {