Fixed some sonar warnings.
[bgpcep.git] / programming / impl / src / main / java / org / opendaylight / bgpcep / programming / impl / ProgrammingServiceImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.programming.impl;
9
10 import io.netty.util.Timeout;
11 import io.netty.util.Timer;
12 import io.netty.util.TimerTask;
13
14 import java.math.BigInteger;
15 import java.util.ArrayDeque;
16 import java.util.ArrayList;
17 import java.util.Deque;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.concurrent.Callable;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ThreadFactory;
25 import java.util.concurrent.TimeUnit;
26
27 import javax.annotation.concurrent.GuardedBy;
28
29 import org.opendaylight.bgpcep.programming.NanotimeUtil;
30 import org.opendaylight.bgpcep.programming.spi.ExecutionResult;
31 import org.opendaylight.bgpcep.programming.spi.InstructionExecutor;
32 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
33 import org.opendaylight.bgpcep.programming.spi.SuccessfulRpcResult;
34 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DeadOnArrival;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DuplicateInstructionId;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionId;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatus;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatusChangedBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.Nanotime;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.ProgrammingService;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UncancellableInstruction;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownInstruction;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownPreconditionId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.Details;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.DetailsBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure.Failure;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure.FailureBuilder;
53 import org.opendaylight.yangtools.yang.common.RpcResult;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 import com.google.common.base.Preconditions;
58 import com.google.common.collect.ImmutableList;
59 import com.google.common.util.concurrent.FutureCallback;
60 import com.google.common.util.concurrent.Futures;
61
62 public final class ProgrammingServiceImpl implements InstructionScheduler, ProgrammingService, AutoCloseable {
63         private static final Logger LOG = LoggerFactory.getLogger(ProgrammingServiceImpl.class);
64
65         // Default stop timeout, in seconds
66         private static final long CLOSE_TIMEOUT = 5;
67
68         private final Map<InstructionId, Instruction> insns = new HashMap<>();
69
70         @GuardedBy("this")
71         private final Deque<Instruction> readyQueue = new ArrayDeque<>();
72
73         private final NotificationProviderService notifs;
74         private final ExecutorService executor;
75         private final Timer timer;
76         private java.util.concurrent.Future<Void> thread;
77         private ExecutorService exec;
78
79         public ProgrammingServiceImpl(final NotificationProviderService notifs, final ExecutorService executor,
80                         final Timer timer) {
81                 this.notifs = Preconditions.checkNotNull(notifs);
82                 this.executor = Preconditions.checkNotNull(executor);
83                 this.timer = Preconditions.checkNotNull(timer);
84         }
85
86         @Override
87         public java.util.concurrent.Future<RpcResult<CancelInstructionOutput>> cancelInstruction(final CancelInstructionInput input) {
88                 return executor.submit(new Callable<RpcResult<CancelInstructionOutput>>() {
89                         @Override
90                         public RpcResult<CancelInstructionOutput> call() {
91                                 return realCancelInstruction(input);
92                         }
93                 });
94         }
95
96         private synchronized RpcResult<CancelInstructionOutput> realCancelInstruction(final CancelInstructionInput input)  {
97                 final Instruction i = insns.get(input.getId());
98                 if (i == null) {
99                         LOG.debug("Instruction {} not present in the graph", input.getId());
100
101                         final CancelInstructionOutput out = new CancelInstructionOutputBuilder().setFailure(UnknownInstruction.class).build();
102                         return SuccessfulRpcResult.create(out);
103                 }
104
105                 switch (i.getStatus()) {
106                 case Cancelled:
107                 case Executing:
108                 case Failed:
109                 case Successful:
110                 case Unknown:
111                         LOG.debug("Instruction {} can no longer be cancelled due to status {}", input.getId());
112                         return SuccessfulRpcResult.create(
113                                         new CancelInstructionOutputBuilder().setFailure(UncancellableInstruction.class).build());
114                 case Queued:
115                 case Scheduled:
116                         break;
117                 }
118
119                 cancelInstruction(i, null);
120                 return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder().build());
121         }
122
123         @Override
124         public Failure submitInstruction(final SubmitInstructionInput input, final InstructionExecutor executor) {
125                 final InstructionId id = input.getId();
126                 if (insns.get(id) != null) {
127                         LOG.info("Instruction ID {} already present", id);
128                         return new FailureBuilder().setType(DuplicateInstructionId.class).build();
129                 }
130
131                 // First things first: check the deadline
132                 final Nanotime now = NanotimeUtil.currentTime();
133                 final BigInteger left = input.getDeadline().getValue().subtract(now.getValue());
134
135                 if (left.compareTo(BigInteger.ZERO) <= 0) {
136                         LOG.debug("Instruction {} deadline has already passed by {}ns", id, left);
137                         return new FailureBuilder().setType(DeadOnArrival.class).build();
138                 }
139
140                 // Resolve dependencies
141                 final List<Instruction> dependencies = new ArrayList<>();
142                 for (final InstructionId pid : input.getPreconditions()) {
143                         final Instruction i = insns.get(pid);
144                         if (i == null) {
145                                 LOG.info("Instruction {} depends on {}, which is not a known instruction", id, pid);
146                                 return new FailureBuilder().setType(UnknownPreconditionId.class).build();
147                         }
148
149                         dependencies.add(i);
150                 }
151
152                 // Check if all dependencies are non-failed
153                 final List<InstructionId> unmet = new ArrayList<>();
154                 for (final Instruction d : dependencies) {
155                         switch (d.getStatus()) {
156                         case Cancelled:
157                         case Failed:
158                         case Unknown:
159                                 unmet.add(d.getId());
160                                 break;
161                         case Executing:
162                         case Queued:
163                         case Scheduled:
164                         case Successful:
165                                 break;
166                         }
167                 }
168
169                 /*
170                  *  Some dependencies have failed, declare the request dead-on-arrival
171                  *  and fail the operation.
172                  */
173                 if (!unmet.isEmpty()) {
174                         return new FailureBuilder().setType(DeadOnArrival.class).setFailedPreconditions(unmet).build();
175                 }
176
177                 /*
178                  * All pre-flight checks done are at this point, the following
179                  * steps can only fail in catastrophic scenarios (OOM and the
180                  * like).
181                  */
182
183                 // Schedule a timeout for the instruction
184                 final Timeout t = timer.newTimeout(new TimerTask() {
185                         @Override
186                         public void run(final Timeout timeout) throws Exception {
187                                 timeoutInstruction(input.getId());
188                         }
189                 }, left.longValue(), TimeUnit.NANOSECONDS);
190
191                 // Put it into the instruction list
192                 final Instruction i = new Instruction(input.getId(), executor, dependencies, t);
193                 insns.put(id, i);
194
195                 // Attach it into its dependencies
196                 for (final Instruction d : dependencies) {
197                         d.addDependant(i);
198                 }
199
200                 /*
201                  * All done. The next part is checking whether the instruction can
202                  * run, which we can figure out after sending out the acknowledgement.
203                  * This task should be ingress-weighed, so we reinsert it into the
204                  * same execution service.
205                  */
206                 this.executor.submit(new Runnable() {
207                         @Override
208                         public void run() {
209                                 tryScheduleInstruction(i);
210                         }
211                 });
212
213                 return null;
214         }
215
216         @GuardedBy("this")
217         private void transitionInstruction(final Instruction v, final InstructionStatus status, final Details details) {
218                 // Set the status
219                 v.setStatus(status);
220
221                 LOG.debug("Instruction {} transitioned to status {}", v.getId(), status);
222
223                 // Send out a notification
224                 notifs.publish(new InstructionStatusChangedBuilder().
225                                 setId(v.getId()).setStatus(status).setDetails(details).build());
226         }
227
228         @GuardedBy("this")
229         private void cancelSingle(final Instruction i, final Details details) {
230                 // Stop the timeout
231                 i.cancel();
232
233                 // Set the new status and send out notification
234                 transitionInstruction(i, InstructionStatus.Cancelled, details);
235         }
236
237         @GuardedBy("this")
238         private void cancelDependants(final Instruction v) {
239                 final Details details = new DetailsBuilder().setUnmetDependencies(ImmutableList.of(v.getId())).build();
240                 for (final Instruction d : v.getDependants()) {
241                         switch (d.getStatus()) {
242                         case Cancelled:
243                         case Executing:
244                         case Failed:
245                         case Successful:
246                         case Unknown:
247                                 break;
248                         case Queued:
249                         case Scheduled:
250                                 cancelSingle(d, details);
251                                 cancelDependants(d);
252                                 break;
253                         }
254                 }
255         }
256
257         private synchronized void cancelInstruction(final Instruction i, final Details details) {
258                 readyQueue.remove(i);
259                 cancelSingle(i, details);
260                 cancelDependants(i);
261         }
262
263         private synchronized void timeoutInstruction(final InstructionId id) {
264                 final Instruction i = insns.get(id);
265                 if (i == null) {
266                         LOG.warn("Instruction {} timed out, but not found in the queue", id);
267                         return;
268                 }
269
270                 switch (i.getStatus()) {
271                 case Cancelled:
272                 case Failed:
273                 case Successful:
274                         LOG.debug("Instruction {} has status {}, timeout is a no-op", id, i.getStatus());
275                         break;
276                 case Unknown:
277                         LOG.warn("Instruction {} has status {} before timeout completed", id, i.getStatus());
278                         break;
279                 case Executing:
280                         LOG.info("Instruction {} timed out while executing, transitioning into Unknown", id);
281                         transitionInstruction(i, InstructionStatus.Unknown, null);
282                         cancelDependants(i);
283                         break;
284                 case Queued:
285                         LOG.debug("Instruction {} timed out while Queued, cancelling it", id);
286
287                         final List<InstructionId> ids = new ArrayList<>();
288                         for (final Instruction d : i.getDependencies()) {
289                                 if (d.getStatus() != InstructionStatus.Successful) {
290                                         ids.add(d.getId());
291                                 }
292                         }
293
294                         cancelInstruction(i, new DetailsBuilder().setUnmetDependencies(ids).build());
295                         break;
296                 case Scheduled:
297                         LOG.debug("Instruction {} timed out while Scheduled, cancelling it", i.getId());
298                         // FIXME: we should provide details why it timed out while scheduled
299                         cancelInstruction(i, null);
300                         break;
301                 }
302         }
303
304         @GuardedBy("this")
305         private synchronized void tryScheduleInstruction(final Instruction i) {
306                 Preconditions.checkState(i.getStatus().equals(InstructionStatus.Queued));
307
308                 /*
309                  * Check all vertices we depend on. We start off as ready for
310                  * scheduling. If we encounter a cancelled/failed/unknown
311                  * dependency, we cancel this instruction (and cascade). If we
312                  * encounter an executing/queued/scheduled dependency, we hold
313                  * of scheduling this one.
314                  */
315                 boolean ready = true;
316
317                 final List<InstructionId> unmet = new ArrayList<>();
318                 for (final Instruction d : i.getDependencies()) {
319                         switch (d.getStatus()) {
320                         case Cancelled:
321                         case Failed:
322                         case Unknown:
323                                 unmet.add(d.getId());
324                                 break;
325                         case Executing:
326                         case Queued:
327                         case Scheduled:
328                                 ready = false;
329                                 break;
330                         case Successful:
331                                 // No-op
332                                 break;
333                         }
334                 }
335
336                 if (!unmet.isEmpty()) {
337                         LOG.debug("Instruction {} was Queued, while some dependencies were resolved unsuccessfully, cancelling it", i.getId());
338                         cancelSingle(i, new DetailsBuilder().setUnmetDependencies(unmet).build());
339                         cancelDependants(i);
340                         return;
341                 }
342
343                 if (ready) {
344                         LOG.debug("Instruction {} is ready for execution", i.getId());
345                         transitionInstruction(i, InstructionStatus.Scheduled, null);
346
347                         readyQueue.add(i);
348                         notify();
349                 }
350         }
351
352         private synchronized void executionFailed(final Instruction i, final Throwable cause) {
353                 LOG.error("Instruction {} failed to execute", i.getId(), cause);
354                 transitionInstruction(i, InstructionStatus.Failed, null);
355                 cancelDependants(i);
356         }
357
358         private synchronized void executionSuccessful(final Instruction i, final ExecutionResult<?> res) {
359                 i.cancel();
360
361                 transitionInstruction(i, res.getStatus(), res.getDetails());
362
363                 // Walk all dependants and try to schedule them
364                 for (final Instruction d : i.getDependants()) {
365                         tryScheduleInstruction(d);
366                 }
367         }
368
369         private synchronized void processQueues() throws InterruptedException {
370                 /*
371                  * This method is only ever interrupted by InterruptedException
372                  */
373                 while (true) {
374                         while (!readyQueue.isEmpty()) {
375                                 final Instruction i = readyQueue.poll();
376
377                                 Preconditions.checkState(i.getStatus().equals(InstructionStatus.Scheduled));
378
379                                 transitionInstruction(i, InstructionStatus.Executing, null);
380                                 Futures.addCallback(i.execute(), new FutureCallback<ExecutionResult<Details>>() {
381
382                                         @Override
383                                         public void onSuccess(final ExecutionResult<Details> result) {
384                                                 executionSuccessful(i, result);
385                                         }
386
387                                         @Override
388                                         public void onFailure(final Throwable t) {
389                                                 executionFailed(i, t);
390                                         }
391                                 });
392                         }
393
394                         wait();
395                 }
396         }
397
398         synchronized void start(final ThreadFactory threadFactory) {
399                 Preconditions.checkState(exec == null, "Programming service dispatch thread already started");
400
401                 exec = Executors.newSingleThreadExecutor(threadFactory);
402                 thread = exec.submit(new Callable<Void>() {
403                         @Override
404                         public Void call() throws Exception {
405                                 try {
406                                         processQueues();
407                                 } catch (InterruptedException ex) {
408                                         LOG.error("Programming service dispatch thread died", ex);
409                                 }
410                                 return null;
411                         }
412                 });
413                 exec.shutdown();
414         }
415
416         synchronized void stop(final long timeout, final TimeUnit unit) throws InterruptedException {
417                 Preconditions.checkState(exec != null, "Programming service dispatch thread already stopped");
418
419                 thread.cancel(true);
420                 exec.awaitTermination(timeout, unit);
421                 exec = null;
422         }
423
424         @Override
425         public void close() throws Exception {
426                 stop(CLOSE_TIMEOUT, TimeUnit.SECONDS);
427         }
428 }