import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.SerializationUtils;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType,
- new RemoveShardReplica(shardName, memberName));
+ new RemoveShardReplica(shardName, MemberName.forName(memberName)));
Futures.addCallback(future, new FutureCallback<Success>() {
@Override
public void onSuccess(Success success) {
Function<String, Object> messageSupplier = new Function<String, Object>() {
@Override
public Object apply(String shardName) {
- return new RemoveShardReplica(shardName, memberName);
+ return new RemoveShardReplica(shardName, MemberName.forName(memberName));
}
};
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.MemberNode;
* @author Thomas Pantelis
*/
public class ClusterAdminRpcServiceTest {
+ private static final MemberName MEMBER_1 = MemberName.forName("member-1");
+ private static final MemberName MEMBER_2 = MemberName.forName("member-2");
+ private static final MemberName MEMBER_3 = MemberName.forName("member-3");
private final List<MemberNode> memberNodes = new ArrayList<>();
@Before
moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
- "pets", null, Arrays.asList("member-1"));
+ "pets", null, Arrays.asList(MEMBER_1));
leaderNode1.configDataStore().getActorContext().getShardManager().tell(
new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
leaderNode1.kit().expectMsgClass(Success.class);
newReplicaNode2.operDataStore().getActorContext().getShardManager().tell(
new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module",
- "no-leader", null, Arrays.asList("member-1")), Shard.builder(), null),
+ "no-leader", null, Arrays.asList(MEMBER_1)), Shard.builder(), null),
newReplicaNode2.kit().getRef());
newReplicaNode2.kit().expectMsgClass(Success.class);
verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
- "pets", null, Arrays.asList("member-1", "member-2", "member-3"));
+ "pets", null, Arrays.asList(MEMBER_1, MEMBER_2, MEMBER_3));
leaderNode1.configDataStore().getActorContext().getShardManager().tell(
new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
leaderNode1.kit().expectMsgClass(Success.class);
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
*/
abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
+ private static final MemberName UNKNOWN_MEMBER = MemberName.forName("UNKNOWN-MEMBER");
protected static final AtomicLong TX_COUNTER = new AtomicLong();
}
}
- protected String getMemberName() {
- String memberName = getActorContext().getCurrentMemberName();
- if (memberName == null) {
- memberName = "UNKNOWN-MEMBER";
- }
-
- return memberName;
+ protected MemberName getMemberName() {
+ final MemberName ret = getActorContext().getCurrentMemberName();
+ return ret == null ? UNKNOWN_MEMBER : ret;
}
/**
import akka.actor.ActorRef;
import akka.actor.Address;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
public interface ClusterWrapper {
void subscribeToMemberEvents(ActorRef actorRef);
- String getCurrentMemberName();
+ MemberName getCurrentMemberName();
Address getSelfAddress();
}
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
public class ClusterWrapperImpl implements ClusterWrapper {
private final Cluster cluster;
- private final String currentMemberName;
+ private final MemberName currentMemberName;
private final Address selfAddress;
public ClusterWrapperImpl(ActorSystem actorSystem){
"member-3 here would be the name of the member"
);
- currentMemberName = cluster.getSelfRoles().iterator().next();
+ currentMemberName = MemberName.forName(cluster.getSelfRoles().iterator().next());
selfAddress = cluster.selfAddress();
}
}
@Override
- public String getCurrentMemberName() {
+ public MemberName getCurrentMemberName() {
return currentMemberName;
}
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
public interface Configuration {
/**
* Returns all the shard names that belong on the member by the given name.
*/
- @Nonnull Collection<String> getMemberShardNames(@Nonnull String memberName);
+ @Nonnull Collection<String> getMemberShardNames(@Nonnull MemberName memberName);
/**
* Returns the module name for the given namespace name or null if not found.
/**
* Returns the member replicas for the given shard name.
*/
- @Nonnull Collection<String> getMembersFromShardName(@Nonnull String shardName);
+ @Nonnull Collection<MemberName> getMembersFromShardName(@Nonnull String shardName);
/**
* Returns the ShardStrategy for the given module name or null if the module is not found.
/**
* Returns a unique set of all member names configured for all shards.
*/
- Collection<String> getUniqueMemberNamesForAllShards();
+ Collection<MemberName> getUniqueMemberNamesForAllShards();
/*
* Verifies if the given module shard in available in the cluster
/**
* Adds the given member as the new replica for the given shardName
*/
- void addMemberReplicaForShard (String shardName, String memberName);
+ void addMemberReplicaForShard (String shardName, MemberName memberName);
/**
* Removes the given member as a replica for the given shardName
*/
- void removeMemberReplicaForShard (String shardName, String memberName);
+ void removeMemberReplicaForShard (String shardName, MemberName memberName);
}
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
}
@Override
- public Collection<String> getMemberShardNames(final String memberName){
+ public Collection<String> getMemberShardNames(final MemberName memberName){
Preconditions.checkNotNull(memberName, "memberName should not be null");
List<String> shards = new ArrayList<>();
- for(ModuleConfig moduleConfig: moduleConfigMap.values()) {
- for(ShardConfig shardConfig: moduleConfig.getShardConfigs()) {
+ for (ModuleConfig moduleConfig: moduleConfigMap.values()) {
+ for (ShardConfig shardConfig: moduleConfig.getShardConfigs()) {
if(shardConfig.getReplicas().contains(memberName)) {
shards.add(shardConfig.getName());
}
}
@Override
- public Collection<String> getMembersFromShardName(final String shardName) {
+ public Collection<MemberName> getMembersFromShardName(final String shardName) {
Preconditions.checkNotNull(shardName, "shardName should not be null");
for(ModuleConfig moduleConfig: moduleConfigMap.values()) {
}
@Override
- public Collection<String> getUniqueMemberNamesForAllShards() {
- Set<String> allNames = new HashSet<>();
+ public Collection<MemberName> getUniqueMemberNamesForAllShards() {
+ Set<MemberName> allNames = new HashSet<>();
for(String shardName: getAllShardNames()) {
allNames.addAll(getMembersFromShardName(shardName));
}
}
@Override
- public void addMemberReplicaForShard (String shardName, String newMemberName) {
+ public void addMemberReplicaForShard (String shardName, MemberName newMemberName) {
Preconditions.checkNotNull(shardName, "shardName should not be null");
Preconditions.checkNotNull(newMemberName, "MemberName should not be null");
for(ModuleConfig moduleConfig: moduleConfigMap.values()) {
ShardConfig shardConfig = moduleConfig.getShardConfig(shardName);
if(shardConfig != null) {
- Set<String> replicas = new HashSet<>(shardConfig.getReplicas());
+ Set<MemberName> replicas = new HashSet<>(shardConfig.getReplicas());
replicas.add(newMemberName);
updateModuleConfigMap(ModuleConfig.builder(moduleConfig).shardConfig(shardName, replicas).build());
return;
}
@Override
- public void removeMemberReplicaForShard (String shardName, String newMemberName) {
+ public void removeMemberReplicaForShard (String shardName, MemberName newMemberName) {
Preconditions.checkNotNull(shardName, "shardName should not be null");
Preconditions.checkNotNull(newMemberName, "MemberName should not be null");
for(ModuleConfig moduleConfig: moduleConfigMap.values()) {
ShardConfig shardConfig = moduleConfig.getShardConfig(shardName);
if(shardConfig != null) {
- Set<String> replicas = new HashSet<>(shardConfig.getReplicas());
+ Set<MemberName> replicas = new HashSet<>(shardConfig.getReplicas());
replicas.remove(newMemberName);
updateModuleConfigMap(ModuleConfig.builder(moduleConfig).shardConfig(shardName, replicas).build());
return;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
for(ConfigObject shard : shardsConfigObjectList){
String shardName = shard.get("name").unwrapped().toString();
- List<String> replicas = shard.toConfig().getStringList("replicas");
+ List<MemberName> replicas = shard.toConfig().getStringList("replicas").stream()
+ .map(MemberName::forName).collect(Collectors.toList());
builder.shardConfig(shardName, replicas);
}
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
/**
return this;
}
- public Builder shardConfig(String name, Collection<String> replicas) {
+ public Builder shardConfig(String name, Collection<MemberName> replicas) {
shardConfigs.put(name, new ShardConfig(name, replicas));
return this;
}
import java.util.Collection;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
/**
* Encapsulates information for adding a new module shard configuration.
private final String moduleName;
private final String shardName;
private final String shardStrategyName;
- private final Collection<String> shardMemberNames;
+ private final Collection<MemberName> shardMemberNames;
/**
* Constructs a new instance.
* @param shardMemberNames the names of the shard's member replicas.
*/
public ModuleShardConfiguration(@Nonnull URI namespace, @Nonnull String moduleName, @Nonnull String shardName,
- @Nullable String shardStrategyName, @Nonnull Collection<String> shardMemberNames) {
+ @Nullable String shardStrategyName, @Nonnull Collection<MemberName> shardMemberNames) {
this.namespace = Preconditions.checkNotNull(namespace, "nameSpace should not be null");
this.moduleName = Preconditions.checkNotNull(moduleName, "moduleName should not be null");
this.shardName = Preconditions.checkNotNull(shardName, "shardName should not be null");
return shardStrategyName;
}
- public Collection<String> getShardMemberNames() {
+ public Collection<MemberName> getShardMemberNames() {
return shardMemberNames;
}
import java.util.Collection;
import java.util.Set;
import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
/**
* Encapsulated configuration for a shard.
*/
public class ShardConfig {
private final String name;
- private final Set<String> replicas;
+ private final Set<MemberName> replicas;
- public ShardConfig(@Nonnull final String name, @Nonnull final Collection<String> replicas) {
+ public ShardConfig(@Nonnull final String name, @Nonnull final Collection<MemberName> replicas) {
this.name = Preconditions.checkNotNull(name);
this.replicas = ImmutableSet.copyOf(Preconditions.checkNotNull(replicas));
}
}
@Nonnull
- public Set<String> getReplicas() {
+ public Set<MemberName> getReplicas() {
return replicas;
}
}
\ No newline at end of file
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
ActorRef shardManagerActor = context.getShardManager();
Configuration configuration = context.getConfiguration();
- Collection<String> entityOwnersMemberNames = configuration.getUniqueMemberNamesForAllShards();
+ Collection<MemberName> entityOwnersMemberNames = configuration.getUniqueMemberNamesForAllShards();
CreateShard createShard = new CreateShard(new ModuleShardConfiguration(EntityOwners.QNAME.getNamespace(),
"entity-owners", ENTITY_OWNERSHIP_SHARD_NAME, ModuleShardStrategy.NAME, entityOwnersMemberNames),
newShardBuilder(context, strategyConfig), null);
return Optional.absent();
}
- String localMemberName = context.getCurrentMemberName();
+ MemberName localMemberName = context.getCurrentMemberName();
Optional<DataContainerChild<? extends PathArgument, ?>> ownerLeaf = entity.getChild(ENTITY_OWNER_NODE_ID);
String owner = ownerLeaf.isPresent() ? ownerLeaf.get().getValue().toString() : null;
boolean hasOwner = !Strings.isNullOrEmpty(owner);
- boolean isOwner = hasOwner && localMemberName.equals(owner);
+ boolean isOwner = hasOwner && localMemberName.getName().equals(owner);
return Optional.of(new EntityOwnershipState(isOwner, hasOwner));
}
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.base.Verify;
import java.util.Collection;
import java.util.Objects;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
private final String localMemberName;
private final EntityOwnershipListenerSupport listenerSupport;
- EntityOwnerChangeListener(final String localMemberName, final EntityOwnershipListenerSupport listenerSupport) {
- this.localMemberName = Preconditions.checkNotNull(localMemberName);
+ EntityOwnerChangeListener(final MemberName localMemberName, final EntityOwnershipListenerSupport listenerSupport) {
+ this.localMemberName = Verify.verifyNotNull(localMemberName.getName());
this.listenerSupport = Preconditions.checkNotNull(listenerSupport);
}
LOG.debug("{}: New owner: {}, Original owner: {}", logId(), newOwner, origOwner);
- if(!Objects.equals(origOwner, newOwner)) {
- boolean isOwner = Objects.equals(localMemberName, newOwner);
- boolean wasOwner = Objects.equals(localMemberName, origOwner);
+ if (!Objects.equals(origOwner, newOwner)) {
+ boolean isOwner = localMemberName.equals(newOwner);
+ boolean wasOwner = localMemberName.equals(origOwner);
boolean hasOwner = !Strings.isNullOrEmpty(newOwner);
Entity entity = createEntity(change.getRootPath());
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
* @author Thomas Pantelis
*/
class EntityOwnershipShard extends Shard {
- private final String localMemberName;
+ private final MemberName localMemberName;
private final EntityOwnershipShardCommitCoordinator commitCoordinator;
private final EntityOwnershipListenerSupport listenerSupport;
- private final Set<String> downPeerMemberNames = new HashSet<>();
- private final Map<String, String> peerIdToMemberNames = new HashMap<>();
+ private final Set<MemberName> downPeerMemberNames = new HashSet<>();
+ private final Map<String, MemberName> peerIdToMemberNames = new HashMap<>();
private final EntityOwnerSelectionStrategyConfig strategyConfig;
private final Map<YangInstanceIdentifier, Cancellable> entityToScheduledOwnershipTask = new HashMap<>();
private final EntityOwnershipStatistics entityOwnershipStatistics;
listenerSupport.setHasCandidateForEntity(registerCandidate.getEntity());
NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
- registerCandidate.getEntity().getId(), localMemberName);
+ registerCandidate.getEntity().getId(), localMemberName.getName());
commitCoordinator.commitModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners), this);
getSender().tell(SuccessReply.INSTANCE, getSelf());
Entity entity = unregisterCandidate.getEntity();
listenerSupport.unsetHasCandidateForEntity(entity);
- YangInstanceIdentifier candidatePath = candidatePath(entity.getType(), entity.getId(), localMemberName);
+ YangInstanceIdentifier candidatePath = candidatePath(entity.getType(), entity.getId(), localMemberName.getName());
commitCoordinator.commitModification(new DeleteModification(candidatePath), this);
getSender().tell(SuccessReply.INSTANCE, getSelf());
getSender().tell(SuccessReply.INSTANCE, getSelf());
- searchForEntities(new EntityWalker() {
- @Override
- public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
- Optional<DataContainerChild<?, ?>> possibleType = entityTypeNode.getChild(ENTITY_TYPE_NODE_ID);
- String entityType = possibleType.isPresent() ? possibleType.get().getValue().toString() : null;
- if (registerListener.getEntityType().equals(entityType)) {
- final boolean hasOwner;
- final boolean isOwner;
-
- Optional<DataContainerChild<?, ?>> possibleOwner = entityNode.getChild(ENTITY_OWNER_NODE_ID);
- if (possibleOwner.isPresent()) {
- isOwner = localMemberName.equals(possibleOwner.get().getValue().toString());
- hasOwner = true;
- } else {
- isOwner = false;
- hasOwner = false;
- }
-
- Entity entity = new Entity(entityType,
- (YangInstanceIdentifier) entityNode.getChild(ENTITY_ID_NODE_ID).get().getValue());
-
- listenerSupport.notifyEntityOwnershipListener(entity, false, isOwner, hasOwner,
- registerListener.getListener());
+ searchForEntities((entityTypeNode, entityNode) -> {
+ Optional<DataContainerChild<?, ?>> possibleType = entityTypeNode.getChild(ENTITY_TYPE_NODE_ID);
+ String entityType = possibleType.isPresent() ? possibleType.get().getValue().toString() : null;
+ if (registerListener.getEntityType().equals(entityType)) {
+ final boolean hasOwner;
+ final boolean isOwner;
+
+ Optional<DataContainerChild<?, ?>> possibleOwner = entityNode.getChild(ENTITY_OWNER_NODE_ID);
+ if (possibleOwner.isPresent()) {
+ isOwner = localMemberName.getName().equals(possibleOwner.get().getValue().toString());
+ hasOwner = true;
+ } else {
+ isOwner = false;
+ hasOwner = false;
}
+
+ Entity entity = new Entity(entityType,
+ (YangInstanceIdentifier) entityNode.getChild(ENTITY_ID_NODE_ID).get().getValue());
+
+ listenerSupport.notifyEntityOwnershipListener(entity, false, isOwner, hasOwner,
+ registerListener.getListener());
}
});
}
}
private void notifyAllListeners() {
- searchForEntities(new EntityWalker() {
- @Override
- public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
- Optional<DataContainerChild<?, ?>> possibleType = entityTypeNode.getChild(ENTITY_TYPE_NODE_ID);
- if (possibleType.isPresent()) {
- final boolean hasOwner;
- final boolean isOwner;
-
- Optional<DataContainerChild<?, ?>> possibleOwner = entityNode.getChild(ENTITY_OWNER_NODE_ID);
- if (possibleOwner.isPresent()) {
- isOwner = localMemberName.equals(possibleOwner.get().getValue().toString());
- hasOwner = true;
- } else {
- isOwner = false;
- hasOwner = false;
- }
-
- Entity entity = new Entity(possibleType.get().getValue().toString(),
- (YangInstanceIdentifier) entityNode.getChild(ENTITY_ID_NODE_ID).get().getValue());
-
- listenerSupport.notifyEntityOwnershipListeners(entity, isOwner, isOwner, hasOwner);
+ searchForEntities((entityTypeNode, entityNode) -> {
+ Optional<DataContainerChild<?, ?>> possibleType = entityTypeNode.getChild(ENTITY_TYPE_NODE_ID);
+ if (possibleType.isPresent()) {
+ final boolean hasOwner;
+ final boolean isOwner;
+
+ Optional<DataContainerChild<?, ?>> possibleOwner = entityNode.getChild(ENTITY_OWNER_NODE_ID);
+ if (possibleOwner.isPresent()) {
+ isOwner = localMemberName.getName().equals(possibleOwner.get().getValue().toString());
+ hasOwner = true;
+ } else {
+ isOwner = false;
+ hasOwner = false;
}
+
+ Entity entity = new Entity(possibleType.get().getValue().toString(),
+ (YangInstanceIdentifier) entityNode.getChild(ENTITY_ID_NODE_ID).get().getValue());
+
+ listenerSupport.notifyEntityOwnershipListeners(entity, isOwner, isOwner, hasOwner);
}
});
}
LOG.debug("{}: onLeaderChanged: oldLeader: {}, newLeader: {}, isLeader: {}", persistenceId(), oldLeader,
newLeader, isLeader);
- if(isLeader) {
+ if (isLeader) {
// Clear all existing strategies so that they get re-created when we call createStrategy again
// This allows the strategies to be re-initialized with existing statistics maintained by
// Remove the candidates for all members that are known to be down. In a cluster which has greater than
// 3 nodes it is possible for a some node beside the leader being down when the leadership transitions
// it makes sense to use this event to remove all the candidates for those downed nodes
- for(String downPeerName : downPeerMemberNames){
+ for (MemberName downPeerName : downPeerMemberNames) {
removeCandidateFromEntities(downPeerName);
}
} else {
// and the leader removed our candidate since the leader can't tell the difference between a
// temporary network partition and a node's process actually restarted. So, in that case, re-add
// our candidate.
- if(localMemberName.equals(message.getRemovedCandidate()) &&
+ if(localMemberName.getName().equals(message.getRemovedCandidate()) &&
listenerSupport.hasCandidateForEntity(createEntity(message.getEntityPath()))) {
LOG.debug("Local candidate member was removed but a local candidate is registered for {}" +
" - adding back local candidate", message.getEntityPath());
commitCoordinator.commitModification(new MergeModification(
- candidatePath(message.getEntityPath(), localMemberName),
- candidateMapEntry(localMemberName)), this);
+ candidatePath(message.getEntityPath(), localMemberName.getName()),
+ candidateMapEntry(localMemberName.getName())), this);
}
}
}
private void onPeerDown(PeerDown peerDown) {
LOG.info("{}: onPeerDown: {}", persistenceId(), peerDown);
- String downMemberName = peerDown.getMemberName();
+ MemberName downMemberName = peerDown.getMemberName();
if(downPeerMemberNames.add(downMemberName) && isLeader()) {
// Remove the down peer as a candidate from all entities.
removeCandidateFromEntities(downMemberName);
commitCoordinator.onStateChanged(this, isLeader());
}
- private void removeCandidateFromEntities(final String owner) {
+ private void removeCandidateFromEntities(final MemberName owner) {
final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
- searchForEntities(new EntityWalker() {
- @Override
- public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
- if (hasCandidate(entityNode, owner)) {
- YangInstanceIdentifier entityId =
- (YangInstanceIdentifier) entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME);
- YangInstanceIdentifier candidatePath = candidatePath(
- entityTypeNode.getIdentifier().getKeyValues().get(ENTITY_TYPE_QNAME).toString(),
- entityId, owner);
-
- LOG.info("{}: Found entity {}, removing candidate {}, path {}", persistenceId(), entityId,
- owner, candidatePath);
-
- modifications.addModification(new DeleteModification(candidatePath));
- }
+ searchForEntities((entityTypeNode, entityNode) -> {
+ if (hasCandidate(entityNode, owner)) {
+ YangInstanceIdentifier entityId =
+ (YangInstanceIdentifier) entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME);
+ YangInstanceIdentifier candidatePath = candidatePath(
+ entityTypeNode.getIdentifier().getKeyValues().get(ENTITY_TYPE_QNAME).toString(),
+ entityId, owner.getName());
+
+ LOG.info("{}: Found entity {}, removing candidate {}, path {}", persistenceId(), entityId,
+ owner, candidatePath);
+
+ modifications.addModification(new DeleteModification(candidatePath));
}
});
commitCoordinator.commitModifications(modifications, this);
}
- private static boolean hasCandidate(MapEntryNode entity, String candidateName) {
- return ((MapNode)entity.getChild(CANDIDATE_NODE_ID).get()).getChild(candidateNodeKey(candidateName)).isPresent();
+ private static boolean hasCandidate(MapEntryNode entity, MemberName candidateName) {
+ return ((MapNode)entity.getChild(CANDIDATE_NODE_ID).get()).getChild(candidateNodeKey(candidateName.getName()))
+ .isPresent();
}
private void searchForEntities(EntityWalker walker) {
}
static class Builder extends Shard.AbstractBuilder<Builder, EntityOwnershipShard> {
- private String localMemberName;
+ private MemberName localMemberName;
private EntityOwnerSelectionStrategyConfig ownerSelectionStrategyConfig;
protected Builder() {
super(EntityOwnershipShard.class);
}
- Builder localMemberName(String localMemberName) {
+ Builder localMemberName(MemberName localMemberName) {
checkSealed();
this.localMemberName = localMemberName;
return this;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
private final Logger log;
private int transactionIDCounter = 0;
- private final String localMemberName;
+ private final MemberName localMemberName;
private final Queue<Modification> pendingModifications = new LinkedList<>();
private BatchedModifications inflightCommit;
private Cancellable retryCommitSchedule;
- EntityOwnershipShardCommitCoordinator(String localMemberName, Logger log) {
+ EntityOwnershipShardCommitCoordinator(MemberName localMemberName, Logger log) {
this.localMemberName = localMemberName;
this.log = log;
}
import com.google.common.base.Preconditions;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
public class ShardIdentifier {
// This pattern needs to remain in sync with toString(), which produces
private static final Pattern PATTERN = Pattern.compile("(\\S+)-shard-(\\S+)-(\\S+)");
private final String shardName;
- private final String memberName;
+ private final MemberName memberName;
private final String type;
private final String fullName;
- public ShardIdentifier(String shardName, String memberName, String type) {
+ public ShardIdentifier(String shardName, MemberName memberName, String type) {
Preconditions.checkNotNull(shardName, "shardName should not be null");
Preconditions.checkNotNull(memberName, "memberName should not be null");
this.memberName = memberName;
this.type = type;
- fullName = new StringBuilder(memberName).append("-shard-").append(shardName).append("-")
+ fullName = new StringBuilder(memberName.getName()).append("-shard-").append(shardName).append("-")
.append(type).toString();
}
return shardName;
}
- public String getMemberName() {
+ public MemberName getMemberName() {
return memberName;
}
public static class Builder {
private String shardName;
- private String memberName;
+ private MemberName memberName;
private String type;
public ShardIdentifier build(){
return this;
}
- public Builder memberName(String memberName){
+ public Builder memberName(MemberName memberName){
this.memberName = memberName;
return this;
}
Matcher matcher = PATTERN.matcher(shardId);
if (matcher.matches()) {
- memberName = matcher.group(1);
+ memberName = MemberName.forName(matcher.group(1));
shardName = matcher.group(2);
type = matcher.group(3);
}
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
public class TransactionChainIdentifier {
private final AtomicLong txnCounter = new AtomicLong();
private final Supplier<String> stringRepresentation;
- private final String memberName;
+ private final MemberName memberName;
- public TransactionChainIdentifier(final String memberName, final long counter) {
+ public TransactionChainIdentifier(final MemberName memberName, final long counter) {
this.memberName = memberName;
- stringRepresentation = Suppliers.memoize(new Supplier<String>() {
- @Override
- public String get() {
- final StringBuilder sb = new StringBuilder();
- sb.append(memberName).append(CHAIN_SEPARATOR);
- sb.append(counter);
- return sb.toString();
- }
+ stringRepresentation = Suppliers.memoize(() -> {
+ final StringBuilder sb = new StringBuilder();
+ sb.append(memberName.getName()).append(CHAIN_SEPARATOR);
+ sb.append(counter);
+ return sb.toString();
});
}
@Override
return new ChainedTransactionIdentifier(this, txnCounter.incrementAndGet());
}
- public String getMemberName() {
+ public MemberName getMemberName() {
return memberName;
}
}
package org.opendaylight.controller.cluster.datastore.identifiers;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
public class TransactionIdentifier {
protected static final String TX_SEPARATOR = "-txn-";
- private final String memberName;
+ private final MemberName memberName;
private final long counter;
private final long timestamp;
private String stringRepresentation;
- public TransactionIdentifier(String memberName, long counter) {
+ public TransactionIdentifier(MemberName memberName, long counter) {
this.memberName = Preconditions.checkNotNull(memberName, "memberName should not be null");
this.counter = counter;
this.timestamp = System.currentTimeMillis();
return "";
}
- protected String getMemberName() {
+ protected MemberName getMemberName() {
return memberName;
}
return timestamp;
}
- public static TransactionIdentifier create(String memberName, long counter) {
+ public static TransactionIdentifier create(MemberName memberName, long counter) {
return new TransactionIdentifier(memberName, counter);
}
@Override
public String toString() {
if(stringRepresentation == null) {
- stringRepresentation = new StringBuilder(memberName.length() + TX_SEPARATOR.length() + 21).
- append(memberName).append(TX_SEPARATOR).append(counter).append('-').append(timestamp).toString();
+ stringRepresentation = new StringBuilder(memberName.getName().length() + TX_SEPARATOR.length() + 21).
+ append(memberName.getName()).append(TX_SEPARATOR).append(counter).append('-').append(timestamp).toString();
}
return stringRepresentation;
*/
package org.opendaylight.controller.cluster.datastore.messages;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+
/**
* Message sent to a shard actor indicating one of its peers is down.
*
* @author Thomas Pantelis
*/
public class PeerDown {
- private final String memberName;
+ private final MemberName memberName;
private final String peerId;
- public PeerDown(String memberName, String peerId) {
+ public PeerDown(MemberName memberName, String peerId) {
this.memberName = memberName;
this.peerId = peerId;
}
- public String getMemberName() {
+ public MemberName getMemberName() {
return memberName;
}
@Override
public String toString() {
- return "PeerDown [memberName=" + memberName + ", peerId=" + peerId + "]";
+ return "PeerDown [memberName=" + memberName.getName() + ", peerId=" + peerId + "]";
}
}
*/
package org.opendaylight.controller.cluster.datastore.messages;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+
/**
* Message sent to a shard actor indicating one of its peers is up.
*
* @author Thomas Pantelis
*/
public class PeerUp {
- private final String memberName;
+ private final MemberName memberName;
private final String peerId;
- public PeerUp(String memberName, String peerId) {
+ public PeerUp(MemberName memberName, String peerId) {
this.memberName = memberName;
this.peerId = peerId;
}
- public String getMemberName() {
+ public MemberName getMemberName() {
return memberName;
}
-
public String getPeerId() {
return peerId;
}
@Override
public String toString() {
- return "PeerUp [memberName=" + memberName + ", peerId=" + peerId + "]";
+ return "PeerUp [memberName=" + memberName.getName() + ", peerId=" + peerId + "]";
}
}
\ No newline at end of file
import com.google.common.base.Preconditions;
import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
/**
* A message sent to the ShardManager to dynamically remove a local shard
public class RemoveShardReplica {
private final String shardName;
- private final String memberName;
+ private final MemberName memberName;
/**
* Constructor.
*
* @param shardName name of the local shard that is to be dynamically removed.
*/
- public RemoveShardReplica (@Nonnull String shardName, @Nonnull String memberName) {
+ public RemoveShardReplica (@Nonnull String shardName, @Nonnull MemberName memberName) {
this.shardName = Preconditions.checkNotNull(shardName, "shardName should not be null");
this.memberName = Preconditions.checkNotNull(memberName, "memberName should not be null");
}
return shardName;
}
- public String getMemberName() {
+ public MemberName getMemberName() {
return memberName;
}
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
notifyOnShardInitializedCallbacks();
}
- void peerDown(String memberName, String peerId, ActorRef sender) {
+ void peerDown(MemberName memberName, String peerId, ActorRef sender) {
if(actor != null) {
actor.tell(new PeerDown(memberName, peerId), sender);
}
}
- void peerUp(String memberName, String peerId, ActorRef sender) {
+ void peerUp(MemberName memberName, String peerId, ActorRef sender) {
if(actor != null) {
actor.tell(new PeerUp(memberName, peerId), sender);
}
import akka.actor.PoisonPill;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
+import akka.actor.SupervisorStrategy.Directive;
import akka.cluster.ClusterEvent;
+import akka.cluster.Member;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.japi.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.lang3.SerializationUtils;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
return;
}
- sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
- @Override
- public Object get() {
- return new LocalShardFound(shardInformation.getActor());
- }
- });
+ sendResponse(shardInformation, message.isWaitUntilInitialized(), false, () -> new LocalShardFound(shardInformation.getActor()));
}
private void sendResponse(ShardInformation shardInformation, boolean doWait,
final ActorRef sender = getSender();
final ActorRef self = self();
- Runnable replyRunnable = new Runnable() {
- @Override
- public void run() {
- sender.tell(messageSupplier.get(), self);
- }
- };
+ Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
new OnShardInitialized(replyRunnable);
"Found primary shard %s but it's not initialized yet. Please try again later", shardId));
}
+ @VisibleForTesting
+ static MemberName memberToName(final Member member) {
+ return MemberName.forName(member.roles().iterator().next());
+ }
+
private void memberRemoved(ClusterEvent.MemberRemoved message) {
- String memberName = message.member().roles().iterator().next();
+ MemberName memberName = memberToName(message.member());
LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
}
private void memberExited(ClusterEvent.MemberExited message) {
- String memberName = message.member().roles().iterator().next();
+ MemberName memberName = memberToName(message.member());
LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
}
private void memberUp(ClusterEvent.MemberUp message) {
- String memberName = message.member().roles().iterator().next();
+ MemberName memberName = memberToName(message.member());
LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
checkReady();
}
- private void addPeerAddress(String memberName, Address address) {
+ private void addPeerAddress(MemberName memberName, Address address) {
peerAddressResolver.addPeerAddress(memberName, address);
for(ShardInformation info : localShards.values()){
}
private void memberReachable(ClusterEvent.ReachableMember message) {
- String memberName = message.member().roles().iterator().next();
+ MemberName memberName = memberToName(message.member());
LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
addPeerAddress(memberName, message.member().address());
}
private void memberUnreachable(ClusterEvent.UnreachableMember message) {
- String memberName = message.member().roles().iterator().next();
+ MemberName memberName = memberToName(message.member());
LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
markMemberUnavailable(memberName);
}
- private void markMemberUnavailable(final String memberName) {
- for(ShardInformation info : localShards.values()){
+ private void markMemberUnavailable(final MemberName memberName) {
+ final String memberStr = memberName.getName();
+ for (ShardInformation info : localShards.values()) {
String leaderId = info.getLeaderId();
- if(leaderId != null && leaderId.contains(memberName)) {
+ // XXX: why are we using String#contains() here?
+ if (leaderId != null && leaderId.contains(memberStr)) {
LOG.debug("Marking Leader {} as unavailable.", leaderId);
info.setLeaderAvailable(false);
}
}
- private void markMemberAvailable(final String memberName) {
- for(ShardInformation info : localShards.values()){
+ private void markMemberAvailable(final MemberName memberName) {
+ final String memberStr = memberName.getName();
+ for (ShardInformation info : localShards.values()) {
String leaderId = info.getLeaderId();
- if(leaderId != null && leaderId.contains(memberName)) {
+ // XXX: why are we using String#contains() here?
+ if (leaderId != null && leaderId.contains(memberStr)) {
LOG.debug("Marking Leader {} as available.", leaderId);
info.setLeaderAvailable(true);
}
// First see if the there is a local replica for the shard
final ShardInformation info = localShards.get(shardName);
if (info != null && info.isActiveMember()) {
- sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
- @Override
- public Object get() {
- String primaryPath = info.getSerializedLeaderActor();
- Object found = canReturnLocalShardState && info.isLeader() ?
- new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
- new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
- }
-
- return found;
- }
+ sendResponse(info, message.isWaitUntilReady(), true, () -> {
+ String primaryPath = info.getSerializedLeaderActor();
+ Object found = canReturnLocalShardState && info.isLeader() ?
+ new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
+ new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
+ }
+
+ return found;
});
return;
* @param shardName
* @return
*/
- private ShardIdentifier getShardIdentifier(String memberName, String shardName){
+ private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName){
return peerAddressResolver.getShardIdentifier(memberName, shardName);
}
*
*/
private void createLocalShards() {
- String memberName = this.cluster.getCurrentMemberName();
+ MemberName memberName = this.cluster.getCurrentMemberName();
Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
* @param shardName
*/
private Map<String, String> getPeerAddresses(String shardName) {
- Collection<String> members = configuration.getMembersFromShardName(shardName);
+ Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
Map<String, String> peerAddresses = new HashMap<>();
- String currentMemberName = this.cluster.getCurrentMemberName();
+ MemberName currentMemberName = this.cluster.getCurrentMemberName();
- for(String memberName : members) {
- if(!currentMemberName.equals(memberName)) {
+ for (MemberName memberName : members) {
+ if (!currentMemberName.equals(memberName)) {
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
peerAddresses.put(shardId.toString(), address);
public SupervisorStrategy supervisorStrategy() {
return new OneForOneStrategy(10, Duration.create("1 minute"),
- new Function<Throwable, SupervisorStrategy.Directive>() {
- @Override
- public SupervisorStrategy.Directive apply(Throwable t) {
- LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
- return SupervisorStrategy.resume();
- }
- }
+ (Function<Throwable, Directive>) t -> {
+ LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
+ return SupervisorStrategy.resume();
+ }
);
}
LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
- String currentMember = cluster.getCurrentMemberName();
+ final MemberName currentMember = cluster.getCurrentMemberName();
Set<String> configuredShardList =
new HashSet<>(configuration.getMemberShardNames(currentMember));
for (String shard : currentSnapshot.getShardList()) {
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.util.List;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
private static final long ASK_TIMEOUT_MILLIS = 5000;
private final ActorRef shardManager;
- private final String memberName;
+ private final MemberName memberName;
private volatile boolean syncStatus = false;
- ShardManagerInfo(final ActorRef shardManager, final String memberName, final String name,
+ ShardManagerInfo(final ActorRef shardManager, final MemberName memberName, final String name,
final String mxBeanType) {
super(name, mxBeanType, JMX_CATEGORY_SHARD_MANAGER);
this.shardManager = Preconditions.checkNotNull(shardManager);
@Override
public String getMemberName() {
- return memberName;
+ return memberName.getName();
}
private void requestSwitchShardState(final ShardIdentifier shardId, final String newState, final long term) {
package org.opendaylight.controller.cluster.datastore.shardmanager;
import akka.actor.Address;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
class ShardPeerAddressResolver implements PeerAddressResolver {
// Stores a mapping between a member name and the address of the member. The map is concurrent as it
// will be accessed by multiple threads via the public resolve method.
- private final ConcurrentMap<String, Address> memberNameToAddress = new ConcurrentHashMap<>();
+ private final ConcurrentMap<MemberName, Address> memberNameToAddress = new ConcurrentHashMap<>();
private final String shardManagerIdentifier;
private final String shardManagerType;
- private final String localMemberName;
+ private final MemberName localMemberName;
- public ShardPeerAddressResolver(String shardManagerType, String localMemberName) {
+ public ShardPeerAddressResolver(String shardManagerType, MemberName localMemberName) {
this.shardManagerIdentifier = ShardManagerIdentifier.builder().type(shardManagerType).build().toString();
this.shardManagerType = shardManagerType;
- this.localMemberName = localMemberName;
+ this.localMemberName = Preconditions.checkNotNull(localMemberName);
}
- void addPeerAddress(String memberName, Address address) {
+ void addPeerAddress(MemberName memberName, Address address) {
memberNameToAddress.put(memberName, address);
}
- void removePeerAddress(String memberName) {
+ void removePeerAddress(MemberName memberName) {
memberNameToAddress.remove(memberName);
}
- Address getPeerAddress(String memberName) {
+ Address getPeerAddress(MemberName memberName) {
return memberNameToAddress.get(memberName);
}
Collection<String> getShardManagerPeerActorAddresses() {
Collection<String> peerAddresses = new ArrayList<>();
- for(Map.Entry<String, Address> entry: memberNameToAddress.entrySet()) {
+ for(Map.Entry<MemberName, Address> entry: memberNameToAddress.entrySet()) {
if(!localMemberName.equals(entry.getKey())) {
peerAddresses.add(getShardManagerActorPathBuilder(entry.getValue()).toString());
}
return peerAddresses;
}
- ShardIdentifier getShardIdentifier(String memberName, String shardName){
+ ShardIdentifier getShardIdentifier(MemberName memberName, String shardName){
return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(shardManagerType).build();
}
- String getShardActorAddress(String shardName, String memberName) {
+ String getShardActorAddress(String shardName, MemberName memberName) {
Address memberAddress = memberNameToAddress.get(memberName);
if(memberAddress != null) {
return getShardManagerActorPathBuilder(memberAddress).append("/").append(
import com.google.common.base.Strings;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
return clusterWrapper;
}
- public String getCurrentMemberName(){
+ public MemberName getCurrentMemberName(){
return clusterWrapper.getCurrentMemberName();
}
import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
- protected final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
+ protected final ShardIdentifier shardID = ShardIdentifier.builder().memberName(MemberName.forName("member-1"))
.shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().
final CountDownLatch recoveryComplete = new CountDownLatch(1);
@SuppressWarnings("serial")
- final
- Creator<Shard> creator = new Creator<Shard>() {
+ final Creator<Shard> creator = new Creator<Shard>() {
@Override
public Shard create() throws Exception {
return new Shard(newShardBuilder()) {
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.TransactionProxyTest.TestException;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
doReturn(getSystem()).when(mockActorContext).getActorSystem();
doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
- doReturn(memberName).when(mockActorContext).getCurrentMemberName();
+ doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
doReturn(new ShardStrategyFactory(configuration)).when(mockActorContext).getShardStrategyFactory();
doReturn(schemaContext).when(mockActorContext).getSchemaContext();
doReturn(new Timeout(operationTimeoutInSeconds, TimeUnit.SECONDS)).when(mockActorContext).getOperationTimeout();
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.slf4j.Logger;
import scala.concurrent.Future;
List<Future<Object>> expCohortFutures = new ArrayList<>();
doReturn(expCohortFutures).when(mockDelegate).getCohortFutures();
- TransactionIdentifier transactionId = TransactionIdentifier.create("1", 1);
+ TransactionIdentifier transactionId = TransactionIdentifier.create(MemberName.forName("1"), 1);
Throwable debugContext = new RuntimeException("mock");
DebugThreePhaseCommitCohort cohort = new DebugThreePhaseCommitCohort(transactionId , mockDelegate , debugContext );
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
String... peerMemberNames) throws Exception {
final Set<String> peerIds = Sets.newHashSet();
for(String p: peerMemberNames) {
- peerIds.add(ShardIdentifier.builder().memberName(p).shardName(shardName).
+ peerIds.add(ShardIdentifier.builder().memberName(MemberName.forName(p)).shardName(shardName).
type(datastore.getActorContext().getDataStoreName()).build().toString());
}
node.kit = new IntegrationTestKit(system, datastoreContextBuilder);
- String memberName = new ClusterWrapperImpl(system).getCurrentMemberName();
+ String memberName = new ClusterWrapperImpl(system).getCurrentMemberName().getName();
node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName);
node.configDataStore = node.kit.setupDistributedDataStore("config_" + testName, moduleShardsConfig,
true, schemaContext, waitForshardLeader);
import static org.junit.Assert.assertEquals;
import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
@Test
public void testOnComplete() throws Exception {
int permits = 10;
- OperationLimiter limiter = new OperationLimiter(new TransactionIdentifier("foo", 1), permits, 1);
+ OperationLimiter limiter = new OperationLimiter(new TransactionIdentifier(MemberName.forName("foo"), 1), permits, 1);
limiter.acquire(permits);
int availablePermits = 0;
import org.mockito.InOrder;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
@Test
public void testPeerAddressResolved() throws Exception {
new ShardTestKit(getSystem()) {{
- ShardIdentifier peerID = ShardIdentifier.builder().memberName("member-2")
+ ShardIdentifier peerID = ShardIdentifier.builder().memberName(MemberName.forName("member-2"))
.shardName("inventory").type("config").build();
final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder().
peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null)).props().
public void testClusteredDataChangeListenerRegistration() throws Exception {
new ShardTestKit(getSystem()) {{
String testName = "testClusteredDataChangeListenerRegistration";
- final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
- actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
+ final ShardIdentifier followerShardID = ShardIdentifier.builder()
+ .memberName(MemberName.forName(actorFactory.generateActorId(testName + "-follower")))
+ .shardName("inventory").type("config").build();
- final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
- actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
+ final ShardIdentifier leaderShardID = ShardIdentifier.builder()
+ .memberName(MemberName.forName(actorFactory.generateActorId(testName + "-leader")))
+ .shardName("inventory").type("config").build();
final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
Shard.builder().id(followerShardID).
public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
new ShardTestKit(getSystem()) {{
String testName = "testClusteredDataTreeChangeListenerRegistration";
- final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
- actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
+ final ShardIdentifier followerShardID = ShardIdentifier.builder()
+ .memberName(MemberName.forName(actorFactory.generateActorId(testName + "-follower")))
+ .shardName("inventory").type("config").build();
- final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
- actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
+ final ShardIdentifier leaderShardID = ShardIdentifier.builder()
+ .memberName(MemberName.forName(actorFactory.generateActorId(testName + "-leader")))
+ .shardName("inventory").type("config").build();
final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
Shard.builder().id(followerShardID).
import akka.testkit.TestActorRef;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
private static final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
private static final ShardIdentifier SHARD_IDENTIFIER =
- ShardIdentifier.builder().memberName("member-1")
+ ShardIdentifier.builder().memberName(MemberName.forName("member-1"))
.shardName("inventory").type("operational").build();
private final DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
private static final TransactionType WO = TransactionType.WRITE_ONLY;
private static final ShardIdentifier SHARD_IDENTIFIER =
- ShardIdentifier.builder().memberName("member-1")
+ ShardIdentifier.builder().memberName(MemberName.forName("member-1"))
.shardName("inventory").type("config").build();
private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
- doReturn(memberName).when(mockActorContext).getCurrentMemberName();
+ doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
public class ConfigurationImplTest {
+ private static final MemberName MEMBER_1 = MemberName.forName("member-1");
+ private static final MemberName MEMBER_2 = MemberName.forName("member-2");
+ private static final MemberName MEMBER_3 = MemberName.forName("member-3");
+ private static final MemberName MEMBER_4 = MemberName.forName("member-4");
+ private static final MemberName MEMBER_5 = MemberName.forName("member-5");
+ private static final MemberName MEMBER_100 = MemberName.forName("member-100");
private ConfigurationImpl configuration;
@Test
public void testGetMemberShardNames(){
- Collection<String> memberShardNames = configuration.getMemberShardNames("member-1");
+ Collection<String> memberShardNames = configuration.getMemberShardNames(MEMBER_1);
assertEquals("getMemberShardNames", ImmutableSortedSet.of("people-1", "cars-1", "test-1", "default"),
ImmutableSortedSet.copyOf(memberShardNames));
- memberShardNames = configuration.getMemberShardNames("member-2");
+ memberShardNames = configuration.getMemberShardNames(MEMBER_2);
assertEquals("getMemberShardNames", ImmutableSortedSet.of("default"),
ImmutableSortedSet.copyOf(memberShardNames));
- memberShardNames = configuration.getMemberShardNames("member-100");
+ memberShardNames = configuration.getMemberShardNames(MEMBER_100);
assertEquals("getMemberShardNames size", 0, memberShardNames.size());
}
@Test
public void testGetMembersFromShardName(){
- Collection<String> members = configuration.getMembersFromShardName("default");
- assertEquals("getMembersFromShardName", ImmutableSortedSet.of("member-1", "member-2", "member-3"),
+ Collection<MemberName> members = configuration.getMembersFromShardName("default");
+ assertEquals("getMembersFromShardName", ImmutableSortedSet.of(MEMBER_1, MEMBER_2, MEMBER_3),
ImmutableSortedSet.copyOf(members));
members = configuration.getMembersFromShardName("cars-1");
- assertEquals("getMembersFromShardName", ImmutableSortedSet.of("member-1"),
+ assertEquals("getMembersFromShardName", ImmutableSortedSet.of(MEMBER_1),
ImmutableSortedSet.copyOf(members));
// Try to find a shard which is not present
String moduleName = "oven";
String shardName = "oven-shard";
String shardStrategyName = ModuleShardStrategy.NAME;
- Collection<String> shardMemberNames = ImmutableSortedSet.of("member-1", "member-4", "member-5");
+ Collection<MemberName> shardMemberNames = ImmutableSortedSet.of(MEMBER_1, MEMBER_4, MEMBER_5);
configuration.addModuleShardConfiguration(new ModuleShardConfiguration(namespace, moduleName, shardName,
shardStrategyName, shardMemberNames));
assertEquals("getMemberShardNames", ImmutableSortedSet.of("people-1", "cars-1", "test-1", "default", shardName),
- ImmutableSortedSet.copyOf(configuration.getMemberShardNames("member-1")));
+ ImmutableSortedSet.copyOf(configuration.getMemberShardNames(MEMBER_1)));
assertEquals("getMemberShardNames", ImmutableSortedSet.of(shardName),
- ImmutableSortedSet.copyOf(configuration.getMemberShardNames("member-4")));
+ ImmutableSortedSet.copyOf(configuration.getMemberShardNames(MEMBER_4)));
assertEquals("getMemberShardNames", ImmutableSortedSet.of(shardName),
- ImmutableSortedSet.copyOf(configuration.getMemberShardNames("member-5")));
+ ImmutableSortedSet.copyOf(configuration.getMemberShardNames(MEMBER_5)));
assertEquals("getMembersFromShardName", shardMemberNames,
ImmutableSortedSet.copyOf(configuration.getMembersFromShardName(shardName)));
assertEquals("getShardNameForModule", shardName, configuration.getShardNameForModule(moduleName));
@Test
public void testGetUniqueMemberNamesForAllShards() {
- assertEquals("getUniqueMemberNamesForAllShards", Sets.newHashSet("member-1", "member-2", "member-3"),
+ assertEquals("getUniqueMemberNamesForAllShards", Sets.newHashSet(MEMBER_1, MEMBER_2, MEMBER_3),
configuration.getUniqueMemberNamesForAllShards());
}
@Test
public void testAddMemberReplicaForShard() {
- configuration.addMemberReplicaForShard("people-1", "member-2");
+ configuration.addMemberReplicaForShard("people-1", MEMBER_2);
String shardName = configuration.getShardNameForModule("people");
assertEquals("ModuleShardName", "people-1", shardName);
ShardStrategy shardStrategy = configuration.getStrategyForModule("people");
assertEquals("ModuleStrategy", ModuleShardStrategy.class, shardStrategy.getClass());
- Collection<String> members = configuration.getMembersFromShardName("people-1");
- assertEquals("Members", ImmutableSortedSet.of("member-1", "member-2"),
+ Collection<MemberName> members = configuration.getMembersFromShardName("people-1");
+ assertEquals("Members", ImmutableSortedSet.of(MEMBER_1, MEMBER_2),
ImmutableSortedSet.copyOf(members));
- configuration.addMemberReplicaForShard("non-existent", "member-2");
+ configuration.addMemberReplicaForShard("non-existent", MEMBER_2);
Set<String> shardNames = configuration.getAllShardNames();
assertEquals("ShardNames", ImmutableSortedSet.of("people-1", "cars-1", "test-1", "default"),
ImmutableSortedSet.copyOf(shardNames));
@Test
public void testRemoveMemberReplicaForShard() {
- configuration.removeMemberReplicaForShard("default", "member-2");
+ configuration.removeMemberReplicaForShard("default", MEMBER_2);
String shardName = configuration.getShardNameForModule("default");
assertEquals("ModuleShardName", "default", shardName);
ShardStrategy shardStrategy = configuration.getStrategyForModule("default");
assertNull("ModuleStrategy", shardStrategy);
- Collection<String> members = configuration.getMembersFromShardName("default");
- assertEquals("Members", ImmutableSortedSet.of("member-1", "member-3"),
+ Collection<MemberName> members = configuration.getMembersFromShardName("default");
+ assertEquals("Members", ImmutableSortedSet.of(MEMBER_1, MEMBER_3),
ImmutableSortedSet.copyOf(members));
- configuration.removeMemberReplicaForShard("non-existent", "member-2");
+ configuration.removeMemberReplicaForShard("non-existent", MEMBER_2);
Set<String> shardNames = configuration.getAllShardNames();
assertEquals("ShardNames", ImmutableSortedSet.of("people-1", "cars-1", "test-1", "default"),
ImmutableSortedSet.copyOf(shardNames));
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
Configuration configuration = new ConfigurationImpl(new EmptyModuleShardConfigProvider()) {
@Override
- public Collection<String> getUniqueMemberNamesForAllShards() {
- return Sets.newHashSet("member-1");
+ public Collection<MemberName> getUniqueMemberNamesForAllShards() {
+ return Sets.newHashSet(MemberName.forName("member-1"));
}
};
verifyRegisterCandidateLocal(service, entity);
verifyEntityOwnershipCandidateRegistration(entity, reg);
verifyEntityCandidate(service.getLocalEntityOwnershipShard(), ENTITY_TYPE, entityId,
- dataStore.getActorContext().getCurrentMemberName());
+ dataStore.getActorContext().getCurrentMemberName().getName());
// Register the same entity - should throw exception
verifyRegisterCandidateLocal(service, entity2);
verifyEntityOwnershipCandidateRegistration(entity2, reg2);
verifyEntityCandidate(service.getLocalEntityOwnershipShard(), ENTITY_TYPE2, entityId,
- dataStore.getActorContext().getCurrentMemberName());
+ dataStore.getActorContext().getCurrentMemberName().getName());
service.close();
}
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath;
import org.junit.Before;
import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.ShardDataTree;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
@Before
public void setup() {
- listener = new EntityOwnerChangeListener(LOCAL_MEMBER_NAME, mockListenerSupport);
+ listener = new EntityOwnerChangeListener(MemberName.forName(LOCAL_MEMBER_NAME), mockListenerSupport);
listener.init(shardDataTree);
}
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
// Send PeerDown and PeerUp with no entities
- leader.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
- leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
+ leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
+ leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
// Add candidates for entity1 with the local leader as the owner
kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
kit.unwatch(peer2);
- leader.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
+ leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
// Send PeerDown again - should be noop
- leader.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
- peer1.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
+ leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
+ peer1.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, ""); // no other candidates so should clear
verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
peer2 = actorFactory.createTestActor(newShardProps(peerId2,
ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId1.toString(), "").build(),
peerMemberName2, EntityOwnerSelectionStrategyConfig.newBuilder().build()). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString());
- leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
+ leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
// Send PeerUp again - should be noop
- leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
- peer1.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
+ leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
+ peer1.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
// Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
- leader.tell(new PeerDown(peerMemberName1, peerId1.toString()), ActorRef.noSender());
+ leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
peer1 = actorFactory.createTestActor(newShardProps(peerId1,
ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId2.toString(), "").build(),
peerMemberName1, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
- leader.tell(new PeerUp(peerMemberName1, peerId1.toString()), ActorRef.noSender());
+ leader.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
// the entities (1 and 3) previously owned by the local leader member.
peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
- peer2.tell(new PeerUp(LOCAL_MEMBER_NAME, leaderId.toString()), ActorRef.noSender());
- peer2.tell(new PeerUp(peerMemberName1, peerId1.toString()), ActorRef.noSender());
+ peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
+ peer2.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
- peer2.tell(new PeerDown(LOCAL_MEMBER_NAME, leaderId.toString()), ActorRef.noSender());
+ peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
peer2.tell(ElectionTimeout.INSTANCE, peer2);
ShardTestKit.waitUntilLeader(peer2);
// Simulate a replicated commit from the leader to remove the local candidate that would occur after a
// network partition is healed.
- leader.tell(new PeerDown(LOCAL_MEMBER_NAME, localId.toString()), ActorRef.noSender());
+ leader.tell(new PeerDown(localId.getMemberName(), localId.toString()), ActorRef.noSender());
verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity, true, false, false));
return newShardProps(newShardId(LOCAL_MEMBER_NAME), peers, LOCAL_MEMBER_NAME, strategyConfig);
}
- private Props newShardProps(Map<String,String> peers) {
+ private Props newShardProps(Map<String, String> peers) {
return newShardProps(newShardId(LOCAL_MEMBER_NAME), peers, LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build());
}
EntityOwnerSelectionStrategyConfig config) {
return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).
datastoreContext(dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT).
- localMemberName(memberName).ownerSelectionStrategyConfig(config).props().withDispatcher(Dispatchers.DefaultDispatcherId());
+ localMemberName(MemberName.forName(memberName)).ownerSelectionStrategyConfig(config).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId());
}
private static ShardIdentifier newShardId(String memberName) {
- return ShardIdentifier.builder().memberName(memberName).shardName("entity-ownership").
+ return ShardIdentifier.builder().memberName(MemberName.forName(memberName)).shardName("entity-ownership").
type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
}
TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
DatastoreContext datastoreContext) {
super(newBuilder().id(name).peerAddresses(peerAddresses).datastoreContext(datastoreContext).
- schemaContext(SCHEMA_CONTEXT).localMemberName(LOCAL_MEMBER_NAME));
+ schemaContext(SCHEMA_CONTEXT).localMemberName(MemberName.forName(LOCAL_MEMBER_NAME)));
}
@Override
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
public class ChainedTransactionIdentifierTest {
@Test
public void testToString(){
- TransactionChainIdentifier chainId = new TransactionChainIdentifier("member-1", 99);
+ TransactionChainIdentifier chainId = new TransactionChainIdentifier(MemberName.forName("member-1"), 99);
ChainedTransactionIdentifier chainedTransactionIdentifier = new ChainedTransactionIdentifier(chainId, 100);
String txnId = chainedTransactionIdentifier.toString();
package org.opendaylight.controller.cluster.datastore.identifiers;
-import org.junit.Test;
-
import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
public class ShardIdentifierTest {
@Test
public void testBasic(){
- ShardIdentifier id = ShardIdentifier.builder().memberName("member-1")
+ ShardIdentifier id = ShardIdentifier.builder().memberName(MemberName.forName("member-1"))
.shardName("inventory").type("config").build();
assertEquals("member-1-shard-inventory-config", id.toString());
ShardIdentifier id = ShardIdentifier.builder().fromShardIdString(shardIdStr).build();
- assertEquals("member-1", id.getMemberName());
+ assertEquals("member-1", id.getMemberName().getName());
assertEquals("inventory", id.getShardName());
assertEquals("config", id.getType());
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
public class TransactionChainIdentifierTest {
+ private static final MemberName MEMBER_1 = MemberName.forName("member-1");
+
@Test
public void testToString(){
- TransactionChainIdentifier transactionChainIdentifier = new TransactionChainIdentifier("member-1", 99);
+ TransactionChainIdentifier transactionChainIdentifier = new TransactionChainIdentifier(MEMBER_1, 99);
String id = transactionChainIdentifier.toString();
@Test
public void testNewTransactionIdentifier(){
- TransactionChainIdentifier transactionChainIdentifier = new TransactionChainIdentifier("member-1", 99);
+ TransactionChainIdentifier transactionChainIdentifier = new TransactionChainIdentifier(MEMBER_1, 99);
TransactionIdentifier txId1 = transactionChainIdentifier.newTransactionIdentifier();
import java.util.concurrent.TimeoutException;
import org.junit.After;
import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
* @author Thomas Pantelis
*/
public class ShardManagerGetSnapshotReplyActorTest extends AbstractActorTest {
+ private static final MemberName MEMBER_1 = MemberName.forName("member-1");
+
private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
@After
kit.watch(replyActor);
byte[] shard1Snapshot = new byte[]{1,2,3};
- replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName("member-1").type("config").
+ replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName(MEMBER_1).type("config").
shardName("shard1").build().toString(), shard1Snapshot), ActorRef.noSender());
byte[] shard2Snapshot = new byte[]{4,5,6};
- replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName("member-1").type("config").
+ replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName(MEMBER_1).type("config").
shardName("shard2").build().toString(), shard2Snapshot), ActorRef.noSender());
kit.expectNoMsg(FiniteDuration.create(500, TimeUnit.MILLISECONDS));
byte[] shard3Snapshot = new byte[]{7,8,9};
- replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName("member-1").type("config").
+ replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName(MEMBER_1).type("config").
shardName("shard3").build().toString(), shard3Snapshot), ActorRef.noSender());
DatastoreSnapshot datastoreSnapshot = kit.expectMsgClass(DatastoreSnapshot.class);
kit.watch(replyActor);
- replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName("member-1").type("config").
+ replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName(MEMBER_1).type("config").
shardName("shard1").build().toString(), new byte[]{1,2,3}), ActorRef.noSender());
replyActor.tell(new Failure(new RuntimeException()), ActorRef.noSender());
import akka.actor.Status.Success;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
+import akka.cluster.Member;
import akka.dispatch.Dispatchers;
import akka.japi.Creator;
import akka.pattern.Patterns;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.SerializationUtils;
import org.junit.After;
import org.junit.Before;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
public class ShardManagerTest extends AbstractActorTest {
private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
+ private static final MemberName MEMBER_1 = MemberName.forName("member-1");
+ private static final MemberName MEMBER_2 = MemberName.forName("member-2");
+ private static final MemberName MEMBER_3 = MemberName.forName("member-3");
private static int ID_COUNTER = 1;
InMemorySnapshotStore.clear();
if(mockShardActor == null) {
- mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config");
+ mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, MEMBER_1, "config");
mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class),
mockShardName.toString());
}
}
private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
- String name = new ShardIdentifier(shardName, memberName,"config").toString();
+ String name = new ShardIdentifier(shardName, MemberName.forName(memberName), "config").toString();
if(system == getSystem()) {
return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name);
}
final MockConfiguration mockConfig = new MockConfiguration() {
@Override
- public Collection<String> getMemberShardNames(String memberName) {
+ public Collection<String> getMemberShardNames(MemberName memberName) {
return Arrays.asList("default", "topology");
}
@Override
- public Collection<String> getMembersFromShardName(String shardName) {
- return Arrays.asList("member-1");
+ public Collection<MemberName> getMembersFromShardName(String shardName) {
+ return members("member-1");
}
};
RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
shardManager1.underlyingActor().waitForMemberUp();
-
shardManager1.tell(new FindPrimary("astronauts", false), getRef());
RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
shardManager1.underlyingActor().waitForUnreachableMember();
PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
- assertEquals("getMemberName", "member-2", peerDown.getMemberName());
+ assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName());
MessageCollectorActor.clearMessages(mockShardActor1);
shardManager1.tell(MockClusterWrapper.
shardManager1.underlyingActor().waitForReachableMember();
PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
- assertEquals("getMemberName", "member-2", peerUp.getMemberName());
+ assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName());
MessageCollectorActor.clearMessages(mockShardActor1);
shardManager1.tell(new FindPrimary("default", true), getRef());
LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
@Override
- public List<String> getMemberShardNames(String memberName) {
+ public List<String> getMemberShardNames(MemberName memberName) {
return Arrays.asList("default", "astronauts");
}
}));
}};
}
+ private static List<MemberName> members(String... names) {
+ return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
+ }
+
@Test
public void testOnCreateShard() {
LOG.info("testOnCreateShard starting");
Shard.Builder shardBuilder = Shard.builder();
ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
- "foo", null, Arrays.asList("member-1", "member-5", "member-6"));
+ "foo", null, members("member-1", "member-5", "member-6"));
shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
expectMsgClass(duration("5 seconds"), Success.class);
assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig().
getPeerAddressResolver() instanceof ShardPeerAddressResolver);
- assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
- new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
- shardBuilder.getPeerAddresses().keySet());
- assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
+ assertEquals("peerMembers", Sets.newHashSet(
+ new ShardIdentifier("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
+ new ShardIdentifier("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
+ shardBuilder.getPeerAddresses().keySet());
+ assertEquals("ShardIdentifier", new ShardIdentifier("foo", MEMBER_1, shardMrgIDSuffix),
shardBuilder.getId());
assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
Shard.Builder shardBuilder = Shard.builder();
ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
- "foo", null, Arrays.asList("member-5", "member-6"));
+ "foo", null, members("member-5", "member-6"));
shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
expectMsgClass(duration("5 seconds"), Success.class);
Shard.Builder shardBuilder = Shard.builder();
ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
- "foo", null, Arrays.asList("member-1"));
+ "foo", null, members("member-1"));
shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
expectMsgClass(duration("5 seconds"), Success.class);
assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
- Function<ShardSnapshot, String> shardNameTransformer = new Function<ShardSnapshot, String>() {
- @Override
- public String apply(ShardSnapshot s) {
- return s.getName();
- }
- };
+ Function<ShardSnapshot, String> shardNameTransformer = s -> s.getName();
assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
final ActorSystem system2 = newActorSystem("Member2");
Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
- String name = new ShardIdentifier("astronauts", "member-2", "config").toString();
+ String name = new ShardIdentifier("astronauts", MEMBER_2, "config").toString();
final TestActorRef<MockRespondActor> mockShardLeaderActor =
TestActorRef.create(system2, Props.create(MockRespondActor.class).
withDispatcher(Dispatchers.DefaultDispatcherId()), name);
ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
- shardManager.tell(new RemoveShardReplica("model-inventory", "member-1"), getRef());
+ shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), getRef());
Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class);
assertEquals("Failure obtained", true,
(resp.cause() instanceof PrimaryNotFoundException));
RaftState.Leader.name())), respondActor);
respondActor.underlyingActor().updateResponse(new RemoveServerReply(ServerChangeStatus.OK, null));
- shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, "member-1"), getRef());
+ shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), getRef());
final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class);
- assertEquals(new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString(),
+ assertEquals(new ShardIdentifier("default", MEMBER_1, shardMrgIDSuffix).toString(),
removeServer.getServerId());
expectMsgClass(duration("5 seconds"), Success.class);
}};
final ActorSystem system2 = newActorSystem("Member2");
Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
- String name = new ShardIdentifier("default", "member-2", shardMrgIDSuffix).toString();
+ String name = new ShardIdentifier("default", MEMBER_2, shardMrgIDSuffix).toString();
final TestActorRef<MockRespondActor> mockShardLeaderActor =
TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
//construct a mock response message
RemoveServerReply response = new RemoveServerReply(ServerChangeStatus.OK, memberId2);
mockShardLeaderActor.underlyingActor().updateResponse(response);
- newReplicaShardManager.tell(new RemoveShardReplica("default", "member-1"), getRef());
+ newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), getRef());
RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
RemoveServer.class);
- String removeServerId = new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString();
+ String removeServerId = new ShardIdentifier("default", MEMBER_1, shardMrgIDSuffix).toString();
assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
expectMsgClass(duration("5 seconds"), Status.Success.class);
}};
@Test
public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() throws Exception {
- testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", "member-2"),
- RemoveServer.class, new RemoveShardReplica("astronauts", "member-3"));
+ testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
+ RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
}
@Test
public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() throws Exception {
testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
- AddServer.class, new RemoveShardReplica("astronauts", "member-2"));
+ AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
}
// Removed the default shard replica from member-1
ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
- ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type(shardMrgIDSuffix).build();
+ ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix).build();
shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
put("astronauts", Arrays.asList("member-2")).
put("people", Arrays.asList("member-1", "member-2")).build());
- String shardId = ShardIdentifier.builder().shardName("default").memberName("member-1").
+ String shardId = ShardIdentifier.builder().shardName("default").memberName(MEMBER_1).
type(shardMrgIDSuffix).build().toString();
TestActorRef<MessageCollectorActor> shard = actorFactory.createTestActor(
MessageCollectorActor.props(), shardId);
put("shard1", Arrays.asList("member-1")).
put("shard2", Arrays.asList("member-1")).build());
- String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName("member-1").
+ String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName(MEMBER_1).
type(shardMrgIDSuffix).build().toString();
TestActorRef<MessageCollectorActor> shard1 = actorFactory.createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1);
- String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName("member-1").
+ String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName(MEMBER_1).
type(shardMrgIDSuffix).build().toString();
TestActorRef<MessageCollectorActor> shard2 = actorFactory.createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2);
}
}
+ private void countDownIfOther(final Member member, CountDownLatch latch) {
+ if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
+ latch.countDown();
+ }
+ }
+
@Override
public void handleCommand(Object message) throws Exception {
try{
if(message instanceof FindPrimary) {
findPrimaryMessageReceived.countDown();
} else if(message instanceof ClusterEvent.MemberUp) {
- String role = ((ClusterEvent.MemberUp)message).member().roles().iterator().next();
- if(!getCluster().getCurrentMemberName().equals(role)) {
- memberUpReceived.countDown();
- }
+ countDownIfOther(((ClusterEvent.MemberUp)message).member(), memberUpReceived);
} else if(message instanceof ClusterEvent.MemberRemoved) {
- String role = ((ClusterEvent.MemberRemoved)message).member().roles().iterator().next();
- if(!getCluster().getCurrentMemberName().equals(role)) {
- memberRemovedReceived.countDown();
- }
+ countDownIfOther(((ClusterEvent.MemberRemoved)message).member(), memberRemovedReceived);
} else if(message instanceof ClusterEvent.UnreachableMember) {
- String role = ((ClusterEvent.UnreachableMember)message).member().roles().iterator().next();
- if(!getCluster().getCurrentMemberName().equals(role)) {
- memberUnreachableReceived.countDown();
- }
+ countDownIfOther(((ClusterEvent.UnreachableMember)message).member(), memberUnreachableReceived);
} else if(message instanceof ClusterEvent.ReachableMember) {
- String role = ((ClusterEvent.ReachableMember)message).member().roles().iterator().next();
- if(!getCluster().getCurrentMemberName().equals(role)) {
- memberReachableReceived.countDown();
- }
+ countDownIfOther(((ClusterEvent.ReachableMember)message).member(), memberReachableReceived);
}
}
}
import com.google.common.collect.Sets;
import java.util.Collection;
import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
/**
* @author Thomas Pantelis
*/
public class ShardPeerAddressResolverTest {
+ private static final MemberName MEMBER_1 = MemberName.forName("member-1");
+ private static final MemberName MEMBER_2 = MemberName.forName("member-2");
+ private static final MemberName MEMBER_3 = MemberName.forName("member-3");
@Test
public void testGetShardActorAddress() {
- ShardPeerAddressResolver resolver = new ShardPeerAddressResolver("config", "member-1");
+ ShardPeerAddressResolver resolver = new ShardPeerAddressResolver("config", MEMBER_1);
- assertEquals("getShardActorAddress", null, resolver.getShardActorAddress("default", "member-2"));
+ assertEquals("getShardActorAddress", null, resolver.getShardActorAddress("default", MEMBER_2));
Address address2 = new Address("tcp", "system2");
- resolver.addPeerAddress("member-2", address2);
- assertEquals("getPeerAddress", address2, resolver.getPeerAddress("member-2"));
+ resolver.addPeerAddress(MEMBER_2, address2);
+ assertEquals("getPeerAddress", address2, resolver.getPeerAddress(MEMBER_2));
Address address3 = new Address("tcp", "system3");
- resolver.addPeerAddress("member-3", address3);
- assertEquals("getPeerAddress", address3, resolver.getPeerAddress("member-3"));
+ resolver.addPeerAddress(MEMBER_3, address3);
+ assertEquals("getPeerAddress", address3, resolver.getPeerAddress(MEMBER_3));
assertEquals("getShardActorAddress", address2.toString() +
"/user/shardmanager-config/member-2-shard-default-config",
- resolver.getShardActorAddress("default", "member-2"));
+ resolver.getShardActorAddress("default", MEMBER_2));
assertEquals("getShardActorAddress", address3.toString() +
"/user/shardmanager-config/member-3-shard-default-config",
- resolver.getShardActorAddress("default", "member-3"));
+ resolver.getShardActorAddress("default", MEMBER_3));
assertEquals("getShardActorAddress", address2.toString() +
"/user/shardmanager-config/member-2-shard-topology-config",
- resolver.getShardActorAddress("topology", "member-2"));
+ resolver.getShardActorAddress("topology", MEMBER_2));
- resolver.removePeerAddress("member-2");
- assertEquals("getShardActorAddress", null, resolver.getShardActorAddress("default", "member-2"));
- assertEquals("getShardActorAddress", null, resolver.getShardActorAddress("topology", "member-2"));
+ resolver.removePeerAddress(MEMBER_2);
+ assertEquals("getShardActorAddress", null, resolver.getShardActorAddress("default", MEMBER_2));
+ assertEquals("getShardActorAddress", null, resolver.getShardActorAddress("topology", MEMBER_2));
assertEquals("getShardActorAddress", address3.toString() +
"/user/shardmanager-config/member-3-shard-default-config",
- resolver.getShardActorAddress("default", "member-3"));
+ resolver.getShardActorAddress("default", MEMBER_3));
}
@Test
public void testResolve() {
String type = "config";
- ShardPeerAddressResolver resolver = new ShardPeerAddressResolver(type, "member-1");
+ ShardPeerAddressResolver resolver = new ShardPeerAddressResolver(type, MEMBER_1);
- String memberName = "member-2";
- String peerId = ShardIdentifier.builder().memberName(memberName ).shardName("default").
+ MemberName memberName = MEMBER_2;
+ String peerId = ShardIdentifier.builder().memberName(memberName).shardName("default").
type(type).build().toString();
assertEquals("resolve", null, resolver.resolve(peerId));
String shardAddress = resolver.getShardActorAddress("default", memberName);
assertEquals("getShardActorAddress", address.toString() +
- "/user/shardmanager-" + type + "/" + memberName + "-shard-default-" + type, shardAddress);
+ "/user/shardmanager-" + type + "/" + memberName.getName() + "-shard-default-" + type, shardAddress);
assertEquals("resolve", shardAddress, resolver.resolve(peerId));
}
@Test
public void testGetShardManagerPeerActorAddresses() {
- ShardPeerAddressResolver resolver = new ShardPeerAddressResolver("config", "member-1");
+ ShardPeerAddressResolver resolver = new ShardPeerAddressResolver("config", MEMBER_1);
- resolver.addPeerAddress("member-1", new Address("tcp", "system1"));
+ resolver.addPeerAddress(MEMBER_1, new Address("tcp", "system1"));
Address address2 = new Address("tcp", "system2");
- resolver.addPeerAddress("member-2", address2);
+ resolver.addPeerAddress(MEMBER_2, address2);
Address address3 = new Address("tcp", "system3");
- resolver.addPeerAddress("member-3", address3);
+ resolver.addPeerAddress(MEMBER_3, address3);
Collection<String> peerAddresses = resolver.getShardManagerPeerActorAddresses();
assertEquals("getShardManagerPeerActorAddresses", Sets.newHashSet(
import akka.cluster.UniqueAddress;
import java.util.HashSet;
import java.util.Set;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import scala.collection.JavaConversions;
public class MockClusterWrapper implements ClusterWrapper{
private Address selfAddress = new Address("akka.tcp", "test", "127.0.0.1", 2550);
- private String currentMemberName = "member-1";
+ private final MemberName currentMemberName;
public MockClusterWrapper() {
+ this("member-1");
}
public MockClusterWrapper(String currentMemberName) {
- this.currentMemberName = currentMemberName;
+ this.currentMemberName = MemberName.forName(currentMemberName);
}
@Override
}
@Override
- public String getCurrentMemberName() {
+ public MemberName getCurrentMemberName() {
return currentMemberName;
}
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import java.util.stream.Collectors;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
import org.opendaylight.controller.cluster.datastore.config.ModuleConfig;
-import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfigProvider;
public class MockConfiguration extends ConfigurationImpl {
public MockConfiguration() {
}
public MockConfiguration(final Map<String, List<String>> shardMembers) {
- super(new ModuleShardConfigProvider() {
- @Override
- public Map<String, ModuleConfig.Builder> retrieveModuleConfigs(Configuration configuration) {
- Map<String, ModuleConfig.Builder> retMap = new HashMap<>();
- for(Map.Entry<String, List<String>> e : shardMembers.entrySet()) {
- String shardName = e.getKey();
- retMap.put(shardName, ModuleConfig.builder(shardName).shardConfig(shardName, e.getValue()));
- }
-
- return retMap;
+ super(configuration -> {
+ Map<String, ModuleConfig.Builder> retMap = new HashMap<>();
+ for(Map.Entry<String, List<String>> e : shardMembers.entrySet()) {
+ String shardName = e.getKey();
+ retMap.put(shardName,
+ ModuleConfig.builder(shardName).shardConfig(
+ shardName, e.getValue().stream().map(MemberName::forName).collect(Collectors.toList())));
}
+
+ return retMap;
});
}
}