Do not use RpcService in toaster-provider
[controller.git] / opendaylight / md-sal / samples / toaster-provider / src / main / java / org / opendaylight / controller / sample / toaster / provider / OpendaylightToaster.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sample.toaster.provider;
9
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType.DELETE;
12 import static org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType.WRITE;
13 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
14 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
15 import static org.opendaylight.yangtools.yang.common.ErrorType.APPLICATION;
16
17 import com.google.common.collect.ImmutableClassToInstanceMap;
18 import com.google.common.util.concurrent.FluentFuture;
19 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.MoreExecutors;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.util.Collection;
25 import java.util.Optional;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.atomic.AtomicLong;
32 import java.util.concurrent.atomic.AtomicReference;
33 import java.util.function.Function;
34 import javax.annotation.PreDestroy;
35 import javax.inject.Inject;
36 import javax.inject.Singleton;
37 import org.eclipse.jdt.annotation.NonNull;
38 import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
39 import org.opendaylight.mdsal.binding.api.DataBroker;
40 import org.opendaylight.mdsal.binding.api.DataObjectModification;
41 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
42 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
43 import org.opendaylight.mdsal.binding.api.DataTreeModification;
44 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
45 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
46 import org.opendaylight.mdsal.binding.api.RpcProviderService;
47 import org.opendaylight.mdsal.binding.api.WriteTransaction;
48 import org.opendaylight.mdsal.common.api.CommitInfo;
49 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
50 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
51 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.CancelToast;
52 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.CancelToastInput;
53 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.CancelToastOutput;
54 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.CancelToastOutputBuilder;
55 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.DisplayString;
56 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.MakeToast;
57 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.MakeToastInput;
58 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.MakeToastOutput;
59 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.MakeToastOutputBuilder;
60 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.RestockToaster;
61 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.RestockToasterInput;
62 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.RestockToasterOutput;
63 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.RestockToasterOutputBuilder;
64 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.Toaster;
65 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.Toaster.ToasterStatus;
66 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.ToasterBuilder;
67 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.ToasterOutOfBreadBuilder;
68 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.ToasterRestocked;
69 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.ToasterRestockedBuilder;
70 import org.opendaylight.yangtools.concepts.ListenerRegistration;
71 import org.opendaylight.yangtools.concepts.Registration;
72 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
73 import org.opendaylight.yangtools.yang.binding.Rpc;
74 import org.opendaylight.yangtools.yang.common.ErrorTag;
75 import org.opendaylight.yangtools.yang.common.ErrorType;
76 import org.opendaylight.yangtools.yang.common.RpcError;
77 import org.opendaylight.yangtools.yang.common.RpcResult;
78 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
79 import org.opendaylight.yangtools.yang.common.Uint32;
80 import org.osgi.service.component.annotations.Activate;
81 import org.osgi.service.component.annotations.Component;
82 import org.osgi.service.component.annotations.Deactivate;
83 import org.osgi.service.component.annotations.Reference;
84 import org.osgi.service.metatype.annotations.AttributeDefinition;
85 import org.osgi.service.metatype.annotations.Designate;
86 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
87 import org.slf4j.Logger;
88 import org.slf4j.LoggerFactory;
89
90 @Singleton
91 @Component(service = MakeToast.class, immediate = true)
92 @Designate(ocd = OpendaylightToaster.Configuration.class)
93 public final class OpendaylightToaster extends AbstractMXBean
94         implements MakeToast, ToasterProviderRuntimeMXBean, DataTreeChangeListener<Toaster>, AutoCloseable {
95     @ObjectClassDefinition
96     public @interface Configuration {
97         @AttributeDefinition(description = "The name of the toaster's manufacturer", max = "255")
98         String manufacturer() default TOASTER_MANUFACTURER;
99         @AttributeDefinition(description = "The name of the toaster's model", max = "255")
100         String modelNumber() default TOASTER_MODEL_NUMBER;
101         @AttributeDefinition(description = "How many times we attempt to make toast before failing ",
102             min = "0", max = "65535")
103         int maxMakeToastTries() default 2;
104     }
105
106     private static final CancelToastOutput EMPTY_CANCEL_OUTPUT = new CancelToastOutputBuilder().build();
107     private static final MakeToastOutput EMPTY_MAKE_OUTPUT = new MakeToastOutputBuilder().build();
108     private static final RestockToasterOutput EMPTY_RESTOCK_OUTPUT = new RestockToasterOutputBuilder().build();
109
110     private static final Logger LOG = LoggerFactory.getLogger(OpendaylightToaster.class);
111
112     private static final InstanceIdentifier<Toaster> TOASTER_IID = InstanceIdentifier.builder(Toaster.class).build();
113     private static final String TOASTER_MANUFACTURER = "Opendaylight";
114     private static final String TOASTER_MODEL_NUMBER = "Model 1 - Binding Aware";
115
116     private final DataBroker dataBroker;
117     private final NotificationPublishService notificationProvider;
118     private final ListenerRegistration<OpendaylightToaster> dataTreeChangeListenerRegistration;
119     private final Registration reg;
120
121     private final ExecutorService executor;
122
123     // This holds the Future for the current make toast task and is used to cancel the current toast.
124     private final AtomicReference<Future<?>> currentMakeToastTask = new AtomicReference<>();
125
126     // Thread safe holders
127     private final AtomicLong amountOfBreadInStock = new AtomicLong(100);
128     private final AtomicLong toastsMade = new AtomicLong(0);
129     private final AtomicLong darknessFactor = new AtomicLong(1000);
130
131     private final @NonNull DisplayString manufacturer;
132     private final @NonNull DisplayString modelNumber;
133     private final int maxMakeToastTries;
134
135     public OpendaylightToaster(final DataBroker dataProvider,
136             final NotificationPublishService notificationPublishService, final RpcProviderService rpcProviderService,
137             final String manufacturer, final String modelNumber, final int maxMakeToastTries) {
138         super("OpendaylightToaster", "toaster-provider", null);
139         notificationProvider = requireNonNull(notificationPublishService);
140         dataBroker = requireNonNull(dataProvider);
141
142         this.manufacturer = new DisplayString(manufacturer);
143         this.modelNumber = new DisplayString(modelNumber);
144         this.maxMakeToastTries = maxMakeToastTries;
145
146         executor = Executors.newFixedThreadPool(1);
147         reg = rpcProviderService.registerRpcImplementations(ImmutableClassToInstanceMap.<Rpc<?, ?>>builder()
148             .put(CancelToast.class, this::cancelToast)
149             .put(MakeToast.class, this)
150             .put(RestockToaster.class, this::restockToaster)
151             .build());
152
153         LOG.info("Initializing...");
154
155         dataTreeChangeListenerRegistration = requireNonNull(dataBroker, "dataBroker must be set")
156             .registerDataTreeChangeListener(DataTreeIdentifier.create(CONFIGURATION, TOASTER_IID), this);
157         try {
158             setToasterStatusUp(null).get();
159         } catch (InterruptedException | ExecutionException e) {
160             throw new IllegalStateException("Failed to commit initial data", e);
161         }
162
163         // Register our MXBean.
164         register();
165     }
166
167     @Inject
168     public OpendaylightToaster(final DataBroker dataProvider,
169             final NotificationPublishService notificationPublishService, final RpcProviderService rpcProviderService) {
170         this(dataProvider, notificationPublishService, rpcProviderService, TOASTER_MANUFACTURER, TOASTER_MODEL_NUMBER,
171             2);
172     }
173
174     @Activate
175     public OpendaylightToaster(@Reference final DataBroker dataProvider,
176             @Reference final NotificationPublishService notificationPublishService,
177             @Reference final RpcProviderService rpcProviderService, final @NonNull Configuration configuration) {
178         this(dataProvider, notificationPublishService, rpcProviderService, configuration.manufacturer(),
179             configuration.modelNumber(), configuration.maxMakeToastTries());
180     }
181
182     /**
183      * Implemented from the AutoCloseable interface.
184      */
185     @Override
186     @PreDestroy
187     @Deactivate
188     public void close() {
189         LOG.info("Closing...");
190
191         // Unregister our MXBean.
192         unregister();
193         reg.close();
194
195         // When we close this service we need to shutdown our executor!
196         executor.shutdown();
197
198         if (dataTreeChangeListenerRegistration != null) {
199             dataTreeChangeListenerRegistration.close();
200         }
201
202         if (dataBroker != null) {
203             WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
204             tx.delete(OPERATIONAL,TOASTER_IID);
205             Futures.addCallback(tx.commit(), new FutureCallback<CommitInfo>() {
206                 @Override
207                 public void onSuccess(final CommitInfo result) {
208                     LOG.debug("Successfully deleted the operational Toaster");
209                 }
210
211                 @Override
212                 public void onFailure(final Throwable failure) {
213                     LOG.error("Delete of the operational Toaster failed", failure);
214                 }
215             }, MoreExecutors.directExecutor());
216         }
217     }
218
219     private Toaster buildToaster(final ToasterStatus status) {
220         // note - we are simulating a device whose manufacture and model are
221         // fixed (embedded) into the hardware.
222         // This is why the manufacture and model number are hardcoded.
223         return new ToasterBuilder()
224             .setToasterManufacturer(manufacturer)
225             .setToasterModelNumber(modelNumber)
226             .setToasterStatus(status)
227             .build();
228     }
229
230     /**
231      * Implemented from the DataTreeChangeListener interface.
232      */
233     @Override
234     public void onDataTreeChanged(final Collection<DataTreeModification<Toaster>> changes) {
235         for (DataTreeModification<Toaster> change: changes) {
236             DataObjectModification<Toaster> rootNode = change.getRootNode();
237             if (rootNode.getModificationType() == WRITE) {
238                 Toaster oldToaster = rootNode.getDataBefore();
239                 Toaster newToaster = rootNode.getDataAfter();
240                 LOG.info("onDataTreeChanged - Toaster config with path {} was added or replaced: "
241                         + "old Toaster: {}, new Toaster: {}", change.getRootPath().getRootIdentifier(),
242                         oldToaster, newToaster);
243
244                 Uint32 darkness = newToaster.getDarknessFactor();
245                 if (darkness != null) {
246                     darknessFactor.set(darkness.toJava());
247                 }
248             } else if (rootNode.getModificationType() == DELETE) {
249                 LOG.info("onDataTreeChanged - Toaster config with path {} was deleted: old Toaster: {}",
250                         change.getRootPath().getRootIdentifier(), rootNode.getDataBefore());
251             }
252         }
253     }
254
255     /**
256      * RPC call implemented from the ToasterService interface that cancels the current toast, if any.
257      */
258     private ListenableFuture<RpcResult<CancelToastOutput>> cancelToast(final CancelToastInput input) {
259         Future<?> current = currentMakeToastTask.getAndSet(null);
260         if (current != null) {
261             current.cancel(true);
262         }
263
264         // Always return success from the cancel toast call
265         return Futures.immediateFuture(RpcResultBuilder.success(EMPTY_CANCEL_OUTPUT).build());
266     }
267
268     /**
269      * RPC call implemented from the ToasterService interface that attempts to make toast.
270      */
271     @Override
272     public ListenableFuture<RpcResult<MakeToastOutput>> invoke(final MakeToastInput input) {
273         LOG.info("makeToast: {}", input);
274         final var futureResult = SettableFuture.<RpcResult<MakeToastOutput>>create();
275         checkStatusAndMakeToast(input, futureResult, maxMakeToastTries);
276         return futureResult;
277     }
278
279     private static RpcError makeToasterOutOfBreadError() {
280         return RpcResultBuilder.newError(APPLICATION, ErrorTag.RESOURCE_DENIED, "Toaster is out of bread",
281             "out-of-stock", null, null);
282     }
283
284     private static RpcError makeToasterInUseError() {
285         return RpcResultBuilder.newWarning(APPLICATION, ErrorTag.IN_USE, "Toaster is busy", null, null, null);
286     }
287
288     private void checkStatusAndMakeToast(final MakeToastInput input,
289             final SettableFuture<RpcResult<MakeToastOutput>> futureResult, final int tries) {
290         // Read the ToasterStatus and, if currently Up, try to write the status to Down.
291         // If that succeeds, then we essentially have an exclusive lock and can proceed
292         // to make toast.
293         final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
294         FluentFuture<Optional<Toaster>> readFuture = tx.read(OPERATIONAL, TOASTER_IID);
295
296         final ListenableFuture<? extends CommitInfo> commitFuture =
297             Futures.transformAsync(readFuture, toasterData -> {
298                 ToasterStatus toasterStatus = ToasterStatus.Up;
299                 if (toasterData.isPresent()) {
300                     toasterStatus = toasterData.orElseThrow().getToasterStatus();
301                 }
302
303                 LOG.debug("Read toaster status: {}", toasterStatus);
304
305                 if (toasterStatus == ToasterStatus.Up) {
306
307                     if (outOfBread()) {
308                         LOG.debug("Toaster is out of bread");
309                         tx.cancel();
310                         return Futures.immediateFailedFuture(
311                                 new TransactionCommitFailedException("", makeToasterOutOfBreadError()));
312                     }
313
314                     LOG.debug("Setting Toaster status to Down");
315
316                     // We're not currently making toast - try to update the status to Down
317                     // to indicate we're going to make toast. This acts as a lock to prevent
318                     // concurrent toasting.
319                     tx.put(OPERATIONAL, TOASTER_IID, buildToaster(ToasterStatus.Down));
320                     return tx.commit();
321                 }
322
323                 LOG.debug("Oops - already making toast!");
324
325                 // Return an error since we are already making toast. This will get
326                 // propagated to the commitFuture below which will interpret the null
327                 // TransactionStatus in the RpcResult as an error condition.
328                 tx.cancel();
329                 return Futures.immediateFailedFuture(
330                         new TransactionCommitFailedException("", makeToasterInUseError()));
331             }, MoreExecutors.directExecutor());
332
333         Futures.addCallback(commitFuture, new FutureCallback<CommitInfo>() {
334             @Override
335             public void onSuccess(final CommitInfo result) {
336                 // OK to make toast
337                 currentMakeToastTask.set(executor.submit(new MakeToastTask(input, futureResult)));
338             }
339
340             @Override
341             public void onFailure(final Throwable ex) {
342                 if (ex instanceof OptimisticLockFailedException) {
343
344                     // Another thread is likely trying to make toast simultaneously and updated the
345                     // status before us. Try reading the status again - if another make toast is
346                     // now in progress, we should get ToasterStatus.Down and fail.
347
348                     if (tries - 1 > 0) {
349                         LOG.debug("Got OptimisticLockFailedException - trying again");
350                         checkStatusAndMakeToast(input, futureResult, tries - 1);
351                     } else {
352                         futureResult.set(RpcResultBuilder.<MakeToastOutput>failed()
353                                 .withError(ErrorType.APPLICATION, ex.getMessage()).build());
354                     }
355                 } else if (ex instanceof TransactionCommitFailedException) {
356                     LOG.debug("Failed to commit Toaster status", ex);
357
358                     // Probably already making toast.
359                     futureResult.set(RpcResultBuilder.<MakeToastOutput>failed()
360                             .withRpcErrors(((TransactionCommitFailedException)ex).getErrorList()).build());
361                 } else {
362                     LOG.debug("Unexpected error committing Toaster status", ex);
363                     futureResult.set(RpcResultBuilder.<MakeToastOutput>failed().withError(ErrorType.APPLICATION,
364                             "Unexpected error committing Toaster status", ex).build());
365                 }
366             }
367         }, MoreExecutors.directExecutor());
368     }
369
370     /**
371      * RestConf RPC call implemented from the ToasterService interface.
372      * Restocks the bread for the toaster, resets the toastsMade counter to 0, and sends a
373      * ToasterRestocked notification.
374      */
375     private ListenableFuture<RpcResult<RestockToasterOutput>> restockToaster(final RestockToasterInput input) {
376         LOG.info("restockToaster: {}", input);
377
378         amountOfBreadInStock.set(input.getAmountOfBreadToStock().toJava());
379
380         if (amountOfBreadInStock.get() > 0) {
381             ToasterRestocked reStockedNotification = new ToasterRestockedBuilder()
382                     .setAmountOfBread(input.getAmountOfBreadToStock()).build();
383             notificationProvider.offerNotification(reStockedNotification);
384         }
385
386         return Futures.immediateFuture(RpcResultBuilder.success(EMPTY_RESTOCK_OUTPUT).build());
387     }
388
389     /**
390      * JMX RPC call implemented from the ToasterProviderRuntimeMXBean interface.
391      */
392     @Override
393     public void clearToastsMade() {
394         LOG.info("clearToastsMade");
395         toastsMade.set(0);
396     }
397
398     /**
399      * Accesssor method implemented from the ToasterProviderRuntimeMXBean interface.
400      */
401     @Override
402     public Long getToastsMade() {
403         return toastsMade.get();
404     }
405
406     private ListenableFuture<?> setToasterStatusUp(final Function<Boolean, MakeToastOutput> resultCallback) {
407         WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
408         tx.put(OPERATIONAL,TOASTER_IID, buildToaster(ToasterStatus.Up));
409
410         final var future = tx.commit();
411         Futures.addCallback(future, new FutureCallback<CommitInfo>() {
412             @Override
413             public void onSuccess(final CommitInfo result) {
414                 LOG.info("Successfully set ToasterStatus to Up");
415                 notifyCallback(true);
416             }
417
418             @Override
419             public void onFailure(final Throwable failure) {
420                 // We shouldn't get an OptimisticLockFailedException (or any ex) as no
421                 // other component should be updating the operational state.
422                 LOG.error("Failed to update toaster status", failure);
423
424                 notifyCallback(false);
425             }
426
427             void notifyCallback(final boolean result) {
428                 if (resultCallback != null) {
429                     resultCallback.apply(result);
430                 }
431             }
432         }, MoreExecutors.directExecutor());
433
434         return future;
435     }
436
437     private boolean outOfBread() {
438         return amountOfBreadInStock.get() == 0;
439     }
440
441     private class MakeToastTask implements Callable<Void> {
442
443         final MakeToastInput toastRequest;
444         final SettableFuture<RpcResult<MakeToastOutput>> futureResult;
445
446         MakeToastTask(final MakeToastInput toastRequest,
447             final SettableFuture<RpcResult<MakeToastOutput>> futureResult) {
448             this.toastRequest = toastRequest;
449             this.futureResult = futureResult;
450         }
451
452         @Override
453         public Void call() {
454             try {
455                 // make toast just sleeps for n seconds per doneness level.
456                 Thread.sleep(darknessFactor.get()
457                         * toastRequest.getToasterDoneness().toJava());
458
459             } catch (InterruptedException e) {
460                 LOG.info("Interrupted while making the toast");
461             }
462
463             toastsMade.incrementAndGet();
464
465             amountOfBreadInStock.getAndDecrement();
466             if (outOfBread()) {
467                 LOG.info("Toaster is out of bread!");
468
469                 notificationProvider.offerNotification(new ToasterOutOfBreadBuilder().build());
470             }
471
472             // Set the Toaster status back to up - this essentially releases the toasting lock.
473             // We can't clear the current toast task nor set the Future result until the
474             // update has been committed so we pass a callback to be notified on completion.
475
476             setToasterStatusUp(result -> {
477                 currentMakeToastTask.set(null);
478                 LOG.debug("Toast done");
479                 futureResult.set(RpcResultBuilder.success(EMPTY_MAKE_OUTPUT).build());
480                 return null;
481             });
482
483             return null;
484         }
485     }
486 }