2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.bgpcep.programming.impl;
10 import io.netty.util.Timeout;
11 import io.netty.util.Timer;
12 import io.netty.util.TimerTask;
14 import java.math.BigInteger;
15 import java.util.ArrayDeque;
16 import java.util.ArrayList;
17 import java.util.Deque;
18 import java.util.HashMap;
19 import java.util.List;
21 import java.util.concurrent.Callable;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.ThreadFactory;
26 import java.util.concurrent.TimeUnit;
28 import javax.annotation.concurrent.GuardedBy;
30 import org.opendaylight.bgpcep.programming.NanotimeUtil;
31 import org.opendaylight.bgpcep.programming.spi.ExecutionResult;
32 import org.opendaylight.bgpcep.programming.spi.InstructionExecutor;
33 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
34 import org.opendaylight.bgpcep.programming.spi.SuccessfulRpcResult;
35 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CleanInstructionsInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CleanInstructionsOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CleanInstructionsOutputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DeadOnArrival;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DuplicateInstructionId;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionId;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatus;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatusChangedBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.Nanotime;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.ProgrammingService;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionInput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UncancellableInstruction;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownInstruction;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownPreconditionId;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.Details;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.DetailsBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure._case.Failure;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure._case.FailureBuilder;
57 import org.opendaylight.yangtools.yang.common.RpcResult;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
61 import com.google.common.base.Preconditions;
62 import com.google.common.collect.ImmutableList;
63 import com.google.common.util.concurrent.FutureCallback;
64 import com.google.common.util.concurrent.Futures;
65 import com.google.common.util.concurrent.ListenableFuture;
66 import com.google.common.util.concurrent.ListeningExecutorService;
68 public final class ProgrammingServiceImpl implements InstructionScheduler, ProgrammingService, AutoCloseable {
69 private static final Logger LOG = LoggerFactory.getLogger(ProgrammingServiceImpl.class);
71 // Default stop timeout, in seconds
72 private static final long CLOSE_TIMEOUT = 5;
74 private final Map<InstructionId, Instruction> insns = new HashMap<>();
77 private final Deque<Instruction> readyQueue = new ArrayDeque<>();
79 private final NotificationProviderService notifs;
80 private final ListeningExecutorService executor;
81 private final Timer timer;
82 private Future<Void> thread;
83 private ExecutorService exec;
85 public ProgrammingServiceImpl(final NotificationProviderService notifs, final ListeningExecutorService executor, final Timer timer) {
86 this.notifs = Preconditions.checkNotNull(notifs);
87 this.executor = Preconditions.checkNotNull(executor);
88 this.timer = Preconditions.checkNotNull(timer);
92 public ListenableFuture<RpcResult<CancelInstructionOutput>> cancelInstruction(final CancelInstructionInput input) {
93 return this.executor.submit(new Callable<RpcResult<CancelInstructionOutput>>() {
95 public RpcResult<CancelInstructionOutput> call() {
96 return realCancelInstruction(input);
102 public ListenableFuture<RpcResult<CleanInstructionsOutput>> cleanInstructions(final CleanInstructionsInput input) {
103 return this.executor.submit(new Callable<RpcResult<CleanInstructionsOutput>>() {
105 public RpcResult<CleanInstructionsOutput> call() throws Exception {
106 return realCleanInstructions(input);
111 private synchronized RpcResult<CancelInstructionOutput> realCancelInstruction(final CancelInstructionInput input) {
112 final Instruction i = this.insns.get(input.getId());
114 LOG.debug("Instruction {} not present in the graph", input.getId());
116 final CancelInstructionOutput out = new CancelInstructionOutputBuilder().setFailure(UnknownInstruction.class).build();
117 return SuccessfulRpcResult.create(out);
120 switch (i.getStatus()) {
126 LOG.debug("Instruction {} can no longer be cancelled due to status {}", input.getId());
127 return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder().setFailure(UncancellableInstruction.class).build());
133 cancelInstruction(i, null);
134 return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder().build());
138 private synchronized RpcResult<CleanInstructionsOutput> realCleanInstructions(final CleanInstructionsInput input) {
139 final List<InstructionId> failed = new ArrayList<>();
141 for (final InstructionId id : input.getId()) {
142 // Find the instruction
143 final Instruction i = this.insns.get(input.getId());
145 LOG.debug("Instruction {} not present in the graph", input.getId());
151 switch (i.getStatus()) {
160 LOG.debug("Instruction {} cannot be cleaned because of it's in state {}", id, i.getStatus());
165 // The instruction is in a terminal state, we need to just unlink
166 // it from its dependencies and dependants
170 LOG.debug("Instruction {} cleaned successfully", id);
173 final CleanInstructionsOutputBuilder ob = new CleanInstructionsOutputBuilder();
174 if (!failed.isEmpty()) {
175 ob.setUnflushed(failed);
178 return SuccessfulRpcResult.create(ob.build());
182 public Failure submitInstruction(final SubmitInstructionInput input, final InstructionExecutor executor) {
183 final InstructionId id = input.getId();
184 if (this.insns.get(id) != null) {
185 LOG.info("Instruction ID {} already present", id);
186 return new FailureBuilder().setType(DuplicateInstructionId.class).build();
189 // First things first: check the deadline
190 final Nanotime now = NanotimeUtil.currentTime();
191 final BigInteger left = input.getDeadline().getValue().subtract(now.getValue());
193 if (left.compareTo(BigInteger.ZERO) <= 0) {
194 LOG.debug("Instruction {} deadline has already passed by {}ns", id, left);
195 return new FailureBuilder().setType(DeadOnArrival.class).build();
198 // Resolve dependencies
199 final List<Instruction> dependencies = new ArrayList<>();
200 for (final InstructionId pid : input.getPreconditions()) {
201 final Instruction i = this.insns.get(pid);
203 LOG.info("Instruction {} depends on {}, which is not a known instruction", id, pid);
204 return new FailureBuilder().setType(UnknownPreconditionId.class).build();
210 // Check if all dependencies are non-failed
211 final List<InstructionId> unmet = new ArrayList<>();
212 for (final Instruction d : dependencies) {
213 switch (d.getStatus()) {
217 unmet.add(d.getId());
228 * Some dependencies have failed, declare the request dead-on-arrival
229 * and fail the operation.
231 if (!unmet.isEmpty()) {
232 return new FailureBuilder().setType(DeadOnArrival.class).setFailedPreconditions(unmet).build();
236 * All pre-flight checks done are at this point, the following
237 * steps can only fail in catastrophic scenarios (OOM and the
241 // Schedule a timeout for the instruction
242 final Timeout t = this.timer.newTimeout(new TimerTask() {
244 public void run(final Timeout timeout) throws Exception {
245 timeoutInstruction(input.getId());
247 }, left.longValue(), TimeUnit.NANOSECONDS);
249 // Put it into the instruction list
250 final Instruction i = new Instruction(input.getId(), executor, dependencies, t);
251 this.insns.put(id, i);
253 // Attach it into its dependencies
254 for (final Instruction d : dependencies) {
259 * All done. The next part is checking whether the instruction can
260 * run, which we can figure out after sending out the acknowledgement.
261 * This task should be ingress-weighed, so we reinsert it into the
262 * same execution service.
264 this.executor.submit(new Runnable() {
267 tryScheduleInstruction(i);
275 private void transitionInstruction(final Instruction v, final InstructionStatus status, final Details details) {
279 LOG.debug("Instruction {} transitioned to status {}", v.getId(), status);
281 // Send out a notification
282 this.notifs.publish(new InstructionStatusChangedBuilder().setId(v.getId()).setStatus(status).setDetails(details).build());
286 private void cancelSingle(final Instruction i, final Details details) {
290 // Set the new status and send out notification
291 transitionInstruction(i, InstructionStatus.Cancelled, details);
295 private void cancelDependants(final Instruction v) {
296 final Details details = new DetailsBuilder().setUnmetDependencies(ImmutableList.of(v.getId())).build();
297 for (final Instruction d : v.getDependants()) {
298 switch (d.getStatus()) {
307 cancelSingle(d, details);
314 private synchronized void cancelInstruction(final Instruction i, final Details details) {
315 this.readyQueue.remove(i);
316 cancelSingle(i, details);
320 private synchronized void timeoutInstruction(final InstructionId id) {
321 final Instruction i = this.insns.get(id);
323 LOG.warn("Instruction {} timed out, but not found in the queue", id);
327 switch (i.getStatus()) {
331 LOG.debug("Instruction {} has status {}, timeout is a no-op", id, i.getStatus());
334 LOG.warn("Instruction {} has status {} before timeout completed", id, i.getStatus());
337 LOG.info("Instruction {} timed out while executing, transitioning into Unknown", id);
338 transitionInstruction(i, InstructionStatus.Unknown, null);
342 LOG.debug("Instruction {} timed out while Queued, cancelling it", id);
344 final List<InstructionId> ids = new ArrayList<>();
345 for (final Instruction d : i.getDependencies()) {
346 if (d.getStatus() != InstructionStatus.Successful) {
351 cancelInstruction(i, new DetailsBuilder().setUnmetDependencies(ids).build());
354 LOG.debug("Instruction {} timed out while Scheduled, cancelling it", i.getId());
355 // FIXME: BUG-191: we should provide details why it timed out while scheduled
356 cancelInstruction(i, null);
362 private synchronized void tryScheduleInstruction(final Instruction i) {
363 Preconditions.checkState(i.getStatus().equals(InstructionStatus.Queued));
366 * Check all vertices we depend on. We start off as ready for
367 * scheduling. If we encounter a cancelled/failed/unknown
368 * dependency, we cancel this instruction (and cascade). If we
369 * encounter an executing/queued/scheduled dependency, we hold
370 * of scheduling this one.
372 boolean ready = true;
374 final List<InstructionId> unmet = new ArrayList<>();
375 for (final Instruction d : i.getDependencies()) {
376 switch (d.getStatus()) {
380 unmet.add(d.getId());
393 if (!unmet.isEmpty()) {
394 LOG.debug("Instruction {} was Queued, while some dependencies were resolved unsuccessfully, cancelling it", i.getId());
395 cancelSingle(i, new DetailsBuilder().setUnmetDependencies(unmet).build());
401 LOG.debug("Instruction {} is ready for execution", i.getId());
402 transitionInstruction(i, InstructionStatus.Scheduled, null);
404 this.readyQueue.add(i);
409 private synchronized void executionFailed(final Instruction i, final Throwable cause) {
410 LOG.error("Instruction {} failed to execute", i.getId(), cause);
411 transitionInstruction(i, InstructionStatus.Failed, null);
415 private synchronized void executionSuccessful(final Instruction i, final ExecutionResult<?> res) {
418 transitionInstruction(i, res.getStatus(), res.getDetails());
420 // Walk all dependants and try to schedule them
421 for (final Instruction d : i.getDependants()) {
422 tryScheduleInstruction(d);
426 private synchronized void processQueues() throws InterruptedException {
428 * This method is only ever interrupted by InterruptedException
431 while (!this.readyQueue.isEmpty()) {
432 final Instruction i = this.readyQueue.poll();
434 Preconditions.checkState(i.getStatus().equals(InstructionStatus.Scheduled));
436 transitionInstruction(i, InstructionStatus.Executing, null);
437 Futures.addCallback(i.execute(), new FutureCallback<ExecutionResult<Details>>() {
440 public void onSuccess(final ExecutionResult<Details> result) {
441 executionSuccessful(i, result);
445 public void onFailure(final Throwable t) {
446 executionFailed(i, t);
455 synchronized void start(final ThreadFactory threadFactory) {
456 Preconditions.checkState(this.exec == null, "Programming service dispatch thread already started");
458 this.exec = Executors.newSingleThreadExecutor(threadFactory);
459 this.thread = this.exec.submit(new Callable<Void>() {
464 } catch (final InterruptedException ex) {
465 LOG.error("Programming service dispatch thread died", ex);
470 this.exec.shutdown();
473 synchronized void stop(final long timeout, final TimeUnit unit) throws InterruptedException {
474 Preconditions.checkState(this.exec != null, "Programming service dispatch thread already stopped");
476 this.thread.cancel(true);
477 this.exec.awaitTermination(timeout, unit);
482 public void close() throws InterruptedException {
483 stop(CLOSE_TIMEOUT, TimeUnit.SECONDS);