BUG 2296 : TransactionProxy should support the ability to accept a local TPC actor...
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Mapper;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.FinalizablePhantomReference;
16 import com.google.common.base.FinalizableReferenceQueue;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Preconditions;
19 import com.google.common.collect.Lists;
20 import com.google.common.util.concurrent.CheckedFuture;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
25 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
32 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
38 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
41 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
42 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
43 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49 import scala.concurrent.Future;
50 import scala.concurrent.Promise;
51 import scala.concurrent.duration.FiniteDuration;
52
53 import javax.annotation.concurrent.GuardedBy;
54 import java.util.HashMap;
55 import java.util.List;
56 import java.util.Map;
57 import java.util.concurrent.ConcurrentHashMap;
58 import java.util.concurrent.TimeUnit;
59 import java.util.concurrent.atomic.AtomicBoolean;
60 import java.util.concurrent.atomic.AtomicLong;
61
62 /**
63  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
64  * <p>
65  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
66  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
67  * be created on each of those shards by the TransactionProxy
68  *</p>
69  * <p>
70  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
71  * shards will be executed.
72  * </p>
73  */
74 public class TransactionProxy implements DOMStoreReadWriteTransaction {
75
76     public static enum TransactionType {
77         READ_ONLY,
78         WRITE_ONLY,
79         READ_WRITE
80     }
81
82     static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
83                                                               new Mapper<Throwable, Throwable>() {
84         @Override
85         public Throwable apply(Throwable failure) {
86             return failure;
87         }
88     };
89
90     private static final AtomicLong counter = new AtomicLong();
91
92     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
93
94     /**
95      * Time interval in between transaction create retries.
96      */
97     private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
98             FiniteDuration.create(1, TimeUnit.SECONDS);
99
100     /**
101      * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
102      * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
103      * trickery to clean up its internal thread when the bundle is unloaded.
104      */
105     private static final FinalizableReferenceQueue phantomReferenceQueue =
106                                                                   new FinalizableReferenceQueue();
107
108     /**
109      * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
110      * necessary because PhantomReferences need a hard reference so they're not garbage collected.
111      * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
112      * and thus becomes eligible for garbage collection.
113      */
114     private static final Map<TransactionProxyCleanupPhantomReference,
115                              TransactionProxyCleanupPhantomReference> phantomReferenceCache =
116                                                                         new ConcurrentHashMap<>();
117
118     /**
119      * A PhantomReference that closes remote transactions for a TransactionProxy when it's
120      * garbage collected. This is used for read-only transactions as they're not explicitly closed
121      * by clients. So the only way to detect that a transaction is no longer in use and it's safe
122      * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
123      * but TransactionProxy instances should generally be short-lived enough to avoid being moved
124      * to the old generation space and thus should be cleaned up in a timely manner as the GC
125      * runs on the young generation (eden, swap1...) space much more frequently.
126      */
127     private static class TransactionProxyCleanupPhantomReference
128                                            extends FinalizablePhantomReference<TransactionProxy> {
129
130         private final List<ActorSelection> remoteTransactionActors;
131         private final AtomicBoolean remoteTransactionActorsMB;
132         private final ActorContext actorContext;
133         private final TransactionIdentifier identifier;
134
135         protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
136             super(referent, phantomReferenceQueue);
137
138             // Note we need to cache the relevant fields from the TransactionProxy as we can't
139             // have a hard reference to the TransactionProxy instance itself.
140
141             remoteTransactionActors = referent.remoteTransactionActors;
142             remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
143             actorContext = referent.actorContext;
144             identifier = referent.identifier;
145         }
146
147         @Override
148         public void finalizeReferent() {
149             LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
150                     remoteTransactionActors.size(), identifier);
151
152             phantomReferenceCache.remove(this);
153
154             // Access the memory barrier volatile to ensure all previous updates to the
155             // remoteTransactionActors list are visible to this thread.
156
157             if(remoteTransactionActorsMB.get()) {
158                 for(ActorSelection actor : remoteTransactionActors) {
159                     LOG.trace("Sending CloseTransaction to {}", actor);
160                     actorContext.sendOperationAsync(actor,
161                             new CloseTransaction().toSerializable());
162                 }
163             }
164         }
165     }
166
167     /**
168      * Stores the remote Tx actors for each requested data store path to be used by the
169      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
170      * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
171      * remoteTransactionActors list so they will be visible to the thread accessing the
172      * PhantomReference.
173      */
174     private List<ActorSelection> remoteTransactionActors;
175     private AtomicBoolean remoteTransactionActorsMB;
176
177     /**
178      * Stores the create transaction results per shard.
179      */
180     private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
181
182     private final TransactionType transactionType;
183     private final ActorContext actorContext;
184     private final TransactionIdentifier identifier;
185     private final TransactionChainProxy transactionChainProxy;
186     private final SchemaContext schemaContext;
187     private boolean inReadyState;
188
189     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
190         this(actorContext, transactionType, null);
191     }
192
193     public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
194             TransactionChainProxy transactionChainProxy) {
195         this.actorContext = Preconditions.checkNotNull(actorContext,
196             "actorContext should not be null");
197         this.transactionType = Preconditions.checkNotNull(transactionType,
198             "transactionType should not be null");
199         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
200             "schemaContext should not be null");
201         this.transactionChainProxy = transactionChainProxy;
202
203         String memberName = actorContext.getCurrentMemberName();
204         if(memberName == null){
205             memberName = "UNKNOWN-MEMBER";
206         }
207
208         this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
209             counter.getAndIncrement()).build();
210
211         if(transactionType == TransactionType.READ_ONLY) {
212             // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
213             // to close the remote Tx's when this instance is no longer in use and is garbage
214             // collected.
215
216             remoteTransactionActors = Lists.newArrayList();
217             remoteTransactionActorsMB = new AtomicBoolean();
218
219             TransactionProxyCleanupPhantomReference cleanup =
220                 new TransactionProxyCleanupPhantomReference(this);
221             phantomReferenceCache.put(cleanup, cleanup);
222         }
223
224         LOG.debug("Created txn {} of type {}", identifier, transactionType);
225     }
226
227     @VisibleForTesting
228     List<Future<Object>> getRecordedOperationFutures() {
229         List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
230         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
231             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
232             if(transactionContext != null) {
233                 recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
234             }
235         }
236
237         return recordedOperationFutures;
238     }
239
240     @Override
241     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
242             final YangInstanceIdentifier path) {
243
244         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
245                 "Read operation on write-only transaction is not allowed");
246
247         LOG.debug("Tx {} read {}", identifier, path);
248
249         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
250         TransactionContext transactionContext = txFutureCallback.getTransactionContext();
251
252         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future;
253         if(transactionContext != null) {
254             future = transactionContext.readData(path);
255         } else {
256             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
257             // callback to be executed after the Tx is created.
258             final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
259             txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
260                 @Override
261                 public void invoke(TransactionContext transactionContext) {
262                     Futures.addCallback(transactionContext.readData(path),
263                         new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
264                             @Override
265                             public void onSuccess(Optional<NormalizedNode<?, ?>> data) {
266                                 proxyFuture.set(data);
267                             }
268
269                             @Override
270                             public void onFailure(Throwable t) {
271                                 proxyFuture.setException(t);
272                             }
273                         });
274                 }
275             });
276
277             future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
278         }
279
280         return future;
281     }
282
283     @Override
284     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
285
286         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
287                 "Exists operation on write-only transaction is not allowed");
288
289         LOG.debug("Tx {} exists {}", identifier, path);
290
291         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
292         TransactionContext transactionContext = txFutureCallback.getTransactionContext();
293
294         CheckedFuture<Boolean, ReadFailedException> future;
295         if(transactionContext != null) {
296             future = transactionContext.dataExists(path);
297         } else {
298             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
299             // callback to be executed after the Tx is created.
300             final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
301             txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
302                 @Override
303                 public void invoke(TransactionContext transactionContext) {
304                     Futures.addCallback(transactionContext.dataExists(path),
305                         new FutureCallback<Boolean>() {
306                             @Override
307                             public void onSuccess(Boolean exists) {
308                                 proxyFuture.set(exists);
309                             }
310
311                             @Override
312                             public void onFailure(Throwable t) {
313                                 proxyFuture.setException(t);
314                             }
315                         });
316                 }
317             });
318
319             future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
320         }
321
322         return future;
323     }
324
325     private void checkModificationState() {
326         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
327                 "Modification operation on read-only transaction is not allowed");
328         Preconditions.checkState(!inReadyState,
329                 "Transaction is sealed - further modifications are not allowed");
330     }
331
332     @Override
333     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
334
335         checkModificationState();
336
337         LOG.debug("Tx {} write {}", identifier, path);
338
339         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
340         TransactionContext transactionContext = txFutureCallback.getTransactionContext();
341         if(transactionContext != null) {
342             transactionContext.writeData(path, data);
343         } else {
344             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
345             // callback to be executed after the Tx is created.
346             txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
347                 @Override
348                 public void invoke(TransactionContext transactionContext) {
349                     transactionContext.writeData(path, data);
350                 }
351             });
352         }
353     }
354
355     @Override
356     public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
357
358         checkModificationState();
359
360         LOG.debug("Tx {} merge {}", identifier, path);
361
362         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
363         TransactionContext transactionContext = txFutureCallback.getTransactionContext();
364         if(transactionContext != null) {
365             transactionContext.mergeData(path, data);
366         } else {
367             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
368             // callback to be executed after the Tx is created.
369             txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
370                 @Override
371                 public void invoke(TransactionContext transactionContext) {
372                     transactionContext.mergeData(path, data);
373                 }
374             });
375         }
376     }
377
378     @Override
379     public void delete(final YangInstanceIdentifier path) {
380
381         checkModificationState();
382
383         LOG.debug("Tx {} delete {}", identifier, path);
384
385         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
386         TransactionContext transactionContext = txFutureCallback.getTransactionContext();
387         if(transactionContext != null) {
388             transactionContext.deleteData(path);
389         } else {
390             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
391             // callback to be executed after the Tx is created.
392             txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
393                 @Override
394                 public void invoke(TransactionContext transactionContext) {
395                     transactionContext.deleteData(path);
396                 }
397             });
398         }
399     }
400
401     @Override
402     public DOMStoreThreePhaseCommitCohort ready() {
403
404         checkModificationState();
405
406         inReadyState = true;
407
408         LOG.debug("Tx {} Readying {} transactions for commit", identifier,
409                     txFutureCallbackMap.size());
410
411         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
412
413         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
414
415             LOG.debug("Tx {} Readying transaction for shard {}", identifier,
416                         txFutureCallback.getShardName());
417
418             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
419             if(transactionContext != null) {
420                 cohortFutures.add(transactionContext.readyTransaction());
421             } else {
422                 // The shard Tx hasn't been created yet so create a promise to ready the Tx later
423                 // after it's created.
424                 final Promise<ActorSelection> cohortPromise = akka.dispatch.Futures.promise();
425                 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
426                     @Override
427                     public void invoke(TransactionContext transactionContext) {
428                         cohortPromise.completeWith(transactionContext.readyTransaction());
429                     }
430                 });
431
432                 cohortFutures.add(cohortPromise.future());
433             }
434         }
435
436         if(transactionChainProxy != null){
437             transactionChainProxy.onTransactionReady(cohortFutures);
438         }
439
440         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
441                 identifier.toString());
442     }
443
444     @Override
445     public Object getIdentifier() {
446         return this.identifier;
447     }
448
449     @Override
450     public void close() {
451         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
452             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
453             if(transactionContext != null) {
454                 transactionContext.closeTransaction();
455             } else {
456                 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
457                     @Override
458                     public void invoke(TransactionContext transactionContext) {
459                         transactionContext.closeTransaction();
460                     }
461                 });
462             }
463         }
464
465         txFutureCallbackMap.clear();
466
467         if(transactionType == TransactionType.READ_ONLY) {
468             remoteTransactionActors.clear();
469             remoteTransactionActorsMB.set(true);
470         }
471     }
472
473     private String shardNameFromIdentifier(YangInstanceIdentifier path){
474         return ShardStrategyFactory.getStrategy(path).findShard(path);
475     }
476
477     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
478         String shardName = shardNameFromIdentifier(path);
479         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
480         if(txFutureCallback == null) {
481             Future<ActorSelection> findPrimaryFuture = actorContext.findPrimaryShardAsync(shardName);
482
483             final TransactionFutureCallback newTxFutureCallback =
484                     new TransactionFutureCallback(shardName);
485
486             txFutureCallback = newTxFutureCallback;
487             txFutureCallbackMap.put(shardName, txFutureCallback);
488
489             findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
490                 @Override
491                 public void onComplete(Throwable failure, ActorSelection primaryShard) {
492                     if(failure != null) {
493                         newTxFutureCallback.onComplete(failure, null);
494                     } else {
495                         newTxFutureCallback.setPrimaryShard(primaryShard);
496                     }
497                 }
498             }, actorContext.getActorSystem().dispatcher());
499         }
500
501         return txFutureCallback;
502     }
503
504     public String getTransactionChainId() {
505         if(transactionChainProxy == null){
506             return "";
507         }
508         return transactionChainProxy.getTransactionChainId();
509     }
510
511     /**
512      * Interface for a transaction operation to be invoked later.
513      */
514     private static interface TransactionOperation {
515         void invoke(TransactionContext transactionContext);
516     }
517
518     /**
519      * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
520      * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
521      * retry task after a short delay.
522      * <p>
523      * The end result from a completed CreateTransaction message is a TransactionContext that is
524      * used to perform transaction operations. Transaction operations that occur before the
525      * CreateTransaction completes are cache and executed once the CreateTransaction completes,
526      * successfully or not.
527      */
528     private class TransactionFutureCallback extends OnComplete<Object> {
529
530         /**
531          * The list of transaction operations to execute once the CreateTransaction completes.
532          */
533         @GuardedBy("txOperationsOnComplete")
534         private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
535
536         /**
537          * The TransactionContext resulting from the CreateTransaction reply.
538          */
539         private volatile TransactionContext transactionContext;
540
541         /**
542          * The target primary shard.
543          */
544         private volatile ActorSelection primaryShard;
545
546         private volatile int createTxTries = (int) (actorContext.getDatastoreContext().
547                 getShardLeaderElectionTimeout().duration().toMillis() /
548                 CREATE_TX_TRY_INTERVAL.toMillis());
549
550         private final String shardName;
551
552         TransactionFutureCallback(String shardName) {
553             this.shardName = shardName;
554         }
555
556         String getShardName() {
557             return shardName;
558         }
559
560         TransactionContext getTransactionContext() {
561             return transactionContext;
562         }
563
564
565         /**
566          * Sets the target primary shard and initiates a CreateTransaction try.
567          */
568         void setPrimaryShard(ActorSelection primaryShard) {
569             LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
570
571             this.primaryShard = primaryShard;
572             tryCreateTransaction();
573         }
574
575         /**
576          * Adds a TransactionOperation to be executed after the CreateTransaction completes.
577          */
578         void addTxOperationOnComplete(TransactionOperation operation) {
579             synchronized(txOperationsOnComplete) {
580                 if(transactionContext == null) {
581                     LOG.debug("Tx {} Adding operation on complete {}", identifier);
582
583                     txOperationsOnComplete.add(operation);
584                 } else {
585                     operation.invoke(transactionContext);
586                 }
587             }
588         }
589
590         /**
591          * Performs a CreateTransaction try async.
592          */
593         private void tryCreateTransaction() {
594             Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard,
595                     new CreateTransaction(identifier.toString(),
596                             TransactionProxy.this.transactionType.ordinal(),
597                             getTransactionChainId()).toSerializable());
598
599             createTxFuture.onComplete(this, actorContext.getActorSystem().dispatcher());
600         }
601
602         @Override
603         public void onComplete(Throwable failure, Object response) {
604             if(failure instanceof NoShardLeaderException) {
605                 // There's no leader for the shard yet - schedule and try again, unless we're out
606                 // of retries. Note: createTxTries is volatile as it may be written by different
607                 // threads however not concurrently, therefore decrementing it non-atomically here
608                 // is ok.
609                 if(--createTxTries > 0) {
610                     LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
611                             identifier, shardName);
612
613                     actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
614                             new Runnable() {
615                                 @Override
616                                 public void run() {
617                                     tryCreateTransaction();
618                                 }
619                             }, actorContext.getActorSystem().dispatcher());
620                     return;
621                 }
622             }
623
624             // Create the TransactionContext from the response or failure and execute delayed
625             // TransactionOperations. This entire section is done atomically (ie synchronized) with
626             // respect to #addTxOperationOnComplete to handle timing issues and ensure no
627             // TransactionOperation is missed and that they are processed in the order they occurred.
628             synchronized(txOperationsOnComplete) {
629                 if(failure != null) {
630                     LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
631                             failure.getMessage());
632
633                     transactionContext = new NoOpTransactionContext(failure, identifier);
634                 } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
635                     createValidTransactionContext(CreateTransactionReply.fromSerializable(response));
636                 } else {
637                     IllegalArgumentException exception = new IllegalArgumentException(String.format(
638                         "Invalid reply type %s for CreateTransaction", response.getClass()));
639
640                     transactionContext = new NoOpTransactionContext(exception, identifier);
641                 }
642
643                 for(TransactionOperation oper: txOperationsOnComplete) {
644                     oper.invoke(transactionContext);
645                 }
646
647                 txOperationsOnComplete.clear();
648             }
649         }
650
651         private void createValidTransactionContext(CreateTransactionReply reply) {
652             String transactionPath = reply.getTransactionPath();
653
654             LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath);
655
656             ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
657
658             if (transactionType == TransactionType.READ_ONLY) {
659                 // Add the actor to the remoteTransactionActors list for access by the
660                 // cleanup PhantonReference.
661                 remoteTransactionActors.add(transactionActor);
662
663                 // Write to the memory barrier volatile to publish the above update to the
664                 // remoteTransactionActors list for thread visibility.
665                 remoteTransactionActorsMB.set(true);
666             }
667
668             // TxActor is always created where the leader of the shard is.
669             // Check if TxActor is created in the same node
670             boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
671
672             transactionContext = new TransactionContextImpl(transactionPath, transactionActor, identifier,
673                 actorContext, schemaContext, isTxActorLocal, reply.getVersion());
674         }
675     }
676
677     private interface TransactionContext {
678         void closeTransaction();
679
680         Future<ActorSelection> readyTransaction();
681
682         void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
683
684         void deleteData(YangInstanceIdentifier path);
685
686         void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
687
688         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
689                 final YangInstanceIdentifier path);
690
691         CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
692
693         List<Future<Object>> getRecordedOperationFutures();
694     }
695
696     private static abstract class AbstractTransactionContext implements TransactionContext {
697
698         protected final TransactionIdentifier identifier;
699         protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
700
701         AbstractTransactionContext(TransactionIdentifier identifier) {
702             this.identifier = identifier;
703         }
704
705         @Override
706         public List<Future<Object>> getRecordedOperationFutures() {
707             return recordedOperationFutures;
708         }
709     }
710
711     private static class TransactionContextImpl extends AbstractTransactionContext {
712         private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
713
714         private final ActorContext actorContext;
715         private final SchemaContext schemaContext;
716         private final String transactionPath;
717         private final ActorSelection actor;
718         private final boolean isTxActorLocal;
719         private final int remoteTransactionVersion;
720
721         private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
722                 ActorContext actorContext, SchemaContext schemaContext,
723                 boolean isTxActorLocal, int remoteTransactionVersion) {
724             super(identifier);
725             this.transactionPath = transactionPath;
726             this.actor = actor;
727             this.actorContext = actorContext;
728             this.schemaContext = schemaContext;
729             this.isTxActorLocal = isTxActorLocal;
730             this.remoteTransactionVersion = remoteTransactionVersion;
731         }
732
733         private ActorSelection getActor() {
734             return actor;
735         }
736
737         @Override
738         public void closeTransaction() {
739             LOG.debug("Tx {} closeTransaction called", identifier);
740
741             actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
742         }
743
744         @Override
745         public Future<ActorSelection> readyTransaction() {
746             LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
747                     identifier, recordedOperationFutures.size());
748
749             // Send the ReadyTransaction message to the Tx actor.
750
751             ReadyTransaction readyTransaction = new ReadyTransaction();
752             final Future<Object> replyFuture = actorContext.executeOperationAsync(getActor(),
753                 isTxActorLocal ? readyTransaction : readyTransaction.toSerializable());
754
755             // Combine all the previously recorded put/merge/delete operation reply Futures and the
756             // ReadyTransactionReply Future into one Future. If any one fails then the combined
757             // Future will fail. We need all prior operations and the ready operation to succeed
758             // in order to attempt commit.
759
760             List<Future<Object>> futureList =
761                     Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
762             futureList.addAll(recordedOperationFutures);
763             futureList.add(replyFuture);
764
765             Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
766                     actorContext.getActorSystem().dispatcher());
767
768             // Transform the combined Future into a Future that returns the cohort actor path from
769             // the ReadyTransactionReply. That's the end result of the ready operation.
770
771             return combinedFutures.transform(new Mapper<Iterable<Object>, ActorSelection>() {
772                 @Override
773                 public ActorSelection checkedApply(Iterable<Object> notUsed) {
774                     LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
775                             identifier);
776
777                     // At this point all the Futures succeeded and we need to extract the cohort
778                     // actor path from the ReadyTransactionReply. For the recorded operations, they
779                     // don't return any data so we're only interested that they completed
780                     // successfully. We could be paranoid and verify the correct reply types but
781                     // that really should never happen so it's not worth the overhead of
782                     // de-serializing each reply.
783
784                     // Note the Future get call here won't block as it's complete.
785                     Object serializedReadyReply = replyFuture.value().get().get();
786                     if (serializedReadyReply instanceof ReadyTransactionReply) {
787                         return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
788
789                     } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
790                         ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
791                         String cohortPath = reply.getCohortPath();
792
793                         // In Helium we used to return the local path of the actor which represented
794                         // a remote ThreePhaseCommitCohort. The local path would then be converted to
795                         // a remote path using this resolvePath method. To maintain compatibility with
796                         // a Helium node we need to continue to do this conversion.
797                         // At some point in the future when upgrades from Helium are not supported
798                         // we could remove this code to resolvePath and just use the cohortPath as the
799                         // resolved cohortPath
800                         if(TransactionContextImpl.this.remoteTransactionVersion < CreateTransaction.HELIUM_1_VERSION) {
801                             cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
802                         }
803
804                         return actorContext.actorSelection(cohortPath);
805
806                     } else {
807                         // Throwing an exception here will fail the Future.
808                         throw new IllegalArgumentException(String.format("Invalid reply type {}",
809                                 serializedReadyReply.getClass()));
810                     }
811                 }
812             }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
813         }
814
815         @Override
816         public void deleteData(YangInstanceIdentifier path) {
817             LOG.debug("Tx {} deleteData called path = {}", identifier, path);
818
819             DeleteData deleteData = new DeleteData(path);
820             recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
821                 isTxActorLocal ? deleteData : deleteData.toSerializable()));
822         }
823
824         @Override
825         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
826             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
827
828             MergeData mergeData = new MergeData(path, data, schemaContext);
829             recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
830                 isTxActorLocal ? mergeData : mergeData.toSerializable()));
831         }
832
833         @Override
834         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
835             LOG.debug("Tx {} writeData called path = {}", identifier, path);
836
837             WriteData writeData = new WriteData(path, data, schemaContext);
838             recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
839                 isTxActorLocal ? writeData : writeData.toSerializable()));
840         }
841
842         @Override
843         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
844                 final YangInstanceIdentifier path) {
845
846             LOG.debug("Tx {} readData called path = {}", identifier, path);
847
848             final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
849
850             // If there were any previous recorded put/merge/delete operation reply Futures then we
851             // must wait for them to successfully complete. This is necessary to honor the read
852             // uncommitted semantics of the public API contract. If any one fails then fail the read.
853
854             if(recordedOperationFutures.isEmpty()) {
855                 finishReadData(path, returnFuture);
856             } else {
857                 LOG.debug("Tx {} readData: verifying {} previous recorded operations",
858                         identifier, recordedOperationFutures.size());
859
860                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
861                 // Futures#sequence accesses the passed List on a different thread, as
862                 // recordedOperationFutures is not synchronized.
863
864                 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
865                         Lists.newArrayList(recordedOperationFutures),
866                         actorContext.getActorSystem().dispatcher());
867
868                 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
869                     @Override
870                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
871                             throws Throwable {
872                         if(failure != null) {
873                             LOG.debug("Tx {} readData: a recorded operation failed: {}",
874                                     identifier, failure);
875                             returnFuture.setException(new ReadFailedException(
876                                     "The read could not be performed because a previous put, merge,"
877                                     + "or delete operation failed", failure));
878                         } else {
879                             finishReadData(path, returnFuture);
880                         }
881                     }
882                 };
883
884                 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
885             }
886
887             return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
888         }
889
890         private void finishReadData(final YangInstanceIdentifier path,
891                 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
892
893             LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
894
895             OnComplete<Object> onComplete = new OnComplete<Object>() {
896                 @Override
897                 public void onComplete(Throwable failure, Object readResponse) throws Throwable {
898                     if(failure != null) {
899                         LOG.debug("Tx {} read operation failed: {}", identifier, failure);
900                         returnFuture.setException(new ReadFailedException(
901                                 "Error reading data for path " + path, failure));
902
903                     } else {
904                         LOG.debug("Tx {} read operation succeeded", identifier, failure);
905
906                         if (readResponse instanceof ReadDataReply) {
907                             ReadDataReply reply = (ReadDataReply) readResponse;
908                             returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
909
910                         } else if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
911                             ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse);
912                             returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
913
914                         } else {
915                             returnFuture.setException(new ReadFailedException(
916                                 "Invalid response reading data for path " + path));
917                         }
918                     }
919                 }
920             };
921
922             ReadData readData = new ReadData(path);
923             Future<Object> readFuture = actorContext.executeOperationAsync(getActor(),
924                 isTxActorLocal ? readData : readData.toSerializable());
925
926             readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
927         }
928
929         @Override
930         public CheckedFuture<Boolean, ReadFailedException> dataExists(
931                 final YangInstanceIdentifier path) {
932
933             LOG.debug("Tx {} dataExists called path = {}", identifier, path);
934
935             final SettableFuture<Boolean> returnFuture = SettableFuture.create();
936
937             // If there were any previous recorded put/merge/delete operation reply Futures then we
938             // must wait for them to successfully complete. This is necessary to honor the read
939             // uncommitted semantics of the public API contract. If any one fails then fail this
940             // request.
941
942             if(recordedOperationFutures.isEmpty()) {
943                 finishDataExists(path, returnFuture);
944             } else {
945                 LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
946                         identifier, recordedOperationFutures.size());
947
948                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
949                 // Futures#sequence accesses the passed List on a different thread, as
950                 // recordedOperationFutures is not synchronized.
951
952                 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
953                         Lists.newArrayList(recordedOperationFutures),
954                         actorContext.getActorSystem().dispatcher());
955                 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
956                     @Override
957                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
958                             throws Throwable {
959                         if(failure != null) {
960                             LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
961                                     identifier, failure);
962                             returnFuture.setException(new ReadFailedException(
963                                     "The data exists could not be performed because a previous "
964                                     + "put, merge, or delete operation failed", failure));
965                         } else {
966                             finishDataExists(path, returnFuture);
967                         }
968                     }
969                 };
970
971                 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
972             }
973
974             return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
975         }
976
977         private void finishDataExists(final YangInstanceIdentifier path,
978                 final SettableFuture<Boolean> returnFuture) {
979
980             LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
981
982             OnComplete<Object> onComplete = new OnComplete<Object>() {
983                 @Override
984                 public void onComplete(Throwable failure, Object response) throws Throwable {
985                     if(failure != null) {
986                         LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
987                         returnFuture.setException(new ReadFailedException(
988                                 "Error checking data exists for path " + path, failure));
989                     } else {
990                         LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
991
992                         if (response instanceof DataExistsReply) {
993                             returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
994
995                         } else if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
996                             returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists()));
997
998                         } else {
999                             returnFuture.setException(new ReadFailedException(
1000                                     "Invalid response checking exists for path " + path));
1001                         }
1002                     }
1003                 }
1004             };
1005
1006             DataExists dataExists = new DataExists(path);
1007             Future<Object> future = actorContext.executeOperationAsync(getActor(),
1008                 isTxActorLocal ? dataExists : dataExists.toSerializable());
1009
1010             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
1011         }
1012     }
1013
1014     private static class NoOpTransactionContext extends AbstractTransactionContext {
1015
1016         private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
1017
1018         private final Throwable failure;
1019
1020         public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier){
1021             super(identifier);
1022             this.failure = failure;
1023         }
1024
1025         @Override
1026         public void closeTransaction() {
1027             LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
1028         }
1029
1030         @Override
1031         public Future<ActorSelection> readyTransaction() {
1032             LOG.debug("Tx {} readyTransaction called", identifier);
1033             return akka.dispatch.Futures.failed(failure);
1034         }
1035
1036         @Override
1037         public void deleteData(YangInstanceIdentifier path) {
1038             LOG.debug("Tx {} deleteData called path = {}", identifier, path);
1039         }
1040
1041         @Override
1042         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
1043             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
1044         }
1045
1046         @Override
1047         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
1048             LOG.debug("Tx {} writeData called path = {}", identifier, path);
1049         }
1050
1051         @Override
1052         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
1053                 YangInstanceIdentifier path) {
1054             LOG.debug("Tx {} readData called path = {}", identifier, path);
1055             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
1056                     "Error reading data for path " + path, failure));
1057         }
1058
1059         @Override
1060         public CheckedFuture<Boolean, ReadFailedException> dataExists(
1061                 YangInstanceIdentifier path) {
1062             LOG.debug("Tx {} dataExists called path = {}", identifier, path);
1063             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
1064                     "Error checking exists for path " + path, failure));
1065         }
1066     }
1067 }