<groupId>org.opendaylight.mdsal</groupId>
<artifactId>mdsal-eos-dom-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-dom-spi</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-dom-inmemory-datastore</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-dom-broker</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-core-spi</artifactId>
import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
+ private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.EMPTY;
private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
private boolean writeOnlyTransactionOptimizationsEnabled = true;
private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE);
}
- private DatastoreContext(DatastoreContext other) {
+ private DatastoreContext(final DatastoreContext other) {
this.dataStoreProperties = other.dataStoreProperties;
this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
this.operationTimeoutInMillis = other.operationTimeoutInMillis;
this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
this.dataStoreName = other.dataStoreName;
this.logicalStoreType = other.logicalStoreType;
+ this.storeRoot = other.storeRoot;
this.shardBatchedModificationCount = other.shardBatchedModificationCount;
this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
return logicalStoreType;
}
+ public YangInstanceIdentifier getStoreRoot() {
+ return storeRoot;
+ }
+
public long getTransactionCreationInitialRateLimit() {
return transactionCreationInitialRateLimit;
}
return this;
}
+ public Builder storeRoot(final YangInstanceIdentifier storeRoot) {
+ datastoreContext.storeRoot = storeRoot;
+ return this;
+ }
+
public Builder dataStoreName(String dataStoreName) {
datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName);
datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
return this;
}
- public Builder shardBatchedModificationCount(int shardBatchedModificationCount) {
+ public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) {
datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
return this;
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.Collection;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
+import org.opendaylight.yangtools.concepts.Registration;
+
+/**
+ * A factory that handles addition of new clustered shard's based on a prefix. This factory is a QoL class that handles
+ * all the boilerplate that comes with registration of a new clustered shard into the system and creating the backend
+ * shard/replicas that come along with it.
+ */
+public interface DistributedShardFactory {
+
+ /**
+ * Register a new shard that is rooted at the desired prefix with replicas on the provided members.
+ * Note to register a shard without replicas you still need to provide at least one Member for the shard.
+ *
+ * @param prefix Shard root
+ * @param replicaMembers Members that this shard is replicated on, has to have at least one Member even if the shard
+ * should not be replicated.
+ * @return ShardRegistration that should be closed if the shard should be destroyed
+ * @throws DOMDataTreeShardingConflictException If the prefix already has a shard registered
+ * @throws DOMDataTreeProducerException in case there is a problem closing the initial producer that is used to
+ * register the shard into the ShardingService
+ */
+ DistributedShardRegistration createDistributedShard(DOMDataTreeIdentifier prefix,
+ Collection<MemberName> replicaMembers)
+ throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException;
+
+ interface DistributedShardRegistration extends Registration {
+ @Override
+ void close();
+ }
+}
\ No newline at end of file
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
public interface Configuration {
*/
@Nullable String getShardNameForModule(@Nonnull String moduleName);
+ /**
+ * Return the shard name corresponding to the prefix, or null if none is configured.
+ */
+ @Nullable String getShardNameForPrefix(@Nonnull YangInstanceIdentifier prefix);
+
/**
* Returns the member replicas for the given shard name.
*/
*/
void addModuleShardConfiguration(@Nonnull ModuleShardConfiguration config);
+ /**
+ * Adds a new configuration for a shard based on prefix.
+ */
+ void addPrefixShardConfiguration(@Nonnull PrefixShardConfiguration config);
+
/**
* Returns a unique set of all member names configured for all shards.
*/
* Removes the given member as a replica for the given shardName.
*/
void removeMemberReplicaForShard(String shardName, MemberName memberName);
+
+ /**
+ * Returns the ShardStrategy for the given prefix or null if the prefix is not found.
+ */
+ @Nullable ShardStrategy getStrategyForPrefix(@Nonnull YangInstanceIdentifier prefix);
}
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+// TODO clean this up once we get rid of module based configuration, prefix one should be alot simpler
public class ConfigurationImpl implements Configuration {
private volatile Map<String, ModuleConfig> moduleConfigMap;
+ // TODO should this be initialized with something? on restart we should restore the shards from configuration?
+ private volatile Map<YangInstanceIdentifier, PrefixShardConfiguration> prefixConfigMap = Collections.emptyMap();
+
// Look up maps to speed things up
private volatile Map<String, String> namespaceToModuleName;
return !shardConfigs.isEmpty() ? shardConfigs.iterator().next().getName() : null;
}
+ @Nullable
+ @Override
+ public String getShardNameForPrefix(@Nonnull final YangInstanceIdentifier prefix) {
+ Preconditions.checkNotNull(prefix, "prefix should not be null");
+
+ Entry<YangInstanceIdentifier, PrefixShardConfiguration> bestMatchEntry =
+ new SimpleEntry<>(YangInstanceIdentifier.EMPTY, null);
+
+ for (Entry<YangInstanceIdentifier, PrefixShardConfiguration> entry : prefixConfigMap.entrySet()) {
+ if (entry.getKey().contains(prefix) && entry.getKey().getPathArguments().size()
+ > bestMatchEntry.getKey().getPathArguments().size()) {
+ bestMatchEntry = entry;
+ }
+ }
+
+ //TODO we really should have mapping based on prefix instead of Strings
+ return ClusterUtils.getCleanShardName(bestMatchEntry.getValue().getPrefix().getRootIdentifier());
+ }
+
@Override
public Collection<MemberName> getMembersFromShardName(final String shardName) {
Preconditions.checkNotNull(shardName, "shardName should not be null");
}
}
+ for (final PrefixShardConfiguration prefixConfig : prefixConfigMap.values()) {
+ if (shardName.equals(ClusterUtils.getCleanShardName(prefixConfig.getPrefix().getRootIdentifier()))) {
+ return prefixConfig.getShardMemberNames();
+ }
+ }
+
return Collections.emptyList();
}
allShardNames = ImmutableSet.<String>builder().addAll(allShardNames).add(config.getShardName()).build();
}
+ @Override
+ public void addPrefixShardConfiguration(@Nonnull final PrefixShardConfiguration config) {
+ Preconditions.checkNotNull(config, "PrefixShardConfiguration cannot be null");
+ updatePrefixConfigMap(config);
+ allShardNames = ImmutableSet.<String>builder().addAll(allShardNames)
+ .add(ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier())).build();
+ }
+
+ private void updatePrefixConfigMap(final PrefixShardConfiguration config) {
+ final Map<YangInstanceIdentifier, PrefixShardConfiguration> newPrefixConfigMap = new HashMap<>(prefixConfigMap);
+ newPrefixConfigMap.put(config.getPrefix().getRootIdentifier(), config);
+ prefixConfigMap = ImmutableMap.copyOf(newPrefixConfigMap);
+ }
+
private ShardStrategy createShardStrategy(String moduleName, String shardStrategyName) {
return ShardStrategyFactory.newShardStrategyInstance(moduleName, shardStrategyName, this);
}
}
}
- private void updateModuleConfigMap(ModuleConfig moduleConfig) {
- Map<String, ModuleConfig> newModuleConfigMap = new HashMap<>(moduleConfigMap);
+ @Override
+ public ShardStrategy getStrategyForPrefix(@Nonnull final YangInstanceIdentifier prefix) {
+ Preconditions.checkNotNull(prefix, "Prefix cannot be null");
+ // FIXME using prefix tables like in mdsal will be better
+ Entry<YangInstanceIdentifier, PrefixShardConfiguration> bestMatchEntry =
+ new SimpleEntry<>(YangInstanceIdentifier.EMPTY, null);
+
+ for (Entry<YangInstanceIdentifier, PrefixShardConfiguration> entry : prefixConfigMap.entrySet()) {
+ if (entry.getKey().contains(prefix) && entry.getKey().getPathArguments().size()
+ > bestMatchEntry.getKey().getPathArguments().size()) {
+ bestMatchEntry = entry;
+ }
+ }
+
+ if (bestMatchEntry.getValue() == null) {
+ return null;
+ }
+ return new PrefixShardStrategy(
+ ClusterUtils.getCleanShardName(bestMatchEntry.getValue().getPrefix().getRootIdentifier()), this);
+ }
+
+ private void updateModuleConfigMap(final ModuleConfig moduleConfig) {
+ final Map<String, ModuleConfig> newModuleConfigMap = new HashMap<>(moduleConfigMap);
newModuleConfigMap.put(moduleConfig.getName(), moduleConfig);
moduleConfigMap = ImmutableMap.copyOf(newModuleConfigMap);
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.config;
+
+import java.io.Serializable;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+
+/**
+ * Configuration for prefix based shards.
+ */
+public class PrefixShardConfiguration implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final DOMDataTreeIdentifier prefix;
+ private final String shardStrategyName;
+ private final Collection<MemberName> shardMemberNames;
+
+ public PrefixShardConfiguration(final DOMDataTreeIdentifier prefix,
+ final String shardStrategyName,
+ final Collection<MemberName> shardMemberNames) {
+ this.prefix = prefix;
+ this.shardStrategyName = shardStrategyName;
+ this.shardMemberNames = shardMemberNames;
+ }
+
+ public DOMDataTreeIdentifier getPrefix() {
+ return prefix;
+ }
+
+ public String getShardStrategyName() {
+ return shardStrategyName;
+ }
+
+ public Collection<MemberName> getShardMemberNames() {
+ return shardMemberNames;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * A message sent to the ShardManager to dynamically add a new local shard
+ * that is a replica for an existing shard that is already available in the
+ * cluster.
+ */
+
+public class AddPrefixShardReplica {
+
+ private final YangInstanceIdentifier prefix;
+
+ public AddPrefixShardReplica(final YangInstanceIdentifier prefix) {
+ this.prefix = Preconditions.checkNotNull(prefix);
+ }
+
+ public YangInstanceIdentifier getPrefix() {
+ return prefix;
+ }
+
+ @Override
+ public String toString() {
+ return "AddPrefixShardReplica[ShardName=" + prefix + "]";
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
+
+/**
+ * Message sent to the ShardManager to create a shard located at a certain logical position in the dataTree.
+ */
+public class CreatePrefixedShard {
+
+ private final PrefixShardConfiguration config;
+ private final DatastoreContext context;
+ private final Shard.Builder builder;
+
+ public CreatePrefixedShard(final PrefixShardConfiguration config,
+ final DatastoreContext context,
+ final Shard.Builder builder) {
+ this.config = config;
+ this.context = context;
+ this.builder = builder;
+ }
+
+ public PrefixShardConfiguration getConfig() {
+ return config;
+ }
+
+ public DatastoreContext getContext() {
+ return context;
+ }
+
+ public Shard.Builder getShardBuilder() {
+ return builder;
+ }
+}
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
+import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
} else if (message instanceof SwitchShardBehavior) {
onSwitchShardBehavior((SwitchShardBehavior) message);
} else if (message instanceof CreateShard) {
- onCreateShard((CreateShard) message);
+ onCreateShard((CreateShard)message);
} else if (message instanceof AddShardReplica) {
onAddShardReplica((AddShardReplica) message);
+ } else if (message instanceof CreatePrefixedShard) {
+ onCreatePrefixedShard((CreatePrefixedShard) message);
+ } else if (message instanceof AddPrefixShardReplica) {
+ onAddPrefixShardReplica((AddPrefixShardReplica) message);
} else if (message instanceof ForwardedAddServerReply) {
- ForwardedAddServerReply msg = (ForwardedAddServerReply) message;
- onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, msg.removeShardOnFailure);
+ ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
+ onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
+ msg.removeShardOnFailure);
} else if (message instanceof ForwardedAddServerFailure) {
- ForwardedAddServerFailure msg = (ForwardedAddServerFailure) message;
+ ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
} else if (message instanceof RemoveShardReplica) {
onRemoveShardReplica((RemoveShardReplica) message);
LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
} else {
int nfailed = 0;
- for (Boolean result: results) {
+ for (Boolean result : results) {
if (!result) {
nfailed++;
}
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void onCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
+ LOG.debug("{}: onCreatePrefixedShard: {}", persistenceId(), createPrefixedShard);
+
+ Object reply;
+ try {
+ final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
+ createPrefixedShard.getConfig().getPrefix());
+ if (localShards.containsKey(shardId.getShardName())) {
+ LOG.debug("{}: Shard {} already exists", persistenceId(), shardId);
+ reply = new Status.Success(String.format("Shard with name %s already exists", shardId));
+ } else {
+ doCreatePrefixedShard(createPrefixedShard);
+ reply = new Status.Success(null);
+ }
+ } catch (final Exception e) {
+ LOG.error("{}: onCreateShard failed", persistenceId(), e);
+ reply = new Status.Failure(e);
+ }
+
+ if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+ getSender().tell(reply, getSelf());
+ }
+ }
+
@SuppressWarnings("checkstyle:IllegalCatch")
private void onCreateShard(CreateShard createShard) {
LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
}
}
- private void doCreateShard(CreateShard createShard) {
- ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
- String shardName = moduleShardConfig.getShardName();
+ private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
+ final PrefixShardConfiguration config = createPrefixedShard.getConfig();
+
+ final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
+ createPrefixedShard.getConfig().getPrefix());
+ final String shardName = shardId.getShardName();
+
+ configuration.addPrefixShardConfiguration(config);
+
+ DatastoreContext shardDatastoreContext = createPrefixedShard.getContext();
+
+ if (shardDatastoreContext == null) {
+ final Builder builder = newShardDatastoreContextBuilder(shardName);
+ builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
+ .storeRoot(config.getPrefix().getRootIdentifier());
+ shardDatastoreContext = builder.build();
+ } else {
+ shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
+ peerAddressResolver).build();
+ }
+
+ final boolean shardWasInRecoveredSnapshot = currentSnapshot != null
+ && currentSnapshot.getShardList().contains(shardName);
+
+ final Map<String, String> peerAddresses = Collections.emptyMap();
+ final boolean isActiveMember = true;
+ LOG.debug("{} doCreatePrefixedShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+ persistenceId(), shardId, peerAddresses, isActiveMember);
+
+ final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
+ shardDatastoreContext, createPrefixedShard.getShardBuilder(), peerAddressResolver);
+ info.setActiveMember(isActiveMember);
+ localShards.put(info.getShardName(), info);
+
+ if (schemaContext != null) {
+ info.setActor(newShardActor(schemaContext, info));
+ }
+ }
+
+ private void doCreateShard(final CreateShard createShard) {
+ final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+ final String shardName = moduleShardConfig.getShardName();
configuration.addModuleShardConfiguration(moduleShardConfig);
return false;
}
+
+ // With this message the shard does NOT have to be preconfigured
+ // do a dynamic lookup if the shard exists somewhere and replicate
+ private void onAddPrefixShardReplica(final AddPrefixShardReplica shardReplicaMsg) {
+ final String shardName = ClusterUtils.getCleanShardName(shardReplicaMsg.getPrefix());
+
+ LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), shardReplicaMsg);
+
+ if (schemaContext == null) {
+ final String msg = String.format(
+ "No SchemaContext is available in order to create a local shard instance for %s", shardName);
+ LOG.debug("{}: {}", persistenceId(), msg);
+ getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
+ return;
+ }
+
+ findPrimary(shardName,
+ new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
+ getSelf().tell(new RunnableMessage() {
+ @Override
+ public void run() {
+ addShard(getShardName(), response, getSender());
+ }
+ }, getTargetActor());
+ }
+
+ @Override
+ public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
+ sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
+ }
+ }
+ );
+ }
+
private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
final String shardName = shardReplicaMsg.getShardName();
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.shardstrategy;
+
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Shard Strategy that resolves a path to a prefix shard name.
+ */
+public class PrefixShardStrategy implements ShardStrategy {
+
+ public static final String NAME = "prefix";
+
+ private final String shardName;
+ private final Configuration configuration;
+
+ public PrefixShardStrategy(final String shardName, final Configuration configuration) {
+ this.shardName = shardName;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public String findShard(final YangInstanceIdentifier path) {
+ final String shardNameForPrefix = configuration.getShardNameForPrefix(path);
+ return shardNameForPrefix != null ? shardName : DefaultShardStrategy.DEFAULT_SHARD;
+ }
+}
public ShardStrategy getStrategy(final YangInstanceIdentifier path) {
Preconditions.checkNotNull(path, "path should not be null");
- String moduleName = getModuleName(path);
- ShardStrategy shardStrategy = configuration.getStrategyForModule(moduleName);
+ // try with the legacy module based shard mapping
+ final String moduleName = getModuleName(path);
+ final ShardStrategy shardStrategy = configuration.getStrategyForModule(moduleName);
if (shardStrategy == null) {
- return DefaultShardStrategy.getInstance();
+ // retry with prefix based sharding
+ final ShardStrategy strategyForPrefix = configuration.getStrategyForPrefix(path);
+ if (strategyForPrefix == null) {
+ return DefaultShardStrategy.getInstance();
+ }
+ return strategyForPrefix;
}
return shardStrategy;
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Utils for encoding prefix shard name.
+ */
+public class ClusterUtils {
+
+ public static ShardIdentifier getShardIdentifier(final MemberName memberName, final DOMDataTreeIdentifier prefix) {
+ return ShardIdentifier
+ .create(getCleanShardName(prefix.getRootIdentifier()), memberName, prefix.getDatastoreType().name());
+ }
+
+ /**
+ * Returns an encoded shard name based on the provided path that should doesn't contain characters that cannot be
+ * present in akka actor paths.
+ *
+ * @param path Path on which to base the shard name
+ * @return encoded name that doesn't contain characters that cannot be in actor path.
+ */
+ public static String getCleanShardName(final YangInstanceIdentifier path) {
+ final StringBuilder builder = new StringBuilder();
+ // TODO need a better mapping that includes namespace, but we'll need to cleanup the string beforehand
+ path.getPathArguments().forEach(p -> {
+ builder.append(p.getNodeType().getLocalName());
+ builder.append("!");
+ });
+ return builder.toString();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.mockito.Mock;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerTest.TestShardManager;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+
+public class AbstractShardManagerTest extends AbstractActorTest {
+
+ protected static final MemberName MEMBER_1 = MemberName.forName("member-1");
+
+ protected static int ID_COUNTER = 1;
+ protected static TestActorRef<MessageCollectorActor> mockShardActor;
+ protected static ShardIdentifier mockShardName;
+
+ protected final String shardMrgIDSuffix = "config" + ID_COUNTER++;
+ protected final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+ protected final Collection<ActorSystem> actorSystems = new ArrayList<>();
+ protected final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
+ .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
+ .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
+
+ @Mock
+ protected static CountDownLatch ready;
+
+ protected TestShardManager.Builder newTestShardMgrBuilder() {
+ return TestShardManager.builder(datastoreContextBuilder);
+ }
+
+ protected TestShardManager.Builder newTestShardMgrBuilder(final Configuration config) {
+ return TestShardManager.builder(datastoreContextBuilder).configuration(config);
+ }
+
+ protected Props newShardMgrProps(final Configuration config) {
+ return newTestShardMgrBuilder(config).waitTillReadyCountDownLatch(ready).props();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ initMocks(this);
+
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
+
+ if (mockShardActor == null) {
+ mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config");
+ mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class),
+ mockShardName.toString());
+ }
+
+ mockShardActor.underlyingActor().clear();
+ }
+
+ @After
+ public void tearDown() {
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
+
+ for (final ActorSystem system : actorSystems) {
+ JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
+ }
+
+ mockShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ mockShardActor = null;
+
+ actorFactory.close();
+ }
+}
public abstract class AbstractTest {
protected static final MemberName MEMBER_NAME = MemberName.forName("member-1");
+ protected static final MemberName MEMBER_2_NAME = MemberName.forName("member-2");
+
private static final FrontendType FRONTEND_TYPE = FrontendType.forName(ShardTransactionTest.class.getSimpleName());
protected static final FrontendIdentifier FRONTEND_ID = FrontendIdentifier.create(MEMBER_NAME, FRONTEND_TYPE);
SchemaContextHelper.full(), shardNames);
}
- public DistributedDataStore setupDistributedDataStore(String typeName, String moduleShardsConfig,
- boolean waitUntilLeader, String... shardNames) {
+ public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
+ final boolean waitUntilLeader, final String... shardNames) {
return setupDistributedDataStore(typeName, moduleShardsConfig, waitUntilLeader,
SchemaContextHelper.full(), shardNames);
}
- public DistributedDataStore setupDistributedDataStore(String typeName, String moduleShardsConfig,
- boolean waitUntilLeader, SchemaContext schemaContext, String... shardNames) {
+ public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
+ final boolean waitUntilLeader, final SchemaContext schemaContext, final String... shardNames) {
final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
final Configuration config = new ConfigurationImpl(moduleShardsConfig, "modules.conf");
void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
Class<? extends Exception> expType) throws Exception {
- assertExceptionOnCall(() -> {
- txChain.newWriteOnlyTransaction();
- return null;
+ assertExceptionOnCall(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ txChain.newWriteOnlyTransaction();
+ return null;
+ }
}, expType);
assertExceptionOnCall(() -> {
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.AddressFromURIString;
+import akka.actor.Status.Success;
+import akka.cluster.Cluster;
+import akka.dispatch.Dispatchers;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.Shard.Builder;
+import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
+import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerTest.TestShardManager;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests prefix shard creation in ShardManager.
+ */
+public class PrefixShardCreationTest extends AbstractShardManagerTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PrefixShardCreationTest.class);
+
+ private static final DOMDataTreeIdentifier TEST_ID =
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+
+ private static final MemberName MEMBER_2 = MemberName.forName("member-2");
+
+ @Test
+ public void testPrefixShardCreation() throws Exception {
+
+ LOG.info("testPrefixShardCreation starting");
+ new JavaTestKit(getSystem()) {
+ {
+ datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
+
+ final ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
+ new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+
+ final SchemaContext schemaContext = TestModel.createTestContext();
+ shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
+
+ shardManager.tell(new FindLocalShard(
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+
+ final Builder builder = Shard.builder();
+
+ final CreatePrefixedShard createPrefixedShard = new CreatePrefixedShard(
+ new PrefixShardConfiguration(TEST_ID,
+ PrefixShardStrategy.NAME,
+ Collections.singletonList(MEMBER_1)),
+ datastoreContextBuilder.build(), builder);
+
+ shardManager.tell(createPrefixedShard, getRef());
+ expectMsgClass(duration("5 seconds"), Success.class);
+
+ shardManager.tell(new FindLocalShard(
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+ }
+ };
+ }
+
+ @Test
+ public void testPrefixShardReplicas() throws Exception {
+ LOG.info("testPrefixShardReplicas starting");
+ final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+
+ // Create ACtorSystem for member-1
+ final ActorSystem system1 = newActorSystem("Member1");
+ Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+ final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
+ newTestShardMgrBuilder(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
+ .waitTillReadyCountDownLatch(ready)
+ .cluster(new ClusterWrapperImpl(system1))
+ .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ shardManagerID);
+
+ // Create an ActorSystem ShardManager actor for member-2.
+
+ final ActorSystem system2 = newActorSystem("Member2");
+
+ Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+ final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
+ newTestShardMgrBuilder()
+ .configuration(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
+ .waitTillReadyCountDownLatch(ready)
+ .cluster(new ClusterWrapperImpl(system2)).props().withDispatcher(
+ Dispatchers.DefaultDispatcherId()),
+ shardManagerID);
+
+ final JavaTestKit kit2 = new JavaTestKit(system2);
+
+ new JavaTestKit(system1) {
+ {
+ shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ // check shard does not exist
+ shardManager1.tell(new FindLocalShard(
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+
+ shardManager2.tell(new FindLocalShard(
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
+ kit2.expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+
+ // create shard on node1
+ final Builder builder = Shard.builder();
+
+ final CreatePrefixedShard createPrefixedShard = new CreatePrefixedShard(
+ new PrefixShardConfiguration(TEST_ID,
+ PrefixShardStrategy.NAME,
+ Lists.newArrayList(MEMBER_1, MEMBER_2)),
+ datastoreContextBuilder.build(), builder);
+
+ shardManager1.tell(createPrefixedShard, getRef());
+ expectMsgClass(duration("5 seconds"), Success.class);
+
+ shardManager1.underlyingActor().waitForMemberUp();
+
+ LOG.info("changed leader state");
+
+ // check node2 cannot find it locally
+ shardManager1.tell(new FindLocalShard(
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+ shardManager2.tell(new FindLocalShard(
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
+ kit2.expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+
+ // but can remotely
+ shardManager2.tell(new FindPrimary(
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
+ kit2.expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
+
+ // add replica and verify if succesful
+ shardManager2.tell(new AddPrefixShardReplica(TEST_ID.getRootIdentifier()), kit2.getRef());
+ kit2.expectMsgClass(duration("5 seconds"), Success.class);
+
+ // verify we have a replica on manager2 now
+ shardManager2.tell(new FindLocalShard(
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
+ kit2.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+ }
+ };
+ }
+
+ private ActorSystem newActorSystem(final String config) {
+ final ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
+ actorSystems.add(system);
+ return system;
+ }
+}
\ No newline at end of file
import com.typesafe.config.ConfigFactory;
import java.net.URI;
import java.util.AbstractMap;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.SerializationUtils;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mock;
import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.AbstractShardManagerTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.ShardManager.SchemaContextModules;
-import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.TestActorFactory;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-public class ShardManagerTest extends AbstractActorTest {
+public class ShardManagerTest extends AbstractShardManagerTest {
private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
- private static final MemberName MEMBER_1 = MemberName.forName("member-1");
private static final MemberName MEMBER_2 = MemberName.forName("member-2");
private static final MemberName MEMBER_3 = MemberName.forName("member-3");
- private static int ID_COUNTER = 1;
-
- private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
- @Mock
- private static CountDownLatch ready;
-
- private static ShardIdentifier mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config");
-
- private static TestActorRef<MessageCollectorActor> mockShardActor = TestActorRef.create(getSystem(),
- Props.create(MessageCollectorActor.class), mockShardName.toString());
-
- private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
- .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
- .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
-
- private final Collection<ActorSystem> actorSystems = new ArrayList<>();
-
- private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
-
- InMemoryJournal.clear();
- InMemorySnapshotStore.clear();
-
- mockShardActor.underlyingActor().clear();
- }
-
- @After
- public void tearDown() {
- InMemoryJournal.clear();
- InMemorySnapshotStore.clear();
-
- for (ActorSystem system: actorSystems) {
- JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
- }
-
- actorFactory.close();
- }
-
private ActorSystem newActorSystem(String config) {
ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
actorSystems.add(system);
return newShardMgrProps(new MockConfiguration());
}
- private Props newShardMgrProps(Configuration config) {
- return newTestShardMgrBuilder(config).props();
- }
-
private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
return mockFactory;
}
- private TestShardManager.Builder newTestShardMgrBuilder() {
- return TestShardManager.builder(datastoreContextBuilder);
- }
-
- private TestShardManager.Builder newTestShardMgrBuilder(Configuration config) {
- return TestShardManager.builder(datastoreContextBuilder).configuration(config);
- }
-
private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
}
};
}
- private static class TestShardManager extends ShardManager {
+ public static class TestShardManager extends ShardManager {
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final CountDownLatch snapshotPersist = new CountDownLatch(1);
private ShardManagerSnapshot snapshot;
Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
}
- void waitForMemberUp() {
+ public void waitForMemberUp() {
assertEquals("MemberUp received", true,
Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
memberUpReceived = new CountDownLatch(1);
return new Builder(datastoreContextBuilder);
}
- private static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
+ public static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
private ActorRef shardActor;
private final Map<String, ActorRef> shardActors = new HashMap<>();
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
import org.opendaylight.controller.cluster.datastore.config.ModuleConfig;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
public class MockConfiguration extends ConfigurationImpl {
public MockConfiguration() {
return retMap;
});
}
+
+ @Override
+ public ShardStrategy getStrategyForPrefix(@Nonnull final YangInstanceIdentifier prefix) {
+ return null;
+ }
}