Performance optimizations for simple transactions 55/15955/3
authorMoiz Raja <moraja@cisco.com>
Tue, 3 Mar 2015 00:32:31 +0000 (16:32 -0800)
committerMoiz Raja <moraja@cisco.com>
Mon, 9 Mar 2015 16:11:10 +0000 (09:11 -0700)
- Cache the ActorSelection for the primary shard in ActorContext
- Use isInstance to check for type of message instead of equals in ActorContext
  ShardManager and Shard
- Change the order in which we check for transaction type to create in Shard
- Create ShardTransactionIdentifier using the ShardTransactionIdentifier
  constructor instead of the Builder

These optimizations are aimed at making simple transactions faster in the dsBenchMark. This
set of optimizations reduced the amount of time that test takes from 20s to 18s on my Mac.

Change-Id: I1a4328d90839ef80041ae4f3bb7dccd45ac7fb97
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java

index a5abd2fc69059f4af377ab85ac161372de15cbed..52b4652de6e7d464d9d54c67ebea37ca233de862 100644 (file)
@@ -132,6 +132,10 @@ public class Shard extends RaftActor {
 
     private final MessageTracker appendEntriesReplyTracker;
 
+    private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
+            Serialization.serializedActorPath(getSelf()));
+
+
     /**
      * Coordinates persistence recovery on startup.
      */
@@ -265,17 +269,17 @@ public class Shard extends RaftActor {
         }
 
         try {
-            if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+            if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
                 handleCreateTransaction(message);
             } else if (message instanceof ForwardedReadyTransaction) {
                 handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
-            } else if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+            } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
                 handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
-            } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+            } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
                 handleCommitTransaction(CommitTransaction.fromSerializable(message));
-            } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+            } else if (AbortTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
                 handleAbortTransaction(AbortTransaction.fromSerializable(message));
-            } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
+            } else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) {
                 closeTransactionChain(CloseTransactionChain.fromSerializable(message));
             } else if (message instanceof RegisterChangeListener) {
                 registerChangeListener((RegisterChangeListener) message);
@@ -457,17 +461,21 @@ public class Shard extends RaftActor {
         // node. In that case, the subsequent 3-phase commit messages won't contain the
         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
         // to provide the compatible behavior.
-        ActorRef replyActorPath = self();
         if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
             LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
-            replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+            ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
                     ready.getTransactionID()));
-        }
 
-        ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(
-                Serialization.serializedActorPath(replyActorPath));
-        getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
-                readyTransactionReply, getSelf());
+            ReadyTransactionReply readyTransactionReply =
+                    new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath));
+            getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
+                    readyTransactionReply, getSelf());
+
+        } else {
+
+            getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() :
+                    READY_TRANSACTION_REPLY, getSelf());
+        }
     }
 
     private void handleAbortTransaction(final AbortTransaction abort) {
@@ -550,11 +558,11 @@ public class Shard extends RaftActor {
             throw new IllegalStateException("SchemaContext is not set");
         }
 
-        if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
+        if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
 
-            shardMBean.incrementReadOnlyTransactionCount();
+            shardMBean.incrementWriteOnlyTransactionCount();
 
-            return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
+            return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
 
         } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
 
@@ -562,11 +570,12 @@ public class Shard extends RaftActor {
 
             return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion);
 
-        } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
+        } else if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
 
-            shardMBean.incrementWriteOnlyTransactionCount();
+            shardMBean.incrementReadOnlyTransactionCount();
+
+            return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
 
-            return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
         } else {
             throw new IllegalArgumentException(
                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
@@ -601,10 +610,8 @@ public class Shard extends RaftActor {
     private ActorRef createTransaction(int transactionType, String remoteTransactionId,
             String transactionChainId, short clientVersion) {
 
-        ShardTransactionIdentifier transactionId =
-            ShardTransactionIdentifier.builder()
-                .remoteTransactionId(remoteTransactionId)
-                .build();
+
+        ShardTransactionIdentifier transactionId = new ShardTransactionIdentifier(remoteTransactionId);
 
         if(LOG.isDebugEnabled()) {
             LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
index d836a347c514b434db26a32cbb172d1671e99253..c441afb49787fb7d5ae946c7fc0e0e91ec7137ad 100644 (file)
@@ -150,7 +150,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     @Override
     public void handleCommand(Object message) throws Exception {
-        if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
+        if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) {
             findPrimary(FindPrimary.fromSerializable(message));
         } else if(message instanceof FindLocalShard){
             findLocalShard((FindLocalShard) message);
index dd04afcb0b9f4091e73908ba8ef63d262f609353..d1f9495d862770aec58b90ad43e91f5ce1cf2a6f 100644 (file)
@@ -13,15 +13,11 @@ import com.google.common.base.Preconditions;
 public class ShardTransactionIdentifier {
     private final String remoteTransactionId;
 
-    private ShardTransactionIdentifier(String remoteTransactionId) {
+    public ShardTransactionIdentifier(String remoteTransactionId) {
         this.remoteTransactionId = Preconditions.checkNotNull(remoteTransactionId,
                 "remoteTransactionId should not be null");
     }
 
-    public static Builder builder(){
-        return new Builder();
-    }
-
     public String getRemoteTransactionId() {
         return remoteTransactionId;
     }
@@ -55,17 +51,4 @@ public class ShardTransactionIdentifier {
         return sb.toString();
     }
 
-    public static class Builder {
-        private String remoteTransactionId;
-
-        public Builder remoteTransactionId(String remoteTransactionId){
-            this.remoteTransactionId = remoteTransactionId;
-            return this;
-        }
-
-        public ShardTransactionIdentifier build(){
-            return new ShardTransactionIdentifier(remoteTransactionId);
-        }
-
-    }
 }
index 7eede29b65690db530fd4b9cfb9acb130365fcb6..0fb09d8231903bbc9b530f488039adc6b8672b90 100644 (file)
@@ -15,15 +15,19 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.PoisonPill;
+import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
 import akka.pattern.AskTimeoutException;
 import akka.util.Timeout;
 import com.codahale.metrics.JmxReporter;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.util.concurrent.RateLimiter;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
@@ -95,6 +99,7 @@ public class ActorContext {
     private final int transactionOutstandingOperationLimit;
     private Timeout transactionCommitOperationTimeout;
     private final Dispatchers dispatchers;
+    private final Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
 
     private volatile SchemaContext schemaContext;
     private volatile boolean updated;
@@ -116,6 +121,14 @@ public class ActorContext {
         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
 
         setCachedProperties();
+        primaryShardActorSelectionCache = CacheBuilder.newBuilder()
+                .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
+                .build();
+
+        operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
+        operationTimeout = new Timeout(operationDuration);
+        transactionCommitOperationTimeout =  new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
+                TimeUnit.SECONDS));
 
         Address selfAddress = clusterWrapper.getSelfAddress();
         if (selfAddress != null && !selfAddress.host().isEmpty()) {
@@ -204,6 +217,10 @@ public class ActorContext {
     }
 
     public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
+        Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
+        if(ret != null){
+            return ret;
+        }
         Future<Object> future = executeOperationAsync(shardManager,
                 new FindPrimary(shardName, true).toSerializable(),
                 datastoreContext.getShardInitializationTimeout());
@@ -211,11 +228,13 @@ public class ActorContext {
         return future.transform(new Mapper<Object, ActorSelection>() {
             @Override
             public ActorSelection checkedApply(Object response) throws Exception {
-                if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
+                if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) {
                     PrimaryFound found = PrimaryFound.fromSerializable(response);
 
                     LOG.debug("Primary found {}", found.getPrimaryPath());
-                    return actorSystem.actorSelection(found.getPrimaryPath());
+                    ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
+                    primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
+                    return actorSelection;
                 } else if(response instanceof ActorNotInitialized) {
                     throw new NotInitializedException(
                             String.format("Found primary shard %s but it's not initialized yet. " +
@@ -325,7 +344,7 @@ public class ActorContext {
         Preconditions.checkArgument(message != null, "message must not be null");
 
         LOG.debug("Sending message {} to {}", message.getClass(), actor);
-        return ask(actor, message, timeout);
+        return doAsk(actor, message, timeout);
     }
 
     /**
@@ -361,7 +380,7 @@ public class ActorContext {
 
         LOG.debug("Sending message {} to {}", message.getClass(), actor);
 
-        return ask(actor, message, timeout);
+        return doAsk(actor, message, timeout);
     }
 
     /**
@@ -555,4 +574,16 @@ public class ActorContext {
         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
     }
 
+    protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
+        return ask(actorRef, message, timeout);
+    }
+
+    protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
+        return ask(actorRef, message, timeout);
+    }
+
+    @VisibleForTesting
+    Cache<String, Future<ActorSelection>> getPrimaryShardActorSelectionCache() {
+        return primaryShardActorSelectionCache;
+    }
 }
index fd41c49390b484fd0d4343befa2f920d542e2f74..6bd732e038a00055bb2407ccc416c7f192059405 100644 (file)
@@ -2,7 +2,10 @@ package org.opendaylight.controller.cluster.datastore.utils;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
@@ -11,9 +14,13 @@ import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
+import akka.dispatch.Futures;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.util.Timeout;
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang.time.StopWatch;
@@ -23,12 +30,18 @@ import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
+import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 public class ActorContextTest extends AbstractActorTest{
 
@@ -278,6 +291,7 @@ public class ActorContextTest extends AbstractActorTest{
 
         doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
         doReturn("config").when(mockDataStoreContext).getDataStoreType();
+        doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
 
         ActorContext actorContext =
                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
@@ -311,6 +325,7 @@ public class ActorContextTest extends AbstractActorTest{
 
         doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
         doReturn("config").when(mockDataStoreContext).getDataStoreType();
+        doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
 
         ActorContext actorContext =
                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
@@ -327,6 +342,7 @@ public class ActorContextTest extends AbstractActorTest{
 
         doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
         doReturn("config").when(mockDataStoreContext).getDataStoreType();
+        doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
 
         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
 
@@ -365,4 +381,122 @@ public class ActorContextTest extends AbstractActorTest{
                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
         }};
     }
+
+    @Test
+    public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
+
+            TestActorRef<MessageCollectorActor> shardManager =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+            DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+            doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+            doReturn("config").when(mockDataStoreContext).getDataStoreType();
+            doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+                            mock(Configuration.class), mockDataStoreContext) {
+                        @Override
+                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+                            return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
+                        }
+                    };
+
+
+            Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+            ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+
+            assertNotNull(actual);
+
+            Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+            ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+
+            assertEquals(cachedSelection, actual);
+
+            // Wait for 200 Milliseconds. The cached entry should have been removed.
+
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+
+            cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+            assertNull(cached);
+
+    }
+
+    @Test
+    public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
+
+            TestActorRef<MessageCollectorActor> shardManager =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+            DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+            doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+            doReturn("config").when(mockDataStoreContext).getDataStoreType();
+            doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+                            mock(Configuration.class), mockDataStoreContext) {
+                        @Override
+                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+                            return Futures.successful((Object) new PrimaryNotFound("foobar"));
+                        }
+                    };
+
+
+            Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+
+            try {
+                Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+                fail("Expected PrimaryNotFoundException");
+            } catch(PrimaryNotFoundException e){
+
+            }
+
+            Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+            assertNull(cached);
+
+    }
+
+    @Test
+    public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
+
+            TestActorRef<MessageCollectorActor> shardManager =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+            DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+            doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+            doReturn("config").when(mockDataStoreContext).getDataStoreType();
+            doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+                            mock(Configuration.class), mockDataStoreContext) {
+                        @Override
+                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+                            return Futures.successful((Object) new ActorNotInitialized());
+                        }
+                    };
+
+
+            Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+
+            try {
+                Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+                fail("Expected NotInitializedException");
+            } catch(NotInitializedException e){
+
+            }
+
+            Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+            assertNull(cached);
+
+    }
+
 }