Merge "BUG-1792: share QNames in both cases"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
13 import akka.dispatch.OnComplete;
14
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.FinalizablePhantomReference;
17 import com.google.common.base.FinalizableReferenceQueue;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import com.google.common.collect.Lists;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24
25 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
32 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
38 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
41 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
42 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
43 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 import scala.Function1;
51 import scala.concurrent.Future;
52 import scala.runtime.AbstractFunction1;
53
54 import java.util.HashMap;
55 import java.util.List;
56 import java.util.Map;
57 import java.util.concurrent.ConcurrentHashMap;
58 import java.util.concurrent.atomic.AtomicBoolean;
59 import java.util.concurrent.atomic.AtomicLong;
60
61 /**
62  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
63  * <p>
64  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
65  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
66  * be created on each of those shards by the TransactionProxy
67  *</p>
68  * <p>
69  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
70  * shards will be executed.
71  * </p>
72  */
73 public class TransactionProxy implements DOMStoreReadWriteTransaction {
74
75     private final TransactionChainProxy transactionChainProxy;
76
77
78
79     public enum TransactionType {
80         READ_ONLY,
81         WRITE_ONLY,
82         READ_WRITE
83     }
84
85     static Function1<Throwable, Throwable> SAME_FAILURE_TRANSFORMER = new AbstractFunction1<
86                                                                           Throwable, Throwable>() {
87         @Override
88         public Throwable apply(Throwable failure) {
89             return failure;
90         }
91     };
92
93     private static final AtomicLong counter = new AtomicLong();
94
95     private static final Logger
96         LOG = LoggerFactory.getLogger(TransactionProxy.class);
97
98
99     /**
100      * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
101      * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
102      * trickery to clean up its internal thread when the bundle is unloaded.
103      */
104     private static final FinalizableReferenceQueue phantomReferenceQueue =
105                                                                   new FinalizableReferenceQueue();
106
107     /**
108      * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
109      * necessary because PhantomReferences need a hard reference so they're not garbage collected.
110      * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
111      * and thus becomes eligible for garbage collection.
112      */
113     private static final Map<TransactionProxyCleanupPhantomReference,
114                              TransactionProxyCleanupPhantomReference> phantomReferenceCache =
115                                                                         new ConcurrentHashMap<>();
116
117     /**
118      * A PhantomReference that closes remote transactions for a TransactionProxy when it's
119      * garbage collected. This is used for read-only transactions as they're not explicitly closed
120      * by clients. So the only way to detect that a transaction is no longer in use and it's safe
121      * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
122      * but TransactionProxy instances should generally be short-lived enough to avoid being moved
123      * to the old generation space and thus should be cleaned up in a timely manner as the GC
124      * runs on the young generation (eden, swap1...) space much more frequently.
125      */
126     private static class TransactionProxyCleanupPhantomReference
127                                            extends FinalizablePhantomReference<TransactionProxy> {
128
129         private final List<ActorSelection> remoteTransactionActors;
130         private final AtomicBoolean remoteTransactionActorsMB;
131         private final ActorContext actorContext;
132         private final TransactionIdentifier identifier;
133
134         protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
135             super(referent, phantomReferenceQueue);
136
137             // Note we need to cache the relevant fields from the TransactionProxy as we can't
138             // have a hard reference to the TransactionProxy instance itself.
139
140             remoteTransactionActors = referent.remoteTransactionActors;
141             remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
142             actorContext = referent.actorContext;
143             identifier = referent.identifier;
144         }
145
146         @Override
147         public void finalizeReferent() {
148             LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
149                     remoteTransactionActors.size(), identifier);
150
151             phantomReferenceCache.remove(this);
152
153             // Access the memory barrier volatile to ensure all previous updates to the
154             // remoteTransactionActors list are visible to this thread.
155
156             if(remoteTransactionActorsMB.get()) {
157                 for(ActorSelection actor : remoteTransactionActors) {
158                     LOG.trace("Sending CloseTransaction to {}", actor);
159                     actorContext.sendRemoteOperationAsync(actor,
160                             new CloseTransaction().toSerializable());
161                 }
162             }
163         }
164     }
165
166     /**
167      * Stores the remote Tx actors for each requested data store path to be used by the
168      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
169      * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
170      * remoteTransactionActors list so they will be visible to the thread accessing the
171      * PhantomReference.
172      */
173     private List<ActorSelection> remoteTransactionActors;
174     private AtomicBoolean remoteTransactionActorsMB;
175
176     private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
177
178     private final TransactionType transactionType;
179     private final ActorContext actorContext;
180     private final TransactionIdentifier identifier;
181     private final SchemaContext schemaContext;
182     private boolean inReadyState;
183
184     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
185         this(actorContext, transactionType, null);
186     }
187
188     @VisibleForTesting
189     List<Future<Object>> getRecordedOperationFutures() {
190         List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
191         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
192             recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
193         }
194
195         return recordedOperationFutures;
196     }
197
198     public TransactionProxy(ActorContext actorContext, TransactionType transactionType, TransactionChainProxy transactionChainProxy) {
199         this.actorContext = Preconditions.checkNotNull(actorContext,
200             "actorContext should not be null");
201         this.transactionType = Preconditions.checkNotNull(transactionType,
202             "transactionType should not be null");
203         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
204             "schemaContext should not be null");
205         this.transactionChainProxy = transactionChainProxy;
206
207         String memberName = actorContext.getCurrentMemberName();
208         if(memberName == null){
209             memberName = "UNKNOWN-MEMBER";
210         }
211
212         this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
213             counter.getAndIncrement()).build();
214
215         if(transactionType == TransactionType.READ_ONLY) {
216             // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
217             // to close the remote Tx's when this instance is no longer in use and is garbage
218             // collected.
219
220             remoteTransactionActors = Lists.newArrayList();
221             remoteTransactionActorsMB = new AtomicBoolean();
222
223             TransactionProxyCleanupPhantomReference cleanup =
224                 new TransactionProxyCleanupPhantomReference(this);
225             phantomReferenceCache.put(cleanup, cleanup);
226         }
227
228         LOG.debug("Created txn {} of type {}", identifier, transactionType);
229     }
230
231     @Override
232     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
233             final YangInstanceIdentifier path) {
234
235         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
236                 "Read operation on write-only transaction is not allowed");
237
238         LOG.debug("Tx {} read {}", identifier, path);
239
240         createTransactionIfMissing(actorContext, path);
241
242         return transactionContext(path).readData(path);
243     }
244
245     @Override
246     public CheckedFuture<Boolean, ReadFailedException> exists(YangInstanceIdentifier path) {
247
248         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
249                 "Exists operation on write-only transaction is not allowed");
250
251         LOG.debug("Tx {} exists {}", identifier, path);
252
253         createTransactionIfMissing(actorContext, path);
254
255         return transactionContext(path).dataExists(path);
256     }
257
258     private void checkModificationState() {
259         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
260                 "Modification operation on read-only transaction is not allowed");
261         Preconditions.checkState(!inReadyState,
262                 "Transaction is sealed - further modifications are not allowed");
263     }
264
265     @Override
266     public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
267
268         checkModificationState();
269
270         LOG.debug("Tx {} write {}", identifier, path);
271
272         createTransactionIfMissing(actorContext, path);
273
274         transactionContext(path).writeData(path, data);
275     }
276
277     @Override
278     public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
279
280         checkModificationState();
281
282         LOG.debug("Tx {} merge {}", identifier, path);
283
284         createTransactionIfMissing(actorContext, path);
285
286         transactionContext(path).mergeData(path, data);
287     }
288
289     @Override
290     public void delete(YangInstanceIdentifier path) {
291
292         checkModificationState();
293
294         LOG.debug("Tx {} delete {}", identifier, path);
295
296         createTransactionIfMissing(actorContext, path);
297
298         transactionContext(path).deleteData(path);
299     }
300
301     @Override
302     public DOMStoreThreePhaseCommitCohort ready() {
303
304         checkModificationState();
305
306         inReadyState = true;
307
308         LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
309                 remoteTransactionPaths.size());
310
311         List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
312
313         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
314
315             LOG.debug("Tx {} Readying transaction for shard {}", identifier,
316                     transactionContext.getShardName());
317
318             cohortPathFutures.add(transactionContext.readyTransaction());
319         }
320
321         if(transactionChainProxy != null){
322             transactionChainProxy.onTransactionReady(cohortPathFutures);
323         }
324
325         return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
326                 identifier.toString());
327     }
328
329     @Override
330     public Object getIdentifier() {
331         return this.identifier;
332     }
333
334     @Override
335     public void close() {
336         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
337             transactionContext.closeTransaction();
338         }
339
340         remoteTransactionPaths.clear();
341
342         if(transactionType == TransactionType.READ_ONLY) {
343             remoteTransactionActors.clear();
344             remoteTransactionActorsMB.set(true);
345         }
346     }
347
348     private TransactionContext transactionContext(YangInstanceIdentifier path){
349         String shardName = shardNameFromIdentifier(path);
350         return remoteTransactionPaths.get(shardName);
351     }
352
353     private String shardNameFromIdentifier(YangInstanceIdentifier path){
354         return ShardStrategyFactory.getStrategy(path).findShard(path);
355     }
356
357     private void createTransactionIfMissing(ActorContext actorContext,
358         YangInstanceIdentifier path) {
359
360         if(transactionChainProxy != null){
361             transactionChainProxy.waitTillCurrentTransactionReady();
362         }
363
364         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
365
366         TransactionContext transactionContext =
367             remoteTransactionPaths.get(shardName);
368
369         if (transactionContext != null) {
370             // A transaction already exists with that shard
371             return;
372         }
373
374         try {
375             Object response = actorContext.executeShardOperation(shardName,
376                 new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
377                     getTransactionChainId()).toSerializable());
378             if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
379                 CreateTransactionReply reply =
380                     CreateTransactionReply.fromSerializable(response);
381
382                 String transactionPath = reply.getTransactionPath();
383
384                 LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
385
386                 ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
387
388                 if (transactionType == TransactionType.READ_ONLY) {
389                     // Add the actor to the remoteTransactionActors list for access by the
390                     // cleanup PhantonReference.
391                     remoteTransactionActors.add(transactionActor);
392
393                     // Write to the memory barrier volatile to publish the above update to the
394                     // remoteTransactionActors list for thread visibility.
395                     remoteTransactionActorsMB.set(true);
396                 }
397
398                 transactionContext = new TransactionContextImpl(shardName, transactionPath,
399                     transactionActor, identifier, actorContext, schemaContext);
400
401                 remoteTransactionPaths.put(shardName, transactionContext);
402             } else {
403                 throw new IllegalArgumentException(String.format(
404                     "Invalid reply type {} for CreateTransaction", response.getClass()));
405             }
406         } catch (Exception e) {
407             LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
408             remoteTransactionPaths
409                 .put(shardName, new NoOpTransactionContext(shardName, e, identifier));
410         }
411     }
412
413     public String getTransactionChainId() {
414         if(transactionChainProxy == null){
415             return "";
416         }
417         return transactionChainProxy.getTransactionChainId();
418     }
419
420
421     private interface TransactionContext {
422         String getShardName();
423
424         void closeTransaction();
425
426         Future<ActorPath> readyTransaction();
427
428         void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
429
430         void deleteData(YangInstanceIdentifier path);
431
432         void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
433
434         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
435                 final YangInstanceIdentifier path);
436
437         CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
438
439         List<Future<Object>> getRecordedOperationFutures();
440     }
441
442     private static abstract class AbstractTransactionContext implements TransactionContext {
443
444         protected final TransactionIdentifier identifier;
445         protected final String shardName;
446         protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
447
448         AbstractTransactionContext(String shardName, TransactionIdentifier identifier) {
449             this.shardName = shardName;
450             this.identifier = identifier;
451         }
452
453         @Override
454         public String getShardName() {
455             return shardName;
456         }
457
458         @Override
459         public List<Future<Object>> getRecordedOperationFutures() {
460             return recordedOperationFutures;
461         }
462     }
463
464     private static class TransactionContextImpl extends AbstractTransactionContext {
465         private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
466
467         private final ActorContext actorContext;
468         private final SchemaContext schemaContext;
469         private final String actorPath;
470         private final ActorSelection actor;
471
472         private TransactionContextImpl(String shardName, String actorPath,
473                 ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext,
474                 SchemaContext schemaContext) {
475             super(shardName, identifier);
476             this.actorPath = actorPath;
477             this.actor = actor;
478             this.actorContext = actorContext;
479             this.schemaContext = schemaContext;
480         }
481
482         private ActorSelection getActor() {
483             return actor;
484         }
485
486         private String getResolvedCohortPath(String cohortPath) {
487             return actorContext.resolvePath(actorPath, cohortPath);
488         }
489
490         @Override
491         public void closeTransaction() {
492             LOG.debug("Tx {} closeTransaction called", identifier);
493             actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
494         }
495
496         @Override
497         public Future<ActorPath> readyTransaction() {
498             LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
499                     identifier, recordedOperationFutures.size());
500
501             // Send the ReadyTransaction message to the Tx actor.
502
503             final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
504                     new ReadyTransaction().toSerializable());
505
506             // Combine all the previously recorded put/merge/delete operation reply Futures and the
507             // ReadyTransactionReply Future into one Future. If any one fails then the combined
508             // Future will fail. We need all prior operations and the ready operation to succeed
509             // in order to attempt commit.
510
511             List<Future<Object>> futureList =
512                     Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
513             futureList.addAll(recordedOperationFutures);
514             futureList.add(replyFuture);
515
516             Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
517                     actorContext.getActorSystem().dispatcher());
518
519             // Transform the combined Future into a Future that returns the cohort actor path from
520             // the ReadyTransactionReply. That's the end result of the ready operation.
521
522             return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
523                 @Override
524                 public ActorPath apply(Iterable<Object> notUsed) {
525
526                     LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
527                             identifier);
528
529                     // At this point all the Futures succeeded and we need to extract the cohort
530                     // actor path from the ReadyTransactionReply. For the recorded operations, they
531                     // don't return any data so we're only interested that they completed
532                     // successfully. We could be paranoid and verify the correct reply types but
533                     // that really should never happen so it's not worth the overhead of
534                     // de-serializing each reply.
535
536                     // Note the Future get call here won't block as it's complete.
537                     Object serializedReadyReply = replyFuture.value().get().get();
538                     if(serializedReadyReply.getClass().equals(
539                                                      ReadyTransactionReply.SERIALIZABLE_CLASS)) {
540                         ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
541                                 actorContext.getActorSystem(), serializedReadyReply);
542
543                         String resolvedCohortPath = getResolvedCohortPath(
544                                 reply.getCohortPath().toString());
545
546                         LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
547                                 identifier, resolvedCohortPath);
548
549                         return actorContext.actorFor(resolvedCohortPath);
550                     } else {
551                         // Throwing an exception here will fail the Future.
552
553                         throw new IllegalArgumentException(String.format("Invalid reply type {}",
554                                 serializedReadyReply.getClass()));
555                     }
556                 }
557             }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
558         }
559
560         @Override
561         public void deleteData(YangInstanceIdentifier path) {
562             LOG.debug("Tx {} deleteData called path = {}", identifier, path);
563             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
564                     new DeleteData(path).toSerializable() ));
565         }
566
567         @Override
568         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
569             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
570             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
571                     new MergeData(path, data, schemaContext).toSerializable()));
572         }
573
574         @Override
575         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
576             LOG.debug("Tx {} writeData called path = {}", identifier, path);
577             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
578                     new WriteData(path, data, schemaContext).toSerializable()));
579         }
580
581         @Override
582         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
583                 final YangInstanceIdentifier path) {
584
585             LOG.debug("Tx {} readData called path = {}", identifier, path);
586
587             final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
588
589             // If there were any previous recorded put/merge/delete operation reply Futures then we
590             // must wait for them to successfully complete. This is necessary to honor the read
591             // uncommitted semantics of the public API contract. If any one fails then fail the read.
592
593             if(recordedOperationFutures.isEmpty()) {
594                 finishReadData(path, returnFuture);
595             } else {
596                 LOG.debug("Tx {} readData: verifying {} previous recorded operations",
597                         identifier, recordedOperationFutures.size());
598
599                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
600                 // Futures#sequence accesses the passed List on a different thread, as
601                 // recordedOperationFutures is not synchronized.
602
603                 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
604                         Lists.newArrayList(recordedOperationFutures),
605                         actorContext.getActorSystem().dispatcher());
606                 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
607                     @Override
608                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
609                             throws Throwable {
610                         if(failure != null) {
611                             LOG.debug("Tx {} readData: a recorded operation failed: {}",
612                                     identifier, failure);
613
614                             returnFuture.setException(new ReadFailedException(
615                                     "The read could not be performed because a previous put, merge,"
616                                     + "or delete operation failed", failure));
617                         } else {
618                             finishReadData(path, returnFuture);
619                         }
620                     }
621                 };
622
623                 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
624             }
625
626             return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
627         }
628
629         private void finishReadData(final YangInstanceIdentifier path,
630                 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
631
632             LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
633
634             OnComplete<Object> onComplete = new OnComplete<Object>() {
635                 @Override
636                 public void onComplete(Throwable failure, Object readResponse) throws Throwable {
637                     if(failure != null) {
638                         LOG.debug("Tx {} read operation failed: {}", identifier, failure);
639
640                         returnFuture.setException(new ReadFailedException(
641                                 "Error reading data for path " + path, failure));
642
643                     } else {
644                         LOG.debug("Tx {} read operation succeeded", identifier, failure);
645
646                         if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
647                             ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
648                                     path, readResponse);
649                             if (reply.getNormalizedNode() == null) {
650                                 returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
651                             } else {
652                                 returnFuture.set(Optional.<NormalizedNode<?, ?>>of(
653                                         reply.getNormalizedNode()));
654                             }
655                         } else {
656                             returnFuture.setException(new ReadFailedException(
657                                     "Invalid response reading data for path " + path));
658                         }
659                     }
660                 }
661             };
662
663             Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
664                     new ReadData(path).toSerializable());
665             readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
666         }
667
668         @Override
669         public CheckedFuture<Boolean, ReadFailedException> dataExists(
670                 final YangInstanceIdentifier path) {
671
672             LOG.debug("Tx {} dataExists called path = {}", identifier, path);
673
674             final SettableFuture<Boolean> returnFuture = SettableFuture.create();
675
676             // If there were any previous recorded put/merge/delete operation reply Futures then we
677             // must wait for them to successfully complete. This is necessary to honor the read
678             // uncommitted semantics of the public API contract. If any one fails then fail this
679             // request.
680
681             if(recordedOperationFutures.isEmpty()) {
682                 finishDataExists(path, returnFuture);
683             } else {
684                 LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
685                         identifier, recordedOperationFutures.size());
686
687                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
688                 // Futures#sequence accesses the passed List on a different thread, as
689                 // recordedOperationFutures is not synchronized.
690
691                 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
692                         Lists.newArrayList(recordedOperationFutures),
693                         actorContext.getActorSystem().dispatcher());
694                 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
695                     @Override
696                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
697                             throws Throwable {
698                         if(failure != null) {
699                             LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
700                                     identifier, failure);
701
702                             returnFuture.setException(new ReadFailedException(
703                                     "The data exists could not be performed because a previous "
704                                     + "put, merge, or delete operation failed", failure));
705                         } else {
706                             finishDataExists(path, returnFuture);
707                         }
708                     }
709                 };
710
711                 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
712             }
713
714             return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
715         }
716
717         private void finishDataExists(final YangInstanceIdentifier path,
718                 final SettableFuture<Boolean> returnFuture) {
719
720             LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
721
722             OnComplete<Object> onComplete = new OnComplete<Object>() {
723                 @Override
724                 public void onComplete(Throwable failure, Object response) throws Throwable {
725                     if(failure != null) {
726                         LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
727
728                         returnFuture.setException(new ReadFailedException(
729                                 "Error checking data exists for path " + path, failure));
730                     } else {
731                         LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
732
733                         if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
734                             returnFuture.set(Boolean.valueOf(DataExistsReply.
735                                         fromSerializable(response).exists()));
736                         } else {
737                             returnFuture.setException(new ReadFailedException(
738                                     "Invalid response checking exists for path " + path));
739                         }
740                     }
741                 }
742             };
743
744             Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
745                     new DataExists(path).toSerializable());
746             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
747         }
748     }
749
750     private static class NoOpTransactionContext extends AbstractTransactionContext {
751
752         private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
753
754         private final Exception failure;
755
756         public NoOpTransactionContext(String shardName, Exception failure,
757                 TransactionIdentifier identifier){
758             super(shardName, identifier);
759             this.failure = failure;
760         }
761
762         @Override
763         public void closeTransaction() {
764             LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
765         }
766
767         @Override
768         public Future<ActorPath> readyTransaction() {
769             LOG.debug("Tx {} readyTransaction called", identifier);
770             return akka.dispatch.Futures.failed(failure);
771         }
772
773         @Override
774         public void deleteData(YangInstanceIdentifier path) {
775             LOG.debug("Tx {} deleteData called path = {}", identifier, path);
776         }
777
778         @Override
779         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
780             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
781         }
782
783         @Override
784         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
785             LOG.debug("Tx {} writeData called path = {}", identifier, path);
786         }
787
788         @Override
789         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
790             YangInstanceIdentifier path) {
791             LOG.debug("Tx {} readData called path = {}", identifier, path);
792             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
793                     "Error reading data for path " + path, failure));
794         }
795
796         @Override
797         public CheckedFuture<Boolean, ReadFailedException> dataExists(
798             YangInstanceIdentifier path) {
799             LOG.debug("Tx {} dataExists called path = {}", identifier, path);
800             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
801                     "Error checking exists for path " + path, failure));
802         }
803     }
804 }