+ private void checkStatusAndMakeToast( final MakeToastInput input,
+ final SettableFuture<RpcResult<Void>> futureResult ) {
+
+ // Read the ToasterStatus and, if currently Up, try to write the status to Down.
+ // If that succeeds, then we essentially have an exclusive lock and can proceed
+ // to make toast.
+
+ final ReadWriteTransaction tx = dataProvider.newReadWriteTransaction();
+ ListenableFuture<Optional<DataObject>> readFuture =
+ tx.read( LogicalDatastoreType.OPERATIONAL, TOASTER_IID );
+
+ final ListenableFuture<RpcResult<TransactionStatus>> commitFuture =
+ Futures.transform( readFuture, new AsyncFunction<Optional<DataObject>,
+ RpcResult<TransactionStatus>>() {
+
+ @Override
+ public ListenableFuture<RpcResult<TransactionStatus>> apply(
+ Optional<DataObject> toasterData ) throws Exception {
+
+ ToasterStatus toasterStatus = ToasterStatus.Up;
+ if( toasterData.isPresent() ) {
+ toasterStatus = ((Toaster)toasterData.get()).getToasterStatus();
+ }
+
+ LOG.debug( "Read toaster status: {}", toasterStatus );
+
+ if( toasterStatus == ToasterStatus.Up ) {
+
+ if( outOfBread() ) {
+ LOG.debug( "Toaster is out of bread" );
+
+ return Futures.immediateFuture( Rpcs.<TransactionStatus>getRpcResult(
+ false, null, makeToasterOutOfBreadError() ) );
+ }
+
+ LOG.debug( "Setting Toaster status to Down" );
+
+ // We're not currently making toast - try to update the status to Down
+ // to indicate we're going to make toast. This acts as a lock to prevent
+ // concurrent toasting.
+ tx.put( LogicalDatastoreType.OPERATIONAL, TOASTER_IID,
+ buildToaster( ToasterStatus.Down ) );
+ return tx.commit();
+ }
+
+ LOG.debug( "Oops - already making toast!" );
+
+ // Return an error since we are already making toast. This will get
+ // propagated to the commitFuture below which will interpret the null
+ // TransactionStatus in the RpcResult as an error condition.
+ return Futures.immediateFuture( Rpcs.<TransactionStatus>getRpcResult(
+ false, null, makeToasterInUseError() ) );
+ }
+ } );
+
+ Futures.addCallback( commitFuture, new FutureCallback<RpcResult<TransactionStatus>>() {
+ @Override
+ public void onSuccess( RpcResult<TransactionStatus> result ) {
+ if( result.getResult() == TransactionStatus.COMMITED ) {
+
+ // OK to make toast
+ currentMakeToastTask.set( executor.submit(
+ new MakeToastTask( input, futureResult ) ) );
+ } else {
+
+ LOG.debug( "Setting error result" );
+
+ // Either the transaction failed to commit for some reason or, more likely,
+ // the read above returned ToasterStatus.Down. Either way, fail the
+ // futureResult and copy the errors.
+
+ futureResult.set( Rpcs.<Void>getRpcResult( false, null, result.getErrors() ) );
+ }
+ }
+
+ @Override
+ public void onFailure( Throwable ex ) {
+ if( ex instanceof OptimisticLockFailedException ) {
+
+ // Another thread is likely trying to make toast simultaneously and updated the
+ // status before us. Try reading the status again - if another make toast is
+ // now in progress, we should get ToasterStatus.Down and fail.
+
+ LOG.debug( "Got OptimisticLockFailedException - trying again" );
+
+ checkStatusAndMakeToast( input, futureResult );
+
+ } else {
+
+ LOG.error( "Failed to commit Toaster status", ex );
+
+ // Got some unexpected error so fail.
+ futureResult.set( Rpcs.<Void> getRpcResult( false, null, Arrays.asList(
+ RpcErrors.getRpcError( null, null, null, ErrorSeverity.ERROR,
+ ex.getMessage(),
+ ErrorType.APPLICATION, ex ) ) ) );
+ }
+ }
+ } );