2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.bgpcep.programming.impl;
10 import io.netty.util.Timeout;
11 import io.netty.util.Timer;
12 import io.netty.util.TimerTask;
14 import java.math.BigInteger;
15 import java.util.ArrayDeque;
16 import java.util.ArrayList;
17 import java.util.Deque;
18 import java.util.HashMap;
19 import java.util.List;
21 import java.util.concurrent.Callable;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ThreadFactory;
25 import java.util.concurrent.TimeUnit;
27 import javax.annotation.concurrent.GuardedBy;
29 import org.opendaylight.bgpcep.programming.NanotimeUtil;
30 import org.opendaylight.bgpcep.programming.spi.ExecutionResult;
31 import org.opendaylight.bgpcep.programming.spi.InstructionExecutor;
32 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
33 import org.opendaylight.bgpcep.programming.spi.SuccessfulRpcResult;
34 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DeadOnArrival;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DuplicateInstructionId;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionId;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatus;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatusChangedBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.Nanotime;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.ProgrammingService;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UncancellableInstruction;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownInstruction;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownPreconditionId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.Details;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.DetailsBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure.Failure;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure.FailureBuilder;
53 import org.opendaylight.yangtools.yang.common.RpcResult;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
57 import com.google.common.base.Preconditions;
58 import com.google.common.collect.ImmutableList;
59 import com.google.common.util.concurrent.FutureCallback;
60 import com.google.common.util.concurrent.Futures;
62 final class ProgrammingServiceImpl implements InstructionScheduler, ProgrammingService {
63 private static final Logger LOG = LoggerFactory.getLogger(ProgrammingServiceImpl.class);
65 private final Map<InstructionId, Instruction> insns = new HashMap<>();
68 private final Deque<Instruction> readyQueue = new ArrayDeque<>();
70 private final NotificationProviderService notifs;
71 private final ExecutorService executor;
72 private final Timer timer;
73 private java.util.concurrent.Future<Void> thread;
74 private ExecutorService exec;
76 ProgrammingServiceImpl(final NotificationProviderService notifs, final ExecutorService executor,
77 final Timer timer, final InstructionScheduler registry) {
78 this.notifs = Preconditions.checkNotNull(notifs);
79 this.executor = Preconditions.checkNotNull(executor);
80 this.timer = Preconditions.checkNotNull(timer);
84 public java.util.concurrent.Future<RpcResult<CancelInstructionOutput>> cancelInstruction(final CancelInstructionInput input) {
85 return executor.submit(new Callable<RpcResult<CancelInstructionOutput>>() {
87 public RpcResult<CancelInstructionOutput> call() {
88 return realCancelInstruction(input);
93 private synchronized RpcResult<CancelInstructionOutput> realCancelInstruction(final CancelInstructionInput input) {
94 final Instruction i = insns.get(input.getId());
96 LOG.debug("Instruction {} not present in the graph", input.getId());
98 final CancelInstructionOutput out = new CancelInstructionOutputBuilder().setFailure(UnknownInstruction.class).build();
99 return SuccessfulRpcResult.create(out);
102 switch (i.getStatus()) {
108 LOG.debug("Instruction {} can no longer be cancelled due to status {}", input.getId());
109 return SuccessfulRpcResult.create(
110 new CancelInstructionOutputBuilder().setFailure(UncancellableInstruction.class).build());
116 cancelInstruction(i, null);
117 return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder().build());
121 public Failure submitInstruction(final SubmitInstructionInput input, final InstructionExecutor executor) {
122 final InstructionId id = input.getId();
123 if (insns.get(id) != null) {
124 LOG.info("Instruction ID {} already present", id);
125 return new FailureBuilder().setType(DuplicateInstructionId.class).build();
128 // First things first: check the deadline
129 final Nanotime now = NanotimeUtil.currentTime();
130 final BigInteger left = input.getDeadline().getValue().subtract(now.getValue());
132 if (left.compareTo(BigInteger.ZERO) <= 0) {
133 LOG.debug("Instruction {} deadline has already passed by {}ns", id, left);
134 return new FailureBuilder().setType(DeadOnArrival.class).build();
137 // Resolve dependencies
138 final List<Instruction> dependencies = new ArrayList<>();
139 for (final InstructionId pid : input.getPreconditions()) {
140 final Instruction i = insns.get(pid);
142 LOG.info("Instruction {} depends on {}, which is not a known instruction", id, pid);
143 return new FailureBuilder().setType(UnknownPreconditionId.class).build();
149 // Check if all dependencies are non-failed
150 final List<InstructionId> unmet = new ArrayList<>();
151 for (final Instruction d : dependencies) {
152 switch (d.getStatus()) {
156 unmet.add(d.getId());
167 * Some dependencies have failed, declare the request dead-on-arrival
168 * and fail the operation.
170 if (!unmet.isEmpty()) {
171 return new FailureBuilder().setType(DeadOnArrival.class).setFailedPreconditions(unmet).build();
175 * All pre-flight checks done are at this point, the following
176 * steps can only fail in catastrophic scenarios (OOM and the
180 // Schedule a timeout for the instruction
181 final Timeout t = timer.newTimeout(new TimerTask() {
183 public void run(final Timeout timeout) throws Exception {
184 timeoutInstruction(input.getId());
186 }, left.longValue(), TimeUnit.NANOSECONDS);
188 // Put it into the instruction list
189 final Instruction i = new Instruction(input.getId(), executor, dependencies, t);
192 // Attach it into its dependencies
193 for (final Instruction d : dependencies) {
198 * All done. The next part is checking whether the instruction can
199 * run, which we can figure out after sending out the acknowledgement.
200 * This task should be ingress-weighed, so we reinsert it into the
201 * same execution service.
203 this.executor.submit(new Runnable() {
206 tryScheduleInstruction(i);
214 private void transitionInstruction(final Instruction v, final InstructionStatus status, final Details details) {
218 LOG.debug("Instruction {} transitioned to status {}", v.getId(), status);
220 // Send out a notification
221 notifs.publish(new InstructionStatusChangedBuilder().
222 setId(v.getId()).setStatus(status).setDetails(details).build());
226 private void cancelSingle(final Instruction i, final Details details) {
230 // Set the new status and send out notification
231 transitionInstruction(i, InstructionStatus.Cancelled, details);
235 private void cancelDependants(final Instruction v) {
236 final Details details = new DetailsBuilder().setUnmetDependencies(ImmutableList.of(v.getId())).build();
237 for (final Instruction d : v.getDependants()) {
238 switch (d.getStatus()) {
247 cancelSingle(d, details);
254 private synchronized void cancelInstruction(final Instruction i, final Details details) {
255 readyQueue.remove(i);
256 cancelSingle(i, details);
260 private synchronized void timeoutInstruction(final InstructionId id) {
261 final Instruction i = insns.get(id);
263 LOG.warn("Instruction {} timed out, but not found in the queue", id);
267 switch (i.getStatus()) {
271 LOG.debug("Instruction {} has status {}, timeout is a no-op", id, i.getStatus());
274 LOG.warn("Instruction {} has status {} before timeout completed", id, i.getStatus());
277 LOG.info("Instruction {} timed out while executing, transitioning into Unknown", id);
278 transitionInstruction(i, InstructionStatus.Unknown, null);
282 LOG.debug("Instruction {} timed out while Queued, cancelling it", id);
284 final List<InstructionId> ids = new ArrayList<>();
285 for (final Instruction d : i.getDependencies()) {
286 if (d.getStatus() != InstructionStatus.Successful) {
291 cancelInstruction(i, new DetailsBuilder().setUnmetDependencies(ids).build());
294 LOG.debug("Instruction {} timed out while Scheduled, cancelling it", i.getId());
295 // FIXME: we should provide details why it timed out while scheduled
296 cancelInstruction(i, null);
302 private synchronized void tryScheduleInstruction(final Instruction i) {
303 Preconditions.checkState(i.getStatus().equals(InstructionStatus.Queued));
306 * Check all vertices we depend on. We start off as ready for
307 * scheduling. If we encounter a cancelled/failed/unknown
308 * dependency, we cancel this instruction (and cascade). If we
309 * encounter an executing/queued/scheduled dependency, we hold
310 * of scheduling this one.
312 boolean ready = true;
314 final List<InstructionId> unmet = new ArrayList<>();
315 for (final Instruction d : i.getDependencies()) {
316 switch (d.getStatus()) {
320 unmet.add(d.getId());
333 if (!unmet.isEmpty()) {
334 LOG.debug("Instruction {} was Queued, while some dependencies were resolved unsuccessfully, cancelling it", i.getId());
335 cancelSingle(i, new DetailsBuilder().setUnmetDependencies(unmet).build());
341 LOG.debug("Instruction {} is ready for execution", i.getId());
342 transitionInstruction(i, InstructionStatus.Scheduled, null);
349 private synchronized void executionFailed(final Instruction i, final Throwable cause) {
350 LOG.error("Instruction {} failed to execute", i.getId(), cause);
351 transitionInstruction(i, InstructionStatus.Failed, null);
355 private synchronized void executionSuccessful(final Instruction i, final ExecutionResult<?> res) {
358 transitionInstruction(i, res.getStatus(), res.getDetails());
360 // Walk all dependants and try to schedule them
361 for (final Instruction d : i.getDependants()) {
362 tryScheduleInstruction(d);
366 private synchronized void processQueues() throws InterruptedException {
368 * This method is only ever interrupted by InterruptedException
371 while (!readyQueue.isEmpty()) {
372 final Instruction i = readyQueue.poll();
374 Preconditions.checkState(i.getStatus().equals(InstructionStatus.Scheduled));
376 transitionInstruction(i, InstructionStatus.Executing, null);
377 Futures.addCallback(i.execute(), new FutureCallback<ExecutionResult<Details>>() {
380 public void onSuccess(final ExecutionResult<Details> result) {
381 executionSuccessful(i, result);
385 public void onFailure(final Throwable t) {
386 executionFailed(i, t);
395 synchronized void start(final ThreadFactory threadFactory) {
396 Preconditions.checkState(exec == null, "Programming service dispatch thread already started");
398 exec = Executors.newSingleThreadExecutor(threadFactory);
399 thread = exec.submit(new Callable<Void>() {
401 public Void call() throws Exception {
404 } catch (Exception ex) {
405 if (!(ex instanceof InterruptedException)) {
406 LOG.error("Programming service dispatch thread died", ex);
416 synchronized void stop(final long timeout, final TimeUnit unit) throws InterruptedException {
417 Preconditions.checkState(exec != null, "Programming service dispatch thread already stopped");
420 exec.awaitTermination(timeout, unit);