Use ListenableFutures when submitting RPCs
[bgpcep.git] / programming / impl / src / main / java / org / opendaylight / bgpcep / programming / impl / ProgrammingServiceImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.programming.impl;
9
10 import io.netty.util.Timeout;
11 import io.netty.util.Timer;
12 import io.netty.util.TimerTask;
13
14 import java.math.BigInteger;
15 import java.util.ArrayDeque;
16 import java.util.ArrayList;
17 import java.util.Deque;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.concurrent.Callable;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.ThreadFactory;
26 import java.util.concurrent.TimeUnit;
27
28 import javax.annotation.concurrent.GuardedBy;
29
30 import org.opendaylight.bgpcep.programming.NanotimeUtil;
31 import org.opendaylight.bgpcep.programming.spi.ExecutionResult;
32 import org.opendaylight.bgpcep.programming.spi.InstructionExecutor;
33 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
34 import org.opendaylight.bgpcep.programming.spi.SuccessfulRpcResult;
35 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CleanInstructionsInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CleanInstructionsOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CleanInstructionsOutputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DeadOnArrival;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DuplicateInstructionId;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionId;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatus;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatusChangedBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.Nanotime;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.ProgrammingService;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionInput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UncancellableInstruction;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownInstruction;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownPreconditionId;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.Details;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.DetailsBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure._case.Failure;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure._case.FailureBuilder;
57 import org.opendaylight.yangtools.yang.common.RpcResult;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 import com.google.common.base.Preconditions;
62 import com.google.common.collect.ImmutableList;
63 import com.google.common.util.concurrent.FutureCallback;
64 import com.google.common.util.concurrent.Futures;
65 import com.google.common.util.concurrent.ListenableFuture;
66 import com.google.common.util.concurrent.ListeningExecutorService;
67
68 public final class ProgrammingServiceImpl implements InstructionScheduler, ProgrammingService, AutoCloseable {
69         private static final Logger LOG = LoggerFactory.getLogger(ProgrammingServiceImpl.class);
70
71         // Default stop timeout, in seconds
72         private static final long CLOSE_TIMEOUT = 5;
73
74         private final Map<InstructionId, Instruction> insns = new HashMap<>();
75
76         @GuardedBy("this")
77         private final Deque<Instruction> readyQueue = new ArrayDeque<>();
78
79         private final NotificationProviderService notifs;
80         private final ListeningExecutorService executor;
81         private final Timer timer;
82         private Future<Void> thread;
83         private ExecutorService exec;
84
85         public ProgrammingServiceImpl(final NotificationProviderService notifs, final ListeningExecutorService executor, final Timer timer) {
86                 this.notifs = Preconditions.checkNotNull(notifs);
87                 this.executor = Preconditions.checkNotNull(executor);
88                 this.timer = Preconditions.checkNotNull(timer);
89         }
90
91         @Override
92         public ListenableFuture<RpcResult<CancelInstructionOutput>> cancelInstruction(final CancelInstructionInput input) {
93                 return this.executor.submit(new Callable<RpcResult<CancelInstructionOutput>>() {
94                         @Override
95                         public RpcResult<CancelInstructionOutput> call() {
96                                 return realCancelInstruction(input);
97                         }
98                 });
99         }
100
101         @Override
102         public ListenableFuture<RpcResult<CleanInstructionsOutput>> cleanInstructions(final CleanInstructionsInput input) {
103                 return this.executor.submit(new Callable<RpcResult<CleanInstructionsOutput>>() {
104                         @Override
105                         public RpcResult<CleanInstructionsOutput> call() throws Exception {
106                                 return realCleanInstructions(input);
107                         }
108                 });
109         }
110
111         private synchronized RpcResult<CancelInstructionOutput> realCancelInstruction(final CancelInstructionInput input) {
112                 final Instruction i = this.insns.get(input.getId());
113                 if (i == null) {
114                         LOG.debug("Instruction {} not present in the graph", input.getId());
115
116                         final CancelInstructionOutput out = new CancelInstructionOutputBuilder().setFailure(UnknownInstruction.class).build();
117                         return SuccessfulRpcResult.create(out);
118                 }
119
120                 switch (i.getStatus()) {
121                 case Cancelled:
122                 case Executing:
123                 case Failed:
124                 case Successful:
125                 case Unknown:
126                         LOG.debug("Instruction {} can no longer be cancelled due to status {}", input.getId());
127                         return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder().setFailure(UncancellableInstruction.class).build());
128                 case Queued:
129                 case Scheduled:
130                         break;
131                 }
132
133                 cancelInstruction(i, null);
134                 return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder().build());
135         }
136
137
138         private synchronized RpcResult<CleanInstructionsOutput> realCleanInstructions(final CleanInstructionsInput input) {
139                 final List<InstructionId> failed = new ArrayList<>();
140
141                 for (final InstructionId id : input.getId()) {
142                         // Find the instruction
143                         final Instruction i = this.insns.get(input.getId());
144                         if (i == null) {
145                                 LOG.debug("Instruction {} not present in the graph", input.getId());
146                                 failed.add(id);
147                                 continue;
148                         }
149
150                         // Check its status
151                         switch (i.getStatus()) {
152                         case Cancelled:
153                         case Failed:
154                         case Successful:
155                                 break;
156                         case Executing:
157                         case Queued:
158                         case Scheduled:
159                         case Unknown:
160                                 LOG.debug("Instruction {} cannot be cleaned because of it's in state {}", id, i.getStatus());
161                                 failed.add(id);
162                                 continue;
163                         }
164
165                         // The instruction is in a terminal state, we need to just unlink
166                         // it from its dependencies and dependants
167                         i.clean();
168
169                         insns.remove(i);
170                         LOG.debug("Instruction {} cleaned successfully", id);
171                 }
172
173                 final CleanInstructionsOutputBuilder ob = new CleanInstructionsOutputBuilder();
174                 if (!failed.isEmpty()) {
175                         ob.setUnflushed(failed);
176                 }
177
178                 return SuccessfulRpcResult.create(ob.build());
179         }
180
181         @Override
182         public Failure submitInstruction(final SubmitInstructionInput input, final InstructionExecutor executor) {
183                 final InstructionId id = input.getId();
184                 if (this.insns.get(id) != null) {
185                         LOG.info("Instruction ID {} already present", id);
186                         return new FailureBuilder().setType(DuplicateInstructionId.class).build();
187                 }
188
189                 // First things first: check the deadline
190                 final Nanotime now = NanotimeUtil.currentTime();
191                 final BigInteger left = input.getDeadline().getValue().subtract(now.getValue());
192
193                 if (left.compareTo(BigInteger.ZERO) <= 0) {
194                         LOG.debug("Instruction {} deadline has already passed by {}ns", id, left);
195                         return new FailureBuilder().setType(DeadOnArrival.class).build();
196                 }
197
198                 // Resolve dependencies
199                 final List<Instruction> dependencies = new ArrayList<>();
200                 for (final InstructionId pid : input.getPreconditions()) {
201                         final Instruction i = this.insns.get(pid);
202                         if (i == null) {
203                                 LOG.info("Instruction {} depends on {}, which is not a known instruction", id, pid);
204                                 return new FailureBuilder().setType(UnknownPreconditionId.class).build();
205                         }
206
207                         dependencies.add(i);
208                 }
209
210                 // Check if all dependencies are non-failed
211                 final List<InstructionId> unmet = new ArrayList<>();
212                 for (final Instruction d : dependencies) {
213                         switch (d.getStatus()) {
214                         case Cancelled:
215                         case Failed:
216                         case Unknown:
217                                 unmet.add(d.getId());
218                                 break;
219                         case Executing:
220                         case Queued:
221                         case Scheduled:
222                         case Successful:
223                                 break;
224                         }
225                 }
226
227                 /*
228                  *  Some dependencies have failed, declare the request dead-on-arrival
229                  *  and fail the operation.
230                  */
231                 if (!unmet.isEmpty()) {
232                         return new FailureBuilder().setType(DeadOnArrival.class).setFailedPreconditions(unmet).build();
233                 }
234
235                 /*
236                  * All pre-flight checks done are at this point, the following
237                  * steps can only fail in catastrophic scenarios (OOM and the
238                  * like).
239                  */
240
241                 // Schedule a timeout for the instruction
242                 final Timeout t = this.timer.newTimeout(new TimerTask() {
243                         @Override
244                         public void run(final Timeout timeout) throws Exception {
245                                 timeoutInstruction(input.getId());
246                         }
247                 }, left.longValue(), TimeUnit.NANOSECONDS);
248
249                 // Put it into the instruction list
250                 final Instruction i = new Instruction(input.getId(), executor, dependencies, t);
251                 this.insns.put(id, i);
252
253                 // Attach it into its dependencies
254                 for (final Instruction d : dependencies) {
255                         d.addDependant(i);
256                 }
257
258                 /*
259                  * All done. The next part is checking whether the instruction can
260                  * run, which we can figure out after sending out the acknowledgement.
261                  * This task should be ingress-weighed, so we reinsert it into the
262                  * same execution service.
263                  */
264                 this.executor.submit(new Runnable() {
265                         @Override
266                         public void run() {
267                                 tryScheduleInstruction(i);
268                         }
269                 });
270
271                 return null;
272         }
273
274         @GuardedBy("this")
275         private void transitionInstruction(final Instruction v, final InstructionStatus status, final Details details) {
276                 // Set the status
277                 v.setStatus(status);
278
279                 LOG.debug("Instruction {} transitioned to status {}", v.getId(), status);
280
281                 // Send out a notification
282                 this.notifs.publish(new InstructionStatusChangedBuilder().setId(v.getId()).setStatus(status).setDetails(details).build());
283         }
284
285         @GuardedBy("this")
286         private void cancelSingle(final Instruction i, final Details details) {
287                 // Stop the timeout
288                 i.cancel();
289
290                 // Set the new status and send out notification
291                 transitionInstruction(i, InstructionStatus.Cancelled, details);
292         }
293
294         @GuardedBy("this")
295         private void cancelDependants(final Instruction v) {
296                 final Details details = new DetailsBuilder().setUnmetDependencies(ImmutableList.of(v.getId())).build();
297                 for (final Instruction d : v.getDependants()) {
298                         switch (d.getStatus()) {
299                         case Cancelled:
300                         case Executing:
301                         case Failed:
302                         case Successful:
303                         case Unknown:
304                                 break;
305                         case Queued:
306                         case Scheduled:
307                                 cancelSingle(d, details);
308                                 cancelDependants(d);
309                                 break;
310                         }
311                 }
312         }
313
314         private synchronized void cancelInstruction(final Instruction i, final Details details) {
315                 this.readyQueue.remove(i);
316                 cancelSingle(i, details);
317                 cancelDependants(i);
318         }
319
320         private synchronized void timeoutInstruction(final InstructionId id) {
321                 final Instruction i = this.insns.get(id);
322                 if (i == null) {
323                         LOG.warn("Instruction {} timed out, but not found in the queue", id);
324                         return;
325                 }
326
327                 switch (i.getStatus()) {
328                 case Cancelled:
329                 case Failed:
330                 case Successful:
331                         LOG.debug("Instruction {} has status {}, timeout is a no-op", id, i.getStatus());
332                         break;
333                 case Unknown:
334                         LOG.warn("Instruction {} has status {} before timeout completed", id, i.getStatus());
335                         break;
336                 case Executing:
337                         LOG.info("Instruction {} timed out while executing, transitioning into Unknown", id);
338                         transitionInstruction(i, InstructionStatus.Unknown, null);
339                         cancelDependants(i);
340                         break;
341                 case Queued:
342                         LOG.debug("Instruction {} timed out while Queued, cancelling it", id);
343
344                         final List<InstructionId> ids = new ArrayList<>();
345                         for (final Instruction d : i.getDependencies()) {
346                                 if (d.getStatus() != InstructionStatus.Successful) {
347                                         ids.add(d.getId());
348                                 }
349                         }
350
351                         cancelInstruction(i, new DetailsBuilder().setUnmetDependencies(ids).build());
352                         break;
353                 case Scheduled:
354                         LOG.debug("Instruction {} timed out while Scheduled, cancelling it", i.getId());
355                         // FIXME: BUG-191: we should provide details why it timed out while scheduled
356                         cancelInstruction(i, null);
357                         break;
358                 }
359         }
360
361         @GuardedBy("this")
362         private synchronized void tryScheduleInstruction(final Instruction i) {
363                 Preconditions.checkState(i.getStatus().equals(InstructionStatus.Queued));
364
365                 /*
366                  * Check all vertices we depend on. We start off as ready for
367                  * scheduling. If we encounter a cancelled/failed/unknown
368                  * dependency, we cancel this instruction (and cascade). If we
369                  * encounter an executing/queued/scheduled dependency, we hold
370                  * of scheduling this one.
371                  */
372                 boolean ready = true;
373
374                 final List<InstructionId> unmet = new ArrayList<>();
375                 for (final Instruction d : i.getDependencies()) {
376                         switch (d.getStatus()) {
377                         case Cancelled:
378                         case Failed:
379                         case Unknown:
380                                 unmet.add(d.getId());
381                                 break;
382                         case Executing:
383                         case Queued:
384                         case Scheduled:
385                                 ready = false;
386                                 break;
387                         case Successful:
388                                 // No-op
389                                 break;
390                         }
391                 }
392
393                 if (!unmet.isEmpty()) {
394                         LOG.debug("Instruction {} was Queued, while some dependencies were resolved unsuccessfully, cancelling it", i.getId());
395                         cancelSingle(i, new DetailsBuilder().setUnmetDependencies(unmet).build());
396                         cancelDependants(i);
397                         return;
398                 }
399
400                 if (ready) {
401                         LOG.debug("Instruction {} is ready for execution", i.getId());
402                         transitionInstruction(i, InstructionStatus.Scheduled, null);
403
404                         this.readyQueue.add(i);
405                         notify();
406                 }
407         }
408
409         private synchronized void executionFailed(final Instruction i, final Throwable cause) {
410                 LOG.error("Instruction {} failed to execute", i.getId(), cause);
411                 transitionInstruction(i, InstructionStatus.Failed, null);
412                 cancelDependants(i);
413         }
414
415         private synchronized void executionSuccessful(final Instruction i, final ExecutionResult<?> res) {
416                 i.cancel();
417
418                 transitionInstruction(i, res.getStatus(), res.getDetails());
419
420                 // Walk all dependants and try to schedule them
421                 for (final Instruction d : i.getDependants()) {
422                         tryScheduleInstruction(d);
423                 }
424         }
425
426         private synchronized void processQueues() throws InterruptedException {
427                 /*
428                  * This method is only ever interrupted by InterruptedException
429                  */
430                 while (true) {
431                         while (!this.readyQueue.isEmpty()) {
432                                 final Instruction i = this.readyQueue.poll();
433
434                                 Preconditions.checkState(i.getStatus().equals(InstructionStatus.Scheduled));
435
436                                 transitionInstruction(i, InstructionStatus.Executing, null);
437                                 Futures.addCallback(i.execute(), new FutureCallback<ExecutionResult<Details>>() {
438
439                                         @Override
440                                         public void onSuccess(final ExecutionResult<Details> result) {
441                                                 executionSuccessful(i, result);
442                                         }
443
444                                         @Override
445                                         public void onFailure(final Throwable t) {
446                                                 executionFailed(i, t);
447                                         }
448                                 });
449                         }
450
451                         wait();
452                 }
453         }
454
455         synchronized void start(final ThreadFactory threadFactory) {
456                 Preconditions.checkState(this.exec == null, "Programming service dispatch thread already started");
457
458                 this.exec = Executors.newSingleThreadExecutor(threadFactory);
459                 this.thread = this.exec.submit(new Callable<Void>() {
460                         @Override
461                         public Void call() {
462                                 try {
463                                         processQueues();
464                                 } catch (final InterruptedException ex) {
465                                         LOG.error("Programming service dispatch thread died", ex);
466                                 }
467                                 return null;
468                         }
469                 });
470                 this.exec.shutdown();
471         }
472
473         synchronized void stop(final long timeout, final TimeUnit unit) throws InterruptedException {
474                 Preconditions.checkState(this.exec != null, "Programming service dispatch thread already stopped");
475
476                 this.thread.cancel(true);
477                 this.exec.awaitTermination(timeout, unit);
478                 this.exec = null;
479         }
480
481         @Override
482         public void close() throws InterruptedException {
483                 stop(CLOSE_TIMEOUT, TimeUnit.SECONDS);
484         }
485 }