BUG-48: Tunnel integration, part 1
[bgpcep.git] / programming / impl / src / main / java / org / opendaylight / bgpcep / programming / impl / ProgrammingServiceImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.programming.impl;
9
10 import io.netty.util.Timeout;
11 import io.netty.util.Timer;
12 import io.netty.util.TimerTask;
13
14 import java.math.BigInteger;
15 import java.util.ArrayDeque;
16 import java.util.ArrayList;
17 import java.util.Deque;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.concurrent.Callable;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ThreadFactory;
25 import java.util.concurrent.TimeUnit;
26
27 import javax.annotation.concurrent.GuardedBy;
28
29 import org.opendaylight.bgpcep.programming.NanotimeUtil;
30 import org.opendaylight.bgpcep.programming.spi.ExecutionResult;
31 import org.opendaylight.bgpcep.programming.spi.InstructionExecutor;
32 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
33 import org.opendaylight.bgpcep.programming.spi.SuccessfulRpcResult;
34 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DeadOnArrival;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DuplicateInstructionId;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionId;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatus;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatusChangedBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.Nanotime;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.ProgrammingService;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UncancellableInstruction;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownInstruction;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownPreconditionId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.Details;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.DetailsBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure.Failure;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure.FailureBuilder;
53 import org.opendaylight.yangtools.yang.common.RpcResult;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 import com.google.common.base.Preconditions;
58 import com.google.common.collect.ImmutableList;
59 import com.google.common.util.concurrent.FutureCallback;
60 import com.google.common.util.concurrent.Futures;
61
62 final class ProgrammingServiceImpl implements InstructionScheduler, ProgrammingService {
63         private static final Logger LOG = LoggerFactory.getLogger(ProgrammingServiceImpl.class);
64
65         private final Map<InstructionId, Instruction> insns = new HashMap<>();
66
67         @GuardedBy("this")
68         private final Deque<Instruction> readyQueue = new ArrayDeque<>();
69
70         private final NotificationProviderService notifs;
71         private final ExecutorService executor;
72         private final Timer timer;
73         private java.util.concurrent.Future<Void> thread;
74         private ExecutorService exec;
75
76         ProgrammingServiceImpl(final NotificationProviderService notifs, final ExecutorService executor,
77                         final Timer timer, final InstructionScheduler registry) {
78                 this.notifs = Preconditions.checkNotNull(notifs);
79                 this.executor = Preconditions.checkNotNull(executor);
80                 this.timer = Preconditions.checkNotNull(timer);
81         }
82
83         @Override
84         public java.util.concurrent.Future<RpcResult<CancelInstructionOutput>> cancelInstruction(final CancelInstructionInput input) {
85                 return executor.submit(new Callable<RpcResult<CancelInstructionOutput>>() {
86                         @Override
87                         public RpcResult<CancelInstructionOutput> call() {
88                                 return realCancelInstruction(input);
89                         }
90                 });
91         }
92
93         private synchronized RpcResult<CancelInstructionOutput> realCancelInstruction(final CancelInstructionInput input)  {
94                 final Instruction i = insns.get(input.getId());
95                 if (i == null) {
96                         LOG.debug("Instruction {} not present in the graph", input.getId());
97
98                         final CancelInstructionOutput out = new CancelInstructionOutputBuilder().setFailure(UnknownInstruction.class).build();
99                         return SuccessfulRpcResult.create(out);
100                 }
101
102                 switch (i.getStatus()) {
103                 case Cancelled:
104                 case Executing:
105                 case Failed:
106                 case Successful:
107                 case Unknown:
108                         LOG.debug("Instruction {} can no longer be cancelled due to status {}", input.getId());
109                         return SuccessfulRpcResult.create(
110                                         new CancelInstructionOutputBuilder().setFailure(UncancellableInstruction.class).build());
111                 case Queued:
112                 case Scheduled:
113                         break;
114                 }
115
116                 cancelInstruction(i, null);
117                 return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder().build());
118         }
119
120         @Override
121         public Failure submitInstruction(final SubmitInstructionInput input, final InstructionExecutor executor) {
122                 final InstructionId id = input.getId();
123                 if (insns.get(id) != null) {
124                         LOG.info("Instruction ID {} already present", id);
125                         return new FailureBuilder().setType(DuplicateInstructionId.class).build();
126                 }
127
128                 // First things first: check the deadline
129                 final Nanotime now = NanotimeUtil.currentTime();
130                 final BigInteger left = input.getDeadline().getValue().subtract(now.getValue());
131
132                 if (left.compareTo(BigInteger.ZERO) <= 0) {
133                         LOG.debug("Instruction {} deadline has already passed by {}ns", id, left);
134                         return new FailureBuilder().setType(DeadOnArrival.class).build();
135                 }
136
137                 // Resolve dependencies
138                 final List<Instruction> dependencies = new ArrayList<>();
139                 for (final InstructionId pid : input.getPreconditions()) {
140                         final Instruction i = insns.get(pid);
141                         if (i == null) {
142                                 LOG.info("Instruction {} depends on {}, which is not a known instruction", id, pid);
143                                 return new FailureBuilder().setType(UnknownPreconditionId.class).build();
144                         }
145
146                         dependencies.add(i);
147                 }
148
149                 // Check if all dependencies are non-failed
150                 final List<InstructionId> unmet = new ArrayList<>();
151                 for (final Instruction d : dependencies) {
152                         switch (d.getStatus()) {
153                         case Cancelled:
154                         case Failed:
155                         case Unknown:
156                                 unmet.add(d.getId());
157                                 break;
158                         case Executing:
159                         case Queued:
160                         case Scheduled:
161                         case Successful:
162                                 break;
163                         }
164                 }
165
166                 /*
167                  *  Some dependencies have failed, declare the request dead-on-arrival
168                  *  and fail the operation.
169                  */
170                 if (!unmet.isEmpty()) {
171                         return new FailureBuilder().setType(DeadOnArrival.class).setFailedPreconditions(unmet).build();
172                 }
173
174                 /*
175                  * All pre-flight checks done are at this point, the following
176                  * steps can only fail in catastrophic scenarios (OOM and the
177                  * like).
178                  */
179
180                 // Schedule a timeout for the instruction
181                 final Timeout t = timer.newTimeout(new TimerTask() {
182                         @Override
183                         public void run(final Timeout timeout) throws Exception {
184                                 timeoutInstruction(input.getId());
185                         }
186                 }, left.longValue(), TimeUnit.NANOSECONDS);
187
188                 // Put it into the instruction list
189                 final Instruction i = new Instruction(input.getId(), executor, dependencies, t);
190                 insns.put(id, i);
191
192                 // Attach it into its dependencies
193                 for (final Instruction d : dependencies) {
194                         d.addDependant(i);
195                 }
196
197                 /*
198                  * All done. The next part is checking whether the instruction can
199                  * run, which we can figure out after sending out the acknowledgement.
200                  * This task should be ingress-weighed, so we reinsert it into the
201                  * same execution service.
202                  */
203                 this.executor.submit(new Runnable() {
204                         @Override
205                         public void run() {
206                                 tryScheduleInstruction(i);
207                         }
208                 });
209
210                 return null;
211         }
212
213         @GuardedBy("this")
214         private void transitionInstruction(final Instruction v, final InstructionStatus status, final Details details) {
215                 // Set the status
216                 v.setStatus(status);
217
218                 LOG.debug("Instruction {} transitioned to status {}", v.getId(), status);
219
220                 // Send out a notification
221                 notifs.publish(new InstructionStatusChangedBuilder().
222                                 setId(v.getId()).setStatus(status).setDetails(details).build());
223         }
224
225         @GuardedBy("this")
226         private void cancelSingle(final Instruction i, final Details details) {
227                 // Stop the timeout
228                 i.cancel();
229
230                 // Set the new status and send out notification
231                 transitionInstruction(i, InstructionStatus.Cancelled, details);
232         }
233
234         @GuardedBy("this")
235         private void cancelDependants(final Instruction v) {
236                 final Details details = new DetailsBuilder().setUnmetDependencies(ImmutableList.of(v.getId())).build();
237                 for (final Instruction d : v.getDependants()) {
238                         switch (d.getStatus()) {
239                         case Cancelled:
240                         case Executing:
241                         case Failed:
242                         case Successful:
243                         case Unknown:
244                                 break;
245                         case Queued:
246                         case Scheduled:
247                                 cancelSingle(d, details);
248                                 cancelDependants(d);
249                                 break;
250                         }
251                 }
252         }
253
254         private synchronized void cancelInstruction(final Instruction i, final Details details) {
255                 readyQueue.remove(i);
256                 cancelSingle(i, details);
257                 cancelDependants(i);
258         }
259
260         private synchronized void timeoutInstruction(final InstructionId id) {
261                 final Instruction i = insns.get(id);
262                 if (i == null) {
263                         LOG.warn("Instruction {} timed out, but not found in the queue", id);
264                         return;
265                 }
266
267                 switch (i.getStatus()) {
268                 case Cancelled:
269                 case Failed:
270                 case Successful:
271                         LOG.debug("Instruction {} has status {}, timeout is a no-op", id, i.getStatus());
272                         break;
273                 case Unknown:
274                         LOG.warn("Instruction {} has status {} before timeout completed", id, i.getStatus());
275                         break;
276                 case Executing:
277                         LOG.info("Instruction {} timed out while executing, transitioning into Unknown", id);
278                         transitionInstruction(i, InstructionStatus.Unknown, null);
279                         cancelDependants(i);
280                         break;
281                 case Queued:
282                         LOG.debug("Instruction {} timed out while Queued, cancelling it", id);
283
284                         final List<InstructionId> ids = new ArrayList<>();
285                         for (final Instruction d : i.getDependencies()) {
286                                 if (d.getStatus() != InstructionStatus.Successful) {
287                                         ids.add(d.getId());
288                                 }
289                         }
290
291                         cancelInstruction(i, new DetailsBuilder().setUnmetDependencies(ids).build());
292                         break;
293                 case Scheduled:
294                         LOG.debug("Instruction {} timed out while Scheduled, cancelling it", i.getId());
295                         // FIXME: we should provide details why it timed out while scheduled
296                         cancelInstruction(i, null);
297                         break;
298                 }
299         }
300
301         @GuardedBy("this")
302         private synchronized void tryScheduleInstruction(final Instruction i) {
303                 Preconditions.checkState(i.getStatus().equals(InstructionStatus.Queued));
304
305                 /*
306                  * Check all vertices we depend on. We start off as ready for
307                  * scheduling. If we encounter a cancelled/failed/unknown
308                  * dependency, we cancel this instruction (and cascade). If we
309                  * encounter an executing/queued/scheduled dependency, we hold
310                  * of scheduling this one.
311                  */
312                 boolean ready = true;
313
314                 final List<InstructionId> unmet = new ArrayList<>();
315                 for (final Instruction d : i.getDependencies()) {
316                         switch (d.getStatus()) {
317                         case Cancelled:
318                         case Failed:
319                         case Unknown:
320                                 unmet.add(d.getId());
321                                 break;
322                         case Executing:
323                         case Queued:
324                         case Scheduled:
325                                 ready = false;
326                                 break;
327                         case Successful:
328                                 // No-op
329                                 break;
330                         }
331                 }
332
333                 if (!unmet.isEmpty()) {
334                         LOG.debug("Instruction {} was Queued, while some dependencies were resolved unsuccessfully, cancelling it", i.getId());
335                         cancelSingle(i, new DetailsBuilder().setUnmetDependencies(unmet).build());
336                         cancelDependants(i);
337                         return;
338                 }
339
340                 if (ready) {
341                         LOG.debug("Instruction {} is ready for execution", i.getId());
342                         transitionInstruction(i, InstructionStatus.Scheduled, null);
343
344                         readyQueue.add(i);
345                         notify();
346                 }
347         }
348
349         private synchronized void executionFailed(final Instruction i, final Throwable cause) {
350                 LOG.error("Instruction {} failed to execute", i.getId(), cause);
351                 transitionInstruction(i, InstructionStatus.Failed, null);
352                 cancelDependants(i);
353         }
354
355         private synchronized void executionSuccessful(final Instruction i, final ExecutionResult<?> res) {
356                 i.cancel();
357
358                 transitionInstruction(i, res.getStatus(), res.getDetails());
359
360                 // Walk all dependants and try to schedule them
361                 for (final Instruction d : i.getDependants()) {
362                         tryScheduleInstruction(d);
363                 }
364         }
365
366         private synchronized void processQueues() throws InterruptedException {
367                 /*
368                  * This method is only ever interrupted by InterruptedException
369                  */
370                 while (true) {
371                         while (!readyQueue.isEmpty()) {
372                                 final Instruction i = readyQueue.poll();
373
374                                 Preconditions.checkState(i.getStatus().equals(InstructionStatus.Scheduled));
375
376                                 transitionInstruction(i, InstructionStatus.Executing, null);
377                                 Futures.addCallback(i.execute(), new FutureCallback<ExecutionResult<Details>>() {
378
379                                         @Override
380                                         public void onSuccess(final ExecutionResult<Details> result) {
381                                                 executionSuccessful(i, result);
382                                         }
383
384                                         @Override
385                                         public void onFailure(final Throwable t) {
386                                                 executionFailed(i, t);
387                                         }
388                                 });
389                         }
390
391                         wait();
392                 }
393         }
394
395         synchronized void start(final ThreadFactory threadFactory) {
396                 Preconditions.checkState(exec == null, "Programming service dispatch thread already started");
397
398                 exec = Executors.newSingleThreadExecutor(threadFactory);
399                 thread = exec.submit(new Callable<Void>() {
400                         @Override
401                         public Void call() throws Exception {
402                                 try {
403                                         processQueues();
404                                 } catch (Exception ex) {
405                                         if (!(ex instanceof InterruptedException)) {
406                                                 LOG.error("Programming service dispatch thread died", ex);
407                                         }
408                                         throw ex;
409                                 }
410                                 return null;
411                         }
412                 });
413                 exec.shutdown();
414         }
415
416         synchronized void stop(final long timeout, final TimeUnit unit) throws InterruptedException {
417                 Preconditions.checkState(exec != null, "Programming service dispatch thread already stopped");
418
419                 thread.cancel(true);
420                 exec.awaitTermination(timeout, unit);
421                 exec = null;
422         }
423 }