Merge "Fix junit dependencies in poms. Reuse existing from parent, add missing ones."
[bgpcep.git] / programming / impl / src / main / java / org / opendaylight / bgpcep / programming / impl / ProgrammingServiceImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.programming.impl;
9
10 import io.netty.util.Timeout;
11 import io.netty.util.Timer;
12 import io.netty.util.TimerTask;
13 import io.netty.util.concurrent.Future;
14 import io.netty.util.concurrent.FutureListener;
15
16 import java.math.BigInteger;
17 import java.util.ArrayDeque;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.Deque;
21 import java.util.HashMap;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ThreadFactory;
29 import java.util.concurrent.TimeUnit;
30
31 import javax.annotation.concurrent.GuardedBy;
32
33 import org.opendaylight.bgpcep.programming.spi.ExecutionResult;
34 import org.opendaylight.bgpcep.programming.spi.InstructionExecutor;
35 import org.opendaylight.bgpcep.programming.spi.InstructionExecutorRegistry;
36 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DeadOnArrival;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DuplicateInstructionId;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionId;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatus;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatusChangedBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionType;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.ProgrammingService;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionInput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionOutputBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UncancellableInstruction;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownInstruction;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownPreconditionId;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.Details;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.DetailsBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure.Failure;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure.FailureBuilder;
57 import org.opendaylight.yangtools.concepts.Registration;
58 import org.opendaylight.yangtools.yang.common.RpcResult;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 import com.google.common.base.Preconditions;
63 import com.google.common.collect.ArrayListMultimap;
64 import com.google.common.collect.ImmutableList;
65 import com.google.common.collect.Multimap;
66
67 final class ProgrammingServiceImpl implements InstructionExecutorRegistry, ProgrammingService {
68         private static final Logger LOG = LoggerFactory.getLogger(ProgrammingServiceImpl.class);
69         private static final BigInteger MILLION = BigInteger.valueOf(1000000);
70
71         private final Map<InstructionId, Instruction> insns = new HashMap<>();
72
73         @GuardedBy("this")
74         private final Deque<Instruction> readyQueue = new ArrayDeque<>();
75
76         @GuardedBy("this")
77         private final Deque<Instruction> deferredQueue = new ArrayDeque<>();
78
79         @GuardedBy("this")
80         private final Multimap<Class<? extends InstructionType>, InstructionExecutor> executors = ArrayListMultimap.create();
81
82         private final NotificationProviderService notifs;
83         private final ExecutorService executor;
84         private final Timer timer;
85         private java.util.concurrent.Future<Void> thread;
86         private ExecutorService exec;
87
88         ProgrammingServiceImpl(final NotificationProviderService notifs, final ExecutorService executor,
89                         final Timer timer, final InstructionExecutorRegistry registry) {
90                 this.notifs = Preconditions.checkNotNull(notifs);
91                 this.executor = Preconditions.checkNotNull(executor);
92                 this.timer = Preconditions.checkNotNull(timer);
93         }
94
95         @Override
96         public java.util.concurrent.Future<RpcResult<CancelInstructionOutput>> cancelInstruction(final CancelInstructionInput input) {
97                 return executor.submit(new Callable<RpcResult<CancelInstructionOutput>>() {
98                         @Override
99                         public RpcResult<CancelInstructionOutput> call() {
100                                 return realCancelInstruction(input);
101                         }
102                 });
103         }
104
105         @Override
106         public java.util.concurrent.Future<RpcResult<SubmitInstructionOutput>> submitInstruction(final SubmitInstructionInput input) {
107                 return executor.submit(new Callable<RpcResult<SubmitInstructionOutput>>() {
108                         @Override
109                         public RpcResult<SubmitInstructionOutput> call() {
110                                 return realSubmitInstruction(input);
111                         }
112                 });
113         }
114
115         @Override
116         public synchronized Registration<InstructionExecutor> registerInstructionExecutor(final Class<? extends InstructionType> type, final InstructionExecutor executor) {
117                 Preconditions.checkNotNull(type);
118                 Preconditions.checkNotNull(executor);
119
120                 executors.put(type, executor);
121
122                 /*
123                  * Walk the deferred instructions back to front, check if they have
124                  * the same type as the executor we have just registered. If they do,
125                  * we move them to the head of readyQueue. This approach should retain
126                  * submission order of the instructions.
127                  */
128                 final Iterator<Instruction> it = deferredQueue.descendingIterator();
129                 while (it.hasNext()) {
130                         final Instruction i = it.next();
131                         if (type.equals(i.getInput().getType())) {
132                                 it.remove();
133                                 readyQueue.addFirst(i);
134                         }
135                 }
136
137                 notify();
138
139                 final Object lock = this;
140                 return new Registration<InstructionExecutor>() {
141                         @Override
142                         public void close() throws Exception {
143                                 synchronized (lock) {
144                                         executors.remove(type, executor);
145                                 }
146                         }
147
148                         @Override
149                         public InstructionExecutor getInstance() {
150                                 return executor;
151                         }
152                 };
153         }
154
155         private static final RpcResult<SubmitInstructionOutput> failedSubmit(final Failure f) {
156                 return SuccessfulRpcResult.create(
157                                 new SubmitInstructionOutputBuilder().setResult(
158                                                 new org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.FailureBuilder()
159                                                 .setFailure(f).build()).build());
160         }
161
162         private synchronized RpcResult<CancelInstructionOutput> realCancelInstruction(final CancelInstructionInput input)  {
163                 final Instruction i = insns.get(input.getId());
164                 if (i == null) {
165                         LOG.debug("Instruction {} not present in the graph", input.getId());
166
167                         final CancelInstructionOutput out = new CancelInstructionOutputBuilder().setFailure(UnknownInstruction.class).build();
168                         return SuccessfulRpcResult.create(out);
169                 }
170
171                 switch (i.getStatus()) {
172                 case Cancelled:
173                 case Executing:
174                 case Failed:
175                 case Successful:
176                 case Unknown:
177                         LOG.debug("Instruction {} can no longer be cancelled due to status {}", input.getId());
178                         return SuccessfulRpcResult.create(
179                                         new CancelInstructionOutputBuilder().setFailure(UncancellableInstruction.class).build());
180                 case Queued:
181                 case Scheduled:
182                         break;
183                 }
184
185                 cancelInstruction(i, null);
186                 return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder().build());
187         }
188
189         private synchronized RpcResult<SubmitInstructionOutput> realSubmitInstruction(final SubmitInstructionInput input) {
190                 final InstructionId id = input.getId();
191                 if (insns.get(id) != null) {
192                         LOG.info("Instruction ID {} already present", id);
193                         return failedSubmit(new FailureBuilder().setType(DuplicateInstructionId.class).build());
194                 }
195
196                 // First things first: check the deadline
197                 final BigInteger now = BigInteger.valueOf(System.currentTimeMillis()).multiply(MILLION);
198                 final BigInteger left = input.getDeadline().getValue().subtract(now);
199
200                 if (left.compareTo(BigInteger.ZERO) <= 0) {
201                         LOG.debug("Instruction {} deadline has already passed by {}ns", id, left);
202                         return failedSubmit(new FailureBuilder().setType(DeadOnArrival.class).build());
203                 }
204
205                 // Resolve dependencies
206                 final List<Instruction> dependencies = new ArrayList<>();
207                 for (final InstructionId pid : input.getPreconditions()) {
208                         final Instruction i = insns.get(pid);
209                         if (i == null) {
210                                 LOG.info("Instruction {} depends on {}, which is not a known instruction", id, pid);
211                                 return failedSubmit(new FailureBuilder().setType(UnknownPreconditionId.class).build());
212                         }
213
214                         dependencies.add(i);
215                 }
216
217                 // Check if all dependencies are non-failed
218                 final List<InstructionId> unmet = new ArrayList<>();
219                 for (final Instruction d : dependencies) {
220                         switch (d.getStatus()) {
221                         case Cancelled:
222                         case Failed:
223                         case Unknown:
224                                 unmet.add(d.getInput().getId());
225                                 break;
226                         case Executing:
227                         case Queued:
228                         case Scheduled:
229                         case Successful:
230                                 break;
231                         }
232                 }
233
234                 /*
235                  *  Some dependencies have failed, declare the request dead-on-arrival
236                  *  and fail the operation.
237                  */
238                 if (!unmet.isEmpty()) {
239                         return failedSubmit(new FailureBuilder().setType(DeadOnArrival.class).setFailedPreconditions(unmet).build());
240                 }
241
242                 /*
243                  * All pre-flight checks done are at this point, the following
244                  * steps can only fail in catastrophic scenarios (OOM and the
245                  * like).
246                  */
247
248                 // Schedule a timeout for the instruction
249                 final Timeout t = timer.newTimeout(new TimerTask() {
250                         @Override
251                         public void run(final Timeout timeout) throws Exception {
252                                 timeoutInstruction(input.getId());
253                         }
254                 }, left.longValue(), TimeUnit.NANOSECONDS);
255
256                 // Put it into the instruction list
257                 final Instruction i = new Instruction(input, dependencies, t);
258                 insns.put(id, i);
259
260                 // Attach it into its dependencies
261                 for (final Instruction d : dependencies) {
262                         d.addDependant(i);
263                 }
264
265                 /*
266                  * All done. The next part is checking whether the instruction can
267                  * run, which we can figure out after sending out the acknowledgement.
268                  * This task should be ingress-weighed, so we reinsert it into the
269                  * same execution service.
270                  */
271                 executor.submit(new Runnable() {
272                         @Override
273                         public void run() {
274                                 tryScheduleInstruction(i);
275                         }
276                 });
277
278                 return SuccessfulRpcResult.create(new SubmitInstructionOutputBuilder().build());
279         }
280
281         @GuardedBy("this")
282         private void transitionInstruction(final Instruction v, final InstructionStatus status, final Details details) {
283                 // Set the status
284                 v.setStatus(status);
285
286                 LOG.debug("Instruction {} transitioned to status {}", v.getInput().getId(), status);
287
288                 // Send out a notification
289                 notifs.publish(new InstructionStatusChangedBuilder().
290                                 setId(v.getInput().getId()).setStatus(status).setDetails(details).build());
291         }
292
293         @GuardedBy("this")
294         private void cancelSingle(final Instruction i, final Details details) {
295                 // Stop the timeout
296                 i.cancel();
297
298                 // Set the new status and send out notification
299                 transitionInstruction(i, InstructionStatus.Cancelled, details);
300         }
301
302         @GuardedBy("this")
303         private void cancelDependants(final Instruction v) {
304                 final Details details = new DetailsBuilder().setUnmetDependencies(ImmutableList.of(v.getInput().getId())).build();
305                 for (final Instruction d : v.getDependants()) {
306                         switch (d.getStatus()) {
307                         case Cancelled:
308                         case Executing:
309                         case Failed:
310                         case Successful:
311                         case Unknown:
312                                 break;
313                         case Queued:
314                         case Scheduled:
315                                 cancelSingle(d, details);
316                                 cancelDependants(d);
317                                 break;
318                         }
319                 }
320         }
321
322         private synchronized void cancelInstruction(final Instruction i, final Details details) {
323                 deferredQueue.remove(i);
324                 readyQueue.remove(i);
325                 cancelSingle(i, details);
326                 cancelDependants(i);
327         }
328
329         private synchronized void timeoutInstruction(final InstructionId id) {
330                 final Instruction i = insns.get(id);
331                 if (i == null) {
332                         LOG.warn("Instruction {} timed out, but not found in the queue", id);
333                         return;
334                 }
335
336                 switch (i.getStatus()) {
337                 case Cancelled:
338                 case Failed:
339                 case Successful:
340                         LOG.debug("Instruction {} has status {}, timeout is a no-op", id, i.getStatus());
341                         break;
342                 case Unknown:
343                         LOG.warn("Instruction {} has status {} before timeout completed", id, i.getStatus());
344                         break;
345                 case Executing:
346                         LOG.info("Instruction {} timed out while executing, transitioning into Unknown", id);
347                         transitionInstruction(i, InstructionStatus.Unknown, null);
348                         cancelDependants(i);
349                         break;
350                 case Queued:
351                         LOG.debug("Instruction {} timed out while Queued, cancelling it", id);
352
353                         final List<InstructionId> ids = new ArrayList<>();
354                         for (final Instruction d : i.getDependencies()) {
355                                 if (d.getStatus() != InstructionStatus.Successful) {
356                                         ids.add(d.getInput().getId());
357                                 }
358                         }
359
360                         cancelInstruction(i, new DetailsBuilder().setUnmetDependencies(ids).build());
361                         break;
362                 case Scheduled:
363                         LOG.debug("Instruction {} timed out while Scheduled, cancelling it", i.getInput().getId());
364                         // FIXME: we should provide details why it timed out while scheduled
365                         cancelInstruction(i, null);
366                         break;
367                 }
368         }
369
370         @GuardedBy("this")
371         private synchronized void tryScheduleInstruction(final Instruction i) {
372                 Preconditions.checkState(i.getStatus().equals(InstructionStatus.Queued));
373
374                 /*
375                  * Check all vertices we depend on. We start off as ready for
376                  * scheduling. If we encounter a cancelled/failed/unknown
377                  * dependency, we cancel this instruction (and cascade). If we
378                  * encounter an executing/queued/scheduled dependency, we hold
379                  * of scheduling this one.
380                  */
381                 boolean ready = true;
382
383                 final List<InstructionId> unmet = new ArrayList<>();
384                 for (final Instruction d : i.getDependencies()) {
385                         switch (d.getStatus()) {
386                         case Cancelled:
387                         case Failed:
388                         case Unknown:
389                                 unmet.add(d.getInput().getId());
390                                 break;
391                         case Executing:
392                         case Queued:
393                         case Scheduled:
394                                 ready = false;
395                                 break;
396                         case Successful:
397                                 // No-op
398                                 break;
399                         }
400                 }
401
402                 if (!unmet.isEmpty()) {
403                         LOG.debug("Instruction {} was Queued, while some dependencies were resolved unsuccessfully, cancelling it", i.getInput().getId());
404                         cancelSingle(i, new DetailsBuilder().setUnmetDependencies(unmet).build());
405                         cancelDependants(i);
406                         return;
407                 }
408
409                 if (ready) {
410                         LOG.debug("Instruction {} is ready for execution", i.getInput().getId());
411                         transitionInstruction(i, InstructionStatus.Scheduled, null);
412
413                         readyQueue.add(i);
414                         notify();
415                 }
416         }
417
418         private synchronized void executionFailed(final Instruction i, final Throwable cause) {
419                 LOG.error("Instruction {} failed to execute", i.getInput().getId(), cause);
420                 transitionInstruction(i, InstructionStatus.Failed, null);
421                 cancelDependants(i);
422         }
423
424         private synchronized void executionSuccessful(final Instruction i, final ExecutionResult<?> res) {
425                 i.cancel();
426
427                 transitionInstruction(i, res.getStatus(), res.getDetails());
428
429                 // Walk all dependants and try to schedule them
430                 for (final Instruction d : i.getDependants()) {
431                         tryScheduleInstruction(d);
432                 }
433         }
434
435         private synchronized void processQueues() throws InterruptedException {
436                 /*
437                  * This method is only ever interrupted by InterruptedException
438                  */
439                 while (true) {
440                         while (!readyQueue.isEmpty()) {
441                                 final Instruction i = readyQueue.poll();
442
443                                 Preconditions.checkState(i.getStatus().equals(InstructionStatus.Scheduled));
444                                 final SubmitInstructionInput input = i.getInput();
445
446                                 /*
447                                  *  Walk all the registered executors for a particular type and
448                                  *  offer them the chance to execute the instruction. The first
449                                  *  one to accept it wins.
450                                  */
451                                 Future<ExecutionResult<?>> f = null;
452                                 final Collection<InstructionExecutor> el = executors.get(input.getType());
453
454                                 for (final InstructionExecutor e : el) {
455                                         f = e.offerInstruction(input.getArguments());
456                                         if (f != null) {
457                                                 break;
458                                         }
459                                 }
460
461                                 // We did not find an executor -- defer the instruction
462                                 if (f == null) {
463                                         deferredQueue.add(i);
464                                         continue;
465                                 }
466
467                                 transitionInstruction(i, InstructionStatus.Executing, null);
468                                 f.addListener(new FutureListener<ExecutionResult<?>>() {
469                                         @Override
470                                         public void operationComplete(final Future<ExecutionResult<?>> future) {
471                                                 if (future.isSuccess()) {
472                                                         executionSuccessful(i, future.getNow());
473                                                 } else {
474                                                         executionFailed(i, future.cause());
475                                                 }
476                                         }
477                                 });
478                         }
479
480                         wait();
481                 }
482         }
483
484         synchronized void start(final ThreadFactory threadFactory) {
485                 Preconditions.checkState(exec == null, "Programming service dispatch thread already started");
486
487                 exec = Executors.newSingleThreadExecutor(threadFactory);
488                 thread = exec.submit(new Callable<Void>() {
489                         @Override
490                         public Void call() throws Exception {
491                                 try {
492                                         processQueues();
493                                 } catch (Exception ex) {
494                                         if (!(ex instanceof InterruptedException)) {
495                                                 LOG.error("Programming service dispatch thread died", ex);
496                                         }
497                                         throw ex;
498                                 }
499                                 return null;
500                         }
501                 });
502                 exec.shutdown();
503         }
504
505         synchronized void stop(final long timeout, final TimeUnit unit) throws InterruptedException {
506                 Preconditions.checkState(exec != null, "Programming service dispatch thread already stopped");
507
508                 thread.cancel(true);
509                 exec.awaitTermination(timeout, unit);
510                 exec = null;
511         }
512 }