From: Tom Pantelis Date: Wed, 25 Mar 2015 04:07:39 +0000 (-0400) Subject: Bug 2194: Find primary shard on remote ShardManager X-Git-Tag: release/lithium~349^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=c9d0ae2a8ce525d150f360cf49f21b7c0187cfbf Bug 2194: Find primary shard on remote ShardManager If there is no local shard, the ShardManager now forwards the FindPrimary message to another ShardManager member. I changed the FindPrimary and PrimaryFound messages to Serializable. Previously they implemented SerializableMessage but they didn't have equivalent protobuff messages so they couldn't be sent over the wire. These are simple messages and we don't need protobuff. I also obsoleted the ActorNotInitialized and PrimaryNotFound messages as we also have equivalent exception classes (which are inhently Serializable) so the former messages are redundant. This avoids translation in ActorContext#findPrimaryShardAsync. Change-Id: I5762ec1dafb9aa12ee8230efe245b154b0bb9b72 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index bc4c825351..55a86ceeea 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -43,19 +43,19 @@ import java.util.concurrent.CountDownLatch; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; @@ -96,6 +96,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // A data store could be of type config/operational private final String type; + private final String shardManagerIdentifierString; + private final ClusterWrapper cluster; private final Configuration configuration; @@ -122,6 +124,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.datastoreContext = datastoreContext; this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent()); this.type = datastoreContext.getDataStoreType(); + this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString(); this.shardDispatcherPath = new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; @@ -158,8 +161,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void handleCommand(Object message) throws Exception { - if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) { - findPrimary(FindPrimary.fromSerializable(message)); + if (message instanceof FindPrimary) { + findPrimary((FindPrimary)message); } else if(message instanceof FindLocalShard){ findLocalShard((FindLocalShard) message); } else if (message instanceof UpdateSchemaContext) { @@ -203,13 +206,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardInformation shardInfo = message.getShardInfo(); LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(), - shardInfo.getShardId()); + shardInfo.getShardName()); shardInfo.removeOnShardInitialized(message.getOnShardInitialized()); if(!shardInfo.isShardInitialized()) { - message.getSender().tell(new ActorNotInitialized(), getSelf()); + LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName()); + message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf()); } else { + LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName()); message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf()); } } @@ -297,7 +302,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void markShardAsInitialized(String shardName) { - LOG.debug("Initializing shard [{}]", shardName); + LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName); ShardInformation shardInformation = localShards.get(shardName); if (shardInformation != null) { @@ -367,6 +372,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardInformation.addOnShardInitialized(onShardInitialized); + LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName()); + Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce( datastoreContext.getShardInitializationTimeout().duration(), getSelf(), new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender), @@ -375,8 +382,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onShardInitialized.setTimeoutSchedule(timeoutSchedule); } else if (!shardInformation.isShardInitialized()) { - getSender().tell(new ActorNotInitialized(), getSelf()); + LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), + shardInformation.getShardName()); + getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf()); } else { + LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), + shardInformation.getShardName()); getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf()); } @@ -392,13 +403,26 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { "recovering and a leader is being elected. Try again later.", shardId)); } + private NotInitializedException createNotInitializedException(ShardIdentifier shardId) { + return new NotInitializedException(String.format( + "Found primary shard %s but it's not initialized yet. Please try again later", shardId)); + } + private void memberRemoved(ClusterEvent.MemberRemoved message) { + String memberName = message.member().roles().head(); + + LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName, + message.member().address()); + memberNameToAddress.remove(message.member().roles().head()); } private void memberUp(ClusterEvent.MemberUp message) { String memberName = message.member().roles().head(); + LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName, + message.member().address()); + memberNameToAddress.put(memberName, message.member().address()); for(ShardInformation info : localShards.values()){ @@ -461,6 +485,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } + @VisibleForTesting + protected ClusterWrapper getCluster() { + return cluster; + } + @VisibleForTesting protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) { return getContext().actorOf(Shard.props(info.getShardId(), @@ -469,6 +498,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void findPrimary(FindPrimary message) { + LOG.debug("{}: In findPrimary: {}", persistenceId(), message); + final String shardName = message.getShardName(); // First see if the there is a local replica for the shard @@ -477,10 +508,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { sendResponse(info, message.isWaitUntilReady(), true, new Supplier() { @Override public Object get() { - Object found = new PrimaryFound(info.getSerializedLeaderActor()).toSerializable(); + Object found = new PrimaryFound(info.getSerializedLeaderActor()); if(LOG.isDebugEnabled()) { - LOG.debug("{}: Found primary for {}: {}", shardName, found); + LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); } return found; @@ -490,38 +521,35 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } - List members = configuration.getMembersFromShardName(shardName); + for(Map.Entry entry: memberNameToAddress.entrySet()) { + if(!cluster.getCurrentMemberName().equals(entry.getKey())) { + String path = getShardManagerActorPathBuilder(entry.getValue()).toString(); - if(cluster.getCurrentMemberName() != null) { - members.remove(cluster.getCurrentMemberName()); - } + LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(), + shardName, path); - /** - * FIXME: Instead of sending remote shard actor path back to sender, - * forward FindPrimary message to remote shard manager - */ - // There is no way for us to figure out the primary (for now) so assume - // that one of the remote nodes is a primary - for(String memberName : members) { - Address address = memberNameToAddress.get(memberName); - if(address != null){ - String path = - getShardActorPath(shardName, memberName); - getSender().tell(new PrimaryFound(path).toSerializable(), getSelf()); + getContext().actorSelection(path).forward(message, getContext()); return; } } - getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf()); + + LOG.debug("{}: No shard found for {}", persistenceId(), shardName); + + getSender().tell(new PrimaryNotFoundException( + String.format("No primary shard found for %s.", shardName)), getSelf()); + } + + private StringBuilder getShardManagerActorPathBuilder(Address address) { + StringBuilder builder = new StringBuilder(); + builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString); + return builder; } private String getShardActorPath(String shardName, String memberName) { Address address = memberNameToAddress.get(memberName); if(address != null) { - StringBuilder builder = new StringBuilder(); - builder.append(address.toString()) - .append("/user/") - .append(ShardManagerIdentifier.builder().type(type).build().toString()) - .append("/") + StringBuilder builder = getShardManagerActorPathBuilder(address); + builder.append("/") .append(getShardIdentifier(memberName, shardName)); return builder.toString(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java deleted file mode 100644 index 576010f916..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore.messages; - -import java.io.Serializable; - -public class ActorNotInitialized implements Serializable { - private static final long serialVersionUID = 1L; -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java index d51d6800a2..2c18eaa86f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java @@ -9,13 +9,14 @@ package org.opendaylight.controller.cluster.datastore.messages; import com.google.common.base.Preconditions; +import java.io.Serializable; /** * The FindPrimary message is used to locate the primary of any given shard * */ -public class FindPrimary implements SerializableMessage{ - public static final Class SERIALIZABLE_CLASS = FindPrimary.class; +public class FindPrimary implements Serializable { + private static final long serialVersionUID = 1L; private final String shardName; private final boolean waitUntilReady; @@ -36,15 +37,6 @@ public class FindPrimary implements SerializableMessage{ return waitUntilReady; } - @Override - public Object toSerializable() { - return this; - } - - public static FindPrimary fromSerializable(Object message){ - return (FindPrimary) message; - } - @Override public String toString() { StringBuilder builder = new StringBuilder(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java index a5565020ed..4c154d43ae 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java @@ -8,56 +8,48 @@ package org.opendaylight.controller.cluster.datastore.messages; +import java.io.Serializable; -public class PrimaryFound implements SerializableMessage { - public static final Class SERIALIZABLE_CLASS = PrimaryFound.class; - private final String primaryPath; +public class PrimaryFound implements Serializable { + private static final long serialVersionUID = 1L; - public PrimaryFound(final String primaryPath) { - this.primaryPath = primaryPath; - } + private final String primaryPath; - public String getPrimaryPath() { - return primaryPath; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; + public PrimaryFound(final String primaryPath) { + this.primaryPath = primaryPath; } - if (o == null || getClass() != o.getClass()) { - return false; - } - - PrimaryFound that = (PrimaryFound) o; - if (!primaryPath.equals(that.primaryPath)) { - return false; + public String getPrimaryPath() { + return primaryPath; } - return true; - } + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } - @Override - public int hashCode() { - return primaryPath.hashCode(); - } + PrimaryFound that = (PrimaryFound) o; - @Override - public String toString() { - return "PrimaryFound{" + - "primaryPath='" + primaryPath + '\'' + - '}'; - } + if (!primaryPath.equals(that.primaryPath)) { + return false; + } + return true; + } - @Override - public Object toSerializable() { - return this; - } + @Override + public int hashCode() { + return primaryPath.hashCode(); + } - public static PrimaryFound fromSerializable(final Object message){ - return (PrimaryFound) message; - } + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("PrimaryFound [primaryPath=").append(primaryPath).append("]"); + return builder.toString(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryNotFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryNotFound.java deleted file mode 100644 index b47c91b6e5..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryNotFound.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.datastore.messages; - -import com.google.common.base.Preconditions; - -public class PrimaryNotFound implements SerializableMessage { - public static final Class SERIALIZABLE_CLASS = PrimaryNotFound.class; - - private final String shardName; - - public PrimaryNotFound(final String shardName){ - - Preconditions.checkNotNull(shardName, "shardName should not be null"); - - this.shardName = shardName; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - PrimaryNotFound that = (PrimaryNotFound) o; - - if (shardName != null ? !shardName.equals(that.shardName) : that.shardName != null) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - return shardName != null ? shardName.hashCode() : 0; - } - - @Override - public Object toSerializable() { - return this; - } - - public static PrimaryNotFound fromSerializable(final Object message){ - return (PrimaryNotFound) message; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 6f9bb7fc9f..b6250fc1cc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -41,13 +41,11 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -209,25 +207,22 @@ public class ActorContext { return ret; } Future future = executeOperationAsync(shardManager, - new FindPrimary(shardName, true).toSerializable(), shardInitializationTimeout); + new FindPrimary(shardName, true), shardInitializationTimeout); return future.transform(new Mapper() { @Override public ActorSelection checkedApply(Object response) throws Exception { - if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) { - PrimaryFound found = PrimaryFound.fromSerializable(response); + if(response instanceof PrimaryFound) { + PrimaryFound found = (PrimaryFound)response; LOG.debug("Primary found {}", found.getPrimaryPath()); ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath()); primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection)); return actorSelection; - } else if(response instanceof ActorNotInitialized) { - throw new NotInitializedException( - String.format("Found primary shard %s but it's not initialized yet. " + - "Please try again later", shardName)); - } else if(response instanceof PrimaryNotFound) { - throw new PrimaryNotFoundException( - String.format("No primary shard found for %S.", shardName)); + } else if(response instanceof NotInitializedException) { + throw (NotInitializedException)response; + } else if(response instanceof PrimaryNotFoundException) { + throw (PrimaryNotFoundException)response; } else if(response instanceof NoShardLeaderException) { throw (NoShardLeaderException)response; } @@ -274,10 +269,8 @@ public class ActorContext { LocalShardFound found = (LocalShardFound)response; LOG.debug("Local shard found {}", found.getPath()); return found.getPath(); - } else if(response instanceof ActorNotInitialized) { - throw new NotInitializedException( - String.format("Found local shard for %s but it's not initialized yet.", - shardName)); + } else if(response instanceof NotInitializedException) { + throw (NotInitializedException)response; } else if(response instanceof LocalShardNotFound) { throw new LocalShardNotFoundException( String.format("Local shard for %s does not exist.", shardName)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java index f6c8f07f6b..fac597003e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java @@ -7,10 +7,10 @@ */ package org.opendaylight.controller.cluster.datastore; -import static org.mockito.Mockito.any; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -28,7 +28,7 @@ import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; @@ -173,7 +173,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); - reply(new ActorNotInitialized()); + reply(new NotInitializedException("not initialized")); new Within(duration("1 seconds")) { @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index ae7a4f96c5..95b1b78a19 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -9,16 +9,23 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.AddressFromURIString; import akka.actor.Props; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent; +import akka.dispatch.Dispatchers; import akka.japi.Creator; import akka.pattern.Patterns; import akka.persistence.RecoveryCompleted; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.ConfigFactory; import java.net.URI; import java.util.Arrays; import java.util.Collection; @@ -35,15 +42,16 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; @@ -75,6 +83,11 @@ public class ShardManagerTest extends AbstractActorTest { private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS); + private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) { + String name = new ShardIdentifier(shardName, memberName,"config").toString(); + return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name); + } + @Before public void setUp() { MockitoAnnotations.initMocks(this); @@ -100,21 +113,22 @@ public class ShardManagerTest extends AbstractActorTest { } private Props newPropsShardMgrWithMockShardActor() { + return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(), + new MockConfiguration()); + } + + private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor, + final ClusterWrapper clusterWrapper, final Configuration config) { Creator creator = new Creator() { private static final long serialVersionUID = 1L; @Override public ShardManager create() throws Exception { - return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), - datastoreContextBuilder.build(), ready) { - @Override - protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { - return mockShardActor; - } - }; + return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(), + ready, name, shardActor); } }; - return Props.create(new DelegatingShardManagerCreator(creator)); + return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()); } @Test @@ -124,9 +138,9 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary("non-existent", false), getRef()); - expectMsgEquals(duration("5 seconds"), new PrimaryNotFound("non-existent").toSerializable()); + expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class); }}; } @@ -146,9 +160,9 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name())), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); }}; @@ -170,9 +184,9 @@ public class ShardManagerTest extends AbstractActorTest { RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-2-shard-default")); }}; @@ -183,9 +197,9 @@ public class ShardManagerTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - expectMsgClass(duration("5 seconds"), ActorNotInitialized.class); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); }}; } @@ -197,7 +211,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); }}; @@ -215,15 +229,15 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); }}; @@ -238,7 +252,7 @@ public class ShardManagerTest extends AbstractActorTest { // We're passing waitUntilInitialized = true to FindPrimary so the response should be // delayed until we send ActorInitialized and RoleChangeNotification. - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); @@ -254,7 +268,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); @@ -269,9 +283,9 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); - expectMsgClass(duration("2 seconds"), ActorNotInitialized.class); + expectMsgClass(duration("2 seconds"), NotInitializedException.class); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -289,7 +303,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null, RaftState.Candidate.name()), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); }}; @@ -303,12 +317,78 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); }}; } + @Test + public void testOnReceiveFindPrimaryForRemoteShard() throws Exception { + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + + // Create an ActorSystem ShardManager actor for member-1. + + final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); + + final TestActorRef shardManager1 = TestActorRef.create(system1, + newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1), + new MockConfiguration()), shardManagerID); + + // Create an ActorSystem ShardManager actor for member-2. + + final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); + + Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2"); + + MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-2")).build()); + + final TestActorRef shardManager2 = TestActorRef.create(system2, + newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2), + mockConfig2), shardManagerID); + + new JavaTestKit(system1) {{ + + shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + shardManager2.tell(new ActorInitialized(), mockShardActor2); + + String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; + shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2); + shardManager2.tell(new RoleChangeNotification(memberId2, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); + + shardManager1.underlyingActor().waitForMemberUp(); + + shardManager1.tell(new FindPrimary("astronauts", false), getRef()); + + PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + String path = found.getPrimaryPath(); + assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config")); + + shardManager2.underlyingActor().verifyFindPrimary(); + + Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + shardManager1.underlyingActor().waitForMemberRemoved(); + + shardManager1.tell(new FindPrimary("astronauts", false), getRef()); + + expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class); + }}; + + JavaTestKit.shutdownActorSystem(system1); + JavaTestKit.shutdownActorSystem(system2); + } + @Test public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ @@ -348,7 +428,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); - expectMsgClass(duration("5 seconds"), ActorNotInitialized.class); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); }}; } @@ -371,42 +451,6 @@ public class ShardManagerTest extends AbstractActorTest { }}; } - @Test - public void testOnReceiveMemberUp() throws Exception { - new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); - - MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString()); - - shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef()); - - PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"), - PrimaryFound.SERIALIZABLE_CLASS)); - String path = found.getPrimaryPath(); - assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config")); - }}; - } - - @Test - public void testOnReceiveMemberDown() throws Exception { - - new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); - - MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString()); - - shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef()); - - expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); - - MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString()); - - shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef()); - - expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS); - }}; - } - @Test public void testOnRecoveryJournalIsCleaned() { InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules( @@ -804,4 +848,69 @@ public class ShardManagerTest extends AbstractActorTest { return delegate.create(); } } + + private static class ForwardingShardManager extends ShardManager { + private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1); + private CountDownLatch memberUpReceived = new CountDownLatch(1); + private CountDownLatch memberRemovedReceived = new CountDownLatch(1); + private final ActorRef shardActor; + private final String name; + + protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration, + DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name, + ActorRef shardActor) { + super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch); + this.shardActor = shardActor; + this.name = name; + } + + @Override + public void handleCommand(Object message) throws Exception { + try{ + super.handleCommand(message); + } finally { + if(message instanceof FindPrimary) { + findPrimaryMessageReceived.countDown(); + } else if(message instanceof ClusterEvent.MemberUp) { + String role = ((ClusterEvent.MemberUp)message).member().roles().head(); + if(!getCluster().getCurrentMemberName().equals(role)) { + memberUpReceived.countDown(); + } + } else if(message instanceof ClusterEvent.MemberRemoved) { + String role = ((ClusterEvent.MemberRemoved)message).member().roles().head(); + if(!getCluster().getCurrentMemberName().equals(role)) { + memberRemovedReceived.countDown(); + } + } + } + } + + @Override + public String persistenceId() { + return name; + } + + @Override + protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { + return shardActor; + } + + void waitForMemberUp() { + assertEquals("MemberUp received", true, + Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS)); + memberUpReceived = new CountDownLatch(1); + } + + void waitForMemberRemoved() { + assertEquals("MemberRemoved received", true, + Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS)); + memberRemovedReceived = new CountDownLatch(1); + } + + void verifyFindPrimary() { + assertEquals("FindPrimary received", true, + Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS)); + findPrimaryMessageReceived = new CountDownLatch(1); + } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 2746bcf982..6b4f633778 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -37,13 +37,11 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Await; @@ -458,7 +456,7 @@ public class ActorContextTest extends AbstractActorTest{ mock(Configuration.class), dataStoreContext) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { - return Futures.successful((Object) new PrimaryNotFound("foobar")); + return Futures.successful((Object) new PrimaryNotFoundException("not found")); } }; @@ -491,7 +489,7 @@ public class ActorContextTest extends AbstractActorTest{ mock(Configuration.class), dataStoreContext) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { - return Futures.successful((Object) new ActorNotInitialized()); + return Futures.successful((Object) new NotInitializedException("not iniislized")); } }; @@ -518,8 +516,8 @@ public class ActorContextTest extends AbstractActorTest{ TestActorRef shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props()); MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor(); - shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()).toSerializable()); - shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()).toSerializable()); + shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString())); + shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString())); shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found")); Configuration mockConfig = mock(Configuration.class); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java index fe40aa0fd4..810b270cfc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java @@ -14,14 +14,22 @@ import akka.actor.AddressFromURIString; import akka.cluster.ClusterEvent; import akka.cluster.MemberStatus; import akka.cluster.UniqueAddress; -import org.opendaylight.controller.cluster.datastore.ClusterWrapper; -import scala.collection.JavaConversions; import java.util.HashSet; import java.util.Set; +import org.opendaylight.controller.cluster.datastore.ClusterWrapper; +import scala.collection.JavaConversions; public class MockClusterWrapper implements ClusterWrapper{ private Address selfAddress = new Address("akka.tcp", "test", "127.0.0.1", 2550); + private String currentMemberName = "member-1"; + + public MockClusterWrapper() { + } + + public MockClusterWrapper(String currentMemberName) { + this.currentMemberName = currentMemberName; + } @Override public void subscribeToMemberEvents(ActorRef actorRef) { @@ -29,7 +37,7 @@ public class MockClusterWrapper implements ClusterWrapper{ @Override public String getCurrentMemberName() { - return "member-1"; + return currentMemberName; } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java index 4ef7d65857..0bc561f1bd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java @@ -9,6 +9,8 @@ package org.opendaylight.controller.cluster.datastore.utils; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -18,11 +20,23 @@ import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy; public class MockConfiguration implements Configuration{ - @Override public List getMemberShardNames(final String memberName) { - return Arrays.asList("default"); + private Map> shardMembers = ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + /*put("astronauts", Arrays.asList("member-2", "member-3")).*/build(); + + public MockConfiguration() { + } + + public MockConfiguration(Map> shardMembers) { + this.shardMembers = shardMembers; } - @Override public Optional getModuleNameFromNameSpace( + @Override + public List getMemberShardNames(final String memberName) { + return new ArrayList<>(shardMembers.keySet()); + } + @Override + public Optional getModuleNameFromNameSpace( final String nameSpace) { return Optional.absent(); } @@ -44,7 +58,8 @@ public class MockConfiguration implements Configuration{ return Arrays.asList("member-2", "member-3"); } - return Collections.emptyList(); + List members = shardMembers.get(shardName); + return members != null ? members : Collections.emptyList(); } @Override public Set getAllShardNames() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf index badec6f831..03634627d6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf @@ -34,3 +34,105 @@ bounded-mailbox { mailbox-capacity = 1000 mailbox-push-timeout-time = 100ms } + +Member1 { + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 100ms + } + + in-memory-journal { + class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal" + } + + in-memory-snapshot-store { + class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore" + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + } + + akka { + persistence.snapshot-store.plugin = "in-memory-snapshot-store" + persistence.journal.plugin = "in-memory-journal" + + loglevel = "DEBUG" + + actor { + provider = "akka.cluster.ClusterActorRefProvider" + + serializers { + java = "akka.serialization.JavaSerializer" + proto = "akka.remote.serialization.ProtobufSerializer" + } + + serialization-bindings { + "com.google.protobuf.Message" = proto + } + } + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "127.0.0.1" + port = 2558 + } + } + + cluster { + auto-down-unreachable-after = 100s + + roles = [ + "member-1" + ] + } + } +} + +Member2 { + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 100ms + } + + in-memory-journal { + class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal" + } + + in-memory-snapshot-store { + class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore" + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + } + + akka { + persistence.snapshot-store.plugin = "in-memory-snapshot-store" + persistence.journal.plugin = "in-memory-journal" + + actor { + provider = "akka.cluster.ClusterActorRefProvider" + + serializers { + java = "akka.serialization.JavaSerializer" + proto = "akka.remote.serialization.ProtobufSerializer" + } + + serialization-bindings { + "com.google.protobuf.Message" = proto + } + } + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "127.0.0.1" + port = 2559 + } + } + + cluster { + auto-down-unreachable-after = 100s + + roles = [ + "member-2" + ] + } + } +}