From 76601bed132933632ab6d397e71cc2aa533bf82f Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Mon, 2 Mar 2015 16:32:31 -0800 Subject: [PATCH] Performance optimizations for simple transactions - Cache the ActorSelection for the primary shard in ActorContext - Use isInstance to check for type of message instead of equals in ActorContext ShardManager and Shard - Change the order in which we check for transaction type to create in Shard - Create ShardTransactionIdentifier using the ShardTransactionIdentifier constructor instead of the Builder These optimizations are aimed at making simple transactions faster in the dsBenchMark. This set of optimizations reduced the amount of time that test takes from 20s to 18s on my Mac. Change-Id: I1a4328d90839ef80041ae4f3bb7dccd45ac7fb97 Signed-off-by: Moiz Raja --- .../controller/cluster/datastore/Shard.java | 51 ++++--- .../cluster/datastore/ShardManager.java | 2 +- .../ShardTransactionIdentifier.java | 19 +-- .../cluster/datastore/utils/ActorContext.java | 39 ++++- .../datastore/utils/ActorContextTest.java | 134 ++++++++++++++++++ 5 files changed, 200 insertions(+), 45 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index a5abd2fc69..52b4652de6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -132,6 +132,10 @@ public class Shard extends RaftActor { private final MessageTracker appendEntriesReplyTracker; + private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply( + Serialization.serializedActorPath(getSelf())); + + /** * Coordinates persistence recovery on startup. */ @@ -265,17 +269,17 @@ public class Shard extends RaftActor { } try { - if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) { + if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) { handleCreateTransaction(message); } else if (message instanceof ForwardedReadyTransaction) { handleForwardedReadyTransaction((ForwardedReadyTransaction) message); - } else if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) { + } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) { handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message)); - } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) { + } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) { handleCommitTransaction(CommitTransaction.fromSerializable(message)); - } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) { + } else if (AbortTransaction.SERIALIZABLE_CLASS.isInstance(message)) { handleAbortTransaction(AbortTransaction.fromSerializable(message)); - } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) { + } else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) { closeTransactionChain(CloseTransactionChain.fromSerializable(message)); } else if (message instanceof RegisterChangeListener) { registerChangeListener((RegisterChangeListener) message); @@ -457,17 +461,21 @@ public class Shard extends RaftActor { // node. In that case, the subsequent 3-phase commit messages won't contain the // transactionId so to maintain backwards compatibility, we create a separate cohort actor // to provide the compatible behavior. - ActorRef replyActorPath = self(); if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) { LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId()); - replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( + ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( ready.getTransactionID())); - } - ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply( - Serialization.serializedActorPath(replyActorPath)); - getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : - readyTransactionReply, getSelf()); + ReadyTransactionReply readyTransactionReply = + new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath)); + getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : + readyTransactionReply, getSelf()); + + } else { + + getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() : + READY_TRANSACTION_REPLY, getSelf()); + } } private void handleAbortTransaction(final AbortTransaction abort) { @@ -550,11 +558,11 @@ public class Shard extends RaftActor { throw new IllegalStateException("SchemaContext is not set"); } - if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) { + if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) { - shardMBean.incrementReadOnlyTransactionCount(); + shardMBean.incrementWriteOnlyTransactionCount(); - return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion); + return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion); } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) { @@ -562,11 +570,12 @@ public class Shard extends RaftActor { return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion); - } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) { + } else if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) { - shardMBean.incrementWriteOnlyTransactionCount(); + shardMBean.incrementReadOnlyTransactionCount(); + + return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion); - return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion); } else { throw new IllegalArgumentException( "Shard="+name + ":CreateTransaction message has unidentified transaction type=" @@ -601,10 +610,8 @@ public class Shard extends RaftActor { private ActorRef createTransaction(int transactionType, String remoteTransactionId, String transactionChainId, short clientVersion) { - ShardTransactionIdentifier transactionId = - ShardTransactionIdentifier.builder() - .remoteTransactionId(remoteTransactionId) - .build(); + + ShardTransactionIdentifier transactionId = new ShardTransactionIdentifier(remoteTransactionId); if(LOG.isDebugEnabled()) { LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index d836a347c5..c441afb497 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -150,7 +150,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void handleCommand(Object message) throws Exception { - if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) { + if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) { findPrimary(FindPrimary.fromSerializable(message)); } else if(message instanceof FindLocalShard){ findLocalShard((FindLocalShard) message); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java index dd04afcb0b..d1f9495d86 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java @@ -13,15 +13,11 @@ import com.google.common.base.Preconditions; public class ShardTransactionIdentifier { private final String remoteTransactionId; - private ShardTransactionIdentifier(String remoteTransactionId) { + public ShardTransactionIdentifier(String remoteTransactionId) { this.remoteTransactionId = Preconditions.checkNotNull(remoteTransactionId, "remoteTransactionId should not be null"); } - public static Builder builder(){ - return new Builder(); - } - public String getRemoteTransactionId() { return remoteTransactionId; } @@ -55,17 +51,4 @@ public class ShardTransactionIdentifier { return sb.toString(); } - public static class Builder { - private String remoteTransactionId; - - public Builder remoteTransactionId(String remoteTransactionId){ - this.remoteTransactionId = remoteTransactionId; - return this; - } - - public ShardTransactionIdentifier build(){ - return new ShardTransactionIdentifier(remoteTransactionId); - } - - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 7eede29b65..0fb09d8231 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -15,15 +15,19 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.PoisonPill; +import akka.dispatch.Futures; import akka.dispatch.Mapper; import akka.pattern.AskTimeoutException; import akka.util.Timeout; import com.codahale.metrics.JmxReporter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.CommonConfig; @@ -95,6 +99,7 @@ public class ActorContext { private final int transactionOutstandingOperationLimit; private Timeout transactionCommitOperationTimeout; private final Dispatchers dispatchers; + private final Cache> primaryShardActorSelectionCache; private volatile SchemaContext schemaContext; private volatile boolean updated; @@ -116,6 +121,14 @@ public class ActorContext { this.dispatchers = new Dispatchers(actorSystem.dispatchers()); setCachedProperties(); + primaryShardActorSelectionCache = CacheBuilder.newBuilder() + .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS) + .build(); + + operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); + operationTimeout = new Timeout(operationDuration); + transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(), + TimeUnit.SECONDS)); Address selfAddress = clusterWrapper.getSelfAddress(); if (selfAddress != null && !selfAddress.host().isEmpty()) { @@ -204,6 +217,10 @@ public class ActorContext { } public Future findPrimaryShardAsync(final String shardName) { + Future ret = primaryShardActorSelectionCache.getIfPresent(shardName); + if(ret != null){ + return ret; + } Future future = executeOperationAsync(shardManager, new FindPrimary(shardName, true).toSerializable(), datastoreContext.getShardInitializationTimeout()); @@ -211,11 +228,13 @@ public class ActorContext { return future.transform(new Mapper() { @Override public ActorSelection checkedApply(Object response) throws Exception { - if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) { + if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) { PrimaryFound found = PrimaryFound.fromSerializable(response); LOG.debug("Primary found {}", found.getPrimaryPath()); - return actorSystem.actorSelection(found.getPrimaryPath()); + ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath()); + primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection)); + return actorSelection; } else if(response instanceof ActorNotInitialized) { throw new NotInitializedException( String.format("Found primary shard %s but it's not initialized yet. " + @@ -325,7 +344,7 @@ public class ActorContext { Preconditions.checkArgument(message != null, "message must not be null"); LOG.debug("Sending message {} to {}", message.getClass(), actor); - return ask(actor, message, timeout); + return doAsk(actor, message, timeout); } /** @@ -361,7 +380,7 @@ public class ActorContext { LOG.debug("Sending message {} to {}", message.getClass(), actor); - return ask(actor, message, timeout); + return doAsk(actor, message, timeout); } /** @@ -555,4 +574,16 @@ public class ActorContext { return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification); } + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout){ + return ask(actorRef, message, timeout); + } + + protected Future doAsk(ActorSelection actorRef, Object message, Timeout timeout){ + return ask(actorRef, message, timeout); + } + + @VisibleForTesting + Cache> getPrimaryShardActorSelectionCache() { + return primaryShardActorSelectionCache; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index fd41c49390..6bd732e038 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -2,7 +2,10 @@ package org.opendaylight.controller.cluster.datastore.utils; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import akka.actor.ActorRef; @@ -11,9 +14,13 @@ import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; import akka.actor.UntypedActor; +import akka.dispatch.Futures; import akka.japi.Creator; import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import akka.util.Timeout; import com.google.common.base.Optional; +import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.time.StopWatch; @@ -23,12 +30,18 @@ import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; +import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; public class ActorContextTest extends AbstractActorTest{ @@ -278,6 +291,7 @@ public class ActorContextTest extends AbstractActorTest{ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); doReturn("config").when(mockDataStoreContext).getDataStoreType(); + doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), @@ -311,6 +325,7 @@ public class ActorContextTest extends AbstractActorTest{ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); doReturn("config").when(mockDataStoreContext).getDataStoreType(); + doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), @@ -327,6 +342,7 @@ public class ActorContextTest extends AbstractActorTest{ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); doReturn("config").when(mockDataStoreContext).getDataStoreType(); + doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf")); @@ -365,4 +381,122 @@ public class ActorContextTest extends AbstractActorTest{ actorContext.getTransactionCommitOperationTimeout().duration().toSeconds()); }}; } + + @Test + public void testFindPrimaryShardAsyncPrimaryFound() throws Exception { + + TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + + DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + + doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); + doReturn("config").when(mockDataStoreContext).getDataStoreType(); + doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), mockDataStoreContext) { + @Override + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { + return Futures.successful((Object) new PrimaryFound("akka://test-system/test")); + } + }; + + + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); + + assertNotNull(actual); + + Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + + ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS)); + + assertEquals(cachedSelection, actual); + + // Wait for 200 Milliseconds. The cached entry should have been removed. + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + + cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + + assertNull(cached); + + } + + @Test + public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception { + + TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + + DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + + doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); + doReturn("config").when(mockDataStoreContext).getDataStoreType(); + doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), mockDataStoreContext) { + @Override + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { + return Futures.successful((Object) new PrimaryNotFound("foobar")); + } + }; + + + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + + try { + Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); + fail("Expected PrimaryNotFoundException"); + } catch(PrimaryNotFoundException e){ + + } + + Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + + assertNull(cached); + + } + + @Test + public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception { + + TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + + DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + + doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); + doReturn("config").when(mockDataStoreContext).getDataStoreType(); + doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), mockDataStoreContext) { + @Override + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { + return Futures.successful((Object) new ActorNotInitialized()); + } + }; + + + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + + try { + Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); + fail("Expected NotInitializedException"); + } catch(NotInitializedException e){ + + } + + Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + + assertNull(cached); + + } + } -- 2.36.6