private final MessageTracker appendEntriesReplyTracker;
+ private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
+ Serialization.serializedActorPath(getSelf()));
+
+
/**
* Coordinates persistence recovery on startup.
*/
}
try {
- if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+ if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCreateTransaction(message);
} else if (message instanceof ForwardedReadyTransaction) {
handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
- } else if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+ } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
- } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+ } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCommitTransaction(CommitTransaction.fromSerializable(message));
- } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+ } else if (AbortTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleAbortTransaction(AbortTransaction.fromSerializable(message));
- } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
+ } else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) {
closeTransactionChain(CloseTransactionChain.fromSerializable(message));
} else if (message instanceof RegisterChangeListener) {
registerChangeListener((RegisterChangeListener) message);
// node. In that case, the subsequent 3-phase commit messages won't contain the
// transactionId so to maintain backwards compatibility, we create a separate cohort actor
// to provide the compatible behavior.
- ActorRef replyActorPath = self();
if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
- replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+ ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
ready.getTransactionID()));
- }
- ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(
- Serialization.serializedActorPath(replyActorPath));
- getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
- readyTransactionReply, getSelf());
+ ReadyTransactionReply readyTransactionReply =
+ new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath));
+ getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
+ readyTransactionReply, getSelf());
+
+ } else {
+
+ getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() :
+ READY_TRANSACTION_REPLY, getSelf());
+ }
}
private void handleAbortTransaction(final AbortTransaction abort) {
throw new IllegalStateException("SchemaContext is not set");
}
- if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
+ if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
- shardMBean.incrementReadOnlyTransactionCount();
+ shardMBean.incrementWriteOnlyTransactionCount();
- return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
+ return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
} else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion);
- } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
+ } else if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
- shardMBean.incrementWriteOnlyTransactionCount();
+ shardMBean.incrementReadOnlyTransactionCount();
+
+ return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
- return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
} else {
throw new IllegalArgumentException(
"Shard="+name + ":CreateTransaction message has unidentified transaction type="
private ActorRef createTransaction(int transactionType, String remoteTransactionId,
String transactionChainId, short clientVersion) {
- ShardTransactionIdentifier transactionId =
- ShardTransactionIdentifier.builder()
- .remoteTransactionId(remoteTransactionId)
- .build();
+
+ ShardTransactionIdentifier transactionId = new ShardTransactionIdentifier(remoteTransactionId);
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
@Override
public void handleCommand(Object message) throws Exception {
- if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
+ if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) {
findPrimary(FindPrimary.fromSerializable(message));
} else if(message instanceof FindLocalShard){
findLocalShard((FindLocalShard) message);
public class ShardTransactionIdentifier {
private final String remoteTransactionId;
- private ShardTransactionIdentifier(String remoteTransactionId) {
+ public ShardTransactionIdentifier(String remoteTransactionId) {
this.remoteTransactionId = Preconditions.checkNotNull(remoteTransactionId,
"remoteTransactionId should not be null");
}
- public static Builder builder(){
- return new Builder();
- }
-
public String getRemoteTransactionId() {
return remoteTransactionId;
}
return sb.toString();
}
- public static class Builder {
- private String remoteTransactionId;
-
- public Builder remoteTransactionId(String remoteTransactionId){
- this.remoteTransactionId = remoteTransactionId;
- return this;
- }
-
- public ShardTransactionIdentifier build(){
- return new ShardTransactionIdentifier(remoteTransactionId);
- }
-
- }
}
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.PoisonPill;
+import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.pattern.AskTimeoutException;
import akka.util.Timeout;
import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
private final int transactionOutstandingOperationLimit;
private Timeout transactionCommitOperationTimeout;
private final Dispatchers dispatchers;
+ private final Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
private volatile SchemaContext schemaContext;
private volatile boolean updated;
this.dispatchers = new Dispatchers(actorSystem.dispatchers());
setCachedProperties();
+ primaryShardActorSelectionCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
+ .build();
+
+ operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
+ operationTimeout = new Timeout(operationDuration);
+ transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
+ TimeUnit.SECONDS));
Address selfAddress = clusterWrapper.getSelfAddress();
if (selfAddress != null && !selfAddress.host().isEmpty()) {
}
public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
+ Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
+ if(ret != null){
+ return ret;
+ }
Future<Object> future = executeOperationAsync(shardManager,
new FindPrimary(shardName, true).toSerializable(),
datastoreContext.getShardInitializationTimeout());
return future.transform(new Mapper<Object, ActorSelection>() {
@Override
public ActorSelection checkedApply(Object response) throws Exception {
- if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
+ if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) {
PrimaryFound found = PrimaryFound.fromSerializable(response);
LOG.debug("Primary found {}", found.getPrimaryPath());
- return actorSystem.actorSelection(found.getPrimaryPath());
+ ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
+ primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
+ return actorSelection;
} else if(response instanceof ActorNotInitialized) {
throw new NotInitializedException(
String.format("Found primary shard %s but it's not initialized yet. " +
Preconditions.checkArgument(message != null, "message must not be null");
LOG.debug("Sending message {} to {}", message.getClass(), actor);
- return ask(actor, message, timeout);
+ return doAsk(actor, message, timeout);
}
/**
LOG.debug("Sending message {} to {}", message.getClass(), actor);
- return ask(actor, message, timeout);
+ return doAsk(actor, message, timeout);
}
/**
return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
}
+ protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
+ return ask(actorRef, message, timeout);
+ }
+
+ protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
+ return ask(actorRef, message, timeout);
+ }
+
+ @VisibleForTesting
+ Cache<String, Future<ActorSelection>> getPrimaryShardActorSelectionCache() {
+ return primaryShardActorSelectionCache;
+ }
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.UntypedActor;
+import akka.dispatch.Futures;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.util.Timeout;
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.ConfigFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.time.StopWatch;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
+import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
public class ActorContextTest extends AbstractActorTest{
doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
doReturn("config").when(mockDataStoreContext).getDataStoreType();
+ doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
ActorContext actorContext =
new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
doReturn("config").when(mockDataStoreContext).getDataStoreType();
+ doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
ActorContext actorContext =
new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
doReturn("config").when(mockDataStoreContext).getDataStoreType();
+ doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
}};
}
+
+ @Test
+ public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
+
+ TestActorRef<MessageCollectorActor> shardManager =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+ DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+ doReturn("config").when(mockDataStoreContext).getDataStoreType();
+ doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ mock(Configuration.class), mockDataStoreContext) {
+ @Override
+ protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
+ }
+ };
+
+
+ Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+ ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+
+ assertNotNull(actual);
+
+ Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+ ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+
+ assertEquals(cachedSelection, actual);
+
+ // Wait for 200 Milliseconds. The cached entry should have been removed.
+
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+
+ cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+ assertNull(cached);
+
+ }
+
+ @Test
+ public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
+
+ TestActorRef<MessageCollectorActor> shardManager =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+ DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+ doReturn("config").when(mockDataStoreContext).getDataStoreType();
+ doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ mock(Configuration.class), mockDataStoreContext) {
+ @Override
+ protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ return Futures.successful((Object) new PrimaryNotFound("foobar"));
+ }
+ };
+
+
+ Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+
+ try {
+ Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+ fail("Expected PrimaryNotFoundException");
+ } catch(PrimaryNotFoundException e){
+
+ }
+
+ Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+ assertNull(cached);
+
+ }
+
+ @Test
+ public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
+
+ TestActorRef<MessageCollectorActor> shardManager =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+ DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+ doReturn("config").when(mockDataStoreContext).getDataStoreType();
+ doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ mock(Configuration.class), mockDataStoreContext) {
+ @Override
+ protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ return Futures.successful((Object) new ActorNotInitialized());
+ }
+ };
+
+
+ Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+
+ try {
+ Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+ fail("Expected NotInitializedException");
+ } catch(NotInitializedException e){
+
+ }
+
+ Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+ assertNull(cached);
+
+ }
+
}