a8b20c030e1dd34f274ce2e02b56669739824af3
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
13 import akka.dispatch.OnComplete;
14
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.FinalizablePhantomReference;
17 import com.google.common.base.FinalizableReferenceQueue;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import com.google.common.collect.Lists;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24
25 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
32 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
38 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
41 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
42 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
43 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 import scala.Function1;
51 import scala.concurrent.Future;
52 import scala.runtime.AbstractFunction1;
53
54 import java.util.HashMap;
55 import java.util.List;
56 import java.util.Map;
57 import java.util.concurrent.ConcurrentHashMap;
58 import java.util.concurrent.atomic.AtomicBoolean;
59 import java.util.concurrent.atomic.AtomicLong;
60
61 /**
62  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
63  * <p>
64  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
65  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
66  * be created on each of those shards by the TransactionProxy
67  *</p>
68  * <p>
69  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
70  * shards will be executed.
71  * </p>
72  */
73 public class TransactionProxy implements DOMStoreReadWriteTransaction {
74     public enum TransactionType {
75         READ_ONLY,
76         WRITE_ONLY,
77         READ_WRITE
78     }
79
80     static Function1<Throwable, Throwable> SAME_FAILURE_TRANSFORMER = new AbstractFunction1<
81                                                                           Throwable, Throwable>() {
82         @Override
83         public Throwable apply(Throwable failure) {
84             return failure;
85         }
86     };
87
88     private static final AtomicLong counter = new AtomicLong();
89
90     private static final Logger
91         LOG = LoggerFactory.getLogger(TransactionProxy.class);
92
93
94     /**
95      * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
96      * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
97      * trickery to clean up its internal thread when the bundle is unloaded.
98      */
99     private static final FinalizableReferenceQueue phantomReferenceQueue =
100                                                                   new FinalizableReferenceQueue();
101
102     /**
103      * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
104      * necessary because PhantomReferences need a hard reference so they're not garbage collected.
105      * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
106      * and thus becomes eligible for garbage collection.
107      */
108     private static final Map<TransactionProxyCleanupPhantomReference,
109                              TransactionProxyCleanupPhantomReference> phantomReferenceCache =
110                                                                         new ConcurrentHashMap<>();
111
112     /**
113      * A PhantomReference that closes remote transactions for a TransactionProxy when it's
114      * garbage collected. This is used for read-only transactions as they're not explicitly closed
115      * by clients. So the only way to detect that a transaction is no longer in use and it's safe
116      * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
117      * but TransactionProxy instances should generally be short-lived enough to avoid being moved
118      * to the old generation space and thus should be cleaned up in a timely manner as the GC
119      * runs on the young generation (eden, swap1...) space much more frequently.
120      */
121     private static class TransactionProxyCleanupPhantomReference
122                                            extends FinalizablePhantomReference<TransactionProxy> {
123
124         private final List<ActorSelection> remoteTransactionActors;
125         private final AtomicBoolean remoteTransactionActorsMB;
126         private final ActorContext actorContext;
127         private final TransactionIdentifier identifier;
128
129         protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
130             super(referent, phantomReferenceQueue);
131
132             // Note we need to cache the relevant fields from the TransactionProxy as we can't
133             // have a hard reference to the TransactionProxy instance itself.
134
135             remoteTransactionActors = referent.remoteTransactionActors;
136             remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
137             actorContext = referent.actorContext;
138             identifier = referent.identifier;
139         }
140
141         @Override
142         public void finalizeReferent() {
143             LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
144                     remoteTransactionActors.size(), identifier);
145
146             phantomReferenceCache.remove(this);
147
148             // Access the memory barrier volatile to ensure all previous updates to the
149             // remoteTransactionActors list are visible to this thread.
150
151             if(remoteTransactionActorsMB.get()) {
152                 for(ActorSelection actor : remoteTransactionActors) {
153                     LOG.trace("Sending CloseTransaction to {}", actor);
154                     actorContext.sendRemoteOperationAsync(actor,
155                             new CloseTransaction().toSerializable());
156                 }
157             }
158         }
159     }
160
161     /**
162      * Stores the remote Tx actors for each requested data store path to be used by the
163      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
164      * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
165      * remoteTransactionActors list so they will be visible to the thread accessing the
166      * PhantomReference.
167      */
168     private List<ActorSelection> remoteTransactionActors;
169     private AtomicBoolean remoteTransactionActorsMB;
170
171     private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
172
173     private final TransactionType transactionType;
174     private final ActorContext actorContext;
175     private final TransactionIdentifier identifier;
176     private final SchemaContext schemaContext;
177     private boolean inReadyState;
178
179     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
180         this.actorContext = Preconditions.checkNotNull(actorContext,
181                 "actorContext should not be null");
182         this.transactionType = Preconditions.checkNotNull(transactionType,
183                 "transactionType should not be null");
184         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
185                 "schemaContext should not be null");
186
187         String memberName = actorContext.getCurrentMemberName();
188         if(memberName == null){
189             memberName = "UNKNOWN-MEMBER";
190         }
191
192         this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
193                 counter.getAndIncrement()).build();
194
195         if(transactionType == TransactionType.READ_ONLY) {
196             // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
197             // to close the remote Tx's when this instance is no longer in use and is garbage
198             // collected.
199
200             remoteTransactionActors = Lists.newArrayList();
201             remoteTransactionActorsMB = new AtomicBoolean();
202
203             TransactionProxyCleanupPhantomReference cleanup =
204                                               new TransactionProxyCleanupPhantomReference(this);
205             phantomReferenceCache.put(cleanup, cleanup);
206         }
207
208         LOG.debug("Created txn {} of type {}", identifier, transactionType);
209     }
210
211     @VisibleForTesting
212     List<Future<Object>> getRecordedOperationFutures() {
213         List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
214         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
215             recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
216         }
217
218         return recordedOperationFutures;
219     }
220
221     @Override
222     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
223             final YangInstanceIdentifier path) {
224
225         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
226                 "Read operation on write-only transaction is not allowed");
227
228         LOG.debug("Tx {} read {}", identifier, path);
229
230         createTransactionIfMissing(actorContext, path);
231
232         return transactionContext(path).readData(path);
233     }
234
235     @Override
236     public CheckedFuture<Boolean, ReadFailedException> exists(YangInstanceIdentifier path) {
237
238         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
239                 "Exists operation on write-only transaction is not allowed");
240
241         LOG.debug("Tx {} exists {}", identifier, path);
242
243         createTransactionIfMissing(actorContext, path);
244
245         return transactionContext(path).dataExists(path);
246     }
247
248     private void checkModificationState() {
249         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
250                 "Modification operation on read-only transaction is not allowed");
251         Preconditions.checkState(!inReadyState,
252                 "Transaction is sealed - further modifications are not allowed");
253     }
254
255     @Override
256     public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
257
258         checkModificationState();
259
260         LOG.debug("Tx {} write {}", identifier, path);
261
262         createTransactionIfMissing(actorContext, path);
263
264         transactionContext(path).writeData(path, data);
265     }
266
267     @Override
268     public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
269
270         checkModificationState();
271
272         LOG.debug("Tx {} merge {}", identifier, path);
273
274         createTransactionIfMissing(actorContext, path);
275
276         transactionContext(path).mergeData(path, data);
277     }
278
279     @Override
280     public void delete(YangInstanceIdentifier path) {
281
282         checkModificationState();
283
284         LOG.debug("Tx {} delete {}", identifier, path);
285
286         createTransactionIfMissing(actorContext, path);
287
288         transactionContext(path).deleteData(path);
289     }
290
291     @Override
292     public DOMStoreThreePhaseCommitCohort ready() {
293
294         checkModificationState();
295
296         inReadyState = true;
297
298         LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
299                 remoteTransactionPaths.size());
300
301         List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
302
303         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
304
305             LOG.debug("Tx {} Readying transaction for shard {}", identifier,
306                     transactionContext.getShardName());
307
308             cohortPathFutures.add(transactionContext.readyTransaction());
309         }
310
311         return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
312                 identifier.toString());
313     }
314
315     @Override
316     public Object getIdentifier() {
317         return this.identifier;
318     }
319
320     @Override
321     public void close() {
322         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
323             transactionContext.closeTransaction();
324         }
325
326         remoteTransactionPaths.clear();
327
328         if(transactionType == TransactionType.READ_ONLY) {
329             remoteTransactionActors.clear();
330             remoteTransactionActorsMB.set(true);
331         }
332     }
333
334     private TransactionContext transactionContext(YangInstanceIdentifier path){
335         String shardName = shardNameFromIdentifier(path);
336         return remoteTransactionPaths.get(shardName);
337     }
338
339     private String shardNameFromIdentifier(YangInstanceIdentifier path){
340         return ShardStrategyFactory.getStrategy(path).findShard(path);
341     }
342
343     private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) {
344         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
345
346         TransactionContext transactionContext =
347             remoteTransactionPaths.get(shardName);
348
349         if(transactionContext != null){
350             // A transaction already exists with that shard
351             return;
352         }
353
354         try {
355             Object response = actorContext.executeShardOperation(shardName,
356                 new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable());
357             if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
358                 CreateTransactionReply reply =
359                     CreateTransactionReply.fromSerializable(response);
360
361                 String transactionPath = reply.getTransactionPath();
362
363                 LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
364
365                 ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
366
367                 if(transactionType == TransactionType.READ_ONLY) {
368                     // Add the actor to the remoteTransactionActors list for access by the
369                     // cleanup PhantonReference.
370                     remoteTransactionActors.add(transactionActor);
371
372                     // Write to the memory barrier volatile to publish the above update to the
373                     // remoteTransactionActors list for thread visibility.
374                     remoteTransactionActorsMB.set(true);
375                 }
376
377                 transactionContext = new TransactionContextImpl(shardName, transactionPath,
378                         transactionActor, identifier, actorContext, schemaContext);
379
380                 remoteTransactionPaths.put(shardName, transactionContext);
381             } else {
382                 throw new IllegalArgumentException(String.format(
383                         "Invalid reply type {} for CreateTransaction", response.getClass()));
384             }
385         } catch(Exception e){
386             LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
387             remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e, identifier));
388         }
389     }
390
391     private interface TransactionContext {
392         String getShardName();
393
394         void closeTransaction();
395
396         Future<ActorPath> readyTransaction();
397
398         void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
399
400         void deleteData(YangInstanceIdentifier path);
401
402         void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
403
404         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
405                 final YangInstanceIdentifier path);
406
407         CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
408
409         List<Future<Object>> getRecordedOperationFutures();
410     }
411
412     private static abstract class AbstractTransactionContext implements TransactionContext {
413
414         protected final TransactionIdentifier identifier;
415         protected final String shardName;
416         protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
417
418         AbstractTransactionContext(String shardName, TransactionIdentifier identifier) {
419             this.shardName = shardName;
420             this.identifier = identifier;
421         }
422
423         @Override
424         public String getShardName() {
425             return shardName;
426         }
427
428         @Override
429         public List<Future<Object>> getRecordedOperationFutures() {
430             return recordedOperationFutures;
431         }
432     }
433
434     private static class TransactionContextImpl extends AbstractTransactionContext {
435         private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
436
437         private final ActorContext actorContext;
438         private final SchemaContext schemaContext;
439         private final String actorPath;
440         private final ActorSelection actor;
441
442         private TransactionContextImpl(String shardName, String actorPath,
443                 ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext,
444                 SchemaContext schemaContext) {
445             super(shardName, identifier);
446             this.actorPath = actorPath;
447             this.actor = actor;
448             this.actorContext = actorContext;
449             this.schemaContext = schemaContext;
450         }
451
452         private ActorSelection getActor() {
453             return actor;
454         }
455
456         private String getResolvedCohortPath(String cohortPath) {
457             return actorContext.resolvePath(actorPath, cohortPath);
458         }
459
460         @Override
461         public void closeTransaction() {
462             LOG.debug("Tx {} closeTransaction called", identifier);
463             actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
464         }
465
466         @Override
467         public Future<ActorPath> readyTransaction() {
468             LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
469                     identifier, recordedOperationFutures.size());
470
471             // Send the ReadyTransaction message to the Tx actor.
472
473             final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
474                     new ReadyTransaction().toSerializable());
475
476             // Combine all the previously recorded put/merge/delete operation reply Futures and the
477             // ReadyTransactionReply Future into one Future. If any one fails then the combined
478             // Future will fail. We need all prior operations and the ready operation to succeed
479             // in order to attempt commit.
480
481             List<Future<Object>> futureList =
482                     Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
483             futureList.addAll(recordedOperationFutures);
484             futureList.add(replyFuture);
485
486             Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
487                     actorContext.getActorSystem().dispatcher());
488
489             // Transform the combined Future into a Future that returns the cohort actor path from
490             // the ReadyTransactionReply. That's the end result of the ready operation.
491
492             return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
493                 @Override
494                 public ActorPath apply(Iterable<Object> notUsed) {
495
496                     LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
497                             identifier);
498
499                     // At this point all the Futures succeeded and we need to extract the cohort
500                     // actor path from the ReadyTransactionReply. For the recorded operations, they
501                     // don't return any data so we're only interested that they completed
502                     // successfully. We could be paranoid and verify the correct reply types but
503                     // that really should never happen so it's not worth the overhead of
504                     // de-serializing each reply.
505
506                     // Note the Future get call here won't block as it's complete.
507                     Object serializedReadyReply = replyFuture.value().get().get();
508                     if(serializedReadyReply.getClass().equals(
509                                                      ReadyTransactionReply.SERIALIZABLE_CLASS)) {
510                         ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
511                                 actorContext.getActorSystem(), serializedReadyReply);
512
513                         String resolvedCohortPath = getResolvedCohortPath(
514                                 reply.getCohortPath().toString());
515
516                         LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
517                                 identifier, resolvedCohortPath);
518
519                         return actorContext.actorFor(resolvedCohortPath);
520                     } else {
521                         // Throwing an exception here will fail the Future.
522
523                         throw new IllegalArgumentException(String.format("Invalid reply type {}",
524                                 serializedReadyReply.getClass()));
525                     }
526                 }
527             }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
528         }
529
530         @Override
531         public void deleteData(YangInstanceIdentifier path) {
532             LOG.debug("Tx {} deleteData called path = {}", identifier, path);
533             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
534                     new DeleteData(path).toSerializable() ));
535         }
536
537         @Override
538         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
539             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
540             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
541                     new MergeData(path, data, schemaContext).toSerializable()));
542         }
543
544         @Override
545         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
546             LOG.debug("Tx {} writeData called path = {}", identifier, path);
547             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
548                     new WriteData(path, data, schemaContext).toSerializable()));
549         }
550
551         @Override
552         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
553                 final YangInstanceIdentifier path) {
554
555             LOG.debug("Tx {} readData called path = {}", identifier, path);
556
557             final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
558
559             // If there were any previous recorded put/merge/delete operation reply Futures then we
560             // must wait for them to successfully complete. This is necessary to honor the read
561             // uncommitted semantics of the public API contract. If any one fails then fail the read.
562
563             if(recordedOperationFutures.isEmpty()) {
564                 finishReadData(path, returnFuture);
565             } else {
566                 LOG.debug("Tx {} readData: verifying {} previous recorded operations",
567                         identifier, recordedOperationFutures.size());
568
569                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
570                 // Futures#sequence accesses the passed List on a different thread, as
571                 // recordedOperationFutures is not synchronized.
572
573                 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
574                         Lists.newArrayList(recordedOperationFutures),
575                         actorContext.getActorSystem().dispatcher());
576                 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
577                     @Override
578                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
579                             throws Throwable {
580                         if(failure != null) {
581                             LOG.debug("Tx {} readData: a recorded operation failed: {}",
582                                     identifier, failure);
583
584                             returnFuture.setException(new ReadFailedException(
585                                     "The read could not be performed because a previous put, merge,"
586                                     + "or delete operation failed", failure));
587                         } else {
588                             finishReadData(path, returnFuture);
589                         }
590                     }
591                 };
592
593                 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
594             }
595
596             return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
597         }
598
599         private void finishReadData(final YangInstanceIdentifier path,
600                 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
601
602             LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
603
604             OnComplete<Object> onComplete = new OnComplete<Object>() {
605                 @Override
606                 public void onComplete(Throwable failure, Object readResponse) throws Throwable {
607                     if(failure != null) {
608                         LOG.debug("Tx {} read operation failed: {}", identifier, failure);
609
610                         returnFuture.setException(new ReadFailedException(
611                                 "Error reading data for path " + path, failure));
612
613                     } else {
614                         LOG.debug("Tx {} read operation succeeded", identifier, failure);
615
616                         if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
617                             ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
618                                     path, readResponse);
619                             if (reply.getNormalizedNode() == null) {
620                                 returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
621                             } else {
622                                 returnFuture.set(Optional.<NormalizedNode<?, ?>>of(
623                                         reply.getNormalizedNode()));
624                             }
625                         } else {
626                             returnFuture.setException(new ReadFailedException(
627                                     "Invalid response reading data for path " + path));
628                         }
629                     }
630                 }
631             };
632
633             Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
634                     new ReadData(path).toSerializable());
635             readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
636         }
637
638         @Override
639         public CheckedFuture<Boolean, ReadFailedException> dataExists(
640                 final YangInstanceIdentifier path) {
641
642             LOG.debug("Tx {} dataExists called path = {}", identifier, path);
643
644             final SettableFuture<Boolean> returnFuture = SettableFuture.create();
645
646             // If there were any previous recorded put/merge/delete operation reply Futures then we
647             // must wait for them to successfully complete. This is necessary to honor the read
648             // uncommitted semantics of the public API contract. If any one fails then fail this
649             // request.
650
651             if(recordedOperationFutures.isEmpty()) {
652                 finishDataExists(path, returnFuture);
653             } else {
654                 LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
655                         identifier, recordedOperationFutures.size());
656
657                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
658                 // Futures#sequence accesses the passed List on a different thread, as
659                 // recordedOperationFutures is not synchronized.
660
661                 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
662                         Lists.newArrayList(recordedOperationFutures),
663                         actorContext.getActorSystem().dispatcher());
664                 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
665                     @Override
666                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
667                             throws Throwable {
668                         if(failure != null) {
669                             LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
670                                     identifier, failure);
671
672                             returnFuture.setException(new ReadFailedException(
673                                     "The data exists could not be performed because a previous "
674                                     + "put, merge, or delete operation failed", failure));
675                         } else {
676                             finishDataExists(path, returnFuture);
677                         }
678                     }
679                 };
680
681                 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
682             }
683
684             return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
685         }
686
687         private void finishDataExists(final YangInstanceIdentifier path,
688                 final SettableFuture<Boolean> returnFuture) {
689
690             LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
691
692             OnComplete<Object> onComplete = new OnComplete<Object>() {
693                 @Override
694                 public void onComplete(Throwable failure, Object response) throws Throwable {
695                     if(failure != null) {
696                         LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
697
698                         returnFuture.setException(new ReadFailedException(
699                                 "Error checking data exists for path " + path, failure));
700                     } else {
701                         LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
702
703                         if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
704                             returnFuture.set(Boolean.valueOf(DataExistsReply.
705                                         fromSerializable(response).exists()));
706                         } else {
707                             returnFuture.setException(new ReadFailedException(
708                                     "Invalid response checking exists for path " + path));
709                         }
710                     }
711                 }
712             };
713
714             Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
715                     new DataExists(path).toSerializable());
716             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
717         }
718     }
719
720     private static class NoOpTransactionContext extends AbstractTransactionContext {
721
722         private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
723
724         private final Exception failure;
725
726         public NoOpTransactionContext(String shardName, Exception failure,
727                 TransactionIdentifier identifier){
728             super(shardName, identifier);
729             this.failure = failure;
730         }
731
732         @Override
733         public void closeTransaction() {
734             LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
735         }
736
737         @Override
738         public Future<ActorPath> readyTransaction() {
739             LOG.debug("Tx {} readyTransaction called", identifier);
740             return akka.dispatch.Futures.failed(failure);
741         }
742
743         @Override
744         public void deleteData(YangInstanceIdentifier path) {
745             LOG.debug("Tx {} deleteData called path = {}", identifier, path);
746         }
747
748         @Override
749         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
750             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
751         }
752
753         @Override
754         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
755             LOG.debug("Tx {} writeData called path = {}", identifier, path);
756         }
757
758         @Override
759         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
760             YangInstanceIdentifier path) {
761             LOG.debug("Tx {} readData called path = {}", identifier, path);
762             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
763                     "Error reading data for path " + path, failure));
764         }
765
766         @Override
767         public CheckedFuture<Boolean, ReadFailedException> dataExists(
768             YangInstanceIdentifier path) {
769             LOG.debug("Tx {} dataExists called path = {}", identifier, path);
770             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
771                     "Error checking exists for path " + path, failure));
772         }
773     }
774 }