BUG 2676 : Use custom client-dispatcher when configured
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionContextImpl.java
index ce2c99ef52b2f6d76ee005accc60f4a9475e3253..03d1b3a6d736541dca754e49fa6f35ea67b7ed2b 100644 (file)
@@ -12,7 +12,6 @@ import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.List;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
@@ -30,7 +29,6 @@ import org.opendaylight.controller.cluster.datastore.messages.VersionedSerializa
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -62,7 +60,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
     }
 
     private Future<Object> completeOperation(Future<Object> operationFuture){
-        operationFuture.onComplete(this.operationCompleter, actorContext.getActorSystem().dispatcher());
+        operationFuture.onComplete(this.operationCompleter, actorContext.getClientDispatcher());
         return operationFuture;
     }
 
@@ -107,7 +105,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
         futureList.add(replyFuture);
 
         Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
-                actorContext.getActorSystem().dispatcher());
+                actorContext.getClientDispatcher());
 
         // Transform the combined Future into a Future that returns the cohort actor path from
         // the ReadyTransactionReply. That's the end result of the ready operation.
@@ -154,7 +152,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
                             serializedReadyReply.getClass()));
                 }
             }
-        }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
+        }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
     }
 
     @Override
@@ -179,13 +177,11 @@ final class TransactionContextImpl extends AbstractTransactionContext {
     }
 
     @Override
-    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
-            final YangInstanceIdentifier path) {
+    public void readData(
+            final YangInstanceIdentifier path,final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture ) {
 
         LOG.debug("Tx {} readData called path = {}", identifier, path);
 
-        final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
-
         // If there were any previous recorded put/merge/delete operation reply Futures then we
         // must wait for them to successfully complete. This is necessary to honor the read
         // uncommitted semantics of the public API contract. If any one fails then fail the read.
@@ -202,7 +198,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
             Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
                     Lists.newArrayList(recordedOperationFutures),
-                    actorContext.getActorSystem().dispatcher());
+                    actorContext.getClientDispatcher());
 
             OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
                 @Override
@@ -220,10 +216,9 @@ final class TransactionContextImpl extends AbstractTransactionContext {
                 }
             };
 
-            combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+            combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher());
         }
 
-        return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
     }
 
     private void finishReadData(final YangInstanceIdentifier path,
@@ -260,17 +255,14 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
         Future<Object> readFuture = executeOperationAsync(new ReadData(path));
 
-        readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+        readFuture.onComplete(onComplete, actorContext.getClientDispatcher());
     }
 
     @Override
-    public CheckedFuture<Boolean, ReadFailedException> dataExists(
-            final YangInstanceIdentifier path) {
+    public void dataExists(final YangInstanceIdentifier path, final SettableFuture<Boolean> returnFuture) {
 
         LOG.debug("Tx {} dataExists called path = {}", identifier, path);
 
-        final SettableFuture<Boolean> returnFuture = SettableFuture.create();
-
         // If there were any previous recorded put/merge/delete operation reply Futures then we
         // must wait for them to successfully complete. This is necessary to honor the read
         // uncommitted semantics of the public API contract. If any one fails then fail this
@@ -288,7 +280,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
             Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
                     Lists.newArrayList(recordedOperationFutures),
-                    actorContext.getActorSystem().dispatcher());
+                    actorContext.getClientDispatcher());
             OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
                 @Override
                 public void onComplete(Throwable failure, Iterable<Object> notUsed)
@@ -305,10 +297,8 @@ final class TransactionContextImpl extends AbstractTransactionContext {
                 }
             };
 
-            combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+            combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher());
         }
-
-        return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
     }
 
     private void finishDataExists(final YangInstanceIdentifier path,
@@ -342,6 +332,6 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
         Future<Object> future = executeOperationAsync(new DataExists(path));
 
-        future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+        future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
-}
\ No newline at end of file
+}