From: Moiz Raja Date: Fri, 24 Apr 2015 10:02:16 +0000 (+0000) Subject: Merge "Return local shard's DataTree from ActorContext#findPrimaryShardAsync" X-Git-Tag: release/lithium~226 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=e8746b7ae6620d9e0dc159f2a13d3385d6197c56;hp=3d460a8bcbc24eeb969319feb9c7bf16bff496c1 Merge "Return local shard's DataTree from ActorContext#findPrimaryShardAsync" --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java index f1f33bfc93..9a800c1659 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java @@ -7,9 +7,9 @@ */ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorSelection; import akka.dispatch.OnComplete; import java.util.List; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +59,7 @@ final class ChainedTransactionProxy extends TransactionProxy { * previous Tx's ready operations haven't completed yet. */ @Override - protected Future sendFindPrimaryShardAsync(final String shardName) { + protected Future sendFindPrimaryShardAsync(final String shardName) { // Check if there are any previous ready Futures, otherwise let the super class handle it. if(previousReadyFutures.isEmpty()) { return super.sendFindPrimaryShardAsync(shardName); @@ -75,7 +75,7 @@ final class ChainedTransactionProxy extends TransactionProxy { previousReadyFutures, getActorContext().getClientDispatcher()); // Add a callback for completion of the combined Futures. - final Promise returnPromise = akka.dispatch.Futures.promise(); + final Promise returnPromise = akka.dispatch.Futures.promise(); OnComplete> onComplete = new OnComplete>() { @Override public void onComplete(Throwable failure, Iterable notUsed) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 361a221dd5..e397ab501c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator; @@ -477,7 +478,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction sendFindPrimaryShardAsync(String shardName) { + protected Future sendFindPrimaryShardAsync(String shardName) { return actorContext.findPrimaryShardAsync(shardName); } @@ -497,20 +498,20 @@ public class TransactionProxy extends AbstractDOMStoreTransaction findPrimaryFuture = sendFindPrimaryShardAsync(shardName); + Future findPrimaryFuture = sendFindPrimaryShardAsync(shardName); final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName); txFutureCallback = newTxFutureCallback; txFutureCallbackMap.put(shardName, txFutureCallback); - findPrimaryFuture.onComplete(new OnComplete() { + findPrimaryFuture.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, ActorSelection primaryShard) { + public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) { if(failure != null) { newTxFutureCallback.createTransactionContext(failure, null); } else { - newTxFutureCallback.setPrimaryShard(primaryShard); + newTxFutureCallback.setPrimaryShard(primaryShardInfo.getPrimaryShardActor()); } } }, actorContext.getClientDispatcher()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryShardInfo.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryShardInfo.java new file mode 100644 index 0000000000..bbeb1aa84b --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryShardInfo.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import akka.actor.ActorSelection; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; + +/** + * Local message DTO that contains information about the primary shard. + * + * @author Thomas Pantelis + */ +public class PrimaryShardInfo { + private final ActorSelection primaryShardActor; + private final Optional localShardDataTree; + + public PrimaryShardInfo(@Nonnull ActorSelection primaryShardActor, @Nonnull Optional localShardDataTree) { + this.primaryShardActor = Preconditions.checkNotNull(primaryShardActor); + this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree); + } + + /** + * Returns an ActorSelection representing the primary shard actor. + */ + public @Nonnull ActorSelection getPrimaryShardActor() { + return primaryShardActor; + } + + /** + * Returns an Optional whose value contains the primary shard's DataTree if the primary shard is local + * to the caller. Otherwise the Optional value is absent. + */ + public @Nonnull Optional getLocalShardDataTree() { + return localShardDataTree; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 17d988005f..afa773b461 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -45,8 +45,10 @@ import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.reporting.MetricsReporter; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,7 +98,7 @@ public class ActorContext { private Timeout transactionCommitOperationTimeout; private Timeout shardInitializationTimeout; private final Dispatchers dispatchers; - private Cache> primaryShardActorSelectionCache; + private Cache> primaryShardInfoCache; private volatile SchemaContext schemaContext; private volatile boolean updated; @@ -141,7 +143,7 @@ public class ActorContext { shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2)); - primaryShardActorSelectionCache = CacheBuilder.newBuilder() + primaryShardInfoCache = CacheBuilder.newBuilder() .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS) .build(); } @@ -196,24 +198,25 @@ public class ActorContext { return schemaContext; } - public Future findPrimaryShardAsync(final String shardName) { - Future ret = primaryShardActorSelectionCache.getIfPresent(shardName); + public Future findPrimaryShardAsync(final String shardName) { + Future ret = primaryShardInfoCache.getIfPresent(shardName); if(ret != null){ return ret; } Future future = executeOperationAsync(shardManager, new FindPrimary(shardName, true), shardInitializationTimeout); - return future.transform(new Mapper() { + return future.transform(new Mapper() { @Override - public ActorSelection checkedApply(Object response) throws Exception { + public PrimaryShardInfo checkedApply(Object response) throws Exception { if(response instanceof PrimaryFound) { PrimaryFound found = (PrimaryFound)response; LOG.debug("Primary found {}", found.getPrimaryPath()); ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath()); - primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection)); - return actorSelection; + PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.absent()); + primaryShardInfoCache.put(shardName, Futures.successful(info)); + return info; } else if(response instanceof NotInitializedException) { throw (NotInitializedException)response; } else if(response instanceof PrimaryNotFoundException) { @@ -387,15 +390,15 @@ public class ActorContext { public void broadcast(final Object message){ for(final String shardName : configuration.getAllShardNames()){ - Future primaryFuture = findPrimaryShardAsync(shardName); - primaryFuture.onComplete(new OnComplete() { + Future primaryFuture = findPrimaryShardAsync(shardName); + primaryFuture.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, ActorSelection primaryShard) { + public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) { if(failure != null) { LOG.warn("broadcast failed to send message {} to shard {}: {}", message.getClass().getSimpleName(), shardName, failure); } else { - primaryShard.tell(message, ActorRef.noSender()); + primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender()); } } }, getClientDispatcher()); @@ -553,7 +556,7 @@ public class ActorContext { } @VisibleForTesting - Cache> getPrimaryShardActorSelectionCache() { - return primaryShardActorSelectionCache; + Cache> getPrimaryShardInfoCache() { + return primaryShardInfoCache; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java index abe7f7678c..a64a5802b8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java @@ -54,6 +54,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionR import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; @@ -71,6 +72,7 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -309,6 +311,11 @@ public abstract class AbstractTransactionProxyTest { return setupActorContextWithoutInitialCreateTransaction(actorSystem, DefaultShardStrategy.DEFAULT_SHARD); } + protected Future primaryShardInfoReply(ActorSystem actorSystem, ActorRef actorRef) { + return Futures.successful(new PrimaryShardInfo(actorSystem.actorSelection(actorRef.path()), + Optional.absent())); + } + protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName) { ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); log.info("Created mock shard actor {}", actorRef); @@ -316,7 +323,7 @@ public abstract class AbstractTransactionProxyTest { doReturn(actorSystem.actorSelection(actorRef.path())). when(mockActorContext).actorSelection(actorRef.path().toString()); - doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))). + doReturn(primaryShardInfoReply(actorSystem, actorRef)). when(mockActorContext).findPrimaryShardAsync(eq(shardName)); doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 93c6ddbe73..844feb2f47 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -130,7 +130,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { if (exToThrow instanceof PrimaryNotFoundException) { doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString()); } else { - doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))). + doReturn(primaryShardInfoReply(getSystem(), actorRef)). when(mockActorContext).findPrimaryShardAsync(anyString()); } @@ -209,7 +209,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext). actorSelection(actorRef.path().toString()); - doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))). + doReturn(primaryShardInfoReply(getSystem(), actorRef)). when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync( @@ -834,7 +834,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { when(mockActorContext).actorSelection(shardActorRef.path().toString()); if(shardFound) { - doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). + doReturn(primaryShardInfoReply(actorSystem, shardActorRef)). when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); } else { doReturn(Futures.failed(new PrimaryNotFoundException("test"))) @@ -1399,7 +1399,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(getSystem().actorSelection(shardActorRef.path())). when(mockActorContext).actorSelection(shardActorRef.path().toString()); - doReturn(Futures.successful(getSystem().actorSelection(shardActorRef.path()))). + doReturn(primaryShardInfoReply(getSystem(), shardActorRef)). when(mockActorContext).findPrimaryShardAsync(eq(shardName)); doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 6b4f633778..bc80937897 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -42,6 +42,7 @@ import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Await; @@ -411,32 +412,36 @@ public class ActorContextTest extends AbstractActorTest{ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); + final String expPrimaryPath = "akka://test-system/find-primary-shard"; ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), mock(Configuration.class), dataStoreContext) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { - return Futures.successful((Object) new PrimaryFound("akka://test-system/test")); + return Futures.successful((Object) new PrimaryFound(expPrimaryPath)); } }; - Future foobar = actorContext.findPrimaryShardAsync("foobar"); - ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); assertNotNull(actual); + assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent()); + assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(), + expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString())); - Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); - ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS)); + PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS)); - assertEquals(cachedSelection, actual); + assertEquals(cachedInfo, actual); // Wait for 200 Milliseconds. The cached entry should have been removed. Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); - cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); assertNull(cached); @@ -461,7 +466,7 @@ public class ActorContextTest extends AbstractActorTest{ }; - Future foobar = actorContext.findPrimaryShardAsync("foobar"); + Future foobar = actorContext.findPrimaryShardAsync("foobar"); try { Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); @@ -470,7 +475,7 @@ public class ActorContextTest extends AbstractActorTest{ } - Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); assertNull(cached); } @@ -494,7 +499,7 @@ public class ActorContextTest extends AbstractActorTest{ }; - Future foobar = actorContext.findPrimaryShardAsync("foobar"); + Future foobar = actorContext.findPrimaryShardAsync("foobar"); try { Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); @@ -503,7 +508,7 @@ public class ActorContextTest extends AbstractActorTest{ } - Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); assertNull(cached); }