import org.opendaylight.bgpcep.programming.spi.ExecutionResult;
import org.opendaylight.bgpcep.programming.spi.Instruction;
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelFailure;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatus;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatusChangedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UncancellableInstruction;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.Details;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.DetailsBuilder;
final class InstructionImpl implements Instruction {
private static final Logger LOG = LoggerFactory.getLogger(InstructionImpl.class);
private final List<InstructionImpl> dependants = new ArrayList<>();
- private SettableFuture<ExecutionResult<Details>> executionFuture;
private final SettableFuture<Instruction> schedulingFuture;
private final List<InstructionImpl> dependencies;
- private final NotificationProviderService notifs;
+ private final QueueInstruction queue;
private final InstructionId id;
- private volatile InstructionStatus status = InstructionStatus.Queued;
+ private SettableFuture<ExecutionResult<Details>> executionFuture;
+ private InstructionStatus status = InstructionStatus.Queued;
private Details heldUpDetails;
private Timeout timeout;
- InstructionImpl(final SettableFuture<Instruction> future, final InstructionId id, final List<InstructionImpl> dependencies, final Timeout timeout, final NotificationProviderService notifs) {
+ InstructionImpl(final QueueInstruction queue, final SettableFuture<Instruction> future, final InstructionId id, final List<InstructionImpl> dependencies, final Timeout timeout) {
this.schedulingFuture = Preconditions.checkNotNull(future);
- this.id = Preconditions.checkNotNull(id);
this.dependencies = Preconditions.checkNotNull(dependencies);
this.timeout = Preconditions.checkNotNull(timeout);
- this.notifs = Preconditions.checkNotNull(notifs);
+ this.queue = Preconditions.checkNotNull(queue);
+ this.id = Preconditions.checkNotNull(id);
}
InstructionId getId() {
LOG.debug("Instruction {} transitioned to status {}", id, status);
// Send out a notification
- this.notifs.publish(new InstructionStatusChangedBuilder().setId(id).setStatus(status).setDetails(details).build());
+ this.queue.instructionUpdated(status, details);
switch (status) {
case Cancelled:
import io.netty.util.TimerTask;
import java.math.BigInteger;
-import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
-import javax.annotation.concurrent.GuardedBy;
-
import org.opendaylight.bgpcep.programming.NanotimeUtil;
import org.opendaylight.bgpcep.programming.spi.ExecutionResult;
import org.opendaylight.bgpcep.programming.spi.Instruction;
import org.opendaylight.bgpcep.programming.spi.SchedulerException;
import org.opendaylight.bgpcep.programming.spi.SuccessfulRpcResult;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DeadOnArrival;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DuplicateInstructionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionQueue;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionQueueBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatusChangedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.Nanotime;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.ProgrammingService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownInstruction;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownPreconditionId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.queue.InstructionKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.queue.InstructionsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.queue.InstructionsKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.Details;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure._case.FailureBuilder;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.SettableFuture;
-public final class ProgrammingServiceImpl implements InstructionScheduler, ProgrammingService, AutoCloseable {
+public final class ProgrammingServiceImpl implements AutoCloseable, InstructionScheduler, ProgrammingService {
private static final Logger LOG = LoggerFactory.getLogger(ProgrammingServiceImpl.class);
private final Map<InstructionId, InstructionImpl> insns = new HashMap<>();
-
- @GuardedBy("this")
- private final Deque<InstructionImpl> readyQueue = new ArrayDeque<>();
-
+ private final InstanceIdentifier<InstructionQueue> qid;
private final NotificationProviderService notifs;
private final ListeningExecutorService executor;
+ private final DataProviderService dataProvider;
private final Timer timer;
- public ProgrammingServiceImpl(final NotificationProviderService notifs, final ListeningExecutorService executor, final Timer timer) {
+ private final class InstructionPusher implements QueueInstruction {
+ private final InstructionsBuilder builder = new InstructionsBuilder();
+
+ InstructionPusher(final InstructionId id, final Nanotime deadline) {
+ builder.setDeadline(deadline);
+ builder.setId(id);
+ builder.setKey(new InstructionsKey(id));
+ builder.setStatus(InstructionStatus.Queued);
+ }
+
+ @Override
+ public void instructionUpdated(final InstructionStatus status, final Details details) {
+ if (!status.equals(builder.getStatus())) {
+ builder.setStatus(status);
+
+ final DataModificationTransaction t = dataProvider.beginTransaction();
+ t.putOperationalData(InstanceIdentifier.builder(qid).child(
+ org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.queue.Instruction.class,
+ new InstructionKey(builder.getId())).build(), builder.build());
+ t.commit();
+ }
+
+ notifs.publish(new InstructionStatusChangedBuilder().setId(builder.getId()).setStatus(status).setDetails(details).build());
+ }
+
+ @Override
+ public void instructionRemoved() {
+ final DataModificationTransaction t = dataProvider.beginTransaction();
+ t.removeOperationalData(InstanceIdentifier.builder(qid).child(
+ org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.queue.Instruction.class,
+ new InstructionKey(builder.getId())).build());
+ t.commit();
+ }
+ }
+
+ public ProgrammingServiceImpl(final DataProviderService dataProvider, final NotificationProviderService notifs, final ListeningExecutorService executor, final Timer timer) {
+ this.dataProvider = Preconditions.checkNotNull(dataProvider);
this.notifs = Preconditions.checkNotNull(notifs);
this.executor = Preconditions.checkNotNull(executor);
this.timer = Preconditions.checkNotNull(timer);
+ qid = InstanceIdentifier.builder(InstructionQueue.class).toInstance();
+
+ final DataModificationTransaction t = dataProvider.beginTransaction();
+ Preconditions.checkState(t.readOperationalData(qid) == null, "Conflicting instruction queue found");
+
+ t.putOperationalData(qid, new InstructionQueueBuilder().build());
+ t.commit();
}
@Override
// Put it into the instruction list
final SettableFuture<Instruction> ret = SettableFuture.create();
- final InstructionImpl i = new InstructionImpl(ret, input.getId(), dependencies, t, notifs);
+ final InstructionImpl i = new InstructionImpl(new InstructionPusher(id, input.getDeadline()), ret, id, dependencies, t);
this.insns.put(id, i);
// Attach it into its dependencies
private synchronized void tryScheduleInstruction(final InstructionImpl i) {
final ListenableFuture<ExecutionResult<Details>> f = i.ready();
if (f != null) {
- this.readyQueue.add(i);
-
Futures.addCallback(f, new FutureCallback<ExecutionResult<Details>>() {
@Override
public void onSuccess(final ExecutionResult<Details> result) {
@Override
public synchronized void close() {
- for (InstructionImpl i : readyQueue) {
- i.tryCancel(null);
+ try {
+ for (InstructionImpl i : insns.values()) {
+ i.tryCancel(null);
+ }
+ } finally {
+ final DataModificationTransaction t = dataProvider.beginTransaction();
+ t.removeOperationalData(qid);
+ t.commit();
}
}
}