BUG-191: provide a way for executors to provide scheduling feedback 12/3712/1
authorRobert Varga <rovarga@cisco.com>
Sat, 9 Nov 2013 08:08:45 +0000 (09:08 +0100)
committerRobert Varga <rovarga@cisco.com>
Fri, 13 Dec 2013 16:44:19 +0000 (17:44 +0100)
The idea is that the executor is hooked onto the ListenableFuture,
kicking in if it succeeds. Should the insn be cancelled, the future
will be, too.

The notification part is handled internally until the Future succeeds --
once it does, the executor can either start immediately executing, after
checking with the Instruction, or inform it why it delayed execution.

Once execution completes, the executor pushes the appropriate details
into the instruction.

Change-Id: I9dd9c29c876b890cadf842941c1e59e0f394cf84
Signed-off-by: Robert Varga <rovarga@cisco.com>
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/TopologyProgramming.java
pcep/topology-spi/src/main/java/org/opendaylight/bgpcep/pcep/topology/spi/AbstractInstructionExecutor.java [new file with mode: 0644]
pcep/topology-spi/src/main/java/org/opendaylight/bgpcep/pcep/topology/spi/AbstractTopologyProgrammingExecutor.java [deleted file]
pcep/tunnel-provider/src/main/java/org/opendaylight/bgpcep/pcep/tunnel/provider/TunnelProgramming.java
programming/impl-config/src/main/java/org/opendaylight/controller/config/yang/programming/impl/InstructionSchedulerImplModule.java
programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/Instruction.java [deleted file]
programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/InstructionImpl.java [new file with mode: 0644]
programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/ProgrammingServiceImpl.java
programming/spi/src/main/java/org/opendaylight/bgpcep/programming/spi/Instruction.java [new file with mode: 0644]
programming/spi/src/main/java/org/opendaylight/bgpcep/programming/spi/InstructionScheduler.java
programming/spi/src/main/java/org/opendaylight/bgpcep/programming/spi/SchedulerException.java [moved from programming/spi/src/main/java/org/opendaylight/bgpcep/programming/spi/InstructionExecutor.java with 50% similarity]

index 57c57711bb410c656d49f0f2f058ceb11d663550..9bfa64ee097f57be09e4c961711e93265fc2ff76 100644 (file)
@@ -7,12 +7,9 @@
  */
 package org.opendaylight.bgpcep.pcep.topology.provider;
 
-import org.opendaylight.bgpcep.pcep.topology.spi.AbstractTopologyProgrammingExecutor;
-import org.opendaylight.bgpcep.programming.spi.InstructionExecutor;
+import org.opendaylight.bgpcep.pcep.topology.spi.AbstractInstructionExecutor;
 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
 import org.opendaylight.bgpcep.programming.spi.SuccessfulRpcResult;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.FailureCaseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure._case.Failure;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.programming.rev131106.NetworkTopologyPcepProgrammingService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.programming.rev131106.SubmitAddLspInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.programming.rev131106.SubmitAddLspOutput;
@@ -51,18 +48,13 @@ final class TopologyProgramming implements NetworkTopologyPcepProgrammingService
                Preconditions.checkArgument(input.getNode() != null);
                Preconditions.checkArgument(input.getName() != null);
 
-               final InstructionExecutor e = new AbstractTopologyProgrammingExecutor() {
+               final SubmitAddLspOutputBuilder b = new SubmitAddLspOutputBuilder();
+               b.setResult(AbstractInstructionExecutor.schedule(scheduler, new AbstractInstructionExecutor(input) {
                        @Override
-                       public ListenableFuture<OperationResult> executeImpl() {
+                       protected ListenableFuture<OperationResult> invokeOperation() {
                                return TopologyProgramming.this.manager.realAddLsp(input);
                        }
-               };
-
-               final Failure f = this.scheduler.submitInstruction(input, e);
-               final SubmitAddLspOutputBuilder b = new SubmitAddLspOutputBuilder();
-               if (f != null) {
-                       b.setResult(new FailureCaseBuilder().setFailure(f).build());
-               }
+               }));
 
                final RpcResult<SubmitAddLspOutput> res = SuccessfulRpcResult.create(b.build());
                return Futures.immediateFuture(res);
@@ -73,18 +65,13 @@ final class TopologyProgramming implements NetworkTopologyPcepProgrammingService
                Preconditions.checkArgument(input.getNode() != null);
                Preconditions.checkArgument(input.getName() != null);
 
-               final InstructionExecutor e = new AbstractTopologyProgrammingExecutor() {
+               final SubmitRemoveLspOutputBuilder b = new SubmitRemoveLspOutputBuilder();
+               b.setResult(AbstractInstructionExecutor.schedule(scheduler, new AbstractInstructionExecutor(input) {
                        @Override
-                       protected ListenableFuture<OperationResult> executeImpl() {
+                       protected ListenableFuture<OperationResult> invokeOperation() {
                                return TopologyProgramming.this.manager.realRemoveLsp(input);
                        }
-               };
-
-               final Failure f = this.scheduler.submitInstruction(input, e);
-               final SubmitRemoveLspOutputBuilder b = new SubmitRemoveLspOutputBuilder();
-               if (f != null) {
-                       b.setResult(new FailureCaseBuilder().setFailure(f).build());
-               }
+               }));
 
                final RpcResult<SubmitRemoveLspOutput> res = SuccessfulRpcResult.create(b.build());
                return Futures.immediateFuture(res);
@@ -95,18 +82,13 @@ final class TopologyProgramming implements NetworkTopologyPcepProgrammingService
                Preconditions.checkArgument(input.getNode() != null);
                Preconditions.checkArgument(input.getName() != null);
 
-               final InstructionExecutor e = new AbstractTopologyProgrammingExecutor() {
+               final SubmitUpdateLspOutputBuilder b = new SubmitUpdateLspOutputBuilder();
+               b.setResult(AbstractInstructionExecutor.schedule(scheduler, new AbstractInstructionExecutor(input) {
                        @Override
-                       protected ListenableFuture<OperationResult> executeImpl() {
+                       protected ListenableFuture<OperationResult> invokeOperation() {
                                return TopologyProgramming.this.manager.realUpdateLsp(input);
                        }
-               };
-
-               final Failure f = this.scheduler.submitInstruction(input, e);
-               final SubmitUpdateLspOutputBuilder b = new SubmitUpdateLspOutputBuilder();
-               if (f != null) {
-                       b.setResult(new FailureCaseBuilder().setFailure(f).build());
-               }
+               }));
 
                final RpcResult<SubmitUpdateLspOutput> res = SuccessfulRpcResult.create(b.build());
                return Futures.immediateFuture(res);
@@ -120,18 +102,13 @@ final class TopologyProgramming implements NetworkTopologyPcepProgrammingService
                Preconditions.checkArgument(input.getArguments() != null);
                Preconditions.checkArgument(input.getArguments().getOperational() != null);
 
-               final InstructionExecutor e = new AbstractTopologyProgrammingExecutor() {
+               final SubmitEnsureLspOperationalOutputBuilder b = new SubmitEnsureLspOperationalOutputBuilder();
+               b.setResult(AbstractInstructionExecutor.schedule(scheduler, new AbstractInstructionExecutor(input) {
                        @Override
-                       protected ListenableFuture<OperationResult> executeImpl() {
+                       protected ListenableFuture<OperationResult> invokeOperation() {
                                return TopologyProgramming.this.manager.realEnsureLspOperational(new EnsureLspOperationalInputBuilder(input).build());
                        }
-               };
-
-               final Failure f = this.scheduler.submitInstruction(input, e);
-               final SubmitEnsureLspOperationalOutputBuilder b = new SubmitEnsureLspOperationalOutputBuilder();
-               if (f != null) {
-                       b.setResult(new FailureCaseBuilder().setFailure(f).build());
-               }
+               }));
 
                final RpcResult<SubmitEnsureLspOperationalOutput> res = SuccessfulRpcResult.create(b.build());
                return Futures.immediateFuture(res);
diff --git a/pcep/topology-spi/src/main/java/org/opendaylight/bgpcep/pcep/topology/spi/AbstractInstructionExecutor.java b/pcep/topology-spi/src/main/java/org/opendaylight/bgpcep/pcep/topology/spi/AbstractInstructionExecutor.java
new file mode 100644 (file)
index 0000000..b9aabbc
--- /dev/null
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.bgpcep.pcep.topology.spi;
+
+import org.opendaylight.bgpcep.programming.spi.Instruction;
+import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
+import org.opendaylight.bgpcep.programming.spi.SchedulerException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.FailureCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.FailureCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.OperationResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ *
+ */
+public abstract class AbstractInstructionExecutor implements FutureCallback<Instruction> {
+       private static final Logger LOG = LoggerFactory.getLogger(AbstractInstructionExecutor.class);
+       private final SubmitInstructionInput input;
+
+       protected AbstractInstructionExecutor(final SubmitInstructionInput input) {
+               this.input = Preconditions.checkNotNull(input);
+       }
+
+       public final SubmitInstructionInput getInput() {
+               return input;
+       }
+
+       public static FailureCase schedule(final InstructionScheduler scheduler, final AbstractInstructionExecutor fwd) {
+               final ListenableFuture<Instruction> s;
+
+               try {
+                       s = scheduler.scheduleInstruction(fwd.getInput());
+               } catch (SchedulerException e) {
+                       LOG.info("Instuction {} failed to schedule", e);
+                       return new FailureCaseBuilder().setFailure(e.getFailure()).build();
+               }
+
+               Futures.addCallback(s, fwd);
+               return null;
+       }
+
+       protected abstract ListenableFuture<OperationResult> invokeOperation();
+
+       @Override
+       public void onSuccess(final Instruction insn) {
+               if (insn.checkedExecutionStart()) {
+                       final ListenableFuture<OperationResult> s = invokeOperation();
+                       Futures.addCallback(s, new FutureCallback<OperationResult>() {
+                               @Override
+                               public void onSuccess(final OperationResult result) {
+                                       if (result.getFailure() != null) {
+                                               switch (result.getFailure()) {
+                                               case Failed:
+                                               case NoAck:
+                                                       insn.executionCompleted(InstructionStatus.Failed, null);
+                                                       break;
+                                               case Unsent:
+                                                       insn.executionCompleted(InstructionStatus.Cancelled, null);
+                                                       break;
+                                               }
+                                       } else {
+                                               insn.executionCompleted(InstructionStatus.Successful, null);
+                                       }
+                               }
+
+                               @Override
+                               public void onFailure(final Throwable t) {
+                                       insn.executionCompleted(InstructionStatus.Failed, null);
+                               }
+                       });
+               }
+       }
+
+       @Override
+       public void onFailure(final Throwable t) {
+               LOG.debug("Instruction {} cancelled", input, t);
+       }
+}
diff --git a/pcep/topology-spi/src/main/java/org/opendaylight/bgpcep/pcep/topology/spi/AbstractTopologyProgrammingExecutor.java b/pcep/topology-spi/src/main/java/org/opendaylight/bgpcep/pcep/topology/spi/AbstractTopologyProgrammingExecutor.java
deleted file mode 100644 (file)
index 81418fb..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.bgpcep.pcep.topology.spi;
-
-import org.opendaylight.bgpcep.programming.spi.ExecutionResult;
-import org.opendaylight.bgpcep.programming.spi.InstructionExecutor;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatus;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.Details;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.FailureType;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.OperationResult;
-
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
-public abstract class AbstractTopologyProgrammingExecutor implements InstructionExecutor {
-
-       protected abstract ListenableFuture<OperationResult> executeImpl();
-
-       @Override
-       public final ListenableFuture<ExecutionResult<Details>> execute() {
-               return Futures.transform(executeImpl(), new Function<OperationResult, ExecutionResult<Details>>() {
-
-                       @Override
-                       public ExecutionResult<Details> apply(final OperationResult input) {
-                               final FailureType fail = input.getFailure();
-                               if (fail == null) {
-                                       return new ExecutionResult<Details>(InstructionStatus.Successful, null);
-                               }
-
-                               switch (fail) {
-                               case Failed:
-                               case NoAck:
-                                       return new ExecutionResult<Details>(InstructionStatus.Failed, null);
-                               case Unsent:
-                                       return new ExecutionResult<Details>(InstructionStatus.Cancelled, null);
-                               }
-
-                               throw new IllegalStateException("Unhandled operation state " + fail);
-                       }
-               });
-       }
-}
index 58daf59775e313137e47086a38ef76f5b4f7b83d..c73990b29314c1ffd9b7dca3b687cb91d7887202 100644 (file)
@@ -10,8 +10,7 @@ package org.opendaylight.bgpcep.pcep.tunnel.provider;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.opendaylight.bgpcep.pcep.topology.spi.AbstractTopologyProgrammingExecutor;
-import org.opendaylight.bgpcep.programming.spi.InstructionExecutor;
+import org.opendaylight.bgpcep.pcep.topology.spi.AbstractInstructionExecutor;
 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
 import org.opendaylight.bgpcep.programming.spi.SuccessfulRpcResult;
 import org.opendaylight.bgpcep.programming.topology.TopologyProgrammingUtil;
@@ -32,8 +31,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.typ
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.explicit.route.object.ero.Subobjects;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.explicit.route.object.ero.SubobjectsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.lspa.object.LspaBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.FailureCaseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure._case.Failure;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.AddLspInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.AddLspOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.NetworkTopologyPcepService;
@@ -211,9 +208,10 @@ public final class TunnelProgramming implements TopologyTunnelPcepProgrammingSer
 
        @Override
        public ListenableFuture<RpcResult<PcepCreateP2pTunnelOutput>> pcepCreateP2pTunnel(final PcepCreateP2pTunnelInput input) {
-               final InstructionExecutor e = new AbstractTopologyProgrammingExecutor() {
+               final PcepCreateP2pTunnelOutputBuilder b = new PcepCreateP2pTunnelOutputBuilder();
+               b.setResult(AbstractInstructionExecutor.schedule(scheduler, new AbstractInstructionExecutor(input) {
                        @Override
-                       protected ListenableFuture<OperationResult> executeImpl() {
+                       protected ListenableFuture<OperationResult> invokeOperation() {
                                final InstanceIdentifier<Topology> tii = TopologyProgrammingUtil.topologyForInput(input);
 
                                final DataModificationTransaction t = TunnelProgramming.this.dataProvider.beginTransaction();
@@ -252,13 +250,7 @@ public final class TunnelProgramming implements TopologyTunnelPcepProgrammingSer
                                                        }
                                                });
                        }
-               };
-
-               final Failure f = this.scheduler.submitInstruction(input, e);
-               final PcepCreateP2pTunnelOutputBuilder b = new PcepCreateP2pTunnelOutputBuilder();
-               if (f != null) {
-                       b.setResult(new FailureCaseBuilder().setFailure(f).build());
-               }
+               }));
 
                final RpcResult<PcepCreateP2pTunnelOutput> res = SuccessfulRpcResult.create(b.build());
                return Futures.immediateFuture(res);
@@ -272,9 +264,10 @@ public final class TunnelProgramming implements TopologyTunnelPcepProgrammingSer
 
        @Override
        public ListenableFuture<RpcResult<PcepDestroyTunnelOutput>> pcepDestroyTunnel(final PcepDestroyTunnelInput input) {
-               final InstructionExecutor e = new AbstractTopologyProgrammingExecutor() {
+               final PcepDestroyTunnelOutputBuilder b = new PcepDestroyTunnelOutputBuilder();
+               b.setResult(AbstractInstructionExecutor.schedule(scheduler, new AbstractInstructionExecutor(input) {
                        @Override
-                       protected ListenableFuture<OperationResult> executeImpl() {
+                       protected ListenableFuture<OperationResult> invokeOperation() {
                                final InstanceIdentifier<Topology> tii = TopologyProgrammingUtil.topologyForInput(input);
                                final InstanceIdentifier<Link> lii = TunnelProgrammingUtil.linkIdentifier(tii, input);
 
@@ -301,13 +294,7 @@ public final class TunnelProgramming implements TopologyTunnelPcepProgrammingSer
                                                        }
                                                });
                        }
-               };
-
-               final Failure f = this.scheduler.submitInstruction(input, e);
-               final PcepDestroyTunnelOutputBuilder b = new PcepDestroyTunnelOutputBuilder();
-               if (f != null) {
-                       b.setResult(new FailureCaseBuilder().setFailure(f).build());
-               }
+               }));
 
                final RpcResult<PcepDestroyTunnelOutput> res = SuccessfulRpcResult.create(b.build());
                return Futures.immediateFuture(res);
@@ -315,9 +302,11 @@ public final class TunnelProgramming implements TopologyTunnelPcepProgrammingSer
 
        @Override
        public ListenableFuture<RpcResult<PcepUpdateTunnelOutput>> pcepUpdateTunnel(final PcepUpdateTunnelInput input) {
-               final InstructionExecutor e = new AbstractTopologyProgrammingExecutor() {
+               final PcepUpdateTunnelOutputBuilder b = new PcepUpdateTunnelOutputBuilder();
+               b.setResult(AbstractInstructionExecutor.schedule(scheduler, new AbstractInstructionExecutor(input) {
                        @Override
-                       protected ListenableFuture<OperationResult> executeImpl() {
+                       protected ListenableFuture<OperationResult> invokeOperation() {
+
                                final InstanceIdentifier<Topology> tii = TopologyProgrammingUtil.topologyForInput(input);
                                final InstanceIdentifier<Link> lii = TunnelProgrammingUtil.linkIdentifier(tii, input);
 
@@ -354,13 +343,7 @@ public final class TunnelProgramming implements TopologyTunnelPcepProgrammingSer
                                                        }
                                                });
                        }
-               };
-
-               final Failure f = this.scheduler.submitInstruction(input, e);
-               final PcepUpdateTunnelOutputBuilder b = new PcepUpdateTunnelOutputBuilder();
-               if (f != null) {
-                       b.setResult(new FailureCaseBuilder().setFailure(f).build());
-               }
+               }));
 
                final RpcResult<PcepUpdateTunnelOutput> res = SuccessfulRpcResult.create(b.build());
                return Futures.immediateFuture(res);
index d47d1a840d2fac945511f17692ccea792578e4ea..1138f39e0f3aad9352434057c573349540c6b877 100644 (file)
@@ -12,13 +12,14 @@ package org.opendaylight.controller.config.yang.programming.impl;
 import java.util.concurrent.Executors;
 
 import org.opendaylight.bgpcep.programming.impl.ProgrammingServiceImpl;
-import org.opendaylight.bgpcep.programming.spi.InstructionExecutor;
+import org.opendaylight.bgpcep.programming.spi.Instruction;
 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
+import org.opendaylight.bgpcep.programming.spi.SchedulerException;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.ProgrammingService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure._case.Failure;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 
@@ -68,8 +69,8 @@ org.opendaylight.controller.config.yang.programming.impl.AbstractInstructionSche
                        }
 
                        @Override
-                       public Failure submitInstruction(final SubmitInstructionInput input, final InstructionExecutor executor) {
-                               return inst.submitInstruction(input, executor);
+                       public ListenableFuture<Instruction> scheduleInstruction(final SubmitInstructionInput input) throws SchedulerException {
+                               return inst.scheduleInstruction(input);
                        }
                }
 
diff --git a/programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/Instruction.java b/programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/Instruction.java
deleted file mode 100644 (file)
index 07b61ae..0000000
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.bgpcep.programming.impl;
-
-import io.netty.util.Timeout;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.opendaylight.bgpcep.programming.spi.ExecutionResult;
-import org.opendaylight.bgpcep.programming.spi.InstructionExecutor;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatus;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.Details;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListenableFuture;
-
-final class Instruction {
-       private final List<Instruction> dependants = new ArrayList<>();
-       private final InstructionExecutor executor;
-       private final List<Instruction> dependencies;
-       private final InstructionId id;
-       private volatile InstructionStatus status = InstructionStatus.Queued;
-       private Timeout timeout;
-
-       Instruction(final InstructionId id, final InstructionExecutor executor, final List<Instruction> dependencies, final Timeout timeout) {
-               this.id = Preconditions.checkNotNull(id);
-               this.executor = Preconditions.checkNotNull(executor);
-               this.dependencies = Preconditions.checkNotNull(dependencies);
-               this.timeout = Preconditions.checkNotNull(timeout);
-       }
-
-       InstructionId getId() {
-               return id;
-       }
-
-       InstructionStatus getStatus() {
-               return status;
-       }
-
-       ListenableFuture<ExecutionResult<Details>> execute() {
-               return executor.execute();
-       }
-
-       void setStatus(final InstructionStatus status) {
-               this.status = status;
-       }
-
-       synchronized void cancel() {
-               if (timeout != null) {
-                       timeout.cancel();
-                       timeout = null;
-               }
-       }
-
-       synchronized void completed() {
-               timeout = null;
-       }
-
-       synchronized void addDependant(final Instruction d) {
-               dependants.add(d);
-       }
-
-       List<Instruction> getDependencies() {
-               return dependencies;
-       }
-
-       List<Instruction> getDependants() {
-               return dependants;
-       }
-
-       synchronized void clean() {
-               for (final Iterator<Instruction> it = dependencies.iterator(); it.hasNext(); ) {
-                       final Instruction o = it.next();
-                       synchronized (o) {
-                               o.getDependants().remove(this);
-                       }
-               }
-               dependencies.clear();
-
-               for (final Iterator<Instruction> it = dependants.iterator(); it.hasNext(); ) {
-                       final Instruction o = it.next();
-
-                       synchronized (o) {
-                               o.getDependencies().remove(this);
-                       }
-               }
-               dependants.clear();
-       }
-}
\ No newline at end of file
diff --git a/programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/InstructionImpl.java b/programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/InstructionImpl.java
new file mode 100644 (file)
index 0000000..7623f36
--- /dev/null
@@ -0,0 +1,273 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.bgpcep.programming.impl;
+
+import io.netty.util.Timeout;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import org.opendaylight.bgpcep.programming.spi.ExecutionResult;
+import org.opendaylight.bgpcep.programming.spi.Instruction;
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelFailure;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatusChangedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UncancellableInstruction;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.Details;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.DetailsBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+final class InstructionImpl implements Instruction {
+       private static final Logger LOG = LoggerFactory.getLogger(InstructionImpl.class);
+       private final List<InstructionImpl> dependants = new ArrayList<>();
+       private SettableFuture<ExecutionResult<Details>> executionFuture;
+       private final SettableFuture<Instruction> schedulingFuture;
+       private final List<InstructionImpl> dependencies;
+       private final NotificationProviderService notifs;
+       private final InstructionId id;
+       private volatile InstructionStatus status = InstructionStatus.Queued;
+       private Details heldUpDetails;
+       private Timeout timeout;
+
+       InstructionImpl(final SettableFuture<Instruction> future, final InstructionId id, final List<InstructionImpl> dependencies, final Timeout timeout, final NotificationProviderService notifs) {
+               this.schedulingFuture = Preconditions.checkNotNull(future);
+               this.id = Preconditions.checkNotNull(id);
+               this.dependencies = Preconditions.checkNotNull(dependencies);
+               this.timeout = Preconditions.checkNotNull(timeout);
+               this.notifs = Preconditions.checkNotNull(notifs);
+       }
+
+       InstructionId getId() {
+               return id;
+       }
+
+       InstructionStatus getStatus() {
+               return status;
+       }
+
+       synchronized void setStatus(final InstructionStatus status, final Details details) {
+               // Set the status
+               this.status = status;
+               LOG.debug("Instruction {} transitioned to status {}", id, status);
+
+               // Send out a notification
+               this.notifs.publish(new InstructionStatusChangedBuilder().setId(id).setStatus(status).setDetails(details).build());
+
+               switch (status) {
+               case Cancelled:
+               case Failed:
+               case Unknown:
+                       cancelDependants();
+                       break;
+               case Executing:
+               case Queued:
+               case Scheduled:
+               case Successful:
+                       break;
+               }
+       }
+
+       @GuardedBy("this")
+       private void cancelTimeout() {
+               if (timeout != null) {
+                       timeout.cancel();
+                       timeout = null;
+               }
+       }
+
+       public synchronized void timeout() {
+               if (timeout != null) {
+                       timeout = null;
+
+                       switch (status) {
+                       case Cancelled:
+                       case Failed:
+                       case Successful:
+                               LOG.debug("Instruction {} has status {}, timeout is a no-op", id, status);
+                               break;
+                       case Unknown:
+                               LOG.warn("Instruction {} has status {} before timeout completed", id, status);
+                               break;
+                       case Executing:
+                               LOG.info("Instruction {} timed out while executing, transitioning into Unknown", id);
+                               setStatus(InstructionStatus.Unknown, null);
+                               cancelDependants();
+                               break;
+                       case Queued:
+                               LOG.debug("Instruction {} timed out while Queued, cancelling it", id);
+
+                               final List<InstructionId> ids = new ArrayList<>();
+                               for (final InstructionImpl d : dependencies) {
+                                       if (d.getStatus() != InstructionStatus.Successful) {
+                                               ids.add(d.getId());
+                                       }
+                               }
+
+                               cancel(new DetailsBuilder().setUnmetDependencies(ids).build());
+                               break;
+                       case Scheduled:
+                               LOG.debug("Instruction {} timed out while Scheduled, cancelling it", id);
+                               cancel(heldUpDetails);
+                               break;
+                       }
+               }
+       }
+
+       @GuardedBy("this")
+       private void cancelDependants() {
+               final Details details = new DetailsBuilder().setUnmetDependencies(ImmutableList.of(id)).build();
+               for (final InstructionImpl d : dependants) {
+                       d.tryCancel(details);
+               }
+       }
+
+       @GuardedBy("this")
+       private void cancel(final Details details) {
+               cancelTimeout();
+               schedulingFuture.cancel(false);
+               setStatus(InstructionStatus.Cancelled, details);
+       }
+
+       synchronized Class<? extends CancelFailure> tryCancel(final Details details) {
+               switch (status) {
+               case Cancelled:
+               case Executing:
+               case Failed:
+               case Successful:
+               case Unknown:
+                       LOG.debug("Instruction {} can no longer be cancelled due to status {}", id, status);
+                       return UncancellableInstruction.class;
+               case Queued:
+               case Scheduled:
+                       cancel(details);
+                       return null;
+               }
+
+               throw new IllegalStateException("Unhandled instruction state " + status);
+       }
+
+       @Override
+       public synchronized boolean checkedExecutionStart() {
+               if (status != InstructionStatus.Scheduled) {
+                       return false;
+               }
+
+               setStatus(InstructionStatus.Executing, null);
+               return true;
+       }
+
+       @Override
+       public synchronized boolean executionHeldUp(final Details details) {
+               if (status != InstructionStatus.Scheduled) {
+                       return false;
+               }
+
+               this.heldUpDetails = details;
+               return true;
+       }
+
+       @Override
+       public synchronized void executionCompleted(final InstructionStatus status, final Details details) {
+               Preconditions.checkState(executionFuture != null);
+
+               cancelTimeout();
+
+               // We reuse the preconditions set down in this class
+               final ExecutionResult<Details> result = new ExecutionResult<Details>(status, details);
+               setStatus(status, details);
+               executionFuture.set(result);
+       }
+
+       synchronized void addDependant(final InstructionImpl d) {
+               dependants.add(d);
+       }
+
+       private synchronized void removeDependant(final InstructionImpl d) {
+               dependants.remove(d);
+       }
+
+       private synchronized void removeDependency(final InstructionImpl other) {
+               dependencies.remove(other);
+       }
+
+       synchronized Iterator<InstructionImpl> getDependants() {
+               return dependants.iterator();
+       }
+
+       synchronized void clean() {
+               for (final Iterator<InstructionImpl> it = dependencies.iterator(); it.hasNext(); ) {
+                       it.next().removeDependant(this);
+               }
+               dependencies.clear();
+
+               for (final Iterator<InstructionImpl> it = dependants.iterator(); it.hasNext(); ) {
+                       it.next().removeDependency(this);
+               }
+               dependants.clear();
+       }
+
+       synchronized ListenableFuture<ExecutionResult<Details>> ready() {
+               Preconditions.checkState(status == InstructionStatus.Queued);
+               Preconditions.checkState(executionFuture == null);
+
+               /*
+                * Check all vertices we depend on. We start off as ready for
+                * scheduling. If we encounter a cancelled/failed/unknown
+                * dependency, we cancel this instruction (and cascade). If we
+                * encounter an executing/queued/scheduled dependency, we hold
+                * of scheduling this one.
+                */
+               boolean ready = true;
+
+               final List<InstructionId> unmet = new ArrayList<>();
+               for (final InstructionImpl d : dependencies) {
+                       switch (d.getStatus()) {
+                       case Cancelled:
+                       case Failed:
+                       case Unknown:
+                               unmet.add(d.getId());
+                               break;
+                       case Executing:
+                       case Queued:
+                       case Scheduled:
+                               ready = false;
+                               break;
+                       case Successful:
+                               // No-op
+                               break;
+                       }
+               }
+
+               if (!unmet.isEmpty()) {
+                       LOG.warn("Instruction {} was Queued, while some dependencies were resolved unsuccessfully, cancelling it", id);
+                       cancel(new DetailsBuilder().setUnmetDependencies(unmet).build());
+                       return null;
+               }
+
+               if (!ready) {
+                       return null;
+               }
+
+               LOG.debug("Instruction {} is ready for execution", id);
+               setStatus(InstructionStatus.Scheduled, null);
+               executionFuture = SettableFuture.create();
+               schedulingFuture.set(this);
+               return executionFuture;
+       }
+}
\ No newline at end of file
index daeb3c7b289bd82079ef3256130cbb4e28b1f201..209dcf3127cc4c9796c0670562ea1177c34bba42 100644 (file)
@@ -16,21 +16,19 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import javax.annotation.concurrent.GuardedBy;
 
 import org.opendaylight.bgpcep.programming.NanotimeUtil;
 import org.opendaylight.bgpcep.programming.spi.ExecutionResult;
-import org.opendaylight.bgpcep.programming.spi.InstructionExecutor;
+import org.opendaylight.bgpcep.programming.spi.Instruction;
 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
+import org.opendaylight.bgpcep.programming.spi.SchedulerException;
 import org.opendaylight.bgpcep.programming.spi.SuccessfulRpcResult;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionInput;
@@ -42,45 +40,35 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programm
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DeadOnArrival;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DuplicateInstructionId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatus;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatusChangedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.Nanotime;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.ProgrammingService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UncancellableInstruction;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownInstruction;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownPreconditionId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.Details;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.DetailsBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure._case.Failure;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure._case.FailureBuilder;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.SettableFuture;
 
 public final class ProgrammingServiceImpl implements InstructionScheduler, ProgrammingService, AutoCloseable {
        private static final Logger LOG = LoggerFactory.getLogger(ProgrammingServiceImpl.class);
 
-       // Default stop timeout, in seconds
-       private static final long CLOSE_TIMEOUT = 5;
-
-       private final Map<InstructionId, Instruction> insns = new HashMap<>();
+       private final Map<InstructionId, InstructionImpl> insns = new HashMap<>();
 
        @GuardedBy("this")
-       private final Deque<Instruction> readyQueue = new ArrayDeque<>();
+       private final Deque<InstructionImpl> readyQueue = new ArrayDeque<>();
 
        private final NotificationProviderService notifs;
        private final ListeningExecutorService executor;
        private final Timer timer;
-       private Future<Void> thread;
-       private ExecutorService exec;
 
        public ProgrammingServiceImpl(final NotificationProviderService notifs, final ListeningExecutorService executor, final Timer timer) {
                this.notifs = Preconditions.checkNotNull(notifs);
@@ -109,7 +97,7 @@ public final class ProgrammingServiceImpl implements InstructionScheduler, Progr
        }
 
        private synchronized RpcResult<CancelInstructionOutput> realCancelInstruction(final CancelInstructionInput input) {
-               final Instruction i = this.insns.get(input.getId());
+               final InstructionImpl i = this.insns.get(input.getId());
                if (i == null) {
                        LOG.debug("Instruction {} not present in the graph", input.getId());
 
@@ -117,21 +105,7 @@ public final class ProgrammingServiceImpl implements InstructionScheduler, Progr
                        return SuccessfulRpcResult.create(out);
                }
 
-               switch (i.getStatus()) {
-               case Cancelled:
-               case Executing:
-               case Failed:
-               case Successful:
-               case Unknown:
-                       LOG.debug("Instruction {} can no longer be cancelled due to status {}", input.getId(), i.getStatus());
-                       return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder().setFailure(UncancellableInstruction.class).build());
-               case Queued:
-               case Scheduled:
-                       break;
-               }
-
-               cancelInstruction(i, null);
-               return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder().build());
+               return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder().setFailure(i.tryCancel(null)).build());
        }
 
 
@@ -140,7 +114,7 @@ public final class ProgrammingServiceImpl implements InstructionScheduler, Progr
 
                for (final InstructionId id : input.getId()) {
                        // Find the instruction
-                       final Instruction i = this.insns.get(input.getId());
+                       final InstructionImpl i = this.insns.get(input.getId());
                        if (i == null) {
                                LOG.debug("Instruction {} not present in the graph", input.getId());
                                failed.add(id);
@@ -179,11 +153,11 @@ public final class ProgrammingServiceImpl implements InstructionScheduler, Progr
        }
 
        @Override
-       public Failure submitInstruction(final SubmitInstructionInput input, final InstructionExecutor executor) {
+       public synchronized ListenableFuture<Instruction> scheduleInstruction(final SubmitInstructionInput input) throws SchedulerException {
                final InstructionId id = input.getId();
                if (this.insns.get(id) != null) {
                        LOG.info("Instruction ID {} already present", id);
-                       return new FailureBuilder().setType(DuplicateInstructionId.class).build();
+                       throw new SchedulerException("Instruction ID currently in use", new FailureBuilder().setType(DuplicateInstructionId.class).build());
                }
 
                // First things first: check the deadline
@@ -192,16 +166,16 @@ public final class ProgrammingServiceImpl implements InstructionScheduler, Progr
 
                if (left.compareTo(BigInteger.ZERO) <= 0) {
                        LOG.debug("Instruction {} deadline has already passed by {}ns", id, left);
-                       return new FailureBuilder().setType(DeadOnArrival.class).build();
+                       throw new SchedulerException("Instruction arrived after specified deadline", new FailureBuilder().setType(DeadOnArrival.class).build());
                }
 
                // Resolve dependencies
-               final List<Instruction> dependencies = new ArrayList<>();
+               final List<InstructionImpl> dependencies = new ArrayList<>();
                for (final InstructionId pid : input.getPreconditions()) {
-                       final Instruction i = this.insns.get(pid);
+                       final InstructionImpl i = this.insns.get(pid);
                        if (i == null) {
                                LOG.info("Instruction {} depends on {}, which is not a known instruction", id, pid);
-                               return new FailureBuilder().setType(UnknownPreconditionId.class).build();
+                               throw new SchedulerException("Unknown dependency ID specified", new FailureBuilder().setType(UnknownPreconditionId.class).build());
                        }
 
                        dependencies.add(i);
@@ -209,7 +183,7 @@ public final class ProgrammingServiceImpl implements InstructionScheduler, Progr
 
                // Check if all dependencies are non-failed
                final List<InstructionId> unmet = new ArrayList<>();
-               for (final Instruction d : dependencies) {
+               for (final InstructionImpl d : dependencies) {
                        switch (d.getStatus()) {
                        case Cancelled:
                        case Failed:
@@ -229,7 +203,7 @@ public final class ProgrammingServiceImpl implements InstructionScheduler, Progr
                 *  and fail the operation.
                 */
                if (!unmet.isEmpty()) {
-                       return new FailureBuilder().setType(DeadOnArrival.class).setFailedPreconditions(unmet).build();
+                       throw new SchedulerException("Instruction's dependecies are already unsuccessful", new FailureBuilder().setType(DeadOnArrival.class).setFailedPreconditions(unmet).build());
                }
 
                /*
@@ -247,11 +221,12 @@ public final class ProgrammingServiceImpl implements InstructionScheduler, Progr
                }, left.longValue(), TimeUnit.NANOSECONDS);
 
                // Put it into the instruction list
-               final Instruction i = new Instruction(input.getId(), executor, dependencies, t);
+               final SettableFuture<Instruction> ret = SettableFuture.create();
+               final InstructionImpl i = new InstructionImpl(ret, input.getId(), dependencies, t, notifs);
                this.insns.put(id, i);
 
                // Attach it into its dependencies
-               for (final Instruction d : dependencies) {
+               for (final InstructionImpl d : dependencies) {
                        d.addDependant(i);
                }
 
@@ -268,218 +243,51 @@ public final class ProgrammingServiceImpl implements InstructionScheduler, Progr
                        }
                });
 
-               return null;
-       }
-
-       @GuardedBy("this")
-       private void transitionInstruction(final Instruction v, final InstructionStatus status, final Details details) {
-               // Set the status
-               v.setStatus(status);
-
-               LOG.debug("Instruction {} transitioned to status {}", v.getId(), status);
-
-               // Send out a notification
-               this.notifs.publish(new InstructionStatusChangedBuilder().setId(v.getId()).setStatus(status).setDetails(details).build());
-       }
-
-       @GuardedBy("this")
-       private void cancelSingle(final Instruction i, final Details details) {
-               // Stop the timeout
-               i.cancel();
-
-               // Set the new status and send out notification
-               transitionInstruction(i, InstructionStatus.Cancelled, details);
-       }
-
-       @GuardedBy("this")
-       private void cancelDependants(final Instruction v) {
-               final Details details = new DetailsBuilder().setUnmetDependencies(ImmutableList.of(v.getId())).build();
-               for (final Instruction d : v.getDependants()) {
-                       switch (d.getStatus()) {
-                       case Cancelled:
-                       case Executing:
-                       case Failed:
-                       case Successful:
-                       case Unknown:
-                               break;
-                       case Queued:
-                       case Scheduled:
-                               cancelSingle(d, details);
-                               cancelDependants(d);
-                               break;
-                       }
-               }
-       }
-
-       private synchronized void cancelInstruction(final Instruction i, final Details details) {
-               this.readyQueue.remove(i);
-               cancelSingle(i, details);
-               cancelDependants(i);
+               return ret;
        }
 
        private synchronized void timeoutInstruction(final InstructionId id) {
-               final Instruction i = this.insns.get(id);
+               final InstructionImpl i = this.insns.get(id);
                if (i == null) {
                        LOG.warn("Instruction {} timed out, but not found in the queue", id);
                        return;
                }
 
-               switch (i.getStatus()) {
-               case Cancelled:
-               case Failed:
-               case Successful:
-                       LOG.debug("Instruction {} has status {}, timeout is a no-op", id, i.getStatus());
-                       break;
-               case Unknown:
-                       LOG.warn("Instruction {} has status {} before timeout completed", id, i.getStatus());
-                       break;
-               case Executing:
-                       LOG.info("Instruction {} timed out while executing, transitioning into Unknown", id);
-                       transitionInstruction(i, InstructionStatus.Unknown, null);
-                       cancelDependants(i);
-                       break;
-               case Queued:
-                       LOG.debug("Instruction {} timed out while Queued, cancelling it", id);
-
-                       final List<InstructionId> ids = new ArrayList<>();
-                       for (final Instruction d : i.getDependencies()) {
-                               if (d.getStatus() != InstructionStatus.Successful) {
-                                       ids.add(d.getId());
-                               }
-                       }
-
-                       cancelInstruction(i, new DetailsBuilder().setUnmetDependencies(ids).build());
-                       break;
-               case Scheduled:
-                       LOG.debug("Instruction {} timed out while Scheduled, cancelling it", i.getId());
-                       // FIXME: BUG-191: we should provide details why it timed out while scheduled
-                       cancelInstruction(i, null);
-                       break;
-               }
-       }
-
-       @GuardedBy("this")
-       private synchronized void tryScheduleInstruction(final Instruction i) {
-               Preconditions.checkState(i.getStatus().equals(InstructionStatus.Queued));
-
-               /*
-                * Check all vertices we depend on. We start off as ready for
-                * scheduling. If we encounter a cancelled/failed/unknown
-                * dependency, we cancel this instruction (and cascade). If we
-                * encounter an executing/queued/scheduled dependency, we hold
-                * of scheduling this one.
-                */
-               boolean ready = true;
-
-               final List<InstructionId> unmet = new ArrayList<>();
-               for (final Instruction d : i.getDependencies()) {
-                       switch (d.getStatus()) {
-                       case Cancelled:
-                       case Failed:
-                       case Unknown:
-                               unmet.add(d.getId());
-                               break;
-                       case Executing:
-                       case Queued:
-                       case Scheduled:
-                               ready = false;
-                               break;
-                       case Successful:
-                               // No-op
-                               break;
-                       }
-               }
-
-               if (!unmet.isEmpty()) {
-                       LOG.debug("Instruction {} was Queued, while some dependencies were resolved unsuccessfully, cancelling it", i.getId());
-                       cancelSingle(i, new DetailsBuilder().setUnmetDependencies(unmet).build());
-                       cancelDependants(i);
-                       return;
-               }
-
-               if (ready) {
-                       LOG.debug("Instruction {} is ready for execution", i.getId());
-                       transitionInstruction(i, InstructionStatus.Scheduled, null);
-
-                       this.readyQueue.add(i);
-                       notify();
-               }
-       }
-
-       private synchronized void executionFailed(final Instruction i, final Throwable cause) {
-               LOG.error("Instruction {} failed to execute", i.getId(), cause);
-               transitionInstruction(i, InstructionStatus.Failed, null);
-               cancelDependants(i);
+               i.timeout();
        }
 
-       private synchronized void executionSuccessful(final Instruction i, final ExecutionResult<?> res) {
-               i.cancel();
-
-               transitionInstruction(i, res.getStatus(), res.getDetails());
-
+       private synchronized void tryScheduleDependants(final InstructionImpl i) {
                // Walk all dependants and try to schedule them
-               for (final Instruction d : i.getDependants()) {
-                       tryScheduleInstruction(d);
+               final Iterator<InstructionImpl> it =i.getDependants();
+               while (it.hasNext()) {
+                       tryScheduleInstruction(it.next());
                }
        }
 
-       private synchronized void processQueues() throws InterruptedException {
-               /*
-                * This method is only ever interrupted by InterruptedException
-                */
-               while (true) {
-                       while (!this.readyQueue.isEmpty()) {
-                               final Instruction i = this.readyQueue.poll();
-
-                               Preconditions.checkState(i.getStatus().equals(InstructionStatus.Scheduled));
-
-                               transitionInstruction(i, InstructionStatus.Executing, null);
-                               Futures.addCallback(i.execute(), new FutureCallback<ExecutionResult<Details>>() {
-
-                                       @Override
-                                       public void onSuccess(final ExecutionResult<Details> result) {
-                                               executionSuccessful(i, result);
-                                       }
-
-                                       @Override
-                                       public void onFailure(final Throwable t) {
-                                               executionFailed(i, t);
-                                       }
-                               });
-                       }
-
-                       wait();
-               }
-       }
-
-       synchronized void start(final ThreadFactory threadFactory) {
-               Preconditions.checkState(this.exec == null, "Programming service dispatch thread already started");
+       private synchronized void tryScheduleInstruction(final InstructionImpl i) {
+               final ListenableFuture<ExecutionResult<Details>> f = i.ready();
+               if (f != null) {
+                       this.readyQueue.add(i);
 
-               this.exec = Executors.newSingleThreadExecutor(threadFactory);
-               this.thread = this.exec.submit(new Callable<Void>() {
-                       @Override
-                       public Void call() {
-                               try {
-                                       processQueues();
-                               } catch (final InterruptedException ex) {
-                                       LOG.error("Programming service dispatch thread died", ex);
+                       Futures.addCallback(f, new FutureCallback<ExecutionResult<Details>>() {
+                               @Override
+                               public void onSuccess(final ExecutionResult<Details> result) {
+                                       tryScheduleDependants(i);
                                }
-                               return null;
-                       }
-               });
-               this.exec.shutdown();
-       }
 
-       synchronized void stop(final long timeout, final TimeUnit unit) throws InterruptedException {
-               Preconditions.checkState(this.exec != null, "Programming service dispatch thread already stopped");
+                               @Override
+                               public void onFailure(final Throwable t) {
+                                       LOG.error("Instruction {} failed to execute", i.getId(), t);
+                               }
+                       });
+               }
 
-               this.thread.cancel(true);
-               this.exec.awaitTermination(timeout, unit);
-               this.exec = null;
        }
 
        @Override
-       public void close() throws InterruptedException {
-               stop(CLOSE_TIMEOUT, TimeUnit.SECONDS);
+       public synchronized void close() {
+               for (InstructionImpl i : readyQueue) {
+                       i.tryCancel(null);
+               }
        }
 }
diff --git a/programming/spi/src/main/java/org/opendaylight/bgpcep/programming/spi/Instruction.java b/programming/spi/src/main/java/org/opendaylight/bgpcep/programming/spi/Instruction.java
new file mode 100644 (file)
index 0000000..acb1c8e
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.bgpcep.programming.spi;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.Details;
+
+/**
+ *
+ */
+public interface Instruction {
+       /**
+        * Instruction executors are required to call this method prior to
+        * starting executing on the instruction. Implementations of this method
+        * are required to transition into Executing state and return true, or
+        * into Cancelled state and return false.
+        * 
+        * @return Indication whether the instruction execution should proceed.
+        */
+       boolean checkedExecutionStart();
+
+       /**
+        * Instruction executors can inform about execution hold ups which
+        * prevent an otherwise-ready instruction from executing by calling this
+        * method. It is recommended they check the return of this method to
+        * detect if a cancellation occurred asynchronously.
+        * 
+        * @param details Details which execution is held up
+        * @return Indication whether the instruction execution should proceed.
+        *         If this method returns false, all subsequent calls to this
+        *         method as well as {@link checkedExecutionStart()} will return
+        *         false.
+        */
+       boolean executionHeldUp(Details details);
+
+       /**
+        * Instruction executors are required to call this method when execution
+        * has finished to provide the execution result to the end.
+        * 
+        * @param status Execution result
+        * @param details Execution result details
+        */
+       void executionCompleted(InstructionStatus status, Details details);
+}
index ec8d1f4f5ed29de1c0fce6b66b815094c50529fd..2462948c9927c1bd30b0d919337c9b2eb2c28d93 100644 (file)
@@ -8,8 +8,21 @@
 package org.opendaylight.bgpcep.programming.spi;
 
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure._case.Failure;
+
+import com.google.common.util.concurrent.ListenableFuture;
 
 public interface InstructionScheduler {
-       Failure submitInstruction(SubmitInstructionInput input, InstructionExecutor executor);
+       /**
+        * Schedule a new instruction for execution. This method tries to enqueue
+        * an instruction. It will return a Future which represents the scheduling
+        * progress. When the future becomes successful, the requestor is expected
+        * to start executing on the instruction, as specified by the {@link Instruction}
+        * contract.
+        * 
+        * @param input Instruction scheduling information
+        * @return Scheduling future.
+        * 
+        * @throws SchedulerException if a failure to schedule the instruction occurs.
+        */
+       ListenableFuture<Instruction> scheduleInstruction(SubmitInstructionInput input) throws SchedulerException;
 }
similarity index 50%
rename from programming/spi/src/main/java/org/opendaylight/bgpcep/programming/spi/InstructionExecutor.java
rename to programming/spi/src/main/java/org/opendaylight/bgpcep/programming/spi/SchedulerException.java
index 5780e6a693496120a7d96794b10bae26115b7ab2..f1e90d2f299286b0409f04143d1c0501aeae7d46 100644 (file)
@@ -7,10 +7,21 @@
  */
 package org.opendaylight.bgpcep.programming.spi;
 
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.Details;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure._case.Failure;
 
-import com.google.common.util.concurrent.ListenableFuture;
+/**
+ *
+ */
+public class SchedulerException extends Exception {
+       private static final long serialVersionUID = 1L;
+       private final Failure failure;
+
+       public SchedulerException(final String message, final Failure failure) {
+               super(message);
+               this.failure = failure;
+       }
 
-public interface InstructionExecutor {
-       ListenableFuture<ExecutionResult<Details>> execute();
+       public final Failure getFailure() {
+               return failure;
+       }
 }