BUG-592: use new InstanceIdentifier APIs
[bgpcep.git] / programming / impl / src / main / java / org / opendaylight / bgpcep / programming / impl / ProgrammingServiceImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.programming.impl;
9
10 import io.netty.util.Timeout;
11 import io.netty.util.Timer;
12 import io.netty.util.TimerTask;
13
14 import java.math.BigInteger;
15 import java.util.ArrayList;
16 import java.util.Collections;
17 import java.util.HashMap;
18 import java.util.Iterator;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.concurrent.Callable;
22 import java.util.concurrent.TimeUnit;
23
24 import org.opendaylight.bgpcep.programming.NanotimeUtil;
25 import org.opendaylight.bgpcep.programming.spi.ExecutionResult;
26 import org.opendaylight.bgpcep.programming.spi.Instruction;
27 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
28 import org.opendaylight.bgpcep.programming.spi.SchedulerException;
29 import org.opendaylight.bgpcep.programming.spi.SuccessfulRpcResult;
30 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
31 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
32 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionInput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelInstructionOutputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CleanInstructionsInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CleanInstructionsOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CleanInstructionsOutputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DeadOnArrival;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.DuplicateInstructionId;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionId;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionQueue;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionQueueBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatus;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.InstructionStatusChangedBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.Nanotime;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.ProgrammingService;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.SubmitInstructionInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownInstruction;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.UnknownPreconditionId;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.queue.InstructionKey;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.queue.InstructionsBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.queue.InstructionsKey;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.status.changed.Details;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.submit.instruction.output.result.failure._case.FailureBuilder;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.opendaylight.yangtools.yang.common.RpcResult;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 import com.google.common.base.Preconditions;
62 import com.google.common.util.concurrent.FutureCallback;
63 import com.google.common.util.concurrent.Futures;
64 import com.google.common.util.concurrent.ListenableFuture;
65 import com.google.common.util.concurrent.ListeningExecutorService;
66 import com.google.common.util.concurrent.SettableFuture;
67
68 public final class ProgrammingServiceImpl implements AutoCloseable, InstructionScheduler, ProgrammingService {
69         private static final Logger LOG = LoggerFactory.getLogger(ProgrammingServiceImpl.class);
70
71         private final Map<InstructionId, InstructionImpl> insns = new HashMap<>();
72         private final InstanceIdentifier<InstructionQueue> qid;
73         private final NotificationProviderService notifs;
74         private final ListeningExecutorService executor;
75         private final DataProviderService dataProvider;
76         private final Timer timer;
77
78         private final class InstructionPusher implements QueueInstruction {
79                 private final InstructionsBuilder builder = new InstructionsBuilder();
80
81                 InstructionPusher(final InstructionId id, final Nanotime deadline) {
82                         builder.setDeadline(deadline);
83                         builder.setId(id);
84                         builder.setKey(new InstructionsKey(id));
85                         builder.setStatus(InstructionStatus.Queued);
86                 }
87
88                 @Override
89                 public void instructionUpdated(final InstructionStatus status, final Details details) {
90                         if (!status.equals(builder.getStatus())) {
91                                 builder.setStatus(status);
92
93                                 final DataModificationTransaction t = dataProvider.beginTransaction();
94                                 t.putOperationalData(qid.child(
95                                                 org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.queue.Instruction.class,
96                                                 new InstructionKey(builder.getId())), builder.build());
97                                 t.commit();
98                         }
99
100                         notifs.publish(new InstructionStatusChangedBuilder().setId(builder.getId()).setStatus(status).setDetails(details).build());
101                 }
102
103                 @Override
104                 public void instructionRemoved() {
105                         final DataModificationTransaction t = dataProvider.beginTransaction();
106                         t.removeOperationalData(qid.child(
107                                         org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.queue.Instruction.class,
108                                         new InstructionKey(builder.getId())));
109                         t.commit();
110                 }
111         }
112
113         public ProgrammingServiceImpl(final DataProviderService dataProvider, final NotificationProviderService notifs, final ListeningExecutorService executor, final Timer timer) {
114                 this.dataProvider = Preconditions.checkNotNull(dataProvider);
115                 this.notifs = Preconditions.checkNotNull(notifs);
116                 this.executor = Preconditions.checkNotNull(executor);
117                 this.timer = Preconditions.checkNotNull(timer);
118                 qid = InstanceIdentifier.builder(InstructionQueue.class).toInstance();
119
120                 final DataModificationTransaction t = dataProvider.beginTransaction();
121                 Preconditions.checkState(t.readOperationalData(qid) == null, "Conflicting instruction queue found");
122
123                 t.putOperationalData(qid, new InstructionQueueBuilder().setInstruction(Collections.<org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.queue.Instruction>emptyList()).build());
124                 t.commit();
125         }
126
127         @Override
128         public ListenableFuture<RpcResult<CancelInstructionOutput>> cancelInstruction(final CancelInstructionInput input) {
129                 return this.executor.submit(new Callable<RpcResult<CancelInstructionOutput>>() {
130                         @Override
131                         public RpcResult<CancelInstructionOutput> call() {
132                                 return realCancelInstruction(input);
133                         }
134                 });
135         }
136
137         @Override
138         public ListenableFuture<RpcResult<CleanInstructionsOutput>> cleanInstructions(final CleanInstructionsInput input) {
139                 return this.executor.submit(new Callable<RpcResult<CleanInstructionsOutput>>() {
140                         @Override
141                         public RpcResult<CleanInstructionsOutput> call() {
142                                 return realCleanInstructions(input);
143                         }
144                 });
145         }
146
147         private synchronized RpcResult<CancelInstructionOutput> realCancelInstruction(final CancelInstructionInput input) {
148                 final InstructionImpl i = this.insns.get(input.getId());
149                 if (i == null) {
150                         LOG.debug("Instruction {} not present in the graph", input.getId());
151
152                         final CancelInstructionOutput out = new CancelInstructionOutputBuilder().setFailure(UnknownInstruction.class).build();
153                         return SuccessfulRpcResult.create(out);
154                 }
155
156                 return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder().setFailure(i.tryCancel(null)).build());
157         }
158
159
160         private synchronized RpcResult<CleanInstructionsOutput> realCleanInstructions(final CleanInstructionsInput input) {
161                 final List<InstructionId> failed = new ArrayList<>();
162
163                 for (final InstructionId id : input.getId()) {
164                         // Find the instruction
165                         final InstructionImpl i = this.insns.get(id);
166                         if (i == null) {
167                                 LOG.debug("Instruction {} not present in the graph", input.getId());
168                                 failed.add(id);
169                                 continue;
170                         }
171
172                         // Check its status
173                         switch (i.getStatus()) {
174                         case Cancelled:
175                         case Failed:
176                         case Successful:
177                                 break;
178                         case Executing:
179                         case Queued:
180                         case Scheduled:
181                         case Unknown:
182                                 LOG.debug("Instruction {} cannot be cleaned because of it's in state {}", id, i.getStatus());
183                                 failed.add(id);
184                                 continue;
185                         }
186
187                         // The instruction is in a terminal state, we need to just unlink
188                         // it from its dependencies and dependants
189                         i.clean();
190
191                         insns.remove(i);
192                         LOG.debug("Instruction {} cleaned successfully", id);
193                 }
194
195                 final CleanInstructionsOutputBuilder ob = new CleanInstructionsOutputBuilder();
196                 if (!failed.isEmpty()) {
197                         ob.setUnflushed(failed);
198                 }
199
200                 return SuccessfulRpcResult.create(ob.build());
201         }
202
203         @Override
204         public synchronized ListenableFuture<Instruction> scheduleInstruction(final SubmitInstructionInput input) throws SchedulerException {
205                 final InstructionId id = input.getId();
206                 if (this.insns.get(id) != null) {
207                         LOG.info("Instruction ID {} already present", id);
208                         throw new SchedulerException("Instruction ID currently in use", new FailureBuilder().setType(DuplicateInstructionId.class).build());
209                 }
210
211                 // First things first: check the deadline
212                 final Nanotime now = NanotimeUtil.currentTime();
213                 final BigInteger left = input.getDeadline().getValue().subtract(now.getValue());
214
215                 if (left.compareTo(BigInteger.ZERO) <= 0) {
216                         LOG.debug("Instruction {} deadline has already passed by {}ns", id, left);
217                         throw new SchedulerException("Instruction arrived after specified deadline", new FailureBuilder().setType(DeadOnArrival.class).build());
218                 }
219
220                 // Resolve dependencies
221                 final List<InstructionImpl> dependencies = new ArrayList<>();
222                 for (final InstructionId pid : input.getPreconditions()) {
223                         final InstructionImpl i = this.insns.get(pid);
224                         if (i == null) {
225                                 LOG.info("Instruction {} depends on {}, which is not a known instruction", id, pid);
226                                 throw new SchedulerException("Unknown dependency ID specified", new FailureBuilder().setType(UnknownPreconditionId.class).build());
227                         }
228
229                         dependencies.add(i);
230                 }
231
232                 // Check if all dependencies are non-failed
233                 final List<InstructionId> unmet = new ArrayList<>();
234                 for (final InstructionImpl d : dependencies) {
235                         switch (d.getStatus()) {
236                         case Cancelled:
237                         case Failed:
238                         case Unknown:
239                                 unmet.add(d.getId());
240                                 break;
241                         case Executing:
242                         case Queued:
243                         case Scheduled:
244                         case Successful:
245                                 break;
246                         }
247                 }
248
249                 /*
250                  *  Some dependencies have failed, declare the request dead-on-arrival
251                  *  and fail the operation.
252                  */
253                 if (!unmet.isEmpty()) {
254                         throw new SchedulerException("Instruction's dependencies are already unsuccessful", new FailureBuilder().setType(DeadOnArrival.class).setFailedPreconditions(unmet).build());
255                 }
256
257                 /*
258                  * All pre-flight checks done are at this point, the following
259                  * steps can only fail in catastrophic scenarios (OOM and the
260                  * like).
261                  */
262
263                 // Schedule a timeout for the instruction
264                 final Timeout t = this.timer.newTimeout(new TimerTask() {
265                         @Override
266                         public void run(final Timeout timeout) {
267                                 timeoutInstruction(input.getId());
268                         }
269                 }, left.longValue(), TimeUnit.NANOSECONDS);
270
271                 // Put it into the instruction list
272                 final SettableFuture<Instruction> ret = SettableFuture.create();
273                 final InstructionImpl i = new InstructionImpl(new InstructionPusher(id, input.getDeadline()), ret, id, dependencies, t);
274                 this.insns.put(id, i);
275
276                 // Attach it into its dependencies
277                 for (final InstructionImpl d : dependencies) {
278                         d.addDependant(i);
279                 }
280
281                 /*
282                  * All done. The next part is checking whether the instruction can
283                  * run, which we can figure out after sending out the acknowledgement.
284                  * This task should be ingress-weighed, so we reinsert it into the
285                  * same execution service.
286                  */
287                 this.executor.submit(new Runnable() {
288                         @Override
289                         public void run() {
290                                 tryScheduleInstruction(i);
291                         }
292                 });
293
294                 return ret;
295         }
296
297         private synchronized void timeoutInstruction(final InstructionId id) {
298                 final InstructionImpl i = this.insns.get(id);
299                 if (i == null) {
300                         LOG.warn("Instruction {} timed out, but not found in the queue", id);
301                         return;
302                 }
303
304                 i.timeout();
305         }
306
307         private synchronized void tryScheduleDependants(final InstructionImpl i) {
308                 // Walk all dependants and try to schedule them
309                 final Iterator<InstructionImpl> it =i.getDependants();
310                 while (it.hasNext()) {
311                         tryScheduleInstruction(it.next());
312                 }
313         }
314
315         private synchronized void tryScheduleInstruction(final InstructionImpl i) {
316                 final ListenableFuture<ExecutionResult<Details>> f = i.ready();
317                 if (f != null) {
318                         Futures.addCallback(f, new FutureCallback<ExecutionResult<Details>>() {
319                                 @Override
320                                 public void onSuccess(final ExecutionResult<Details> result) {
321                                         tryScheduleDependants(i);
322                                 }
323
324                                 @Override
325                                 public void onFailure(final Throwable t) {
326                                         LOG.error("Instruction {} failed to execute", i.getId(), t);
327                                 }
328                         });
329                 }
330
331         }
332
333         @Override
334         public synchronized void close() {
335                 try {
336                         for (InstructionImpl i : insns.values()) {
337                                 i.tryCancel(null);
338                         }
339                 } finally {
340                         final DataModificationTransaction t = dataProvider.beginTransaction();
341                         t.removeOperationalData(qid);
342                         t.commit();
343                 }
344         }
345 }