834fae584fd5bf50f015656c6197577a4253dcab
[bgpcep.git] / programming / impl / src / main / java / org / opendaylight / bgpcep / programming / impl / ProgrammingServiceImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.programming.impl;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import com.google.common.util.concurrent.SettableFuture;
17 import io.netty.util.Timeout;
18 import io.netty.util.Timer;
19 import java.math.BigInteger;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.TimeUnit;
27 import org.opendaylight.bgpcep.programming.NanotimeUtil;
28 import org.opendaylight.bgpcep.programming.spi.ExecutionResult;
29 import org.opendaylight.bgpcep.programming.spi.Instruction;
30 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
31 import org.opendaylight.bgpcep.programming.spi.SchedulerException;
32 import org.opendaylight.bgpcep.programming.spi.SuccessfulRpcResult;
33 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
34 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
35 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
36 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
37 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
38 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
39 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
41 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
42 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
43 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionOutputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CleanInstructionsInput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CleanInstructionsOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CleanInstructionsOutputBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.DeadOnArrival;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.DuplicateInstructionId;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionId;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionStatus;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionStatusChangedBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionsQueue;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionsQueueBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionsQueueKey;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.Nanotime;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.ProgrammingService;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.SubmitInstructionInput;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.UnknownInstruction;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.UnknownPreconditionId;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction.queue.InstructionBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction.queue.InstructionKey;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction.status.changed.Details;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.submit.instruction.output.result.failure._case.FailureBuilder;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
69 import org.opendaylight.yangtools.yang.common.RpcResult;
70 import org.osgi.framework.ServiceRegistration;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73
74 public final class ProgrammingServiceImpl implements AutoCloseable, ClusterSingletonService, InstructionScheduler,
75     ProgrammingService {
76     private static final Logger LOG = LoggerFactory.getLogger(ProgrammingServiceImpl.class);
77
78     private final Map<InstructionId, InstructionImpl> insns = new HashMap<>();
79     private final InstanceIdentifier<InstructionsQueue> qid;
80     private final NotificationPublishService notifs;
81     private final ListeningExecutorService executor;
82     private final DataBroker dataProvider;
83     private final Timer timer;
84     private final String instructionId;
85     private final ServiceGroupIdentifier sgi;
86     private final ClusterSingletonServiceRegistration csspReg;
87     private final RpcProviderRegistry rpcProviderRegistry;
88     private RpcRegistration<ProgrammingService> reg;
89     private ServiceRegistration<?> serviceRegistration;
90
91     private final class InstructionPusher implements QueueInstruction {
92         private final InstructionBuilder builder = new InstructionBuilder();
93
94         InstructionPusher(final InstructionId id, final Nanotime deadline) {
95             this.builder.setDeadline(deadline);
96             this.builder.setId(id);
97             this.builder.setKey(new InstructionKey(id));
98             this.builder.setStatus(InstructionStatus.Queued);
99         }
100
101         @Override
102         public void instructionUpdated(final InstructionStatus status, final Details details) {
103             if (!status.equals(this.builder.getStatus())) {
104                 this.builder.setStatus(status);
105
106                 final WriteTransaction t = ProgrammingServiceImpl.this.dataProvider.newWriteOnlyTransaction();
107                 t.put(LogicalDatastoreType.OPERATIONAL,
108                         ProgrammingServiceImpl.this.qid.child(
109                                 org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction.queue.Instruction.class,
110                                 new InstructionKey(this.builder.getId())), this.builder.build());
111                 Futures.addCallback(t.submit(), new FutureCallback<Void>() {
112                     @Override
113                     public void onSuccess(final Void result) {
114                         LOG.debug("Instruction Queue {} updated", ProgrammingServiceImpl.this.qid);
115                     }
116
117                     @Override
118                     public void onFailure(final Throwable t) {
119                         LOG.error("Failed to update Instruction Queue {}", ProgrammingServiceImpl.this.qid, t);
120                     }
121                 });
122             }
123
124             try {
125                 ProgrammingServiceImpl.this.notifs.putNotification(new InstructionStatusChangedBuilder()
126                     .setId(this.builder.getId()).setStatus(status).setDetails(details).build());
127             } catch (final InterruptedException e) {
128                 LOG.debug("Failed to publish notification", e);
129             }
130         }
131
132         @Override
133         public void instructionRemoved() {
134             final WriteTransaction t = ProgrammingServiceImpl.this.dataProvider.newWriteOnlyTransaction();
135             t.delete(LogicalDatastoreType.OPERATIONAL, ProgrammingServiceImpl.this.qid.child(
136                     org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction.queue.Instruction.class,
137                     new InstructionKey(this.builder.getId())));
138             Futures.addCallback(t.submit(), new FutureCallback<Void>() {
139                 @Override
140                 public void onSuccess(final Void result) {
141                     LOG.debug("Instruction Queue {} removed", ProgrammingServiceImpl.this.qid);
142                 }
143
144                 @Override
145                 public void onFailure(final Throwable t) {
146                     LOG.error("Failed to remove Instruction Queue {}", ProgrammingServiceImpl.this.qid, t);
147                 }
148             });
149         }
150     }
151
152     ProgrammingServiceImpl(final DataBroker dataProvider, final NotificationPublishService notifs,
153         final ListeningExecutorService executor, final RpcProviderRegistry rpcProviderRegistry,
154         final ClusterSingletonServiceProvider cssp, final Timer timer, final String instructionId) {
155         this.dataProvider = Preconditions.checkNotNull(dataProvider);
156         this.instructionId = Preconditions.checkNotNull(instructionId);
157         this.notifs = Preconditions.checkNotNull(notifs);
158         this.executor = Preconditions.checkNotNull(executor);
159         this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
160         this.timer = Preconditions.checkNotNull(timer);
161         this.qid = KeyedInstanceIdentifier.builder(InstructionsQueue.class,  new InstructionsQueueKey(this.instructionId)).build();
162         this.sgi = ServiceGroupIdentifier.create("programming-"+ this.instructionId + "-service-group");
163         this.csspReg = cssp.registerClusterSingletonService(this);
164     }
165
166     @Override
167     public void instantiateServiceInstance() {
168         LOG.info("Instruction Queue service {} instantiated", this.sgi.getValue());
169
170         this.reg = this.rpcProviderRegistry.addRpcImplementation(ProgrammingService.class, this);
171
172         final WriteTransaction t = this.dataProvider.newWriteOnlyTransaction();
173         t.put(LogicalDatastoreType.OPERATIONAL, this.qid, new InstructionsQueueBuilder()
174             .setKey(new InstructionsQueueKey(this.instructionId)).setInstruction(Collections.emptyList()).build());
175         Futures.addCallback(t.submit(), new FutureCallback<Void>() {
176             @Override
177             public void onSuccess(final Void result) {
178                 LOG.debug("Instruction Queue {} added", ProgrammingServiceImpl.this.qid);
179             }
180
181             @Override
182             public void onFailure(final Throwable t) {
183                 LOG.error("Failed to add Instruction Queue {}", ProgrammingServiceImpl.this.qid, t);
184             }
185         });
186     }
187
188     @Override
189     public ServiceGroupIdentifier getIdentifier() {
190         return this.sgi;
191     }
192
193     @Override
194     public ListenableFuture<RpcResult<CancelInstructionOutput>> cancelInstruction(final CancelInstructionInput input) {
195         return this.executor.submit(() -> realCancelInstruction(input));
196     }
197
198     @Override
199     public ListenableFuture<RpcResult<CleanInstructionsOutput>> cleanInstructions(final CleanInstructionsInput input) {
200         return this.executor.submit(() -> realCleanInstructions(input));
201     }
202
203     private synchronized RpcResult<CancelInstructionOutput> realCancelInstruction(final CancelInstructionInput input) {
204         final InstructionImpl i = this.insns.get(input.getId());
205         if (i == null) {
206             LOG.debug("Instruction {} not present in the graph", input.getId());
207
208             final CancelInstructionOutput out = new CancelInstructionOutputBuilder().setFailure(UnknownInstruction.class).build();
209             return SuccessfulRpcResult.create(out);
210         }
211
212         return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder().setFailure(i.tryCancel(null)).build());
213     }
214
215     private synchronized RpcResult<CleanInstructionsOutput> realCleanInstructions(final CleanInstructionsInput input) {
216         final List<InstructionId> failed = new ArrayList<>();
217
218         for (final InstructionId id : input.getId()) {
219             // Find the instruction
220             final InstructionImpl i = this.insns.get(id);
221             if (i == null) {
222                 LOG.debug("Instruction {} not present in the graph", input.getId());
223                 failed.add(id);
224                 continue;
225             }
226
227             // Check its status
228             switch (i.getStatus()) {
229             case Cancelled:
230             case Failed:
231             case Successful:
232                 break;
233             case Executing:
234             case Queued:
235             case Scheduled:
236             case Unknown:
237                 LOG.debug("Instruction {} cannot be cleaned because of it's in state {}", id, i.getStatus());
238                 failed.add(id);
239                 continue;
240             default:
241                 break;
242             }
243
244             // The instruction is in a terminal state, we need to just unlink
245             // it from its dependencies and dependents
246             i.clean();
247
248             this.insns.remove(id);
249             LOG.debug("Instruction {} cleaned successfully", id);
250         }
251
252         final CleanInstructionsOutputBuilder ob = new CleanInstructionsOutputBuilder();
253         ob.setUnflushed(failed);
254
255         return SuccessfulRpcResult.create(ob.build());
256     }
257
258     private List<InstructionImpl> checkDependencies(final SubmitInstructionInput input) throws SchedulerException {
259         final List<InstructionImpl> dependencies = collectDependencies(input);
260         // Check if all dependencies are non-failed
261         final List<InstructionId> unmet = checkIfUnfailed(dependencies);
262         /*
263          *  Some dependencies have failed, declare the request dead-on-arrival
264          *  and fail the operation.
265          */
266         if (!unmet.isEmpty()) {
267             throw new SchedulerException("Instruction's dependencies are already unsuccessful", new FailureBuilder().setType(
268                     DeadOnArrival.class).setFailedPreconditions(unmet).build());
269         }
270         return dependencies;
271     }
272
273     private List<InstructionImpl> collectDependencies(final SubmitInstructionInput input) throws SchedulerException {
274         final List<InstructionImpl> dependencies = new ArrayList<>();
275         for (final InstructionId pid : input.getPreconditions()) {
276             final InstructionImpl i = this.insns.get(pid);
277             if (i == null) {
278                 LOG.info("Instruction {} depends on {}, which is not a known instruction", input.getId(), pid);
279                 throw new SchedulerException("Unknown dependency ID specified", new FailureBuilder().setType(UnknownPreconditionId.class).build());
280             }
281             dependencies.add(i);
282         }
283         return dependencies;
284     }
285
286     private List<InstructionId> checkIfUnfailed(final List<InstructionImpl> dependencies) {
287         final List<InstructionId> unmet = new ArrayList<>();
288         for (final InstructionImpl d : dependencies) {
289             switch (d.getStatus()) {
290             case Cancelled:
291             case Failed:
292             case Unknown:
293                 unmet.add(d.getId());
294                 break;
295             case Executing:
296             case Queued:
297             case Scheduled:
298             case Successful:
299                 break;
300             default:
301                 break;
302             }
303         }
304         return unmet;
305     }
306
307     @Override
308     public synchronized ListenableFuture<Instruction> scheduleInstruction(final SubmitInstructionInput input) throws SchedulerException {
309         final InstructionId id = input.getId();
310         if (this.insns.get(id) != null) {
311             LOG.info("Instruction ID {} already present", id);
312             throw new SchedulerException("Instruction ID currently in use", new FailureBuilder().setType(DuplicateInstructionId.class).build());
313         }
314
315         // First things first: check the deadline
316         final Nanotime now = NanotimeUtil.currentTime();
317         final BigInteger left = input.getDeadline().getValue().subtract(now.getValue());
318
319         if (left.compareTo(BigInteger.ZERO) <= 0) {
320             LOG.debug("Instruction {} deadline has already passed by {}ns", id, left);
321             throw new SchedulerException("Instruction arrived after specified deadline", new FailureBuilder().setType(DeadOnArrival.class).build());
322         }
323
324         // Resolve dependencies
325         final List<InstructionImpl> dependencies = checkDependencies(input);
326
327         /*
328          * All pre-flight checks done are at this point, the following
329          * steps can only fail in catastrophic scenarios (OOM and the
330          * like).
331          */
332
333         // Schedule a timeout for the instruction
334         final Timeout t = this.timer.newTimeout(timeout -> timeoutInstruction(input.getId()), left.longValue(), TimeUnit.NANOSECONDS);
335
336         // Put it into the instruction list
337         final SettableFuture<Instruction> ret = SettableFuture.create();
338         final InstructionImpl i = new InstructionImpl(new InstructionPusher(id, input.getDeadline()), ret, id, dependencies, t);
339         this.insns.put(id, i);
340
341         // Attach it into its dependencies
342         for (final InstructionImpl d : dependencies) {
343             d.addDependant(i);
344         }
345
346         /*
347          * All done. The next part is checking whether the instruction can
348          * run, which we can figure out after sending out the acknowledgement.
349          * This task should be ingress-weighed, so we reinsert it into the
350          * same execution service.
351          */
352         this.executor.submit(() -> tryScheduleInstruction(i));
353
354         return ret;
355     }
356
357     public String getInstructionID() {
358         return this.instructionId;
359     }
360
361     private synchronized void timeoutInstruction(final InstructionId id) {
362         final InstructionImpl i = this.insns.get(id);
363         if (i == null) {
364             LOG.warn("Instruction {} timed out, but not found in the queue", id);
365             return;
366         }
367
368         i.timeout();
369     }
370
371     private synchronized void tryScheduleDependants(final InstructionImpl i) {
372         // Walk all dependants and try to schedule them
373         final Iterator<InstructionImpl> it = i.getDependants();
374         while (it.hasNext()) {
375             tryScheduleInstruction(it.next());
376         }
377     }
378
379     private synchronized void tryScheduleInstruction(final InstructionImpl i) {
380         final ListenableFuture<ExecutionResult<Details>> f = i.ready();
381         if (f != null) {
382             Futures.addCallback(f, new FutureCallback<ExecutionResult<Details>>() {
383                 @Override
384                 public void onSuccess(final ExecutionResult<Details> result) {
385                     tryScheduleDependants(i);
386                 }
387
388                 @Override
389                 public void onFailure(final Throwable t) {
390                     LOG.error("Instruction {} failed to execute", i.getId(), t);
391                 }
392             });
393         }
394
395     }
396
397     @Override
398     public ListenableFuture<Void> closeServiceInstance() {
399         LOG.info("Closing Instruction Queue service {}", this.sgi.getValue());
400
401         this.reg.close();
402         for (final InstructionImpl i : this.insns.values()) {
403             i.tryCancel(null);
404         }
405         // Workaround for BUG-2283
406         final WriteTransaction t = this.dataProvider.newWriteOnlyTransaction();
407         t.delete(LogicalDatastoreType.OPERATIONAL, this.qid);
408         final CheckedFuture<Void, TransactionCommitFailedException> future = t.submit();
409         Futures.addCallback(future, new FutureCallback<Void>() {
410             @Override
411             public void onSuccess(final Void result) {
412                 LOG.debug("Instruction Queue {} removed", ProgrammingServiceImpl.this.qid);
413             }
414
415             @Override
416             public void onFailure(final Throwable t) {
417                 LOG.error("Failed to shutdown Instruction Queue {}", ProgrammingServiceImpl.this.qid, t);
418             }
419         });
420         return future;
421     }
422
423     @Override
424     public synchronized void close() {
425         if (this.csspReg != null) {
426             try {
427                 this.csspReg.close();
428             } catch (final Exception e) {
429                 LOG.debug("Failed to close Instruction Scheduler service");
430             }
431         }
432         if (this.serviceRegistration != null) {
433             this.serviceRegistration.unregister();
434             this.serviceRegistration = null;
435         }
436     }
437
438     void setServiceRegistration(final ServiceRegistration<?> serviceRegistration) {
439         this.serviceRegistration = serviceRegistration;
440     }
441 }