BUG-48: next phase of implementation
[bgpcep.git] / programming / impl / src / main / java / org / opendaylight / bgpcep / programming / impl / ProgrammingServiceImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.programming.impl;
9
10 import io.netty.util.Timeout;
11 import io.netty.util.Timer;
12 import io.netty.util.TimerTask;
13 import io.netty.util.concurrent.Future;
14 import io.netty.util.concurrent.FutureListener;
15
16 import java.math.BigInteger;
17 import java.util.ArrayDeque;
18 import java.util.ArrayList;
19 import java.util.Deque;
20 import java.util.HashMap;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.TimeUnit;
28
29 import javax.annotation.concurrent.GuardedBy;
30
31 import org.opendaylight.bgpcep.programming.spi.ExecutionResult;
32 import org.opendaylight.bgpcep.programming.spi.InstructionExecutor;
33 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
34 import org.opendaylight.bgpcep.programming.spi.SuccessfulRpcResult;
35 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DeadOnArrival;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DuplicateInstructionId;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionId;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatus;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatusChangedBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.ProgrammingService;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UncancellableInstruction;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownInstruction;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownPreconditionId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.Details;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.DetailsBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure.Failure;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure.FailureBuilder;
53 import org.opendaylight.yangtools.yang.common.RpcResult;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 import com.google.common.base.Preconditions;
58 import com.google.common.collect.ImmutableList;
59
60 final class ProgrammingServiceImpl implements InstructionScheduler, ProgrammingService {
61         private static final Logger LOG = LoggerFactory.getLogger(ProgrammingServiceImpl.class);
62         private static final BigInteger MILLION = BigInteger.valueOf(1000000);
63
64         private final Map<InstructionId, Instruction> insns = new HashMap<>();
65
66         @GuardedBy("this")
67         private final Deque<Instruction> readyQueue = new ArrayDeque<>();
68
69         private final NotificationProviderService notifs;
70         private final ExecutorService executor;
71         private final Timer timer;
72         private java.util.concurrent.Future<Void> thread;
73         private ExecutorService exec;
74
75         ProgrammingServiceImpl(final NotificationProviderService notifs, final ExecutorService executor,
76                         final Timer timer, final InstructionScheduler registry) {
77                 this.notifs = Preconditions.checkNotNull(notifs);
78                 this.executor = Preconditions.checkNotNull(executor);
79                 this.timer = Preconditions.checkNotNull(timer);
80         }
81
82         @Override
83         public java.util.concurrent.Future<RpcResult<CancelInstructionOutput>> cancelInstruction(final CancelInstructionInput input) {
84                 return executor.submit(new Callable<RpcResult<CancelInstructionOutput>>() {
85                         @Override
86                         public RpcResult<CancelInstructionOutput> call() {
87                                 return realCancelInstruction(input);
88                         }
89                 });
90         }
91
92         private synchronized RpcResult<CancelInstructionOutput> realCancelInstruction(final CancelInstructionInput input)  {
93                 final Instruction i = insns.get(input.getId());
94                 if (i == null) {
95                         LOG.debug("Instruction {} not present in the graph", input.getId());
96
97                         final CancelInstructionOutput out = new CancelInstructionOutputBuilder().setFailure(UnknownInstruction.class).build();
98                         return SuccessfulRpcResult.create(out);
99                 }
100
101                 switch (i.getStatus()) {
102                 case Cancelled:
103                 case Executing:
104                 case Failed:
105                 case Successful:
106                 case Unknown:
107                         LOG.debug("Instruction {} can no longer be cancelled due to status {}", input.getId());
108                         return SuccessfulRpcResult.create(
109                                         new CancelInstructionOutputBuilder().setFailure(UncancellableInstruction.class).build());
110                 case Queued:
111                 case Scheduled:
112                         break;
113                 }
114
115                 cancelInstruction(i, null);
116                 return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder().build());
117         }
118
119         @Override
120         public Failure submitInstruction(final SubmitInstructionInput input, final InstructionExecutor executor) {
121                 final InstructionId id = input.getId();
122                 if (insns.get(id) != null) {
123                         LOG.info("Instruction ID {} already present", id);
124                         return new FailureBuilder().setType(DuplicateInstructionId.class).build();
125                 }
126
127                 // First things first: check the deadline
128                 final BigInteger now = BigInteger.valueOf(System.currentTimeMillis()).multiply(MILLION);
129                 final BigInteger left = input.getDeadline().getValue().subtract(now);
130
131                 if (left.compareTo(BigInteger.ZERO) <= 0) {
132                         LOG.debug("Instruction {} deadline has already passed by {}ns", id, left);
133                         return new FailureBuilder().setType(DeadOnArrival.class).build();
134                 }
135
136                 // Resolve dependencies
137                 final List<Instruction> dependencies = new ArrayList<>();
138                 for (final InstructionId pid : input.getPreconditions()) {
139                         final Instruction i = insns.get(pid);
140                         if (i == null) {
141                                 LOG.info("Instruction {} depends on {}, which is not a known instruction", id, pid);
142                                 return new FailureBuilder().setType(UnknownPreconditionId.class).build();
143                         }
144
145                         dependencies.add(i);
146                 }
147
148                 // Check if all dependencies are non-failed
149                 final List<InstructionId> unmet = new ArrayList<>();
150                 for (final Instruction d : dependencies) {
151                         switch (d.getStatus()) {
152                         case Cancelled:
153                         case Failed:
154                         case Unknown:
155                                 unmet.add(d.getId());
156                                 break;
157                         case Executing:
158                         case Queued:
159                         case Scheduled:
160                         case Successful:
161                                 break;
162                         }
163                 }
164
165                 /*
166                  *  Some dependencies have failed, declare the request dead-on-arrival
167                  *  and fail the operation.
168                  */
169                 if (!unmet.isEmpty()) {
170                         return new FailureBuilder().setType(DeadOnArrival.class).setFailedPreconditions(unmet).build();
171                 }
172
173                 /*
174                  * All pre-flight checks done are at this point, the following
175                  * steps can only fail in catastrophic scenarios (OOM and the
176                  * like).
177                  */
178
179                 // Schedule a timeout for the instruction
180                 final Timeout t = timer.newTimeout(new TimerTask() {
181                         @Override
182                         public void run(final Timeout timeout) throws Exception {
183                                 timeoutInstruction(input.getId());
184                         }
185                 }, left.longValue(), TimeUnit.NANOSECONDS);
186
187                 // Put it into the instruction list
188                 final Instruction i = new Instruction(input.getId(), executor, dependencies, t);
189                 insns.put(id, i);
190
191                 // Attach it into its dependencies
192                 for (final Instruction d : dependencies) {
193                         d.addDependant(i);
194                 }
195
196                 /*
197                  * All done. The next part is checking whether the instruction can
198                  * run, which we can figure out after sending out the acknowledgement.
199                  * This task should be ingress-weighed, so we reinsert it into the
200                  * same execution service.
201                  */
202                 this.executor.submit(new Runnable() {
203                         @Override
204                         public void run() {
205                                 tryScheduleInstruction(i);
206                         }
207                 });
208
209                 return null;
210         }
211
212         @GuardedBy("this")
213         private void transitionInstruction(final Instruction v, final InstructionStatus status, final Details details) {
214                 // Set the status
215                 v.setStatus(status);
216
217                 LOG.debug("Instruction {} transitioned to status {}", v.getId(), status);
218
219                 // Send out a notification
220                 notifs.publish(new InstructionStatusChangedBuilder().
221                                 setId(v.getId()).setStatus(status).setDetails(details).build());
222         }
223
224         @GuardedBy("this")
225         private void cancelSingle(final Instruction i, final Details details) {
226                 // Stop the timeout
227                 i.cancel();
228
229                 // Set the new status and send out notification
230                 transitionInstruction(i, InstructionStatus.Cancelled, details);
231         }
232
233         @GuardedBy("this")
234         private void cancelDependants(final Instruction v) {
235                 final Details details = new DetailsBuilder().setUnmetDependencies(ImmutableList.of(v.getId())).build();
236                 for (final Instruction d : v.getDependants()) {
237                         switch (d.getStatus()) {
238                         case Cancelled:
239                         case Executing:
240                         case Failed:
241                         case Successful:
242                         case Unknown:
243                                 break;
244                         case Queued:
245                         case Scheduled:
246                                 cancelSingle(d, details);
247                                 cancelDependants(d);
248                                 break;
249                         }
250                 }
251         }
252
253         private synchronized void cancelInstruction(final Instruction i, final Details details) {
254                 readyQueue.remove(i);
255                 cancelSingle(i, details);
256                 cancelDependants(i);
257         }
258
259         private synchronized void timeoutInstruction(final InstructionId id) {
260                 final Instruction i = insns.get(id);
261                 if (i == null) {
262                         LOG.warn("Instruction {} timed out, but not found in the queue", id);
263                         return;
264                 }
265
266                 switch (i.getStatus()) {
267                 case Cancelled:
268                 case Failed:
269                 case Successful:
270                         LOG.debug("Instruction {} has status {}, timeout is a no-op", id, i.getStatus());
271                         break;
272                 case Unknown:
273                         LOG.warn("Instruction {} has status {} before timeout completed", id, i.getStatus());
274                         break;
275                 case Executing:
276                         LOG.info("Instruction {} timed out while executing, transitioning into Unknown", id);
277                         transitionInstruction(i, InstructionStatus.Unknown, null);
278                         cancelDependants(i);
279                         break;
280                 case Queued:
281                         LOG.debug("Instruction {} timed out while Queued, cancelling it", id);
282
283                         final List<InstructionId> ids = new ArrayList<>();
284                         for (final Instruction d : i.getDependencies()) {
285                                 if (d.getStatus() != InstructionStatus.Successful) {
286                                         ids.add(d.getId());
287                                 }
288                         }
289
290                         cancelInstruction(i, new DetailsBuilder().setUnmetDependencies(ids).build());
291                         break;
292                 case Scheduled:
293                         LOG.debug("Instruction {} timed out while Scheduled, cancelling it", i.getId());
294                         // FIXME: we should provide details why it timed out while scheduled
295                         cancelInstruction(i, null);
296                         break;
297                 }
298         }
299
300         @GuardedBy("this")
301         private synchronized void tryScheduleInstruction(final Instruction i) {
302                 Preconditions.checkState(i.getStatus().equals(InstructionStatus.Queued));
303
304                 /*
305                  * Check all vertices we depend on. We start off as ready for
306                  * scheduling. If we encounter a cancelled/failed/unknown
307                  * dependency, we cancel this instruction (and cascade). If we
308                  * encounter an executing/queued/scheduled dependency, we hold
309                  * of scheduling this one.
310                  */
311                 boolean ready = true;
312
313                 final List<InstructionId> unmet = new ArrayList<>();
314                 for (final Instruction d : i.getDependencies()) {
315                         switch (d.getStatus()) {
316                         case Cancelled:
317                         case Failed:
318                         case Unknown:
319                                 unmet.add(d.getId());
320                                 break;
321                         case Executing:
322                         case Queued:
323                         case Scheduled:
324                                 ready = false;
325                                 break;
326                         case Successful:
327                                 // No-op
328                                 break;
329                         }
330                 }
331
332                 if (!unmet.isEmpty()) {
333                         LOG.debug("Instruction {} was Queued, while some dependencies were resolved unsuccessfully, cancelling it", i.getId());
334                         cancelSingle(i, new DetailsBuilder().setUnmetDependencies(unmet).build());
335                         cancelDependants(i);
336                         return;
337                 }
338
339                 if (ready) {
340                         LOG.debug("Instruction {} is ready for execution", i.getId());
341                         transitionInstruction(i, InstructionStatus.Scheduled, null);
342
343                         readyQueue.add(i);
344                         notify();
345                 }
346         }
347
348         private synchronized void executionFailed(final Instruction i, final Throwable cause) {
349                 LOG.error("Instruction {} failed to execute", i.getId(), cause);
350                 transitionInstruction(i, InstructionStatus.Failed, null);
351                 cancelDependants(i);
352         }
353
354         private synchronized void executionSuccessful(final Instruction i, final ExecutionResult<?> res) {
355                 i.cancel();
356
357                 transitionInstruction(i, res.getStatus(), res.getDetails());
358
359                 // Walk all dependants and try to schedule them
360                 for (final Instruction d : i.getDependants()) {
361                         tryScheduleInstruction(d);
362                 }
363         }
364
365         private synchronized void processQueues() throws InterruptedException {
366                 /*
367                  * This method is only ever interrupted by InterruptedException
368                  */
369                 while (true) {
370                         while (!readyQueue.isEmpty()) {
371                                 final Instruction i = readyQueue.poll();
372
373                                 Preconditions.checkState(i.getStatus().equals(InstructionStatus.Scheduled));
374
375                                 transitionInstruction(i, InstructionStatus.Executing, null);
376                                 final Future<ExecutionResult<?>> f = i.execute();
377                                 f.addListener(new FutureListener<ExecutionResult<?>>() {
378                                         @Override
379                                         public void operationComplete(final Future<ExecutionResult<?>> future) {
380                                                 if (future.isSuccess()) {
381                                                         executionSuccessful(i, future.getNow());
382                                                 } else {
383                                                         executionFailed(i, future.cause());
384                                                 }
385                                         }
386                                 });
387                         }
388
389                         wait();
390                 }
391         }
392
393         synchronized void start(final ThreadFactory threadFactory) {
394                 Preconditions.checkState(exec == null, "Programming service dispatch thread already started");
395
396                 exec = Executors.newSingleThreadExecutor(threadFactory);
397                 thread = exec.submit(new Callable<Void>() {
398                         @Override
399                         public Void call() throws Exception {
400                                 try {
401                                         processQueues();
402                                 } catch (Exception ex) {
403                                         if (!(ex instanceof InterruptedException)) {
404                                                 LOG.error("Programming service dispatch thread died", ex);
405                                         }
406                                         throw ex;
407                                 }
408                                 return null;
409                         }
410                 });
411                 exec.shutdown();
412         }
413
414         synchronized void stop(final long timeout, final TimeUnit unit) throws InterruptedException {
415                 Preconditions.checkState(exec != null, "Programming service dispatch thread already stopped");
416
417                 thread.cancel(true);
418                 exec.awaitTermination(timeout, unit);
419                 exec = null;
420         }
421 }