d2b0f90194a9a066558f6d4dca5d1d8751ec01aa
[controller.git] / opendaylight / md-sal / samples / toaster-provider / src / main / java / org / opendaylight / controller / sample / toaster / provider / OpendaylightToaster.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sample.toaster.provider;
9
10 import java.util.Arrays;
11 import java.util.Collections;
12 import java.util.List;
13 import java.util.concurrent.Callable;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.atomic.AtomicLong;
19 import java.util.concurrent.atomic.AtomicReference;
20
21 import org.opendaylight.controller.config.yang.config.toaster_provider.impl.ToasterProviderRuntimeMXBean;
22 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
23 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
24 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
25 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
26 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
27 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
28 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
29 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
30 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
31 import org.opendaylight.controller.sal.common.util.RpcErrors;
32 import org.opendaylight.controller.sal.common.util.Rpcs;
33 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.DisplayString;
34 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.MakeToastInput;
35 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.RestockToasterInput;
36 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.Toaster;
37 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.Toaster.ToasterStatus;
38 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.ToasterBuilder;
39 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.ToasterOutOfBreadBuilder;
40 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.ToasterRestocked;
41 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.ToasterRestockedBuilder;
42 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.ToasterService;
43 import org.opendaylight.yangtools.yang.binding.DataObject;
44 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
45 import org.opendaylight.yangtools.yang.common.RpcError;
46 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
47 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
48 import org.opendaylight.yangtools.yang.common.RpcResult;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 import com.google.common.base.Function;
53 import com.google.common.base.Optional;
54 import com.google.common.util.concurrent.AsyncFunction;
55 import com.google.common.util.concurrent.FutureCallback;
56 import com.google.common.util.concurrent.Futures;
57 import com.google.common.util.concurrent.ListenableFuture;
58 import com.google.common.util.concurrent.SettableFuture;
59
60 public class OpendaylightToaster implements ToasterService, ToasterProviderRuntimeMXBean,
61                                             DataChangeListener, AutoCloseable {
62
63     private static final Logger LOG = LoggerFactory.getLogger(OpendaylightToaster.class);
64
65     public static final InstanceIdentifier<Toaster> TOASTER_IID = InstanceIdentifier.builder(Toaster.class).build();
66
67     private static final DisplayString TOASTER_MANUFACTURER = new DisplayString("Opendaylight");
68     private static final DisplayString TOASTER_MODEL_NUMBER = new DisplayString("Model 1 - Binding Aware");
69
70     private NotificationProviderService notificationProvider;
71     private DataBroker dataProvider;
72
73     private final ExecutorService executor;
74
75     // The following holds the Future for the current make toast task.
76     // This is used to cancel the current toast.
77     private final AtomicReference<Future<?>> currentMakeToastTask = new AtomicReference<>();
78
79     private final AtomicLong amountOfBreadInStock = new AtomicLong( 100 );
80
81     private final AtomicLong toastsMade = new AtomicLong(0);
82
83     // Thread safe holder for our darkness multiplier.
84     private final AtomicLong darknessFactor = new AtomicLong( 1000 );
85
86     public OpendaylightToaster() {
87         executor = Executors.newFixedThreadPool(1);
88     }
89
90     public void setNotificationProvider(final NotificationProviderService salService) {
91         this.notificationProvider = salService;
92     }
93
94     public void setDataProvider(final DataBroker salDataProvider) {
95         this.dataProvider = salDataProvider;
96         setToasterStatusUp( null );
97     }
98
99     /**
100      * Implemented from the AutoCloseable interface.
101      */
102     @Override
103     public void close() throws ExecutionException, InterruptedException {
104         // When we close this service we need to shutdown our executor!
105         executor.shutdown();
106
107         if (dataProvider != null) {
108             WriteTransaction t = dataProvider.newWriteOnlyTransaction();
109             t.delete(LogicalDatastoreType.OPERATIONAL,TOASTER_IID);
110             ListenableFuture<RpcResult<TransactionStatus>> future = t.commit();
111             Futures.addCallback( future, new FutureCallback<RpcResult<TransactionStatus>>() {
112                 @Override
113                 public void onSuccess( RpcResult<TransactionStatus> result ) {
114                     LOG.debug( "Delete Toaster commit result: " + result );
115                 }
116
117                 @Override
118                 public void onFailure( Throwable t ) {
119                     LOG.error( "Delete of Toaster failed", t );
120                 }
121             } );
122         }
123     }
124
125     private Toaster buildToaster( ToasterStatus status ) {
126
127         // note - we are simulating a device whose manufacture and model are
128         // fixed (embedded) into the hardware.
129         // This is why the manufacture and model number are hardcoded.
130         return new ToasterBuilder().setToasterManufacturer( TOASTER_MANUFACTURER )
131                                    .setToasterModelNumber( TOASTER_MODEL_NUMBER )
132                                    .setToasterStatus( status )
133                                    .build();
134     }
135
136     /**
137      * Implemented from the DataChangeListener interface.
138      */
139     @Override
140     public void onDataChanged( final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change ) {
141         DataObject dataObject = change.getUpdatedSubtree();
142         if( dataObject instanceof Toaster )
143         {
144             Toaster toaster = (Toaster) dataObject;
145             Long darkness = toaster.getDarknessFactor();
146             if( darkness != null )
147             {
148                 darknessFactor.set( darkness );
149             }
150         }
151     }
152
153     /**
154      * RPC call implemented from the ToasterService interface that cancels the current
155      * toast, if any.
156      */
157     @Override
158     public Future<RpcResult<Void>> cancelToast() {
159
160         Future<?> current = currentMakeToastTask.getAndSet( null );
161         if( current != null ) {
162             current.cancel( true );
163         }
164
165         // Always return success from the cancel toast call.
166         return Futures.immediateFuture( Rpcs.<Void> getRpcResult( true,
167                                         Collections.<RpcError>emptyList() ) );
168     }
169
170     /**
171      * RPC call implemented from the ToasterService interface that attempts to make toast.
172      */
173     @Override
174     public Future<RpcResult<Void>> makeToast(final MakeToastInput input) {
175         LOG.info("makeToast: " + input);
176
177         final SettableFuture<RpcResult<Void>> futureResult = SettableFuture.create();
178
179         checkStatusAndMakeToast( input, futureResult );
180
181         return futureResult;
182     }
183
184     private List<RpcError> makeToasterOutOfBreadError() {
185         return Arrays.asList(
186                 RpcErrors.getRpcError( "out-of-stock", "resource-denied", null, null,
187                                        "Toaster is out of bread",
188                                        ErrorType.APPLICATION, null ) );
189     }
190
191     private List<RpcError> makeToasterInUseError() {
192         return Arrays.asList(
193             RpcErrors.getRpcError( "", "in-use", null, ErrorSeverity.WARNING,
194                                    "Toaster is busy", ErrorType.APPLICATION, null ) );
195     }
196
197     private void checkStatusAndMakeToast( final MakeToastInput input,
198                                           final SettableFuture<RpcResult<Void>> futureResult ) {
199
200         // Read the ToasterStatus and, if currently Up, try to write the status to Down.
201         // If that succeeds, then we essentially have an exclusive lock and can proceed
202         // to make toast.
203
204         final ReadWriteTransaction tx = dataProvider.newReadWriteTransaction();
205         ListenableFuture<Optional<DataObject>> readFuture =
206                                           tx.read( LogicalDatastoreType.OPERATIONAL, TOASTER_IID );
207
208         final ListenableFuture<RpcResult<TransactionStatus>> commitFuture =
209             Futures.transform( readFuture, new AsyncFunction<Optional<DataObject>,
210                                                                    RpcResult<TransactionStatus>>() {
211
212                 @Override
213                 public ListenableFuture<RpcResult<TransactionStatus>> apply(
214                         Optional<DataObject> toasterData ) throws Exception {
215
216                     ToasterStatus toasterStatus = ToasterStatus.Up;
217                     if( toasterData.isPresent() ) {
218                         toasterStatus = ((Toaster)toasterData.get()).getToasterStatus();
219                     }
220
221                     LOG.debug( "Read toaster status: {}", toasterStatus );
222
223                     if( toasterStatus == ToasterStatus.Up ) {
224
225                         if( outOfBread() ) {
226                             LOG.debug( "Toaster is out of bread" );
227
228                             return Futures.immediateFuture( Rpcs.<TransactionStatus>getRpcResult(
229                                        false, null, makeToasterOutOfBreadError() ) );
230                         }
231
232                         LOG.debug( "Setting Toaster status to Down" );
233
234                         // We're not currently making toast - try to update the status to Down
235                         // to indicate we're going to make toast. This acts as a lock to prevent
236                         // concurrent toasting.
237                         tx.put( LogicalDatastoreType.OPERATIONAL, TOASTER_IID,
238                                 buildToaster( ToasterStatus.Down ) );
239                         return tx.commit();
240                     }
241
242                     LOG.debug( "Oops - already making toast!" );
243
244                     // Return an error since we are already making toast. This will get
245                     // propagated to the commitFuture below which will interpret the null
246                     // TransactionStatus in the RpcResult as an error condition.
247                     return Futures.immediateFuture( Rpcs.<TransactionStatus>getRpcResult(
248                             false, null, makeToasterInUseError() ) );
249                 }
250         } );
251
252         Futures.addCallback( commitFuture, new FutureCallback<RpcResult<TransactionStatus>>() {
253             @Override
254             public void onSuccess( RpcResult<TransactionStatus> result ) {
255                 if( result.getResult() == TransactionStatus.COMMITED  ) {
256
257                     // OK to make toast
258                     currentMakeToastTask.set( executor.submit(
259                                                     new MakeToastTask( input, futureResult ) ) );
260                 } else {
261
262                     LOG.debug( "Setting error result" );
263
264                     // Either the transaction failed to commit for some reason or, more likely,
265                     // the read above returned ToasterStatus.Down. Either way, fail the
266                     // futureResult and copy the errors.
267
268                     futureResult.set( Rpcs.<Void>getRpcResult( false, null, result.getErrors() ) );
269                 }
270             }
271
272             @Override
273             public void onFailure( Throwable ex ) {
274                 if( ex instanceof OptimisticLockFailedException ) {
275
276                     // Another thread is likely trying to make toast simultaneously and updated the
277                     // status before us. Try reading the status again - if another make toast is
278                     // now in progress, we should get ToasterStatus.Down and fail.
279
280                     LOG.debug( "Got OptimisticLockFailedException - trying again" );
281
282                     checkStatusAndMakeToast( input, futureResult );
283
284                 } else {
285
286                     LOG.error( "Failed to commit Toaster status", ex );
287
288                     // Got some unexpected error so fail.
289                     futureResult.set( Rpcs.<Void> getRpcResult( false, null, Arrays.asList(
290                         RpcErrors.getRpcError( null, null, null, ErrorSeverity.ERROR,
291                                                ex.getMessage(),
292                                                ErrorType.APPLICATION, ex ) ) ) );
293                 }
294             }
295         } );
296     }
297
298     /**
299      * RestConf RPC call implemented from the ToasterService interface.
300      * Restocks the bread for the toaster, resets the toastsMade counter to 0, and sends a
301      * ToasterRestocked notification.
302      */
303     @Override
304     public Future<RpcResult<java.lang.Void>> restockToaster(final RestockToasterInput input) {
305         LOG.info( "restockToaster: " + input );
306
307         amountOfBreadInStock.set( input.getAmountOfBreadToStock() );
308
309         if( amountOfBreadInStock.get() > 0 ) {
310             ToasterRestocked reStockedNotification = new ToasterRestockedBuilder()
311                 .setAmountOfBread( input.getAmountOfBreadToStock() ).build();
312             notificationProvider.publish( reStockedNotification );
313         }
314
315         return Futures.immediateFuture(Rpcs.<Void> getRpcResult(true, Collections.<RpcError>emptyList()));
316     }
317
318     /**
319      * JMX RPC call implemented from the ToasterProviderRuntimeMXBean interface.
320      */
321     @Override
322     public void clearToastsMade() {
323         LOG.info( "clearToastsMade" );
324         toastsMade.set( 0 );
325     }
326
327     /**
328      * Accesssor method implemented from the ToasterProviderRuntimeMXBean interface.
329      */
330     @Override
331     public Long getToastsMade() {
332         return toastsMade.get();
333     }
334
335     private void setToasterStatusUp( final Function<Boolean,Void> resultCallback ) {
336
337         WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
338         tx.put( LogicalDatastoreType.OPERATIONAL,TOASTER_IID, buildToaster( ToasterStatus.Up ) );
339
340         ListenableFuture<RpcResult<TransactionStatus>> commitFuture = tx.commit();
341
342         Futures.addCallback( commitFuture, new FutureCallback<RpcResult<TransactionStatus>>() {
343             @Override
344             public void onSuccess( RpcResult<TransactionStatus> result ) {
345                 if( result.getResult() != TransactionStatus.COMMITED ) {
346                     LOG.error( "Failed to update toaster status: " + result.getErrors() );
347                 }
348
349                 notifyCallback( result.getResult() == TransactionStatus.COMMITED );
350             }
351
352             @Override
353             public void onFailure( Throwable t ) {
354                 // We shouldn't get an OptimisticLockFailedException (or any ex) as no
355                 // other component should be updating the operational state.
356                 LOG.error( "Failed to update toaster status", t );
357
358                 notifyCallback( false );
359             }
360
361             void notifyCallback( boolean result ) {
362                 if( resultCallback != null ) {
363                     resultCallback.apply( result );
364                 }
365             }
366         } );
367     }
368
369     private boolean outOfBread()
370     {
371         return amountOfBreadInStock.get() == 0;
372     }
373
374     private class MakeToastTask implements Callable<Void> {
375
376         final MakeToastInput toastRequest;
377         final SettableFuture<RpcResult<Void>> futureResult;
378
379         public MakeToastTask( final MakeToastInput toastRequest,
380                               final SettableFuture<RpcResult<Void>> futureResult ) {
381             this.toastRequest = toastRequest;
382             this.futureResult = futureResult;
383         }
384
385         @Override
386         public Void call() {
387             try
388             {
389                 // make toast just sleeps for n seconds per doneness level.
390                 long darknessFactor = OpendaylightToaster.this.darknessFactor.get();
391                 Thread.sleep(darknessFactor * toastRequest.getToasterDoneness());
392
393             }
394             catch( InterruptedException e ) {
395                 LOG.info( "Interrupted while making the toast" );
396             }
397
398             toastsMade.incrementAndGet();
399
400             amountOfBreadInStock.getAndDecrement();
401             if( outOfBread() ) {
402                 LOG.info( "Toaster is out of bread!" );
403
404                 notificationProvider.publish( new ToasterOutOfBreadBuilder().build() );
405             }
406
407             // Set the Toaster status back to up - this essentially releases the toasting lock.
408             // We can't clear the current toast task nor set the Future result until the
409             // update has been committed so we pass a callback to be notified on completion.
410
411             setToasterStatusUp( new Function<Boolean,Void>() {
412                 @Override
413                 public Void apply( Boolean result ) {
414
415                     currentMakeToastTask.set( null );
416
417                     LOG.debug("Toast done");
418
419                     futureResult.set( Rpcs.<Void>getRpcResult( true, null,
420                                                           Collections.<RpcError>emptyList() ) );
421
422                     return null;
423                 }
424             } );
425
426             return null;
427         }
428     }
429 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.