From: Tom Pantelis Date: Mon, 9 Mar 2015 23:51:39 +0000 (+0000) Subject: Merge "Performance optimizations for simple transactions" X-Git-Tag: release/lithium~437 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=744be1c648a6f90c3bc26bd0e48f3a826de2e0ba;hp=68837dc673653f47995e792f897870cc3cb5004c Merge "Performance optimizations for simple transactions" --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index a5abd2fc69..52b4652de6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -132,6 +132,10 @@ public class Shard extends RaftActor { private final MessageTracker appendEntriesReplyTracker; + private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply( + Serialization.serializedActorPath(getSelf())); + + /** * Coordinates persistence recovery on startup. */ @@ -265,17 +269,17 @@ public class Shard extends RaftActor { } try { - if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) { + if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) { handleCreateTransaction(message); } else if (message instanceof ForwardedReadyTransaction) { handleForwardedReadyTransaction((ForwardedReadyTransaction) message); - } else if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) { + } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) { handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message)); - } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) { + } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) { handleCommitTransaction(CommitTransaction.fromSerializable(message)); - } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) { + } else if (AbortTransaction.SERIALIZABLE_CLASS.isInstance(message)) { handleAbortTransaction(AbortTransaction.fromSerializable(message)); - } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) { + } else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) { closeTransactionChain(CloseTransactionChain.fromSerializable(message)); } else if (message instanceof RegisterChangeListener) { registerChangeListener((RegisterChangeListener) message); @@ -457,17 +461,21 @@ public class Shard extends RaftActor { // node. In that case, the subsequent 3-phase commit messages won't contain the // transactionId so to maintain backwards compatibility, we create a separate cohort actor // to provide the compatible behavior. - ActorRef replyActorPath = self(); if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) { LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId()); - replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( + ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( ready.getTransactionID())); - } - ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply( - Serialization.serializedActorPath(replyActorPath)); - getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : - readyTransactionReply, getSelf()); + ReadyTransactionReply readyTransactionReply = + new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath)); + getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : + readyTransactionReply, getSelf()); + + } else { + + getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() : + READY_TRANSACTION_REPLY, getSelf()); + } } private void handleAbortTransaction(final AbortTransaction abort) { @@ -550,11 +558,11 @@ public class Shard extends RaftActor { throw new IllegalStateException("SchemaContext is not set"); } - if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) { + if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) { - shardMBean.incrementReadOnlyTransactionCount(); + shardMBean.incrementWriteOnlyTransactionCount(); - return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion); + return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion); } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) { @@ -562,11 +570,12 @@ public class Shard extends RaftActor { return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion); - } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) { + } else if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) { - shardMBean.incrementWriteOnlyTransactionCount(); + shardMBean.incrementReadOnlyTransactionCount(); + + return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion); - return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion); } else { throw new IllegalArgumentException( "Shard="+name + ":CreateTransaction message has unidentified transaction type=" @@ -601,10 +610,8 @@ public class Shard extends RaftActor { private ActorRef createTransaction(int transactionType, String remoteTransactionId, String transactionChainId, short clientVersion) { - ShardTransactionIdentifier transactionId = - ShardTransactionIdentifier.builder() - .remoteTransactionId(remoteTransactionId) - .build(); + + ShardTransactionIdentifier transactionId = new ShardTransactionIdentifier(remoteTransactionId); if(LOG.isDebugEnabled()) { LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index d836a347c5..c441afb497 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -150,7 +150,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void handleCommand(Object message) throws Exception { - if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) { + if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) { findPrimary(FindPrimary.fromSerializable(message)); } else if(message instanceof FindLocalShard){ findLocalShard((FindLocalShard) message); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java index dd04afcb0b..d1f9495d86 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java @@ -13,15 +13,11 @@ import com.google.common.base.Preconditions; public class ShardTransactionIdentifier { private final String remoteTransactionId; - private ShardTransactionIdentifier(String remoteTransactionId) { + public ShardTransactionIdentifier(String remoteTransactionId) { this.remoteTransactionId = Preconditions.checkNotNull(remoteTransactionId, "remoteTransactionId should not be null"); } - public static Builder builder(){ - return new Builder(); - } - public String getRemoteTransactionId() { return remoteTransactionId; } @@ -55,17 +51,4 @@ public class ShardTransactionIdentifier { return sb.toString(); } - public static class Builder { - private String remoteTransactionId; - - public Builder remoteTransactionId(String remoteTransactionId){ - this.remoteTransactionId = remoteTransactionId; - return this; - } - - public ShardTransactionIdentifier build(){ - return new ShardTransactionIdentifier(remoteTransactionId); - } - - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 7eede29b65..0fb09d8231 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -15,15 +15,19 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.PoisonPill; +import akka.dispatch.Futures; import akka.dispatch.Mapper; import akka.pattern.AskTimeoutException; import akka.util.Timeout; import com.codahale.metrics.JmxReporter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.CommonConfig; @@ -95,6 +99,7 @@ public class ActorContext { private final int transactionOutstandingOperationLimit; private Timeout transactionCommitOperationTimeout; private final Dispatchers dispatchers; + private final Cache> primaryShardActorSelectionCache; private volatile SchemaContext schemaContext; private volatile boolean updated; @@ -116,6 +121,14 @@ public class ActorContext { this.dispatchers = new Dispatchers(actorSystem.dispatchers()); setCachedProperties(); + primaryShardActorSelectionCache = CacheBuilder.newBuilder() + .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS) + .build(); + + operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); + operationTimeout = new Timeout(operationDuration); + transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(), + TimeUnit.SECONDS)); Address selfAddress = clusterWrapper.getSelfAddress(); if (selfAddress != null && !selfAddress.host().isEmpty()) { @@ -204,6 +217,10 @@ public class ActorContext { } public Future findPrimaryShardAsync(final String shardName) { + Future ret = primaryShardActorSelectionCache.getIfPresent(shardName); + if(ret != null){ + return ret; + } Future future = executeOperationAsync(shardManager, new FindPrimary(shardName, true).toSerializable(), datastoreContext.getShardInitializationTimeout()); @@ -211,11 +228,13 @@ public class ActorContext { return future.transform(new Mapper() { @Override public ActorSelection checkedApply(Object response) throws Exception { - if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) { + if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) { PrimaryFound found = PrimaryFound.fromSerializable(response); LOG.debug("Primary found {}", found.getPrimaryPath()); - return actorSystem.actorSelection(found.getPrimaryPath()); + ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath()); + primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection)); + return actorSelection; } else if(response instanceof ActorNotInitialized) { throw new NotInitializedException( String.format("Found primary shard %s but it's not initialized yet. " + @@ -325,7 +344,7 @@ public class ActorContext { Preconditions.checkArgument(message != null, "message must not be null"); LOG.debug("Sending message {} to {}", message.getClass(), actor); - return ask(actor, message, timeout); + return doAsk(actor, message, timeout); } /** @@ -361,7 +380,7 @@ public class ActorContext { LOG.debug("Sending message {} to {}", message.getClass(), actor); - return ask(actor, message, timeout); + return doAsk(actor, message, timeout); } /** @@ -555,4 +574,16 @@ public class ActorContext { return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification); } + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout){ + return ask(actorRef, message, timeout); + } + + protected Future doAsk(ActorSelection actorRef, Object message, Timeout timeout){ + return ask(actorRef, message, timeout); + } + + @VisibleForTesting + Cache> getPrimaryShardActorSelectionCache() { + return primaryShardActorSelectionCache; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index fd41c49390..6bd732e038 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -2,7 +2,10 @@ package org.opendaylight.controller.cluster.datastore.utils; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import akka.actor.ActorRef; @@ -11,9 +14,13 @@ import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; import akka.actor.UntypedActor; +import akka.dispatch.Futures; import akka.japi.Creator; import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import akka.util.Timeout; import com.google.common.base.Optional; +import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.time.StopWatch; @@ -23,12 +30,18 @@ import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; +import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; public class ActorContextTest extends AbstractActorTest{ @@ -278,6 +291,7 @@ public class ActorContextTest extends AbstractActorTest{ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); doReturn("config").when(mockDataStoreContext).getDataStoreType(); + doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), @@ -311,6 +325,7 @@ public class ActorContextTest extends AbstractActorTest{ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); doReturn("config").when(mockDataStoreContext).getDataStoreType(); + doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), @@ -327,6 +342,7 @@ public class ActorContextTest extends AbstractActorTest{ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); doReturn("config").when(mockDataStoreContext).getDataStoreType(); + doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf")); @@ -365,4 +381,122 @@ public class ActorContextTest extends AbstractActorTest{ actorContext.getTransactionCommitOperationTimeout().duration().toSeconds()); }}; } + + @Test + public void testFindPrimaryShardAsyncPrimaryFound() throws Exception { + + TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + + DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + + doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); + doReturn("config").when(mockDataStoreContext).getDataStoreType(); + doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), mockDataStoreContext) { + @Override + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { + return Futures.successful((Object) new PrimaryFound("akka://test-system/test")); + } + }; + + + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); + + assertNotNull(actual); + + Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + + ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS)); + + assertEquals(cachedSelection, actual); + + // Wait for 200 Milliseconds. The cached entry should have been removed. + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + + cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + + assertNull(cached); + + } + + @Test + public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception { + + TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + + DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + + doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); + doReturn("config").when(mockDataStoreContext).getDataStoreType(); + doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), mockDataStoreContext) { + @Override + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { + return Futures.successful((Object) new PrimaryNotFound("foobar")); + } + }; + + + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + + try { + Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); + fail("Expected PrimaryNotFoundException"); + } catch(PrimaryNotFoundException e){ + + } + + Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + + assertNull(cached); + + } + + @Test + public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception { + + TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + + DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + + doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); + doReturn("config").when(mockDataStoreContext).getDataStoreType(); + doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), mockDataStoreContext) { + @Override + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { + return Futures.successful((Object) new ActorNotInitialized()); + } + }; + + + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + + try { + Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); + fail("Expected NotInitializedException"); + } catch(NotInitializedException e){ + + } + + Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + + assertNull(cached); + + } + }