2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.bgpcep.programming.impl;
10 import io.netty.util.Timeout;
11 import io.netty.util.Timer;
12 import io.netty.util.TimerTask;
13 import io.netty.util.concurrent.Future;
14 import io.netty.util.concurrent.FutureListener;
16 import java.math.BigInteger;
17 import java.util.ArrayDeque;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.Deque;
21 import java.util.HashMap;
22 import java.util.Iterator;
23 import java.util.List;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ThreadFactory;
29 import java.util.concurrent.TimeUnit;
31 import javax.annotation.concurrent.GuardedBy;
33 import org.opendaylight.bgpcep.programming.spi.ExecutionResult;
34 import org.opendaylight.bgpcep.programming.spi.InstructionExecutor;
35 import org.opendaylight.bgpcep.programming.spi.InstructionExecutorRegistry;
36 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DeadOnArrival;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DuplicateInstructionId;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionId;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatus;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatusChangedBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionType;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.ProgrammingService;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionInput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionOutputBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UncancellableInstruction;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownInstruction;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownPreconditionId;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.Details;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.DetailsBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure.Failure;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure.FailureBuilder;
57 import org.opendaylight.yangtools.concepts.Registration;
58 import org.opendaylight.yangtools.yang.common.RpcResult;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
62 import com.google.common.base.Preconditions;
63 import com.google.common.collect.ArrayListMultimap;
64 import com.google.common.collect.ImmutableList;
65 import com.google.common.collect.Multimap;
67 final class ProgrammingServiceImpl implements InstructionExecutorRegistry, ProgrammingService {
68 private static final Logger LOG = LoggerFactory.getLogger(ProgrammingServiceImpl.class);
69 private static final BigInteger MILLION = BigInteger.valueOf(1000000);
71 private final Map<InstructionId, Instruction> insns = new HashMap<>();
74 private final Deque<Instruction> readyQueue = new ArrayDeque<>();
77 private final Deque<Instruction> deferredQueue = new ArrayDeque<>();
80 private final Multimap<Class<? extends InstructionType>, InstructionExecutor> executors = ArrayListMultimap.create();
82 private final NotificationProviderService notifs;
83 private final ExecutorService executor;
84 private final Timer timer;
85 private java.util.concurrent.Future<Void> thread;
86 private ExecutorService exec;
88 ProgrammingServiceImpl(final NotificationProviderService notifs, final ExecutorService executor,
89 final Timer timer, final InstructionExecutorRegistry registry) {
90 this.notifs = Preconditions.checkNotNull(notifs);
91 this.executor = Preconditions.checkNotNull(executor);
92 this.timer = Preconditions.checkNotNull(timer);
96 public java.util.concurrent.Future<RpcResult<CancelInstructionOutput>> cancelInstruction(final CancelInstructionInput input) {
97 return executor.submit(new Callable<RpcResult<CancelInstructionOutput>>() {
99 public RpcResult<CancelInstructionOutput> call() {
100 return realCancelInstruction(input);
106 public java.util.concurrent.Future<RpcResult<SubmitInstructionOutput>> submitInstruction(final SubmitInstructionInput input) {
107 return executor.submit(new Callable<RpcResult<SubmitInstructionOutput>>() {
109 public RpcResult<SubmitInstructionOutput> call() {
110 return realSubmitInstruction(input);
116 public synchronized Registration<InstructionExecutor> registerInstructionExecutor(final Class<? extends InstructionType> type, final InstructionExecutor executor) {
117 Preconditions.checkNotNull(type);
118 Preconditions.checkNotNull(executor);
120 executors.put(type, executor);
123 * Walk the deferred instructions back to front, check if they have
124 * the same type as the executor we have just registered. If they do,
125 * we move them to the head of readyQueue. This approach should retain
126 * submission order of the instructions.
128 final Iterator<Instruction> it = deferredQueue.descendingIterator();
129 while (it.hasNext()) {
130 final Instruction i = it.next();
131 if (type.equals(i.getInput().getType())) {
133 readyQueue.addFirst(i);
139 final Object lock = this;
140 return new Registration<InstructionExecutor>() {
142 public void close() throws Exception {
143 synchronized (lock) {
144 executors.remove(type, executor);
149 public InstructionExecutor getInstance() {
155 private static final RpcResult<SubmitInstructionOutput> failedSubmit(final Failure f) {
156 return SuccessfulRpcResult.create(
157 new SubmitInstructionOutputBuilder().setResult(
158 new org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.FailureBuilder()
159 .setFailure(f).build()).build());
162 private synchronized RpcResult<CancelInstructionOutput> realCancelInstruction(final CancelInstructionInput input) {
163 final Instruction i = insns.get(input.getId());
165 LOG.debug("Instruction {} not present in the graph", input.getId());
167 final CancelInstructionOutput out = new CancelInstructionOutputBuilder().setFailure(UnknownInstruction.class).build();
168 return SuccessfulRpcResult.create(out);
171 switch (i.getStatus()) {
177 LOG.debug("Instruction {} can no longer be cancelled due to status {}", input.getId());
178 return SuccessfulRpcResult.create(
179 new CancelInstructionOutputBuilder().setFailure(UncancellableInstruction.class).build());
185 cancelInstruction(i, null);
186 return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder().build());
189 private synchronized RpcResult<SubmitInstructionOutput> realSubmitInstruction(final SubmitInstructionInput input) {
190 final InstructionId id = input.getId();
191 if (insns.get(id) != null) {
192 LOG.info("Instruction ID {} already present", id);
193 return failedSubmit(new FailureBuilder().setType(DuplicateInstructionId.class).build());
196 // First things first: check the deadline
197 final BigInteger now = BigInteger.valueOf(System.currentTimeMillis()).multiply(MILLION);
198 final BigInteger left = input.getDeadline().getValue().subtract(now);
200 if (left.compareTo(BigInteger.ZERO) <= 0) {
201 LOG.debug("Instruction {} deadline has already passed by {}ns", id, left);
202 return failedSubmit(new FailureBuilder().setType(DeadOnArrival.class).build());
205 // Resolve dependencies
206 final List<Instruction> dependencies = new ArrayList<>();
207 for (final InstructionId pid : input.getPreconditions()) {
208 final Instruction i = insns.get(pid);
210 LOG.info("Instruction {} depends on {}, which is not a known instruction", id, pid);
211 return failedSubmit(new FailureBuilder().setType(UnknownPreconditionId.class).build());
217 // Check if all dependencies are non-failed
218 final List<InstructionId> unmet = new ArrayList<>();
219 for (final Instruction d : dependencies) {
220 switch (d.getStatus()) {
224 unmet.add(d.getInput().getId());
235 * Some dependencies have failed, declare the request dead-on-arrival
236 * and fail the operation.
238 if (!unmet.isEmpty()) {
239 return failedSubmit(new FailureBuilder().setType(DeadOnArrival.class).setFailedPreconditions(unmet).build());
243 * All pre-flight checks done are at this point, the following
244 * steps can only fail in catastrophic scenarios (OOM and the
248 // Schedule a timeout for the instruction
249 final Timeout t = timer.newTimeout(new TimerTask() {
251 public void run(final Timeout timeout) throws Exception {
252 timeoutInstruction(input.getId());
254 }, left.longValue(), TimeUnit.NANOSECONDS);
256 // Put it into the instruction list
257 final Instruction i = new Instruction(input, dependencies, t);
260 // Attach it into its dependencies
261 for (final Instruction d : dependencies) {
266 * All done. The next part is checking whether the instruction can
267 * run, which we can figure out after sending out the acknowledgement.
268 * This task should be ingress-weighed, so we reinsert it into the
269 * same execution service.
271 executor.submit(new Runnable() {
274 tryScheduleInstruction(i);
278 return SuccessfulRpcResult.create(new SubmitInstructionOutputBuilder().build());
282 private void transitionInstruction(final Instruction v, final InstructionStatus status, final Details details) {
286 LOG.debug("Instruction {} transitioned to status {}", v.getInput().getId(), status);
288 // Send out a notification
289 notifs.publish(new InstructionStatusChangedBuilder().
290 setId(v.getInput().getId()).setStatus(status).setDetails(details).build());
294 private void cancelSingle(final Instruction i, final Details details) {
298 // Set the new status and send out notification
299 transitionInstruction(i, InstructionStatus.Cancelled, details);
303 private void cancelDependants(final Instruction v) {
304 final Details details = new DetailsBuilder().setUnmetDependencies(ImmutableList.of(v.getInput().getId())).build();
305 for (final Instruction d : v.getDependants()) {
306 switch (d.getStatus()) {
315 cancelSingle(d, details);
322 private synchronized void cancelInstruction(final Instruction i, final Details details) {
323 deferredQueue.remove(i);
324 readyQueue.remove(i);
325 cancelSingle(i, details);
329 private synchronized void timeoutInstruction(final InstructionId id) {
330 final Instruction i = insns.get(id);
332 LOG.warn("Instruction {} timed out, but not found in the queue", id);
336 switch (i.getStatus()) {
340 LOG.debug("Instruction {} has status {}, timeout is a no-op", id, i.getStatus());
343 LOG.warn("Instruction {} has status {} before timeout completed", id, i.getStatus());
346 LOG.info("Instruction {} timed out while executing, transitioning into Unknown", id);
347 transitionInstruction(i, InstructionStatus.Unknown, null);
351 LOG.debug("Instruction {} timed out while Queued, cancelling it", id);
353 final List<InstructionId> ids = new ArrayList<>();
354 for (final Instruction d : i.getDependencies()) {
355 if (d.getStatus() != InstructionStatus.Successful) {
356 ids.add(d.getInput().getId());
360 cancelInstruction(i, new DetailsBuilder().setUnmetDependencies(ids).build());
363 LOG.debug("Instruction {} timed out while Scheduled, cancelling it", i.getInput().getId());
364 // FIXME: we should provide details why it timed out while scheduled
365 cancelInstruction(i, null);
371 private synchronized void tryScheduleInstruction(final Instruction i) {
372 Preconditions.checkState(i.getStatus().equals(InstructionStatus.Queued));
375 * Check all vertices we depend on. We start off as ready for
376 * scheduling. If we encounter a cancelled/failed/unknown
377 * dependency, we cancel this instruction (and cascade). If we
378 * encounter an executing/queued/scheduled dependency, we hold
379 * of scheduling this one.
381 boolean ready = true;
383 final List<InstructionId> unmet = new ArrayList<>();
384 for (final Instruction d : i.getDependencies()) {
385 switch (d.getStatus()) {
389 unmet.add(d.getInput().getId());
402 if (!unmet.isEmpty()) {
403 LOG.debug("Instruction {} was Queued, while some dependencies were resolved unsuccessfully, cancelling it", i.getInput().getId());
404 cancelSingle(i, new DetailsBuilder().setUnmetDependencies(unmet).build());
410 LOG.debug("Instruction {} is ready for execution", i.getInput().getId());
411 transitionInstruction(i, InstructionStatus.Scheduled, null);
418 private synchronized void executionFailed(final Instruction i, final Throwable cause) {
419 LOG.error("Instruction {} failed to execute", i.getInput().getId(), cause);
420 transitionInstruction(i, InstructionStatus.Failed, null);
424 private synchronized void executionSuccessful(final Instruction i, final ExecutionResult<?> res) {
427 transitionInstruction(i, res.getStatus(), res.getDetails());
429 // Walk all dependants and try to schedule them
430 for (final Instruction d : i.getDependants()) {
431 tryScheduleInstruction(d);
435 private synchronized void processQueues() throws InterruptedException {
437 * This method is only ever interrupted by InterruptedException
440 while (!readyQueue.isEmpty()) {
441 final Instruction i = readyQueue.poll();
443 Preconditions.checkState(i.getStatus().equals(InstructionStatus.Scheduled));
444 final SubmitInstructionInput input = i.getInput();
447 * Walk all the registered executors for a particular type and
448 * offer them the chance to execute the instruction. The first
449 * one to accept it wins.
451 Future<ExecutionResult<?>> f = null;
452 final Collection<InstructionExecutor> el = executors.get(input.getType());
454 for (final InstructionExecutor e : el) {
455 f = e.offerInstruction(input.getArguments());
461 // We did not find an executor -- defer the instruction
463 deferredQueue.add(i);
467 transitionInstruction(i, InstructionStatus.Executing, null);
468 f.addListener(new FutureListener<ExecutionResult<?>>() {
470 public void operationComplete(final Future<ExecutionResult<?>> future) {
471 if (future.isSuccess()) {
472 executionSuccessful(i, future.getNow());
474 executionFailed(i, future.cause());
484 synchronized void start(final ThreadFactory threadFactory) {
485 Preconditions.checkState(exec == null, "Programming service dispatch thread already started");
487 exec = Executors.newSingleThreadExecutor(threadFactory);
488 thread = exec.submit(new Callable<Void>() {
490 public Void call() throws Exception {
493 } catch (Exception ex) {
494 if (!(ex instanceof InterruptedException)) {
495 LOG.error("Programming service dispatch thread died", ex);
505 synchronized void stop(final long timeout, final TimeUnit unit) throws InterruptedException {
506 Preconditions.checkState(exec != null, "Programming service dispatch thread already stopped");
509 exec.awaitTermination(timeout, unit);