BUG 1883 - Ensure that all debug logging is done conditionally
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
13 import akka.dispatch.OnComplete;
14
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.FinalizablePhantomReference;
17 import com.google.common.base.FinalizableReferenceQueue;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import com.google.common.collect.Lists;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24
25 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
32 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
38 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
41 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
42 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
43 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 import scala.Function1;
51 import scala.concurrent.Future;
52 import scala.runtime.AbstractFunction1;
53
54 import java.util.HashMap;
55 import java.util.List;
56 import java.util.Map;
57 import java.util.concurrent.ConcurrentHashMap;
58 import java.util.concurrent.atomic.AtomicBoolean;
59 import java.util.concurrent.atomic.AtomicLong;
60
61 /**
62  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
63  * <p>
64  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
65  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
66  * be created on each of those shards by the TransactionProxy
67  *</p>
68  * <p>
69  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
70  * shards will be executed.
71  * </p>
72  */
73 public class TransactionProxy implements DOMStoreReadWriteTransaction {
74
75     private final TransactionChainProxy transactionChainProxy;
76
77
78
79     public enum TransactionType {
80         READ_ONLY,
81         WRITE_ONLY,
82         READ_WRITE
83     }
84
85     static Function1<Throwable, Throwable> SAME_FAILURE_TRANSFORMER = new AbstractFunction1<
86                                                                           Throwable, Throwable>() {
87         @Override
88         public Throwable apply(Throwable failure) {
89             return failure;
90         }
91     };
92
93     private static final AtomicLong counter = new AtomicLong();
94
95     private static final Logger
96         LOG = LoggerFactory.getLogger(TransactionProxy.class);
97
98
99     /**
100      * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
101      * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
102      * trickery to clean up its internal thread when the bundle is unloaded.
103      */
104     private static final FinalizableReferenceQueue phantomReferenceQueue =
105                                                                   new FinalizableReferenceQueue();
106
107     /**
108      * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
109      * necessary because PhantomReferences need a hard reference so they're not garbage collected.
110      * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
111      * and thus becomes eligible for garbage collection.
112      */
113     private static final Map<TransactionProxyCleanupPhantomReference,
114                              TransactionProxyCleanupPhantomReference> phantomReferenceCache =
115                                                                         new ConcurrentHashMap<>();
116
117     /**
118      * A PhantomReference that closes remote transactions for a TransactionProxy when it's
119      * garbage collected. This is used for read-only transactions as they're not explicitly closed
120      * by clients. So the only way to detect that a transaction is no longer in use and it's safe
121      * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
122      * but TransactionProxy instances should generally be short-lived enough to avoid being moved
123      * to the old generation space and thus should be cleaned up in a timely manner as the GC
124      * runs on the young generation (eden, swap1...) space much more frequently.
125      */
126     private static class TransactionProxyCleanupPhantomReference
127                                            extends FinalizablePhantomReference<TransactionProxy> {
128
129         private final List<ActorSelection> remoteTransactionActors;
130         private final AtomicBoolean remoteTransactionActorsMB;
131         private final ActorContext actorContext;
132         private final TransactionIdentifier identifier;
133
134         protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
135             super(referent, phantomReferenceQueue);
136
137             // Note we need to cache the relevant fields from the TransactionProxy as we can't
138             // have a hard reference to the TransactionProxy instance itself.
139
140             remoteTransactionActors = referent.remoteTransactionActors;
141             remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
142             actorContext = referent.actorContext;
143             identifier = referent.identifier;
144         }
145
146         @Override
147         public void finalizeReferent() {
148             LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
149                     remoteTransactionActors.size(), identifier);
150
151             phantomReferenceCache.remove(this);
152
153             // Access the memory barrier volatile to ensure all previous updates to the
154             // remoteTransactionActors list are visible to this thread.
155
156             if(remoteTransactionActorsMB.get()) {
157                 for(ActorSelection actor : remoteTransactionActors) {
158                     LOG.trace("Sending CloseTransaction to {}", actor);
159                     actorContext.sendRemoteOperationAsync(actor,
160                             new CloseTransaction().toSerializable());
161                 }
162             }
163         }
164     }
165
166     /**
167      * Stores the remote Tx actors for each requested data store path to be used by the
168      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
169      * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
170      * remoteTransactionActors list so they will be visible to the thread accessing the
171      * PhantomReference.
172      */
173     private List<ActorSelection> remoteTransactionActors;
174     private AtomicBoolean remoteTransactionActorsMB;
175
176     private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
177
178     private final TransactionType transactionType;
179     private final ActorContext actorContext;
180     private final TransactionIdentifier identifier;
181     private final SchemaContext schemaContext;
182     private boolean inReadyState;
183
184     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
185         this(actorContext, transactionType, null);
186     }
187
188     @VisibleForTesting
189     List<Future<Object>> getRecordedOperationFutures() {
190         List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
191         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
192             recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
193         }
194
195         return recordedOperationFutures;
196     }
197
198     public TransactionProxy(ActorContext actorContext, TransactionType transactionType, TransactionChainProxy transactionChainProxy) {
199         this.actorContext = Preconditions.checkNotNull(actorContext,
200             "actorContext should not be null");
201         this.transactionType = Preconditions.checkNotNull(transactionType,
202             "transactionType should not be null");
203         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
204             "schemaContext should not be null");
205         this.transactionChainProxy = transactionChainProxy;
206
207         String memberName = actorContext.getCurrentMemberName();
208         if(memberName == null){
209             memberName = "UNKNOWN-MEMBER";
210         }
211
212         this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
213             counter.getAndIncrement()).build();
214
215         if(transactionType == TransactionType.READ_ONLY) {
216             // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
217             // to close the remote Tx's when this instance is no longer in use and is garbage
218             // collected.
219
220             remoteTransactionActors = Lists.newArrayList();
221             remoteTransactionActorsMB = new AtomicBoolean();
222
223             TransactionProxyCleanupPhantomReference cleanup =
224                 new TransactionProxyCleanupPhantomReference(this);
225             phantomReferenceCache.put(cleanup, cleanup);
226         }
227         if(LOG.isDebugEnabled()) {
228             LOG.debug("Created txn {} of type {}", identifier, transactionType);
229         }
230     }
231
232     @Override
233     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
234             final YangInstanceIdentifier path) {
235
236         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
237                 "Read operation on write-only transaction is not allowed");
238
239         if(LOG.isDebugEnabled()) {
240             LOG.debug("Tx {} read {}", identifier, path);
241         }
242         createTransactionIfMissing(actorContext, path);
243
244         return transactionContext(path).readData(path);
245     }
246
247     @Override
248     public CheckedFuture<Boolean, ReadFailedException> exists(YangInstanceIdentifier path) {
249
250         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
251                 "Exists operation on write-only transaction is not allowed");
252
253         if(LOG.isDebugEnabled()) {
254             LOG.debug("Tx {} exists {}", identifier, path);
255         }
256         createTransactionIfMissing(actorContext, path);
257
258         return transactionContext(path).dataExists(path);
259     }
260
261     private void checkModificationState() {
262         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
263                 "Modification operation on read-only transaction is not allowed");
264         Preconditions.checkState(!inReadyState,
265                 "Transaction is sealed - further modifications are not allowed");
266     }
267
268     @Override
269     public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
270
271         checkModificationState();
272
273         if(LOG.isDebugEnabled()) {
274             LOG.debug("Tx {} write {}", identifier, path);
275         }
276         createTransactionIfMissing(actorContext, path);
277
278         transactionContext(path).writeData(path, data);
279     }
280
281     @Override
282     public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
283
284         checkModificationState();
285
286         if(LOG.isDebugEnabled()) {
287             LOG.debug("Tx {} merge {}", identifier, path);
288         }
289         createTransactionIfMissing(actorContext, path);
290
291         transactionContext(path).mergeData(path, data);
292     }
293
294     @Override
295     public void delete(YangInstanceIdentifier path) {
296
297         checkModificationState();
298         if(LOG.isDebugEnabled()) {
299             LOG.debug("Tx {} delete {}", identifier, path);
300         }
301         createTransactionIfMissing(actorContext, path);
302
303         transactionContext(path).deleteData(path);
304     }
305
306     @Override
307     public DOMStoreThreePhaseCommitCohort ready() {
308
309         checkModificationState();
310
311         inReadyState = true;
312
313         if(LOG.isDebugEnabled()) {
314             LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
315                 remoteTransactionPaths.size());
316         }
317         List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
318
319         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
320
321             if(LOG.isDebugEnabled()) {
322                 LOG.debug("Tx {} Readying transaction for shard {}", identifier,
323                     transactionContext.getShardName());
324             }
325             cohortPathFutures.add(transactionContext.readyTransaction());
326         }
327
328         if(transactionChainProxy != null){
329             transactionChainProxy.onTransactionReady(cohortPathFutures);
330         }
331
332         return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
333                 identifier.toString());
334     }
335
336     @Override
337     public Object getIdentifier() {
338         return this.identifier;
339     }
340
341     @Override
342     public void close() {
343         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
344             transactionContext.closeTransaction();
345         }
346
347         remoteTransactionPaths.clear();
348
349         if(transactionType == TransactionType.READ_ONLY) {
350             remoteTransactionActors.clear();
351             remoteTransactionActorsMB.set(true);
352         }
353     }
354
355     private TransactionContext transactionContext(YangInstanceIdentifier path){
356         String shardName = shardNameFromIdentifier(path);
357         return remoteTransactionPaths.get(shardName);
358     }
359
360     private String shardNameFromIdentifier(YangInstanceIdentifier path){
361         return ShardStrategyFactory.getStrategy(path).findShard(path);
362     }
363
364     private void createTransactionIfMissing(ActorContext actorContext,
365         YangInstanceIdentifier path) {
366
367         if(transactionChainProxy != null){
368             transactionChainProxy.waitTillCurrentTransactionReady();
369         }
370
371         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
372
373         TransactionContext transactionContext =
374             remoteTransactionPaths.get(shardName);
375
376         if (transactionContext != null) {
377             // A transaction already exists with that shard
378             return;
379         }
380
381         try {
382             Object response = actorContext.executeShardOperation(shardName,
383                 new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
384                     getTransactionChainId()).toSerializable());
385             if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
386                 CreateTransactionReply reply =
387                     CreateTransactionReply.fromSerializable(response);
388
389                 String transactionPath = reply.getTransactionPath();
390
391                 if(LOG.isDebugEnabled()) {
392                     LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
393                 }
394                 ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
395
396                 if (transactionType == TransactionType.READ_ONLY) {
397                     // Add the actor to the remoteTransactionActors list for access by the
398                     // cleanup PhantonReference.
399                     remoteTransactionActors.add(transactionActor);
400
401                     // Write to the memory barrier volatile to publish the above update to the
402                     // remoteTransactionActors list for thread visibility.
403                     remoteTransactionActorsMB.set(true);
404                 }
405
406                 transactionContext = new TransactionContextImpl(shardName, transactionPath,
407                     transactionActor, identifier, actorContext, schemaContext);
408
409                 remoteTransactionPaths.put(shardName, transactionContext);
410             } else {
411                 throw new IllegalArgumentException(String.format(
412                     "Invalid reply type {} for CreateTransaction", response.getClass()));
413             }
414         } catch (Exception e) {
415             if(LOG.isDebugEnabled()) {
416                 LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
417             }
418             remoteTransactionPaths
419                 .put(shardName, new NoOpTransactionContext(shardName, e, identifier));
420         }
421     }
422
423     public String getTransactionChainId() {
424         if(transactionChainProxy == null){
425             return "";
426         }
427         return transactionChainProxy.getTransactionChainId();
428     }
429
430
431     private interface TransactionContext {
432         String getShardName();
433
434         void closeTransaction();
435
436         Future<ActorPath> readyTransaction();
437
438         void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
439
440         void deleteData(YangInstanceIdentifier path);
441
442         void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
443
444         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
445                 final YangInstanceIdentifier path);
446
447         CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
448
449         List<Future<Object>> getRecordedOperationFutures();
450     }
451
452     private static abstract class AbstractTransactionContext implements TransactionContext {
453
454         protected final TransactionIdentifier identifier;
455         protected final String shardName;
456         protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
457
458         AbstractTransactionContext(String shardName, TransactionIdentifier identifier) {
459             this.shardName = shardName;
460             this.identifier = identifier;
461         }
462
463         @Override
464         public String getShardName() {
465             return shardName;
466         }
467
468         @Override
469         public List<Future<Object>> getRecordedOperationFutures() {
470             return recordedOperationFutures;
471         }
472     }
473
474     private static class TransactionContextImpl extends AbstractTransactionContext {
475         private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
476
477         private final ActorContext actorContext;
478         private final SchemaContext schemaContext;
479         private final String actorPath;
480         private final ActorSelection actor;
481
482         private TransactionContextImpl(String shardName, String actorPath,
483                 ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext,
484                 SchemaContext schemaContext) {
485             super(shardName, identifier);
486             this.actorPath = actorPath;
487             this.actor = actor;
488             this.actorContext = actorContext;
489             this.schemaContext = schemaContext;
490         }
491
492         private ActorSelection getActor() {
493             return actor;
494         }
495
496         private String getResolvedCohortPath(String cohortPath) {
497             return actorContext.resolvePath(actorPath, cohortPath);
498         }
499
500         @Override
501         public void closeTransaction() {
502             if(LOG.isDebugEnabled()) {
503                 LOG.debug("Tx {} closeTransaction called", identifier);
504             }
505             actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
506         }
507
508         @Override
509         public Future<ActorPath> readyTransaction() {
510             if(LOG.isDebugEnabled()) {
511                 LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
512                     identifier, recordedOperationFutures.size());
513             }
514             // Send the ReadyTransaction message to the Tx actor.
515
516             final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
517                     new ReadyTransaction().toSerializable());
518
519             // Combine all the previously recorded put/merge/delete operation reply Futures and the
520             // ReadyTransactionReply Future into one Future. If any one fails then the combined
521             // Future will fail. We need all prior operations and the ready operation to succeed
522             // in order to attempt commit.
523
524             List<Future<Object>> futureList =
525                     Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
526             futureList.addAll(recordedOperationFutures);
527             futureList.add(replyFuture);
528
529             Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
530                     actorContext.getActorSystem().dispatcher());
531
532             // Transform the combined Future into a Future that returns the cohort actor path from
533             // the ReadyTransactionReply. That's the end result of the ready operation.
534
535             return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
536                 @Override
537                 public ActorPath apply(Iterable<Object> notUsed) {
538                     if(LOG.isDebugEnabled()) {
539                         LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
540                             identifier);
541                     }
542                     // At this point all the Futures succeeded and we need to extract the cohort
543                     // actor path from the ReadyTransactionReply. For the recorded operations, they
544                     // don't return any data so we're only interested that they completed
545                     // successfully. We could be paranoid and verify the correct reply types but
546                     // that really should never happen so it's not worth the overhead of
547                     // de-serializing each reply.
548
549                     // Note the Future get call here won't block as it's complete.
550                     Object serializedReadyReply = replyFuture.value().get().get();
551                     if(serializedReadyReply.getClass().equals(
552                                                      ReadyTransactionReply.SERIALIZABLE_CLASS)) {
553                         ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
554                                 actorContext.getActorSystem(), serializedReadyReply);
555
556                         String resolvedCohortPath = getResolvedCohortPath(
557                                 reply.getCohortPath().toString());
558
559                         if(LOG.isDebugEnabled()) {
560                             LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
561                                 identifier, resolvedCohortPath);
562                         }
563                         return actorContext.actorFor(resolvedCohortPath);
564                     } else {
565                         // Throwing an exception here will fail the Future.
566
567                         throw new IllegalArgumentException(String.format("Invalid reply type {}",
568                                 serializedReadyReply.getClass()));
569                     }
570                 }
571             }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
572         }
573
574         @Override
575         public void deleteData(YangInstanceIdentifier path) {
576             if(LOG.isDebugEnabled()) {
577                 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
578             }
579             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
580                     new DeleteData(path).toSerializable() ));
581         }
582
583         @Override
584         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
585             if(LOG.isDebugEnabled()) {
586                 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
587             }
588             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
589                     new MergeData(path, data, schemaContext).toSerializable()));
590         }
591
592         @Override
593         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
594             if(LOG.isDebugEnabled()) {
595                 LOG.debug("Tx {} writeData called path = {}", identifier, path);
596             }
597             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
598                     new WriteData(path, data, schemaContext).toSerializable()));
599         }
600
601         @Override
602         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
603                 final YangInstanceIdentifier path) {
604
605             if(LOG.isDebugEnabled()) {
606                 LOG.debug("Tx {} readData called path = {}", identifier, path);
607             }
608             final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
609
610             // If there were any previous recorded put/merge/delete operation reply Futures then we
611             // must wait for them to successfully complete. This is necessary to honor the read
612             // uncommitted semantics of the public API contract. If any one fails then fail the read.
613
614             if(recordedOperationFutures.isEmpty()) {
615                 finishReadData(path, returnFuture);
616             } else {
617                 if(LOG.isDebugEnabled()) {
618                     LOG.debug("Tx {} readData: verifying {} previous recorded operations",
619                         identifier, recordedOperationFutures.size());
620                 }
621                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
622                 // Futures#sequence accesses the passed List on a different thread, as
623                 // recordedOperationFutures is not synchronized.
624
625                 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
626                         Lists.newArrayList(recordedOperationFutures),
627                         actorContext.getActorSystem().dispatcher());
628                 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
629                     @Override
630                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
631                             throws Throwable {
632                         if(failure != null) {
633                             if(LOG.isDebugEnabled()) {
634                                 LOG.debug("Tx {} readData: a recorded operation failed: {}",
635                                     identifier, failure);
636                             }
637                             returnFuture.setException(new ReadFailedException(
638                                     "The read could not be performed because a previous put, merge,"
639                                     + "or delete operation failed", failure));
640                         } else {
641                             finishReadData(path, returnFuture);
642                         }
643                     }
644                 };
645
646                 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
647             }
648
649             return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
650         }
651
652         private void finishReadData(final YangInstanceIdentifier path,
653                 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
654
655             if(LOG.isDebugEnabled()) {
656                 LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
657             }
658             OnComplete<Object> onComplete = new OnComplete<Object>() {
659                 @Override
660                 public void onComplete(Throwable failure, Object readResponse) throws Throwable {
661                     if(failure != null) {
662                         if(LOG.isDebugEnabled()) {
663                             LOG.debug("Tx {} read operation failed: {}", identifier, failure);
664                         }
665                         returnFuture.setException(new ReadFailedException(
666                                 "Error reading data for path " + path, failure));
667
668                     } else {
669                         if(LOG.isDebugEnabled()) {
670                             LOG.debug("Tx {} read operation succeeded", identifier, failure);
671                         }
672                         if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
673                             ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
674                                     path, readResponse);
675                             if (reply.getNormalizedNode() == null) {
676                                 returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
677                             } else {
678                                 returnFuture.set(Optional.<NormalizedNode<?, ?>>of(
679                                         reply.getNormalizedNode()));
680                             }
681                         } else {
682                             returnFuture.setException(new ReadFailedException(
683                                     "Invalid response reading data for path " + path));
684                         }
685                     }
686                 }
687             };
688
689             Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
690                     new ReadData(path).toSerializable());
691             readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
692         }
693
694         @Override
695         public CheckedFuture<Boolean, ReadFailedException> dataExists(
696                 final YangInstanceIdentifier path) {
697
698             if(LOG.isDebugEnabled()) {
699                 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
700             }
701             final SettableFuture<Boolean> returnFuture = SettableFuture.create();
702
703             // If there were any previous recorded put/merge/delete operation reply Futures then we
704             // must wait for them to successfully complete. This is necessary to honor the read
705             // uncommitted semantics of the public API contract. If any one fails then fail this
706             // request.
707
708             if(recordedOperationFutures.isEmpty()) {
709                 finishDataExists(path, returnFuture);
710             } else {
711                 if(LOG.isDebugEnabled()) {
712                     LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
713                         identifier, recordedOperationFutures.size());
714                 }
715                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
716                 // Futures#sequence accesses the passed List on a different thread, as
717                 // recordedOperationFutures is not synchronized.
718
719                 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
720                         Lists.newArrayList(recordedOperationFutures),
721                         actorContext.getActorSystem().dispatcher());
722                 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
723                     @Override
724                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
725                             throws Throwable {
726                         if(failure != null) {
727                             if(LOG.isDebugEnabled()) {
728                                 LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
729                                     identifier, failure);
730                             }
731                             returnFuture.setException(new ReadFailedException(
732                                     "The data exists could not be performed because a previous "
733                                     + "put, merge, or delete operation failed", failure));
734                         } else {
735                             finishDataExists(path, returnFuture);
736                         }
737                     }
738                 };
739
740                 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
741             }
742
743             return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
744         }
745
746         private void finishDataExists(final YangInstanceIdentifier path,
747                 final SettableFuture<Boolean> returnFuture) {
748
749             if(LOG.isDebugEnabled()) {
750                 LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
751             }
752             OnComplete<Object> onComplete = new OnComplete<Object>() {
753                 @Override
754                 public void onComplete(Throwable failure, Object response) throws Throwable {
755                     if(failure != null) {
756                         if(LOG.isDebugEnabled()) {
757                             LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
758                         }
759                         returnFuture.setException(new ReadFailedException(
760                                 "Error checking data exists for path " + path, failure));
761                     } else {
762                         if(LOG.isDebugEnabled()) {
763                             LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
764                         }
765                         if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
766                             returnFuture.set(Boolean.valueOf(DataExistsReply.
767                                         fromSerializable(response).exists()));
768                         } else {
769                             returnFuture.setException(new ReadFailedException(
770                                     "Invalid response checking exists for path " + path));
771                         }
772                     }
773                 }
774             };
775
776             Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
777                     new DataExists(path).toSerializable());
778             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
779         }
780     }
781
782     private static class NoOpTransactionContext extends AbstractTransactionContext {
783
784         private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
785
786         private final Exception failure;
787
788         public NoOpTransactionContext(String shardName, Exception failure,
789                 TransactionIdentifier identifier){
790             super(shardName, identifier);
791             this.failure = failure;
792         }
793
794         @Override
795         public void closeTransaction() {
796             if(LOG.isDebugEnabled()) {
797                 LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
798             }
799         }
800
801         @Override
802         public Future<ActorPath> readyTransaction() {
803             if(LOG.isDebugEnabled()) {
804                 LOG.debug("Tx {} readyTransaction called", identifier);
805             }
806             return akka.dispatch.Futures.failed(failure);
807         }
808
809         @Override
810         public void deleteData(YangInstanceIdentifier path) {
811             if(LOG.isDebugEnabled()) {
812                 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
813             }
814         }
815
816         @Override
817         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
818             if(LOG.isDebugEnabled()) {
819                 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
820             }
821         }
822
823         @Override
824         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
825             if(LOG.isDebugEnabled()) {
826                 LOG.debug("Tx {} writeData called path = {}", identifier, path);
827             }
828         }
829
830         @Override
831         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
832             YangInstanceIdentifier path) {
833             if(LOG.isDebugEnabled()) {
834                 LOG.debug("Tx {} readData called path = {}", identifier, path);
835             }
836             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
837                     "Error reading data for path " + path, failure));
838         }
839
840         @Override
841         public CheckedFuture<Boolean, ReadFailedException> dataExists(
842             YangInstanceIdentifier path) {
843             if(LOG.isDebugEnabled()) {
844                 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
845             }
846             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
847                     "Error checking exists for path " + path, failure));
848         }
849     }
850 }