2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.bgpcep.programming.impl;
10 import io.netty.util.Timeout;
11 import io.netty.util.Timer;
12 import io.netty.util.TimerTask;
14 import java.math.BigInteger;
15 import java.util.ArrayDeque;
16 import java.util.ArrayList;
17 import java.util.Deque;
18 import java.util.HashMap;
19 import java.util.List;
21 import java.util.concurrent.Callable;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ThreadFactory;
25 import java.util.concurrent.TimeUnit;
27 import javax.annotation.concurrent.GuardedBy;
29 import org.opendaylight.bgpcep.programming.NanotimeUtil;
30 import org.opendaylight.bgpcep.programming.spi.ExecutionResult;
31 import org.opendaylight.bgpcep.programming.spi.InstructionExecutor;
32 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
33 import org.opendaylight.bgpcep.programming.spi.SuccessfulRpcResult;
34 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DeadOnArrival;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DuplicateInstructionId;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionId;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatus;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatusChangedBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.Nanotime;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.ProgrammingService;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UncancellableInstruction;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownInstruction;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownPreconditionId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.Details;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.DetailsBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure.Failure;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure.FailureBuilder;
53 import org.opendaylight.yangtools.yang.common.RpcResult;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
57 import com.google.common.base.Preconditions;
58 import com.google.common.collect.ImmutableList;
59 import com.google.common.util.concurrent.FutureCallback;
60 import com.google.common.util.concurrent.Futures;
62 public final class ProgrammingServiceImpl implements InstructionScheduler, ProgrammingService, AutoCloseable {
63 private static final Logger LOG = LoggerFactory.getLogger(ProgrammingServiceImpl.class);
65 // Default stop timeout, in seconds
66 private static final long CLOSE_TIMEOUT = 5;
68 private final Map<InstructionId, Instruction> insns = new HashMap<>();
71 private final Deque<Instruction> readyQueue = new ArrayDeque<>();
73 private final NotificationProviderService notifs;
74 private final ExecutorService executor;
75 private final Timer timer;
76 private java.util.concurrent.Future<Void> thread;
77 private ExecutorService exec;
79 public ProgrammingServiceImpl(final NotificationProviderService notifs, final ExecutorService executor,
81 this.notifs = Preconditions.checkNotNull(notifs);
82 this.executor = Preconditions.checkNotNull(executor);
83 this.timer = Preconditions.checkNotNull(timer);
87 public java.util.concurrent.Future<RpcResult<CancelInstructionOutput>> cancelInstruction(final CancelInstructionInput input) {
88 return executor.submit(new Callable<RpcResult<CancelInstructionOutput>>() {
90 public RpcResult<CancelInstructionOutput> call() {
91 return realCancelInstruction(input);
96 private synchronized RpcResult<CancelInstructionOutput> realCancelInstruction(final CancelInstructionInput input) {
97 final Instruction i = insns.get(input.getId());
99 LOG.debug("Instruction {} not present in the graph", input.getId());
101 final CancelInstructionOutput out = new CancelInstructionOutputBuilder().setFailure(UnknownInstruction.class).build();
102 return SuccessfulRpcResult.create(out);
105 switch (i.getStatus()) {
111 LOG.debug("Instruction {} can no longer be cancelled due to status {}", input.getId());
112 return SuccessfulRpcResult.create(
113 new CancelInstructionOutputBuilder().setFailure(UncancellableInstruction.class).build());
119 cancelInstruction(i, null);
120 return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder().build());
124 public Failure submitInstruction(final SubmitInstructionInput input, final InstructionExecutor executor) {
125 final InstructionId id = input.getId();
126 if (insns.get(id) != null) {
127 LOG.info("Instruction ID {} already present", id);
128 return new FailureBuilder().setType(DuplicateInstructionId.class).build();
131 // First things first: check the deadline
132 final Nanotime now = NanotimeUtil.currentTime();
133 final BigInteger left = input.getDeadline().getValue().subtract(now.getValue());
135 if (left.compareTo(BigInteger.ZERO) <= 0) {
136 LOG.debug("Instruction {} deadline has already passed by {}ns", id, left);
137 return new FailureBuilder().setType(DeadOnArrival.class).build();
140 // Resolve dependencies
141 final List<Instruction> dependencies = new ArrayList<>();
142 for (final InstructionId pid : input.getPreconditions()) {
143 final Instruction i = insns.get(pid);
145 LOG.info("Instruction {} depends on {}, which is not a known instruction", id, pid);
146 return new FailureBuilder().setType(UnknownPreconditionId.class).build();
152 // Check if all dependencies are non-failed
153 final List<InstructionId> unmet = new ArrayList<>();
154 for (final Instruction d : dependencies) {
155 switch (d.getStatus()) {
159 unmet.add(d.getId());
170 * Some dependencies have failed, declare the request dead-on-arrival
171 * and fail the operation.
173 if (!unmet.isEmpty()) {
174 return new FailureBuilder().setType(DeadOnArrival.class).setFailedPreconditions(unmet).build();
178 * All pre-flight checks done are at this point, the following
179 * steps can only fail in catastrophic scenarios (OOM and the
183 // Schedule a timeout for the instruction
184 final Timeout t = timer.newTimeout(new TimerTask() {
186 public void run(final Timeout timeout) throws Exception {
187 timeoutInstruction(input.getId());
189 }, left.longValue(), TimeUnit.NANOSECONDS);
191 // Put it into the instruction list
192 final Instruction i = new Instruction(input.getId(), executor, dependencies, t);
195 // Attach it into its dependencies
196 for (final Instruction d : dependencies) {
201 * All done. The next part is checking whether the instruction can
202 * run, which we can figure out after sending out the acknowledgement.
203 * This task should be ingress-weighed, so we reinsert it into the
204 * same execution service.
206 this.executor.submit(new Runnable() {
209 tryScheduleInstruction(i);
217 private void transitionInstruction(final Instruction v, final InstructionStatus status, final Details details) {
221 LOG.debug("Instruction {} transitioned to status {}", v.getId(), status);
223 // Send out a notification
224 notifs.publish(new InstructionStatusChangedBuilder().
225 setId(v.getId()).setStatus(status).setDetails(details).build());
229 private void cancelSingle(final Instruction i, final Details details) {
233 // Set the new status and send out notification
234 transitionInstruction(i, InstructionStatus.Cancelled, details);
238 private void cancelDependants(final Instruction v) {
239 final Details details = new DetailsBuilder().setUnmetDependencies(ImmutableList.of(v.getId())).build();
240 for (final Instruction d : v.getDependants()) {
241 switch (d.getStatus()) {
250 cancelSingle(d, details);
257 private synchronized void cancelInstruction(final Instruction i, final Details details) {
258 readyQueue.remove(i);
259 cancelSingle(i, details);
263 private synchronized void timeoutInstruction(final InstructionId id) {
264 final Instruction i = insns.get(id);
266 LOG.warn("Instruction {} timed out, but not found in the queue", id);
270 switch (i.getStatus()) {
274 LOG.debug("Instruction {} has status {}, timeout is a no-op", id, i.getStatus());
277 LOG.warn("Instruction {} has status {} before timeout completed", id, i.getStatus());
280 LOG.info("Instruction {} timed out while executing, transitioning into Unknown", id);
281 transitionInstruction(i, InstructionStatus.Unknown, null);
285 LOG.debug("Instruction {} timed out while Queued, cancelling it", id);
287 final List<InstructionId> ids = new ArrayList<>();
288 for (final Instruction d : i.getDependencies()) {
289 if (d.getStatus() != InstructionStatus.Successful) {
294 cancelInstruction(i, new DetailsBuilder().setUnmetDependencies(ids).build());
297 LOG.debug("Instruction {} timed out while Scheduled, cancelling it", i.getId());
298 // FIXME: we should provide details why it timed out while scheduled
299 cancelInstruction(i, null);
305 private synchronized void tryScheduleInstruction(final Instruction i) {
306 Preconditions.checkState(i.getStatus().equals(InstructionStatus.Queued));
309 * Check all vertices we depend on. We start off as ready for
310 * scheduling. If we encounter a cancelled/failed/unknown
311 * dependency, we cancel this instruction (and cascade). If we
312 * encounter an executing/queued/scheduled dependency, we hold
313 * of scheduling this one.
315 boolean ready = true;
317 final List<InstructionId> unmet = new ArrayList<>();
318 for (final Instruction d : i.getDependencies()) {
319 switch (d.getStatus()) {
323 unmet.add(d.getId());
336 if (!unmet.isEmpty()) {
337 LOG.debug("Instruction {} was Queued, while some dependencies were resolved unsuccessfully, cancelling it", i.getId());
338 cancelSingle(i, new DetailsBuilder().setUnmetDependencies(unmet).build());
344 LOG.debug("Instruction {} is ready for execution", i.getId());
345 transitionInstruction(i, InstructionStatus.Scheduled, null);
352 private synchronized void executionFailed(final Instruction i, final Throwable cause) {
353 LOG.error("Instruction {} failed to execute", i.getId(), cause);
354 transitionInstruction(i, InstructionStatus.Failed, null);
358 private synchronized void executionSuccessful(final Instruction i, final ExecutionResult<?> res) {
361 transitionInstruction(i, res.getStatus(), res.getDetails());
363 // Walk all dependants and try to schedule them
364 for (final Instruction d : i.getDependants()) {
365 tryScheduleInstruction(d);
369 private synchronized void processQueues() throws InterruptedException {
371 * This method is only ever interrupted by InterruptedException
374 while (!readyQueue.isEmpty()) {
375 final Instruction i = readyQueue.poll();
377 Preconditions.checkState(i.getStatus().equals(InstructionStatus.Scheduled));
379 transitionInstruction(i, InstructionStatus.Executing, null);
380 Futures.addCallback(i.execute(), new FutureCallback<ExecutionResult<Details>>() {
383 public void onSuccess(final ExecutionResult<Details> result) {
384 executionSuccessful(i, result);
388 public void onFailure(final Throwable t) {
389 executionFailed(i, t);
398 synchronized void start(final ThreadFactory threadFactory) {
399 Preconditions.checkState(exec == null, "Programming service dispatch thread already started");
401 exec = Executors.newSingleThreadExecutor(threadFactory);
402 thread = exec.submit(new Callable<Void>() {
404 public Void call() throws Exception {
407 } catch (InterruptedException ex) {
408 LOG.error("Programming service dispatch thread died", ex);
416 synchronized void stop(final long timeout, final TimeUnit unit) throws InterruptedException {
417 Preconditions.checkState(exec != null, "Programming service dispatch thread already stopped");
420 exec.awaitTermination(timeout, unit);
425 public void close() throws Exception {
426 stop(CLOSE_TIMEOUT, TimeUnit.SECONDS);