X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=programming%2Fimpl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fbgpcep%2Fprogramming%2Fimpl%2FProgrammingServiceImpl.java;h=68aa257c3b2f16177fa9a5cb193b19e2f7dc754f;hb=0903ae978349d82e076b999965aaad58f0318393;hp=6955f8b776c68b9662904fd130e4ece7d6178d59;hpb=e19c562d8cf21c3fd27a6562f96617728bae556d;p=bgpcep.git diff --git a/programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/ProgrammingServiceImpl.java b/programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/ProgrammingServiceImpl.java index 6955f8b776..68aa257c3b 100644 --- a/programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/ProgrammingServiceImpl.java +++ b/programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/ProgrammingServiceImpl.java @@ -7,428 +7,461 @@ */ package org.opendaylight.bgpcep.programming.impl; +import static java.util.Objects.requireNonNull; + +import com.google.common.util.concurrent.FluentFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.netty.util.Timeout; import io.netty.util.Timer; -import io.netty.util.TimerTask; - import java.math.BigInteger; -import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.Deque; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; - -import javax.annotation.concurrent.GuardedBy; - +import org.checkerframework.checker.lock.qual.GuardedBy; import org.opendaylight.bgpcep.programming.NanotimeUtil; import org.opendaylight.bgpcep.programming.spi.ExecutionResult; -import org.opendaylight.bgpcep.programming.spi.InstructionExecutor; +import org.opendaylight.bgpcep.programming.spi.Instruction; import org.opendaylight.bgpcep.programming.spi.InstructionScheduler; +import org.opendaylight.bgpcep.programming.spi.SchedulerException; import org.opendaylight.bgpcep.programming.spi.SuccessfulRpcResult; -import org.opendaylight.controller.sal.binding.api.NotificationProviderService; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CleanInstructionsInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CleanInstructionsOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DeadOnArrival; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DuplicateInstructionId; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionId; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatus; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatusChangedBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.Nanotime; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.ProgrammingService; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UncancellableInstruction; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownInstruction; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownPreconditionId; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.Details; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.DetailsBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure._case.Failure; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure._case.FailureBuilder; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.NotificationPublishService; +import org.opendaylight.mdsal.binding.api.RpcProviderService; +import org.opendaylight.mdsal.binding.api.WriteTransaction; +import org.opendaylight.mdsal.common.api.CommitInfo; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService; +import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; +import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration; +import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CleanInstructionsInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CleanInstructionsOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CleanInstructionsOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.DeadOnArrival; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.DuplicateInstructionId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionStatus; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionStatusChangedBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionsQueue; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionsQueueBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionsQueueKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.Nanotime; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.ProgrammingService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.SubmitInstructionInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.UnknownInstruction; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.UnknownPreconditionId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction.queue.InstructionBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction.queue.InstructionKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction.status.changed.Details; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.submit.instruction.output.result.failure._case.FailureBuilder; +import org.opendaylight.yangtools.concepts.ObjectRegistration; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcResult; +import org.osgi.framework.ServiceRegistration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; - -public final class ProgrammingServiceImpl implements InstructionScheduler, ProgrammingService, AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(ProgrammingServiceImpl.class); - - // Default stop timeout, in seconds - private static final long CLOSE_TIMEOUT = 5; - - private final Map insns = new HashMap<>(); - - @GuardedBy("this") - private final Deque readyQueue = new ArrayDeque<>(); - - private final NotificationProviderService notifs; - private final ExecutorService executor; - private final Timer timer; - private java.util.concurrent.Future thread; - private ExecutorService exec; - - public ProgrammingServiceImpl(final NotificationProviderService notifs, final ExecutorService executor, final Timer timer) { - this.notifs = Preconditions.checkNotNull(notifs); - this.executor = Preconditions.checkNotNull(executor); - this.timer = Preconditions.checkNotNull(timer); - } - - @Override - public java.util.concurrent.Future> cancelInstruction(final CancelInstructionInput input) { - return this.executor.submit(new Callable>() { - @Override - public RpcResult call() { - return realCancelInstruction(input); - } - }); - } - - private synchronized RpcResult realCancelInstruction(final CancelInstructionInput input) { - final Instruction i = this.insns.get(input.getId()); - if (i == null) { - LOG.debug("Instruction {} not present in the graph", input.getId()); - - final CancelInstructionOutput out = new CancelInstructionOutputBuilder().setFailure(UnknownInstruction.class).build(); - return SuccessfulRpcResult.create(out); - } - - switch (i.getStatus()) { - case Cancelled: - case Executing: - case Failed: - case Successful: - case Unknown: - LOG.debug("Instruction {} can no longer be cancelled due to status {}", input.getId()); - return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder().setFailure(UncancellableInstruction.class).build()); - case Queued: - case Scheduled: - break; - } - - cancelInstruction(i, null); - return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder().build()); - } - - @Override - public Failure submitInstruction(final SubmitInstructionInput input, final InstructionExecutor executor) { - final InstructionId id = input.getId(); - if (this.insns.get(id) != null) { - LOG.info("Instruction ID {} already present", id); - return new FailureBuilder().setType(DuplicateInstructionId.class).build(); - } - - // First things first: check the deadline - final Nanotime now = NanotimeUtil.currentTime(); - final BigInteger left = input.getDeadline().getValue().subtract(now.getValue()); - - if (left.compareTo(BigInteger.ZERO) <= 0) { - LOG.debug("Instruction {} deadline has already passed by {}ns", id, left); - return new FailureBuilder().setType(DeadOnArrival.class).build(); - } - - // Resolve dependencies - final List dependencies = new ArrayList<>(); - for (final InstructionId pid : input.getPreconditions()) { - final Instruction i = this.insns.get(pid); - if (i == null) { - LOG.info("Instruction {} depends on {}, which is not a known instruction", id, pid); - return new FailureBuilder().setType(UnknownPreconditionId.class).build(); - } - - dependencies.add(i); - } - - // Check if all dependencies are non-failed - final List unmet = new ArrayList<>(); - for (final Instruction d : dependencies) { - switch (d.getStatus()) { - case Cancelled: - case Failed: - case Unknown: - unmet.add(d.getId()); - break; - case Executing: - case Queued: - case Scheduled: - case Successful: - break; - } - } - - /* - * Some dependencies have failed, declare the request dead-on-arrival - * and fail the operation. - */ - if (!unmet.isEmpty()) { - return new FailureBuilder().setType(DeadOnArrival.class).setFailedPreconditions(unmet).build(); - } - - /* - * All pre-flight checks done are at this point, the following - * steps can only fail in catastrophic scenarios (OOM and the - * like). - */ - - // Schedule a timeout for the instruction - final Timeout t = this.timer.newTimeout(new TimerTask() { - @Override - public void run(final Timeout timeout) throws Exception { - timeoutInstruction(input.getId()); - } - }, left.longValue(), TimeUnit.NANOSECONDS); - - // Put it into the instruction list - final Instruction i = new Instruction(input.getId(), executor, dependencies, t); - this.insns.put(id, i); - - // Attach it into its dependencies - for (final Instruction d : dependencies) { - d.addDependant(i); - } - - /* - * All done. The next part is checking whether the instruction can - * run, which we can figure out after sending out the acknowledgement. - * This task should be ingress-weighed, so we reinsert it into the - * same execution service. - */ - this.executor.submit(new Runnable() { - @Override - public void run() { - tryScheduleInstruction(i); - } - }); - - return null; - } - - @GuardedBy("this") - private void transitionInstruction(final Instruction v, final InstructionStatus status, final Details details) { - // Set the status - v.setStatus(status); - - LOG.debug("Instruction {} transitioned to status {}", v.getId(), status); - - // Send out a notification - this.notifs.publish(new InstructionStatusChangedBuilder().setId(v.getId()).setStatus(status).setDetails(details).build()); - } - - @GuardedBy("this") - private void cancelSingle(final Instruction i, final Details details) { - // Stop the timeout - i.cancel(); - - // Set the new status and send out notification - transitionInstruction(i, InstructionStatus.Cancelled, details); - } - - @GuardedBy("this") - private void cancelDependants(final Instruction v) { - final Details details = new DetailsBuilder().setUnmetDependencies(ImmutableList.of(v.getId())).build(); - for (final Instruction d : v.getDependants()) { - switch (d.getStatus()) { - case Cancelled: - case Executing: - case Failed: - case Successful: - case Unknown: - break; - case Queued: - case Scheduled: - cancelSingle(d, details); - cancelDependants(d); - break; - } - } - } - - private synchronized void cancelInstruction(final Instruction i, final Details details) { - this.readyQueue.remove(i); - cancelSingle(i, details); - cancelDependants(i); - } - - private synchronized void timeoutInstruction(final InstructionId id) { - final Instruction i = this.insns.get(id); - if (i == null) { - LOG.warn("Instruction {} timed out, but not found in the queue", id); - return; - } - - switch (i.getStatus()) { - case Cancelled: - case Failed: - case Successful: - LOG.debug("Instruction {} has status {}, timeout is a no-op", id, i.getStatus()); - break; - case Unknown: - LOG.warn("Instruction {} has status {} before timeout completed", id, i.getStatus()); - break; - case Executing: - LOG.info("Instruction {} timed out while executing, transitioning into Unknown", id); - transitionInstruction(i, InstructionStatus.Unknown, null); - cancelDependants(i); - break; - case Queued: - LOG.debug("Instruction {} timed out while Queued, cancelling it", id); - - final List ids = new ArrayList<>(); - for (final Instruction d : i.getDependencies()) { - if (d.getStatus() != InstructionStatus.Successful) { - ids.add(d.getId()); - } - } - - cancelInstruction(i, new DetailsBuilder().setUnmetDependencies(ids).build()); - break; - case Scheduled: - LOG.debug("Instruction {} timed out while Scheduled, cancelling it", i.getId()); - // FIXME: BUG-191: we should provide details why it timed out while scheduled - cancelInstruction(i, null); - break; - } - } - - @GuardedBy("this") - private synchronized void tryScheduleInstruction(final Instruction i) { - Preconditions.checkState(i.getStatus().equals(InstructionStatus.Queued)); - - /* - * Check all vertices we depend on. We start off as ready for - * scheduling. If we encounter a cancelled/failed/unknown - * dependency, we cancel this instruction (and cascade). If we - * encounter an executing/queued/scheduled dependency, we hold - * of scheduling this one. - */ - boolean ready = true; - - final List unmet = new ArrayList<>(); - for (final Instruction d : i.getDependencies()) { - switch (d.getStatus()) { - case Cancelled: - case Failed: - case Unknown: - unmet.add(d.getId()); - break; - case Executing: - case Queued: - case Scheduled: - ready = false; - break; - case Successful: - // No-op - break; - } - } - - if (!unmet.isEmpty()) { - LOG.debug("Instruction {} was Queued, while some dependencies were resolved unsuccessfully, cancelling it", i.getId()); - cancelSingle(i, new DetailsBuilder().setUnmetDependencies(unmet).build()); - cancelDependants(i); - return; - } - - if (ready) { - LOG.debug("Instruction {} is ready for execution", i.getId()); - transitionInstruction(i, InstructionStatus.Scheduled, null); - - this.readyQueue.add(i); - notify(); - } - } - - private synchronized void executionFailed(final Instruction i, final Throwable cause) { - LOG.error("Instruction {} failed to execute", i.getId(), cause); - transitionInstruction(i, InstructionStatus.Failed, null); - cancelDependants(i); - } - - private synchronized void executionSuccessful(final Instruction i, final ExecutionResult res) { - i.cancel(); - - transitionInstruction(i, res.getStatus(), res.getDetails()); - - // Walk all dependants and try to schedule them - for (final Instruction d : i.getDependants()) { - tryScheduleInstruction(d); - } - } - - private synchronized void processQueues() throws InterruptedException { - /* - * This method is only ever interrupted by InterruptedException - */ - while (true) { - while (!this.readyQueue.isEmpty()) { - final Instruction i = this.readyQueue.poll(); - - Preconditions.checkState(i.getStatus().equals(InstructionStatus.Scheduled)); - - transitionInstruction(i, InstructionStatus.Executing, null); - Futures.addCallback(i.execute(), new FutureCallback>() { - - @Override - public void onSuccess(final ExecutionResult
result) { - executionSuccessful(i, result); - } - - @Override - public void onFailure(final Throwable t) { - executionFailed(i, t); - } - }); - } - - wait(); - } - } - - synchronized void start(final ThreadFactory threadFactory) { - Preconditions.checkState(this.exec == null, "Programming service dispatch thread already started"); - - this.exec = Executors.newSingleThreadExecutor(threadFactory); - this.thread = this.exec.submit(new Callable() { - @Override - public Void call() { - try { - processQueues(); - } catch (final InterruptedException ex) { - LOG.error("Programming service dispatch thread died", ex); - } - return null; - } - }); - this.exec.shutdown(); - } - - synchronized void stop(final long timeout, final TimeUnit unit) throws InterruptedException { - Preconditions.checkState(this.exec != null, "Programming service dispatch thread already stopped"); - - this.thread.cancel(true); - this.exec.awaitTermination(timeout, unit); - this.exec = null; - } - - @Override - public void close() throws InterruptedException { - stop(CLOSE_TIMEOUT, TimeUnit.SECONDS); - } - - @Override - public Future> cleanInstructions(final CleanInstructionsInput input) { - // FIXME: BUG-219: implement this instruction - return null; - } +public final class ProgrammingServiceImpl implements ClusterSingletonService, InstructionScheduler, + ProgrammingService { + private static final Logger LOG = LoggerFactory.getLogger(ProgrammingServiceImpl.class); + + private final Map insns = new HashMap<>(); + private final InstanceIdentifier qid; + private final NotificationPublishService notifs; + private final ListeningExecutorService executor; + private final DataBroker dataProvider; + private final Timer timer; + private final String instructionId; + private final ServiceGroupIdentifier sgi; + private final ClusterSingletonServiceRegistration csspReg; + private final RpcProviderService rpcProviderRegistry; + @GuardedBy("this") + private ObjectRegistration reg; + @GuardedBy("this") + private ServiceRegistration serviceRegistration; + + private final class InstructionPusher implements QueueInstruction { + private final InstructionBuilder builder = new InstructionBuilder(); + + InstructionPusher(final InstructionId id, final Nanotime deadline) { + this.builder.setDeadline(deadline); + this.builder.setId(id); + this.builder.withKey(new InstructionKey(id)); + this.builder.setStatus(InstructionStatus.Queued); + } + + @Override + public void instructionUpdated(final InstructionStatus status, final Details details) { + if (!status.equals(this.builder.getStatus())) { + this.builder.setStatus(status); + + final WriteTransaction wt = ProgrammingServiceImpl.this.dataProvider.newWriteOnlyTransaction(); + wt.put(LogicalDatastoreType.OPERATIONAL, + ProgrammingServiceImpl.this.qid.child( + org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming + .rev150720.instruction.queue.Instruction.class, + new InstructionKey(this.builder.getId())), this.builder.build()); + wt.commit().addCallback(new FutureCallback() { + @Override + public void onSuccess(final CommitInfo result) { + LOG.debug("Instruction Queue {} updated", ProgrammingServiceImpl.this.qid); + } + + @Override + public void onFailure(final Throwable trw) { + LOG.error("Failed to update Instruction Queue {}", ProgrammingServiceImpl.this.qid, trw); + } + }, MoreExecutors.directExecutor()); + } + + try { + ProgrammingServiceImpl.this.notifs.putNotification(new InstructionStatusChangedBuilder() + .setId(this.builder.getId()).setStatus(status).setDetails(details).build()); + } catch (final InterruptedException e) { + LOG.debug("Failed to publish notification", e); + } + } + + @Override + public void instructionRemoved() { + final WriteTransaction wt = ProgrammingServiceImpl.this.dataProvider.newWriteOnlyTransaction(); + wt.delete(LogicalDatastoreType.OPERATIONAL, ProgrammingServiceImpl.this.qid.child( + org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction + .queue.Instruction.class, + new InstructionKey(this.builder.getId()))); + wt.commit().addCallback(new FutureCallback() { + @Override + public void onSuccess(final CommitInfo result) { + LOG.debug("Instruction Queue {} removed", ProgrammingServiceImpl.this.qid); + } + + @Override + public void onFailure(final Throwable trw) { + LOG.error("Failed to remove Instruction Queue {}", ProgrammingServiceImpl.this.qid, trw); + } + }, MoreExecutors.directExecutor()); + } + } + + ProgrammingServiceImpl(final DataBroker dataProvider, final NotificationPublishService notifs, + final ListeningExecutorService executor, final RpcProviderService rpcProviderRegistry, + final ClusterSingletonServiceProvider cssp, final Timer timer, final String instructionId) { + this.dataProvider = requireNonNull(dataProvider); + this.instructionId = requireNonNull(instructionId); + this.notifs = requireNonNull(notifs); + this.executor = requireNonNull(executor); + this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry); + this.timer = requireNonNull(timer); + this.qid = KeyedInstanceIdentifier.builder(InstructionsQueue.class, + new InstructionsQueueKey(this.instructionId)).build(); + this.sgi = ServiceGroupIdentifier.create(this.instructionId + "-service-group"); + LOG.info("Creating Programming Service {}.", this.sgi.getName()); + this.csspReg = cssp.registerClusterSingletonService(this); + } + + @Override + public synchronized void instantiateServiceInstance() { + LOG.info("Instruction Queue service {} instantiated", this.sgi.getName()); + this.reg = this.rpcProviderRegistry.registerRpcImplementation(ProgrammingService.class, this); + + final WriteTransaction wt = this.dataProvider.newWriteOnlyTransaction(); + wt.put(LogicalDatastoreType.OPERATIONAL, this.qid, new InstructionsQueueBuilder() + .withKey(new InstructionsQueueKey(this.instructionId)).setInstruction(Map.of()).build()); + wt.commit().addCallback(new FutureCallback() { + @Override + public void onSuccess(final CommitInfo result) { + LOG.debug("Instruction Queue {} added", ProgrammingServiceImpl.this.qid); + } + + @Override + public void onFailure(final Throwable trw) { + LOG.error("Failed to add Instruction Queue {}", ProgrammingServiceImpl.this.qid, trw); + } + }, MoreExecutors.directExecutor()); + } + + @Override + public ServiceGroupIdentifier getIdentifier() { + return this.sgi; + } + + @Override + public ListenableFuture> cancelInstruction(final CancelInstructionInput input) { + return this.executor.submit(() -> realCancelInstruction(input)); + } + + @Override + public ListenableFuture> cleanInstructions(final CleanInstructionsInput input) { + return this.executor.submit(() -> realCleanInstructions(input)); + } + + private synchronized RpcResult realCancelInstruction(final CancelInstructionInput input) { + final InstructionImpl instruction = this.insns.get(input.getId()); + if (instruction == null) { + LOG.debug("Instruction {} not present in the graph", input.getId()); + + final CancelInstructionOutput out = new CancelInstructionOutputBuilder() + .setFailure(UnknownInstruction.class).build(); + return SuccessfulRpcResult.create(out); + } + + return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder() + .setFailure(instruction.tryCancel(null)).build()); + } + + private synchronized RpcResult realCleanInstructions(final CleanInstructionsInput input) { + final List failed = new ArrayList<>(); + + for (final InstructionId id : input.getId()) { + // Find the instruction + final InstructionImpl instruction = this.insns.get(id); + if (instruction == null) { + LOG.debug("Instruction {} not present in the graph", input.getId()); + failed.add(id); + continue; + } + + // Check its status + switch (instruction.getStatus()) { + case Cancelled: + case Failed: + case Successful: + break; + case Executing: + case Queued: + case Scheduled: + case Unknown: + LOG.debug("Instruction {} cannot be cleaned because of it's in state {}", + id, instruction.getStatus()); + failed.add(id); + continue; + default: + break; + } + + // The instruction is in a terminal state, we need to just unlink + // it from its dependencies and dependents + instruction.clean(); + + this.insns.remove(id); + LOG.debug("Instruction {} cleaned successfully", id); + } + + final CleanInstructionsOutputBuilder ob = new CleanInstructionsOutputBuilder(); + ob.setUnflushed(failed); + + return SuccessfulRpcResult.create(ob.build()); + } + + private List checkDependencies(final SubmitInstructionInput input) throws SchedulerException { + final List dependencies = collectDependencies(input); + // Check if all dependencies are non-failed + final List unmet = checkIfUnfailed(dependencies); + /* + * Some dependencies have failed, declare the request dead-on-arrival + * and fail the operation. + */ + if (!unmet.isEmpty()) { + throw new SchedulerException("Instruction's dependencies are already unsuccessful", new FailureBuilder() + .setType(DeadOnArrival.class).setFailedPreconditions(unmet).build()); + } + return dependencies; + } + + private List collectDependencies(final SubmitInstructionInput input) throws SchedulerException { + final List dependencies = new ArrayList<>(); + for (final InstructionId pid : input.getPreconditions()) { + final InstructionImpl instruction = this.insns.get(pid); + if (instruction == null) { + LOG.info("Instruction {} depends on {}, which is not a known instruction", input.getId(), pid); + throw new SchedulerException("Unknown dependency ID specified", + new FailureBuilder().setType(UnknownPreconditionId.class).build()); + } + dependencies.add(instruction); + } + return dependencies; + } + + private static List checkIfUnfailed(final List dependencies) { + final List unmet = new ArrayList<>(); + for (final InstructionImpl d : dependencies) { + switch (d.getStatus()) { + case Cancelled: + case Failed: + case Unknown: + unmet.add(d.getId()); + break; + case Executing: + case Queued: + case Scheduled: + case Successful: + break; + default: + break; + } + } + return unmet; + } + + @Override + public synchronized ListenableFuture scheduleInstruction(final SubmitInstructionInput input) throws + SchedulerException { + final InstructionId id = input.getId(); + if (this.insns.get(id) != null) { + LOG.info("Instruction ID {} already present", id); + throw new SchedulerException("Instruction ID currently in use", + new FailureBuilder().setType(DuplicateInstructionId.class).build()); + } + + // First things first: check the deadline + final Nanotime now = NanotimeUtil.currentTime(); + final BigInteger left = input.getDeadline().getValue().toJava().subtract(now.getValue().toJava()); + + if (left.compareTo(BigInteger.ZERO) <= 0) { + LOG.debug("Instruction {} deadline has already passed by {}ns", id, left); + throw new SchedulerException("Instruction arrived after specified deadline", + new FailureBuilder().setType(DeadOnArrival.class).build()); + } + + // Resolve dependencies + final List dependencies = checkDependencies(input); + + /* + * All pre-flight checks done are at this point, the following + * steps can only fail in catastrophic scenarios (OOM and the + * like). + */ + + // Schedule a timeout for the instruction + final Timeout t = this.timer.newTimeout(timeout -> timeoutInstruction(input.getId()), left.longValue(), + TimeUnit.NANOSECONDS); + + // Put it into the instruction list + final SettableFuture ret = SettableFuture.create(); + final InstructionImpl instruction = new InstructionImpl(new InstructionPusher(id, input.getDeadline()), ret, id, + dependencies, t); + this.insns.put(id, instruction); + + // Attach it into its dependencies + for (final InstructionImpl d : dependencies) { + d.addDependant(instruction); + } + + /* + * All done. The next part is checking whether the instruction can + * run, which we can figure out after sending out the acknowledgement. + * This task should be ingress-weighed, so we reinsert it into the + * same execution service. + */ + this.executor.submit(() -> tryScheduleInstruction(instruction)); + + return ret; + } + + @Override + public String getInstructionID() { + return this.instructionId; + } + + private synchronized void timeoutInstruction(final InstructionId id) { + final InstructionImpl instruction = this.insns.get(id); + if (instruction == null) { + LOG.warn("Instruction {} timed out, but not found in the queue", id); + return; + } + + instruction.timeout(); + } + + @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", + justification = "https://github.com/spotbugs/spotbugs/issues/811") + private synchronized void tryScheduleDependants(final InstructionImpl instruction) { + // Walk all dependants and try to schedule them + final Iterator it = instruction.getDependants(); + while (it.hasNext()) { + tryScheduleInstruction(it.next()); + } + } + + private synchronized void tryScheduleInstruction(final InstructionImpl instruction) { + final ListenableFuture> f = instruction.ready(); + if (f != null) { + Futures.addCallback(f, new FutureCallback>() { + @Override + public void onSuccess(final ExecutionResult
result) { + tryScheduleDependants(instruction); + } + + @Override + public void onFailure(final Throwable trw) { + LOG.error("Instruction {} failed to execute", instruction.getId(), trw); + } + }, MoreExecutors.directExecutor()); + } + + } + + @Override + public synchronized FluentFuture closeServiceInstance() { + LOG.info("Closing Instruction Queue service {}", this.sgi.getName()); + + if (this.reg != null) { + this.reg.close(); + this.reg = null; + } + for (final InstructionImpl instruction : this.insns.values()) { + instruction.tryCancel(null); + } + // Workaround for BUG-2283 + final WriteTransaction wt = this.dataProvider.newWriteOnlyTransaction(); + wt.delete(LogicalDatastoreType.OPERATIONAL, this.qid); + + final FluentFuture future = wt.commit(); + future.addCallback(new FutureCallback() { + @Override + public void onSuccess(final CommitInfo result) { + LOG.debug("Instruction Queue {} removed", ProgrammingServiceImpl.this.qid); + } + + @Override + public void onFailure(final Throwable trw) { + LOG.error("Failed to shutdown Instruction Queue {}", ProgrammingServiceImpl.this.qid, trw); + } + }, MoreExecutors.directExecutor()); + + return future; + } + + @Override + @SuppressWarnings("checkstyle:IllegalCatch") + public synchronized void close() { + if (this.csspReg != null) { + try { + this.csspReg.close(); + } catch (final Exception e) { + LOG.error("Failed to close Instruction Scheduler service", e); + } + } + if (this.serviceRegistration != null) { + this.serviceRegistration.unregister(); + this.serviceRegistration = null; + } + } + + void setServiceRegistration(final ServiceRegistration serviceRegistration) { + this.serviceRegistration = serviceRegistration; + } }