BUG-6975: Integrate Programming with Cluster Singleton Service
[bgpcep.git] / programming / impl / src / main / java / org / opendaylight / bgpcep / programming / impl / ProgrammingServiceImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.programming.impl;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import com.google.common.util.concurrent.SettableFuture;
17 import io.netty.util.Timeout;
18 import io.netty.util.Timer;
19 import java.math.BigInteger;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.TimeUnit;
27 import org.opendaylight.bgpcep.programming.NanotimeUtil;
28 import org.opendaylight.bgpcep.programming.spi.ExecutionResult;
29 import org.opendaylight.bgpcep.programming.spi.Instruction;
30 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
31 import org.opendaylight.bgpcep.programming.spi.SchedulerException;
32 import org.opendaylight.bgpcep.programming.spi.SuccessfulRpcResult;
33 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
34 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
35 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
36 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
37 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
38 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
39 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
41 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
42 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
43 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionOutputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CleanInstructionsInput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CleanInstructionsOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CleanInstructionsOutputBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.DeadOnArrival;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.DuplicateInstructionId;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionId;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionStatus;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionStatusChangedBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionsQueue;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionsQueueBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionsQueueKey;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.Nanotime;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.ProgrammingService;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.SubmitInstructionInput;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.UnknownInstruction;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.UnknownPreconditionId;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction.queue.InstructionBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction.queue.InstructionKey;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction.status.changed.Details;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.submit.instruction.output.result.failure._case.FailureBuilder;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
69 import org.opendaylight.yangtools.yang.common.RpcResult;
70 import org.osgi.framework.ServiceRegistration;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73
74 public final class ProgrammingServiceImpl implements AutoCloseable, ClusterSingletonService, InstructionScheduler,
75     ProgrammingService {
76     private static final Logger LOG = LoggerFactory.getLogger(ProgrammingServiceImpl.class);
77
78     private final Map<InstructionId, InstructionImpl> insns = new HashMap<>();
79     private final InstanceIdentifier<InstructionsQueue> qid;
80     private final NotificationProviderService notifs;
81     private final ListeningExecutorService executor;
82     private final DataBroker dataProvider;
83     private final Timer timer;
84     private final String instructionId;
85     private final ServiceGroupIdentifier sgi;
86     private final ClusterSingletonServiceRegistration csspReg;
87     private final RpcProviderRegistry rpcProviderRegistry;
88     private RpcRegistration<ProgrammingService> reg;
89     private ServiceRegistration<?> serviceRegistration;
90
91     private final class InstructionPusher implements QueueInstruction {
92         private final InstructionBuilder builder = new InstructionBuilder();
93
94         InstructionPusher(final InstructionId id, final Nanotime deadline) {
95             this.builder.setDeadline(deadline);
96             this.builder.setId(id);
97             this.builder.setKey(new InstructionKey(id));
98             this.builder.setStatus(InstructionStatus.Queued);
99         }
100
101         @Override
102         public void instructionUpdated(final InstructionStatus status, final Details details) {
103             if (!status.equals(this.builder.getStatus())) {
104                 this.builder.setStatus(status);
105
106                 final WriteTransaction t = ProgrammingServiceImpl.this.dataProvider.newWriteOnlyTransaction();
107                 t.put(LogicalDatastoreType.OPERATIONAL,
108                         ProgrammingServiceImpl.this.qid.child(
109                                 org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction.queue.Instruction.class,
110                                 new InstructionKey(this.builder.getId())), this.builder.build());
111                 Futures.addCallback(t.submit(), new FutureCallback<Void>() {
112                     @Override
113                     public void onSuccess(final Void result) {
114                         LOG.debug("Instruction Queue {} updated", ProgrammingServiceImpl.this.qid);
115                     }
116
117                     @Override
118                     public void onFailure(final Throwable t) {
119                         LOG.error("Failed to update Instruction Queue {}", ProgrammingServiceImpl.this.qid, t);
120                     }
121                 });
122             }
123
124             ProgrammingServiceImpl.this.notifs.publish(new InstructionStatusChangedBuilder().setId(this.builder.getId()).setStatus(status).setDetails(details).build());
125         }
126
127         @Override
128         public void instructionRemoved() {
129             final WriteTransaction t = ProgrammingServiceImpl.this.dataProvider.newWriteOnlyTransaction();
130             t.delete(LogicalDatastoreType.OPERATIONAL, ProgrammingServiceImpl.this.qid.child(
131                     org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction.queue.Instruction.class,
132                     new InstructionKey(this.builder.getId())));
133             Futures.addCallback(t.submit(), new FutureCallback<Void>() {
134                 @Override
135                 public void onSuccess(final Void result) {
136                     LOG.debug("Instruction Queue {} removed", ProgrammingServiceImpl.this.qid);
137                 }
138
139                 @Override
140                 public void onFailure(final Throwable t) {
141                     LOG.error("Failed to remove Instruction Queue {}", ProgrammingServiceImpl.this.qid, t);
142                 }
143             });
144         }
145     }
146
147     ProgrammingServiceImpl(final DataBroker dataProvider, final NotificationProviderService notifs,
148         final ListeningExecutorService executor, final RpcProviderRegistry rpcProviderRegistry,
149         final ClusterSingletonServiceProvider cssp, final Timer timer, final String instructionId) {
150         this.dataProvider = Preconditions.checkNotNull(dataProvider);
151         this.instructionId = Preconditions.checkNotNull(instructionId);
152         this.notifs = Preconditions.checkNotNull(notifs);
153         this.executor = Preconditions.checkNotNull(executor);
154         this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
155         this.timer = Preconditions.checkNotNull(timer);
156         this.qid = KeyedInstanceIdentifier.builder(InstructionsQueue.class,  new InstructionsQueueKey(this.instructionId)).build();
157         this.sgi = ServiceGroupIdentifier.create("programming-"+ this.instructionId + "-service-group");
158         this.csspReg = cssp.registerClusterSingletonService(this);
159     }
160
161     @Override
162     public void instantiateServiceInstance() {
163         LOG.info("Instruction Queue service {} instantiated", this.sgi.getValue());
164
165         this.reg = this.rpcProviderRegistry.addRpcImplementation(ProgrammingService.class, this);
166
167         final WriteTransaction t = this.dataProvider.newWriteOnlyTransaction();
168         t.put(LogicalDatastoreType.OPERATIONAL, this.qid, new InstructionsQueueBuilder()
169             .setKey(new InstructionsQueueKey(this.instructionId)).setInstruction(Collections.emptyList()).build());
170         Futures.addCallback(t.submit(), new FutureCallback<Void>() {
171             @Override
172             public void onSuccess(final Void result) {
173                 LOG.debug("Instruction Queue {} added", ProgrammingServiceImpl.this.qid);
174             }
175
176             @Override
177             public void onFailure(final Throwable t) {
178                 LOG.error("Failed to add Instruction Queue {}", ProgrammingServiceImpl.this.qid, t);
179             }
180         });
181     }
182
183     @Override
184     public ServiceGroupIdentifier getIdentifier() {
185         return this.sgi;
186     }
187
188     @Override
189     public ListenableFuture<RpcResult<CancelInstructionOutput>> cancelInstruction(final CancelInstructionInput input) {
190         return this.executor.submit(() -> realCancelInstruction(input));
191     }
192
193     @Override
194     public ListenableFuture<RpcResult<CleanInstructionsOutput>> cleanInstructions(final CleanInstructionsInput input) {
195         return this.executor.submit(() -> realCleanInstructions(input));
196     }
197
198     private synchronized RpcResult<CancelInstructionOutput> realCancelInstruction(final CancelInstructionInput input) {
199         final InstructionImpl i = this.insns.get(input.getId());
200         if (i == null) {
201             LOG.debug("Instruction {} not present in the graph", input.getId());
202
203             final CancelInstructionOutput out = new CancelInstructionOutputBuilder().setFailure(UnknownInstruction.class).build();
204             return SuccessfulRpcResult.create(out);
205         }
206
207         return SuccessfulRpcResult.create(new CancelInstructionOutputBuilder().setFailure(i.tryCancel(null)).build());
208     }
209
210     private synchronized RpcResult<CleanInstructionsOutput> realCleanInstructions(final CleanInstructionsInput input) {
211         final List<InstructionId> failed = new ArrayList<>();
212
213         for (final InstructionId id : input.getId()) {
214             // Find the instruction
215             final InstructionImpl i = this.insns.get(id);
216             if (i == null) {
217                 LOG.debug("Instruction {} not present in the graph", input.getId());
218                 failed.add(id);
219                 continue;
220             }
221
222             // Check its status
223             switch (i.getStatus()) {
224             case Cancelled:
225             case Failed:
226             case Successful:
227                 break;
228             case Executing:
229             case Queued:
230             case Scheduled:
231             case Unknown:
232                 LOG.debug("Instruction {} cannot be cleaned because of it's in state {}", id, i.getStatus());
233                 failed.add(id);
234                 continue;
235             default:
236                 break;
237             }
238
239             // The instruction is in a terminal state, we need to just unlink
240             // it from its dependencies and dependents
241             i.clean();
242
243             this.insns.remove(id);
244             LOG.debug("Instruction {} cleaned successfully", id);
245         }
246
247         final CleanInstructionsOutputBuilder ob = new CleanInstructionsOutputBuilder();
248         ob.setUnflushed(failed);
249
250         return SuccessfulRpcResult.create(ob.build());
251     }
252
253     private List<InstructionImpl> checkDependencies(final SubmitInstructionInput input) throws SchedulerException {
254         final List<InstructionImpl> dependencies = collectDependencies(input);
255         // Check if all dependencies are non-failed
256         final List<InstructionId> unmet = checkIfUnfailed(dependencies);
257         /*
258          *  Some dependencies have failed, declare the request dead-on-arrival
259          *  and fail the operation.
260          */
261         if (!unmet.isEmpty()) {
262             throw new SchedulerException("Instruction's dependencies are already unsuccessful", new FailureBuilder().setType(
263                     DeadOnArrival.class).setFailedPreconditions(unmet).build());
264         }
265         return dependencies;
266     }
267
268     private List<InstructionImpl> collectDependencies(final SubmitInstructionInput input) throws SchedulerException {
269         final List<InstructionImpl> dependencies = new ArrayList<>();
270         for (final InstructionId pid : input.getPreconditions()) {
271             final InstructionImpl i = this.insns.get(pid);
272             if (i == null) {
273                 LOG.info("Instruction {} depends on {}, which is not a known instruction", input.getId(), pid);
274                 throw new SchedulerException("Unknown dependency ID specified", new FailureBuilder().setType(UnknownPreconditionId.class).build());
275             }
276             dependencies.add(i);
277         }
278         return dependencies;
279     }
280
281     private List<InstructionId> checkIfUnfailed(final List<InstructionImpl> dependencies) {
282         final List<InstructionId> unmet = new ArrayList<>();
283         for (final InstructionImpl d : dependencies) {
284             switch (d.getStatus()) {
285             case Cancelled:
286             case Failed:
287             case Unknown:
288                 unmet.add(d.getId());
289                 break;
290             case Executing:
291             case Queued:
292             case Scheduled:
293             case Successful:
294                 break;
295             default:
296                 break;
297             }
298         }
299         return unmet;
300     }
301
302     @Override
303     public synchronized ListenableFuture<Instruction> scheduleInstruction(final SubmitInstructionInput input) throws SchedulerException {
304         final InstructionId id = input.getId();
305         if (this.insns.get(id) != null) {
306             LOG.info("Instruction ID {} already present", id);
307             throw new SchedulerException("Instruction ID currently in use", new FailureBuilder().setType(DuplicateInstructionId.class).build());
308         }
309
310         // First things first: check the deadline
311         final Nanotime now = NanotimeUtil.currentTime();
312         final BigInteger left = input.getDeadline().getValue().subtract(now.getValue());
313
314         if (left.compareTo(BigInteger.ZERO) <= 0) {
315             LOG.debug("Instruction {} deadline has already passed by {}ns", id, left);
316             throw new SchedulerException("Instruction arrived after specified deadline", new FailureBuilder().setType(DeadOnArrival.class).build());
317         }
318
319         // Resolve dependencies
320         final List<InstructionImpl> dependencies = checkDependencies(input);
321
322         /*
323          * All pre-flight checks done are at this point, the following
324          * steps can only fail in catastrophic scenarios (OOM and the
325          * like).
326          */
327
328         // Schedule a timeout for the instruction
329         final Timeout t = this.timer.newTimeout(timeout -> timeoutInstruction(input.getId()), left.longValue(), TimeUnit.NANOSECONDS);
330
331         // Put it into the instruction list
332         final SettableFuture<Instruction> ret = SettableFuture.create();
333         final InstructionImpl i = new InstructionImpl(new InstructionPusher(id, input.getDeadline()), ret, id, dependencies, t);
334         this.insns.put(id, i);
335
336         // Attach it into its dependencies
337         for (final InstructionImpl d : dependencies) {
338             d.addDependant(i);
339         }
340
341         /*
342          * All done. The next part is checking whether the instruction can
343          * run, which we can figure out after sending out the acknowledgement.
344          * This task should be ingress-weighed, so we reinsert it into the
345          * same execution service.
346          */
347         this.executor.submit(() -> tryScheduleInstruction(i));
348
349         return ret;
350     }
351
352     public String getInstructionID() {
353         return this.instructionId;
354     }
355
356     private synchronized void timeoutInstruction(final InstructionId id) {
357         final InstructionImpl i = this.insns.get(id);
358         if (i == null) {
359             LOG.warn("Instruction {} timed out, but not found in the queue", id);
360             return;
361         }
362
363         i.timeout();
364     }
365
366     private synchronized void tryScheduleDependants(final InstructionImpl i) {
367         // Walk all dependants and try to schedule them
368         final Iterator<InstructionImpl> it = i.getDependants();
369         while (it.hasNext()) {
370             tryScheduleInstruction(it.next());
371         }
372     }
373
374     private synchronized void tryScheduleInstruction(final InstructionImpl i) {
375         final ListenableFuture<ExecutionResult<Details>> f = i.ready();
376         if (f != null) {
377             Futures.addCallback(f, new FutureCallback<ExecutionResult<Details>>() {
378                 @Override
379                 public void onSuccess(final ExecutionResult<Details> result) {
380                     tryScheduleDependants(i);
381                 }
382
383                 @Override
384                 public void onFailure(final Throwable t) {
385                     LOG.error("Instruction {} failed to execute", i.getId(), t);
386                 }
387             });
388         }
389
390     }
391
392     @Override
393     public ListenableFuture<Void> closeServiceInstance() {
394         LOG.info("Closing Instruction Queue service {}", this.sgi.getValue());
395
396         this.reg.close();
397         for (final InstructionImpl i : this.insns.values()) {
398             i.tryCancel(null);
399         }
400         // Workaround for BUG-2283
401         final WriteTransaction t = this.dataProvider.newWriteOnlyTransaction();
402         t.delete(LogicalDatastoreType.OPERATIONAL, this.qid);
403         final CheckedFuture<Void, TransactionCommitFailedException> future = t.submit();
404         Futures.addCallback(future, new FutureCallback<Void>() {
405             @Override
406             public void onSuccess(final Void result) {
407                 LOG.debug("Instruction Queue {} removed", ProgrammingServiceImpl.this.qid);
408             }
409
410             @Override
411             public void onFailure(final Throwable t) {
412                 LOG.error("Failed to shutdown Instruction Queue {}", ProgrammingServiceImpl.this.qid, t);
413             }
414         });
415         return future;
416     }
417
418     @Override
419     public synchronized void close() {
420         if (this.csspReg != null) {
421             try {
422                 this.csspReg.close();
423             } catch (final Exception e) {
424                 LOG.debug("Failed to close Instruction Scheduler service");
425             }
426         }
427         if (this.serviceRegistration != null) {
428             this.serviceRegistration.unregister();
429             this.serviceRegistration = null;
430         }
431     }
432
433     void setServiceRegistration(final ServiceRegistration<?> serviceRegistration) {
434         this.serviceRegistration = serviceRegistration;
435     }
436 }