2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.bgpcep.programming.impl;
10 import io.netty.util.Timeout;
11 import io.netty.util.Timer;
12 import io.netty.util.TimerTask;
13 import io.netty.util.concurrent.Future;
14 import io.netty.util.concurrent.FutureListener;
16 import java.math.BigInteger;
17 import java.util.ArrayDeque;
18 import java.util.ArrayList;
19 import java.util.Deque;
20 import java.util.HashMap;
21 import java.util.List;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.TimeUnit;
29 import javax.annotation.concurrent.GuardedBy;
31 import org.opendaylight.bgpcep.programming.spi.ExecutionResult;
32 import org.opendaylight.bgpcep.programming.spi.InstructionExecutor;
33 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
34 import org.opendaylight.bgpcep.programming.spi.SuccessfulRpcResult;
35 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DeadOnArrival;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DuplicateInstructionId;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionId;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatus;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatusChangedBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.ProgrammingService;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UncancellableInstruction;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownInstruction;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownPreconditionId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.Details;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.DetailsBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure.Failure;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure.FailureBuilder;
53 import org.opendaylight.yangtools.yang.common.RpcResult;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
57 import com.google.common.base.Preconditions;
58 import com.google.common.collect.ImmutableList;
60 final class ProgrammingServiceImpl implements InstructionScheduler, ProgrammingService {
61 private static final Logger LOG = LoggerFactory.getLogger(ProgrammingServiceImpl.class);
62 private static final BigInteger MILLION = BigInteger.valueOf(1000000);
64 private final Map<InstructionId, Instruction> insns = new HashMap<>();
67 private final Deque<Instruction> readyQueue = new ArrayDeque<>();
69 private final NotificationProviderService notifs;
70 private final ExecutorService executor;
71 private final Timer timer;
72 private java.util.concurrent.Future<Void> thread;
73 private ExecutorService exec;
75 ProgrammingServiceImpl(final NotificationProviderService notifs, final ExecutorService executor,
76 final Timer timer, final InstructionScheduler registry) {
77 this.notifs = Preconditions.checkNotNull(notifs);
78 this.executor = Preconditions.checkNotNull(executor);
79 this.timer = Preconditions.checkNotNull(timer);
83 public java.util.concurrent.Future<RpcResult<CancelInstructionOutput>> cancelInstruction(final CancelInstructionInput input) {
84 return executor.submit(new Callable<RpcResult<CancelInstructionOutput>>() {
86 public RpcResult<CancelInstructionOutput> call() {
87 return realCancelInstruction(input);
92 private synchronized RpcResult<CancelInstructionOutput> realCancelInstruction(final CancelInstructionInput input) {
93 final Instruction i = insns.get(input.getId());
95 LOG.debug("Instruction {} not present in the graph", input.getId());
97 final CancelInstructionOutput out = new CancelInstructionOutputBuilder().setFailure(UnknownInstruction.class).build();
98 return SuccessfulRpcResult.create(out);
101 switch (i.getStatus()) {
107 LOG.debug("Instruction {} can no longer be cancelled due to status {}", input.getId());
108 return SuccessfulRpcResult.create(
109 new CancelInstructionOutputBuilder().setFailure(UncancellableInstruction.class).build());
115 cancelInstruction(i, null);
116 return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder().build());
120 public Failure submitInstruction(final SubmitInstructionInput input, final InstructionExecutor executor) {
121 final InstructionId id = input.getId();
122 if (insns.get(id) != null) {
123 LOG.info("Instruction ID {} already present", id);
124 return new FailureBuilder().setType(DuplicateInstructionId.class).build();
127 // First things first: check the deadline
128 final BigInteger now = BigInteger.valueOf(System.currentTimeMillis()).multiply(MILLION);
129 final BigInteger left = input.getDeadline().getValue().subtract(now);
131 if (left.compareTo(BigInteger.ZERO) <= 0) {
132 LOG.debug("Instruction {} deadline has already passed by {}ns", id, left);
133 return new FailureBuilder().setType(DeadOnArrival.class).build();
136 // Resolve dependencies
137 final List<Instruction> dependencies = new ArrayList<>();
138 for (final InstructionId pid : input.getPreconditions()) {
139 final Instruction i = insns.get(pid);
141 LOG.info("Instruction {} depends on {}, which is not a known instruction", id, pid);
142 return new FailureBuilder().setType(UnknownPreconditionId.class).build();
148 // Check if all dependencies are non-failed
149 final List<InstructionId> unmet = new ArrayList<>();
150 for (final Instruction d : dependencies) {
151 switch (d.getStatus()) {
155 unmet.add(d.getId());
166 * Some dependencies have failed, declare the request dead-on-arrival
167 * and fail the operation.
169 if (!unmet.isEmpty()) {
170 return new FailureBuilder().setType(DeadOnArrival.class).setFailedPreconditions(unmet).build();
174 * All pre-flight checks done are at this point, the following
175 * steps can only fail in catastrophic scenarios (OOM and the
179 // Schedule a timeout for the instruction
180 final Timeout t = timer.newTimeout(new TimerTask() {
182 public void run(final Timeout timeout) throws Exception {
183 timeoutInstruction(input.getId());
185 }, left.longValue(), TimeUnit.NANOSECONDS);
187 // Put it into the instruction list
188 final Instruction i = new Instruction(input.getId(), executor, dependencies, t);
191 // Attach it into its dependencies
192 for (final Instruction d : dependencies) {
197 * All done. The next part is checking whether the instruction can
198 * run, which we can figure out after sending out the acknowledgement.
199 * This task should be ingress-weighed, so we reinsert it into the
200 * same execution service.
202 this.executor.submit(new Runnable() {
205 tryScheduleInstruction(i);
213 private void transitionInstruction(final Instruction v, final InstructionStatus status, final Details details) {
217 LOG.debug("Instruction {} transitioned to status {}", v.getId(), status);
219 // Send out a notification
220 notifs.publish(new InstructionStatusChangedBuilder().
221 setId(v.getId()).setStatus(status).setDetails(details).build());
225 private void cancelSingle(final Instruction i, final Details details) {
229 // Set the new status and send out notification
230 transitionInstruction(i, InstructionStatus.Cancelled, details);
234 private void cancelDependants(final Instruction v) {
235 final Details details = new DetailsBuilder().setUnmetDependencies(ImmutableList.of(v.getId())).build();
236 for (final Instruction d : v.getDependants()) {
237 switch (d.getStatus()) {
246 cancelSingle(d, details);
253 private synchronized void cancelInstruction(final Instruction i, final Details details) {
254 readyQueue.remove(i);
255 cancelSingle(i, details);
259 private synchronized void timeoutInstruction(final InstructionId id) {
260 final Instruction i = insns.get(id);
262 LOG.warn("Instruction {} timed out, but not found in the queue", id);
266 switch (i.getStatus()) {
270 LOG.debug("Instruction {} has status {}, timeout is a no-op", id, i.getStatus());
273 LOG.warn("Instruction {} has status {} before timeout completed", id, i.getStatus());
276 LOG.info("Instruction {} timed out while executing, transitioning into Unknown", id);
277 transitionInstruction(i, InstructionStatus.Unknown, null);
281 LOG.debug("Instruction {} timed out while Queued, cancelling it", id);
283 final List<InstructionId> ids = new ArrayList<>();
284 for (final Instruction d : i.getDependencies()) {
285 if (d.getStatus() != InstructionStatus.Successful) {
290 cancelInstruction(i, new DetailsBuilder().setUnmetDependencies(ids).build());
293 LOG.debug("Instruction {} timed out while Scheduled, cancelling it", i.getId());
294 // FIXME: we should provide details why it timed out while scheduled
295 cancelInstruction(i, null);
301 private synchronized void tryScheduleInstruction(final Instruction i) {
302 Preconditions.checkState(i.getStatus().equals(InstructionStatus.Queued));
305 * Check all vertices we depend on. We start off as ready for
306 * scheduling. If we encounter a cancelled/failed/unknown
307 * dependency, we cancel this instruction (and cascade). If we
308 * encounter an executing/queued/scheduled dependency, we hold
309 * of scheduling this one.
311 boolean ready = true;
313 final List<InstructionId> unmet = new ArrayList<>();
314 for (final Instruction d : i.getDependencies()) {
315 switch (d.getStatus()) {
319 unmet.add(d.getId());
332 if (!unmet.isEmpty()) {
333 LOG.debug("Instruction {} was Queued, while some dependencies were resolved unsuccessfully, cancelling it", i.getId());
334 cancelSingle(i, new DetailsBuilder().setUnmetDependencies(unmet).build());
340 LOG.debug("Instruction {} is ready for execution", i.getId());
341 transitionInstruction(i, InstructionStatus.Scheduled, null);
348 private synchronized void executionFailed(final Instruction i, final Throwable cause) {
349 LOG.error("Instruction {} failed to execute", i.getId(), cause);
350 transitionInstruction(i, InstructionStatus.Failed, null);
354 private synchronized void executionSuccessful(final Instruction i, final ExecutionResult<?> res) {
357 transitionInstruction(i, res.getStatus(), res.getDetails());
359 // Walk all dependants and try to schedule them
360 for (final Instruction d : i.getDependants()) {
361 tryScheduleInstruction(d);
365 private synchronized void processQueues() throws InterruptedException {
367 * This method is only ever interrupted by InterruptedException
370 while (!readyQueue.isEmpty()) {
371 final Instruction i = readyQueue.poll();
373 Preconditions.checkState(i.getStatus().equals(InstructionStatus.Scheduled));
375 transitionInstruction(i, InstructionStatus.Executing, null);
376 final Future<ExecutionResult<?>> f = i.execute();
377 f.addListener(new FutureListener<ExecutionResult<?>>() {
379 public void operationComplete(final Future<ExecutionResult<?>> future) {
380 if (future.isSuccess()) {
381 executionSuccessful(i, future.getNow());
383 executionFailed(i, future.cause());
393 synchronized void start(final ThreadFactory threadFactory) {
394 Preconditions.checkState(exec == null, "Programming service dispatch thread already started");
396 exec = Executors.newSingleThreadExecutor(threadFactory);
397 thread = exec.submit(new Callable<Void>() {
399 public Void call() throws Exception {
402 } catch (Exception ex) {
403 if (!(ex instanceof InterruptedException)) {
404 LOG.error("Programming service dispatch thread died", ex);
414 synchronized void stop(final long timeout, final TimeUnit unit) throws InterruptedException {
415 Preconditions.checkState(exec != null, "Programming service dispatch thread already stopped");
418 exec.awaitTermination(timeout, unit);