Refactor MessageCollectorActor and DoNothingActor
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
index 4301a72d180273d79e868b0c8de2acf19f916ff9..a5625637f4b5720d2d8084dcd415ab94b31a0c7a 100644 (file)
@@ -1,3 +1,11 @@
+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
@@ -19,6 +27,7 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.dispatch.Futures;
+import akka.util.Timeout;
 import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.CheckedFuture;
@@ -34,6 +43,7 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.InOrder;
 import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
@@ -47,8 +57,8 @@ import org.opendaylight.controller.cluster.datastore.modification.DeleteModifica
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
-import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
@@ -138,7 +148,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         }
 
         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
-                any(ActorSelection.class), any());
+                any(ActorSelection.class), any(), any(Timeout.class));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
@@ -216,7 +226,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
-            eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
+            eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
+            any(Timeout.class));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
@@ -334,7 +345,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
                 eq(getSystem().actorSelection(actorRef.path())),
-                eqCreateTransaction(memberName, READ_WRITE));
+                eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
 
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
@@ -799,7 +810,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     }
 
     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
-        throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
+        throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
+                mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
     }
 
     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
@@ -837,13 +849,10 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         }
 
         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
-        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
-                setTransactionId("txn-1").setTransactionActorPath(actorPath).
-                setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
 
-        doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
-                executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
-                        eqCreateTransaction(memberName, READ_WRITE));
+        doReturn(incompleteFuture()).when(mockActorContext).
+        executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
 
         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
 
@@ -890,7 +899,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
-                        eqCreateTransaction(memberName, READ_WRITE));
+                        eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
 
         doReturn(true).when(mockActorContext).isPathLocal(anyString());
 
@@ -902,7 +911,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         long end = System.nanoTime();
 
-        long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
+        long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis());
         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
                 expected, (end-start)), (end - start) <= expected);
     }
@@ -925,12 +934,12 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         long end = System.nanoTime();
 
-        long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
+        long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis());
         Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
                 expected, (end-start)), (end - start) <= expected);
     }
 
-    private Optional<DataTree> createDataTree(){
+    private static Optional<DataTree> createDataTree(){
         DataTree dataTree = mock(DataTree.class);
         Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
@@ -942,7 +951,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         return dataTreeOptional;
     }
 
-    private Optional<DataTree> createDataTree(NormalizedNode readResponse){
+    private static Optional<DataTree> createDataTree(NormalizedNode readResponse){
         DataTree dataTree = mock(DataTree.class);
         Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
@@ -1310,9 +1319,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
                 expectBatchedModifications(1);
 
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), any(ReadyTransaction.class));
-
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
                 transactionProxy.ready();
@@ -1341,7 +1347,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 // Now ready should block for both transaction contexts
                 transactionProxy.ready();
             }
-        }, 1, true, TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()) * 2);
+        }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()) * 2);
     }
 
     private void testModificationOperationBatching(TransactionType type) throws Exception {
@@ -1573,7 +1579,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
-                        eqCreateTransaction(memberName, TransactionType.READ_ONLY));
+                        eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
 
         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));