<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_${scala.version}</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-distributed-data-experimental_${scala.version}</artifactId>
+ <version>2.4.7</version>
+ </dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_${scala.version}</artifactId>
*
* @author Robert Varga
*/
-abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
+public abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
enum State {
IDLE,
TX_OPEN,
* @throws TransactionChainClosedException if this history is closed
* @throws IllegalStateException if a previous dependent transaction has not been closed
*/
- public final ClientTransaction createTransaction() {
+ public ClientTransaction createTransaction() {
checkNotClosed();
synchronized (this) {
* @author Robert Varga
*/
@Beta
-public final class ClientLocalHistory extends AbstractClientHistory implements AutoCloseable {
+public class ClientLocalHistory extends AbstractClientHistory implements AutoCloseable {
ClientLocalHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier historyId) {
super(client, historyId);
}
* @author Robert Varga
*/
@Beta
-public final class ClientTransaction extends AbstractClientHandle<AbstractProxyTransaction> {
+public class ClientTransaction extends AbstractClientHandle<AbstractProxyTransaction> {
private ClientTransactionCursor cursor;
import java.io.ObjectStreamException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;
}
private Object readResolve() throws ObjectStreamException {
- return new org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot(shardList);
+ return new org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot(shardList,
+ Collections.emptyMap());
}
}
package org.opendaylight.controller.cluster.datastore.config;
import java.util.Collection;
+import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
public interface Configuration {
/**
* Return the shard name corresponding to the prefix, or null if none is configured.
*/
- @Nullable String getShardNameForPrefix(@Nonnull YangInstanceIdentifier prefix);
+ @Nullable String getShardNameForPrefix(@Nonnull DOMDataTreeIdentifier prefix);
/**
* Returns the member replicas for the given shard name.
*/
void addPrefixShardConfiguration(@Nonnull PrefixShardConfiguration config);
+ /**
+ * Removes a shard configuration for the specified prefix.
+ */
+ void removePrefixShardConfiguration(@Nonnull DOMDataTreeIdentifier prefix);
+
+ /**
+ * Returns the configuration for all configured prefix shards.
+ *
+ * @return An immutable copy of the currently configured prefix shards.
+ */
+ Map<DOMDataTreeIdentifier, PrefixShardConfiguration> getAllPrefixShardConfigurations();
+
/**
* Returns a unique set of all member names configured for all shards.
*/
/**
* Returns the ShardStrategy for the given prefix or null if the prefix is not found.
*/
- @Nullable ShardStrategy getStrategyForPrefix(@Nonnull YangInstanceIdentifier prefix);
+ @Nullable ShardStrategy getStrategyForPrefix(@Nonnull DOMDataTreeIdentifier prefix);
}
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
// TODO clean this up once we get rid of module based configuration, prefix one should be alot simpler
private volatile Map<String, ModuleConfig> moduleConfigMap;
// TODO should this be initialized with something? on restart we should restore the shards from configuration?
- private volatile Map<YangInstanceIdentifier, PrefixShardConfiguration> prefixConfigMap = Collections.emptyMap();
+ private volatile Map<DOMDataTreeIdentifier, PrefixShardConfiguration> prefixConfigMap = Collections.emptyMap();
// Look up maps to speed things up
@Nullable
@Override
- public String getShardNameForPrefix(@Nonnull final YangInstanceIdentifier prefix) {
+ public String getShardNameForPrefix(@Nonnull final DOMDataTreeIdentifier prefix) {
Preconditions.checkNotNull(prefix, "prefix should not be null");
- Entry<YangInstanceIdentifier, PrefixShardConfiguration> bestMatchEntry =
- new SimpleEntry<>(YangInstanceIdentifier.EMPTY, null);
+ Entry<DOMDataTreeIdentifier, PrefixShardConfiguration> bestMatchEntry =
+ new SimpleEntry<>(
+ new DOMDataTreeIdentifier(prefix.getDatastoreType(), YangInstanceIdentifier.EMPTY), null);
- for (Entry<YangInstanceIdentifier, PrefixShardConfiguration> entry : prefixConfigMap.entrySet()) {
- if (entry.getKey().contains(prefix) && entry.getKey().getPathArguments().size()
- > bestMatchEntry.getKey().getPathArguments().size()) {
+ for (Entry<DOMDataTreeIdentifier, PrefixShardConfiguration> entry : prefixConfigMap.entrySet()) {
+ if (entry.getKey().contains(prefix) && entry.getKey().getRootIdentifier().getPathArguments().size()
+ > bestMatchEntry.getKey().getRootIdentifier().getPathArguments().size()) {
bestMatchEntry = entry;
}
}
//TODO we really should have mapping based on prefix instead of Strings
- return ClusterUtils.getCleanShardName(bestMatchEntry.getValue().getPrefix().getRootIdentifier());
+ return ClusterUtils.getCleanShardName(bestMatchEntry.getKey().getRootIdentifier());
}
@Override
@Override
public void addPrefixShardConfiguration(@Nonnull final PrefixShardConfiguration config) {
Preconditions.checkNotNull(config, "PrefixShardConfiguration cannot be null");
- updatePrefixConfigMap(config);
+ addPrefixConfig(config);
allShardNames = ImmutableSet.<String>builder().addAll(allShardNames)
.add(ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier())).build();
}
- private void updatePrefixConfigMap(final PrefixShardConfiguration config) {
- final Map<YangInstanceIdentifier, PrefixShardConfiguration> newPrefixConfigMap = new HashMap<>(prefixConfigMap);
- newPrefixConfigMap.put(config.getPrefix().getRootIdentifier(), config);
+ @Override
+ public void removePrefixShardConfiguration(@Nonnull final DOMDataTreeIdentifier prefix) {
+ Preconditions.checkNotNull(prefix, "Prefix cannot be null");
+
+ removePrefixConfig(prefix);
+
+ final HashSet<String> temp = new HashSet<>(allShardNames);
+ temp.remove(ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
+
+ allShardNames = ImmutableSet.copyOf(temp);
+ }
+
+ @Override
+ public Map<DOMDataTreeIdentifier, PrefixShardConfiguration> getAllPrefixShardConfigurations() {
+ return ImmutableMap.copyOf(prefixConfigMap);
+ }
+
+ private void addPrefixConfig(final PrefixShardConfiguration config) {
+ final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newPrefixConfigMap = new HashMap<>(prefixConfigMap);
+ newPrefixConfigMap.put(config.getPrefix(), config);
+ prefixConfigMap = ImmutableMap.copyOf(newPrefixConfigMap);
+ }
+
+ private void removePrefixConfig(final DOMDataTreeIdentifier prefix) {
+ final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newPrefixConfigMap = new HashMap<>(prefixConfigMap);
+ newPrefixConfigMap.remove(prefix);
prefixConfigMap = ImmutableMap.copyOf(newPrefixConfigMap);
}
}
@Override
- public ShardStrategy getStrategyForPrefix(@Nonnull final YangInstanceIdentifier prefix) {
+ public ShardStrategy getStrategyForPrefix(@Nonnull final DOMDataTreeIdentifier prefix) {
Preconditions.checkNotNull(prefix, "Prefix cannot be null");
// FIXME using prefix tables like in mdsal will be better
- Entry<YangInstanceIdentifier, PrefixShardConfiguration> bestMatchEntry =
- new SimpleEntry<>(YangInstanceIdentifier.EMPTY, null);
+ Entry<DOMDataTreeIdentifier, PrefixShardConfiguration> bestMatchEntry =
+ new SimpleEntry<>(
+ new DOMDataTreeIdentifier(prefix.getDatastoreType(), YangInstanceIdentifier.EMPTY), null);
- for (Entry<YangInstanceIdentifier, PrefixShardConfiguration> entry : prefixConfigMap.entrySet()) {
- if (entry.getKey().contains(prefix) && entry.getKey().getPathArguments().size()
- > bestMatchEntry.getKey().getPathArguments().size()) {
+ for (Entry<DOMDataTreeIdentifier, PrefixShardConfiguration> entry : prefixConfigMap.entrySet()) {
+ if (entry.getKey().contains(prefix) && entry.getKey().getRootIdentifier().getPathArguments().size()
+ > bestMatchEntry.getKey().getRootIdentifier().getPathArguments().size()) {
bestMatchEntry = entry;
}
}
if (bestMatchEntry.getValue() == null) {
return null;
}
- return new PrefixShardStrategy(
- ClusterUtils.getCleanShardName(bestMatchEntry.getValue().getPrefix().getRootIdentifier()), this);
+ return new PrefixShardStrategy(ClusterUtils
+ .getCleanShardName(bestMatchEntry.getKey().getRootIdentifier()),
+ bestMatchEntry.getKey().getRootIdentifier());
}
private void updateModuleConfigMap(final ModuleConfig moduleConfig) {
package org.opendaylight.controller.cluster.datastore.config;
+import akka.cluster.ddata.ReplicatedData;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
import java.io.Serializable;
import java.util.Collection;
+import java.util.HashSet;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
/**
* Configuration for prefix based shards.
*/
-public class PrefixShardConfiguration implements Serializable {
+public class PrefixShardConfiguration implements ReplicatedData, Serializable {
private static final long serialVersionUID = 1L;
private final DOMDataTreeIdentifier prefix;
public PrefixShardConfiguration(final DOMDataTreeIdentifier prefix,
final String shardStrategyName,
final Collection<MemberName> shardMemberNames) {
- this.prefix = prefix;
- this.shardStrategyName = shardStrategyName;
- this.shardMemberNames = shardMemberNames;
+ this.prefix = Preconditions.checkNotNull(prefix);
+ this.shardStrategyName = Preconditions.checkNotNull(shardStrategyName);
+ this.shardMemberNames = ImmutableSet.copyOf(shardMemberNames);
}
public DOMDataTreeIdentifier getPrefix() {
public Collection<MemberName> getShardMemberNames() {
return shardMemberNames;
}
+
+ @Override
+ public String toString() {
+ return "PrefixShardConfiguration{"
+ + "prefix=" + prefix
+ + ", shardStrategyName='"
+ + shardStrategyName + '\''
+ + ", shardMemberNames=" + shardMemberNames
+ + '}';
+ }
+
+ public String toDataMapKey() {
+ return "prefix=" + prefix;
+ }
+
+ @Override
+ public ReplicatedData merge(final ReplicatedData replicatedData) {
+ if (!(replicatedData instanceof PrefixShardConfiguration)) {
+ throw new IllegalStateException("replicatedData expected to be instance of PrefixShardConfiguration");
+ }
+ final PrefixShardConfiguration entry = (PrefixShardConfiguration) replicatedData;
+ if (!entry.getPrefix().equals(prefix)) {
+ // this should never happen since the key is the prefix
+ // if it does just return current?
+ return this;
+ }
+ final HashSet<MemberName> members = new HashSet<>(shardMemberNames);
+ members.addAll(entry.getShardMemberNames());
+ return new PrefixShardConfiguration(prefix, shardStrategyName, members);
+ }
}
import com.google.common.base.Preconditions;
import java.util.Optional;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
private final DataTree localShardDataTree;
- public ShardLeaderStateChanged(@Nonnull String memberId, @Nonnull String leaderId,
+ public ShardLeaderStateChanged(@Nonnull String memberId, @Nullable String leaderId,
@Nonnull DataTree localShardDataTree, short leaderPayloadVersion) {
super(memberId, leaderId, leaderPayloadVersion);
this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree);
}
- public ShardLeaderStateChanged(@Nonnull String memberId, @Nonnull String leaderId,
+ public ShardLeaderStateChanged(@Nonnull String memberId, @Nullable String leaderId,
short leaderPayloadVersion) {
super(memberId, leaderId, leaderPayloadVersion);
this.localShardDataTree = null;
package org.opendaylight.controller.cluster.datastore.persisted;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
/**
* Represents the persisted snapshot state for the ShardManager.
for (String shard: snapshot.shardList) {
out.writeObject(shard);
}
+
+ out.writeInt(snapshot.prefixShardConfiguration.size());
+ for (Map.Entry prefixShardConfigEntry : snapshot.prefixShardConfiguration.entrySet()) {
+ out.writeObject(prefixShardConfigEntry.getKey());
+ out.writeObject(prefixShardConfigEntry.getValue());
+ }
}
@Override
shardList.add((String) in.readObject());
}
- snapshot = new ShardManagerSnapshot(shardList);
+ size = in.readInt();
+ Map<DOMDataTreeIdentifier, PrefixShardConfiguration> prefixShardConfiguration = new HashMap<>(size);
+ for (int i = 0; i < size; i++) {
+ prefixShardConfiguration.put((DOMDataTreeIdentifier) in.readObject(),
+ (PrefixShardConfiguration) in.readObject());
+ }
+
+ snapshot = new ShardManagerSnapshot(shardList, prefixShardConfiguration);
}
private Object readResolve() {
}
private final List<String> shardList;
+ private final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> prefixShardConfiguration;
- public ShardManagerSnapshot(@Nonnull final List<String> shardList) {
+ public ShardManagerSnapshot(@Nonnull final List<String> shardList,
+ final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> prefixShardConfiguration) {
this.shardList = ImmutableList.copyOf(shardList);
+ this.prefixShardConfiguration = ImmutableMap.copyOf(prefixShardConfiguration);
}
public List<String> getShardList() {
package org.opendaylight.controller.cluster.datastore.shardmanager;
+import static akka.actor.ActorRef.noSender;
import static akka.pattern.Patterns.ask;
import akka.actor.ActorRef;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.MemberWeaklyUp;
import akka.cluster.Member;
+import akka.cluster.ddata.DistributedData;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.Replicator.Changed;
+import akka.cluster.ddata.Replicator.Subscribe;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.japi.Function;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Sets.SetView;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final String persistenceId;
+ private final ActorRef replicator;
+
ShardManager(AbstractShardManagerCreator<?> builder) {
this.cluster = builder.getCluster();
this.configuration = builder.getConfiguration();
"shard-manager-" + this.type,
datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
shardManagerMBean.registerMBean();
+
+ replicator = DistributedData.get(context().system()).replicator();
+
+ }
+
+ public void preStart() {
+ LOG.info("Starting Shardmanager {}", persistenceId);
+
+ final Subscribe<ORMap<PrefixShardConfiguration>> subscribe =
+ new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self());
+ replicator.tell(subscribe, noSender());
}
@Override
onGetLocalShardIds();
} else if (message instanceof RunnableMessage) {
((RunnableMessage)message).run();
+ } else if (message instanceof Changed) {
+ onConfigChanged((Changed) message);
} else {
unknownMessage(message);
}
}
}
+ private void onConfigChanged(final Changed<ORMap<PrefixShardConfiguration>> change) {
+ LOG.debug("{}, ShardManager {} received config changed {}",
+ cluster.getCurrentMemberName(), persistenceId, change.dataValue().getEntries());
+
+ final Map<String, PrefixShardConfiguration> changedConfig = change.dataValue().getEntries();
+
+ final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig =
+ changedConfig.values().stream().collect(
+ Collectors.toMap(PrefixShardConfiguration::getPrefix, java.util.function.Function.identity()));
+
+ resolveConfig(newConfig);
+ }
+
+ private void resolveConfig(final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig) {
+ LOG.debug("{} ShardManager : {}, resolving new shard configuration : {}",
+ cluster.getCurrentMemberName(), persistenceId, newConfig);
+
+ newConfig.forEach((prefix, config) ->
+ LOG.debug("{} ShardManager : {}, received shard config "
+ + "for prefix {}, config {}", cluster.getCurrentMemberName(), persistenceId, prefix, config));
+
+ final SetView<DOMDataTreeIdentifier> removedConfigs =
+ Sets.difference(configuration.getAllPrefixShardConfigurations().keySet(), newConfig.keySet());
+
+ // resolve removals
+
+ resolveRemovals(removedConfigs);
+
+ final SetView<DOMDataTreeIdentifier> addedConfigs =
+ Sets.difference(newConfig.keySet(), configuration.getAllPrefixShardConfigurations().keySet());
+ // resolve additions
+
+ resolveAdditions(addedConfigs, newConfig);
+ // iter through to update existing shards, either start/stop replicas or update the shard
+ // to check for more peers
+ resolveUpdates(Collections.emptySet());
+ }
+
+ private void resolveRemovals(final Set<DOMDataTreeIdentifier> removedConfigs) {
+ LOG.debug("{} ShardManager : {}, resolving removed configs : {}",
+ cluster.getCurrentMemberName(), persistenceId, removedConfigs);
+
+ removedConfigs.forEach(id -> doRemovePrefixedShard(id));
+ }
+
+ private void resolveAdditions(final Set<DOMDataTreeIdentifier> addedConfigs,
+ final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> configs) {
+ LOG.debug("{} ShardManager : {}, resolving added configs : {}", addedConfigs);
+
+ addedConfigs.forEach(id -> doCreatePrefixedShard(configs.get(id)));
+ }
+
+ private void resolveUpdates(Set<DOMDataTreeIdentifier> maybeUpdatedConfigs) {
+ LOG.debug("{} ShardManager : {}, resolving potentially updated configs : {}", maybeUpdatedConfigs);
+ }
+
+ private void doRemovePrefixedShard(final DOMDataTreeIdentifier prefix) {
+ LOG.debug("{} ShardManager : {}, removing prefix shard: {}",
+ cluster.getCurrentMemberName(), persistenceId, prefix);
+ final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix);
+ final ShardInformation shard = localShards.remove(shardId.getShardName());
+
+ configuration.removePrefixShardConfiguration(prefix);
+
+ if (shard == null) {
+ LOG.warn("Received removal for unconfigured shard: {} , ignoring.. ", prefix);
+ return;
+ }
+
+ if (shard.getActor() != null) {
+ LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor());
+ shard.getActor().tell(Shutdown.INSTANCE, self());
+ }
+ LOG.debug("{} : {} : Local Shard replica for shard {} has been removed", cluster.getCurrentMemberName(),
+ persistenceId(), shardId.getShardName());
+ persistShardList();
+ }
+
private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
String leaderPath) {
shardReplicaOperationsInProgress.remove(shardId.getShardName());
}
private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
- final PrefixShardConfiguration config = createPrefixedShard.getConfig();
+ doCreatePrefixedShard(createPrefixedShard.getConfig());
+ // do not replicate on this level
+ }
+
+ private void doCreatePrefixedShard(final PrefixShardConfiguration config) {
+ LOG.debug("doCreatePrefixShard : {}", config.getPrefix());
final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
- createPrefixedShard.getConfig().getPrefix());
+ config.getPrefix());
final String shardName = shardId.getShardName();
configuration.addPrefixShardConfiguration(config);
- DatastoreContext shardDatastoreContext = createPrefixedShard.getContext();
-
- if (shardDatastoreContext == null) {
- final Builder builder = newShardDatastoreContextBuilder(shardName);
- builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
- .storeRoot(config.getPrefix().getRootIdentifier());
- shardDatastoreContext = builder.build();
- } else {
- shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
- peerAddressResolver).build();
- }
-
- final boolean shardWasInRecoveredSnapshot = currentSnapshot != null
- && currentSnapshot.getShardList().contains(shardName);
+ final Builder builder = newShardDatastoreContextBuilder(shardName);
+ builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
+ .storeRoot(config.getPrefix().getRootIdentifier());
+ DatastoreContext shardDatastoreContext = builder.build();
final Map<String, String> peerAddresses = Collections.emptyMap();
final boolean isActiveMember = true;
- LOG.debug("{} doCreatePrefixedShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
- persistenceId(), shardId, peerAddresses, isActiveMember);
+ LOG.debug("{} doCreatePrefixedShard: persistenceId(): {}, memberNames: "
+ + "{}, peerAddresses: {}, isActiveMember: {}",
+ shardId, persistenceId(), config.getShardMemberNames(),
+ peerAddresses, isActiveMember);
final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
- shardDatastoreContext, createPrefixedShard.getShardBuilder(), peerAddressResolver);
+ shardDatastoreContext, Shard.builder(), peerAddressResolver);
info.setActiveMember(isActiveMember);
localShards.put(info.getShardName(), info);
if (schemaContext != null) {
info.setActor(newShardActor(schemaContext, info));
}
+
+ persistShardList();
}
private void doCreateShard(final CreateShard createShard) {
* @param shardName the shard name
*/
private Map<String, String> getPeerAddresses(String shardName) {
- Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
- Map<String, String> peerAddresses = new HashMap<>();
+ final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
+ return getPeerAddresses(shardName, members);
+ }
+ private Map<String, String> getPeerAddresses(final String shardName, final Collection<MemberName> members) {
+ Map<String, String> peerAddresses = new HashMap<>();
MemberName currentMemberName = this.cluster.getCurrentMemberName();
for (MemberName memberName : members) {
}
}
LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
- saveSnapshot(updateShardManagerSnapshot(shardList));
+ saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations()));
}
- private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
- currentSnapshot = new ShardManagerSnapshot(shardList);
+ private ShardManagerSnapshot updateShardManagerSnapshot(
+ final List<String> shardList,
+ final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> allPrefixShardConfigurations) {
+ currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations);
return currentSnapshot;
}
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
+import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;
}
private Object readResolve() {
- return new org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot(shardList);
+ return new org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot(shardList,
+ Collections.emptyMap());
}
@Override
public String findShard(YangInstanceIdentifier path) {
return DEFAULT_SHARD;
}
+
+ @Override
+ public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) {
+ return YangInstanceIdentifier.EMPTY;
+ }
}
String shardName = configuration.getShardNameForModule(moduleName);
return shardName != null ? shardName : DefaultShardStrategy.DEFAULT_SHARD;
}
+
+ @Override
+ public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) {
+ return YangInstanceIdentifier.EMPTY;
+ }
+
+
}
package org.opendaylight.controller.cluster.datastore.shardstrategy;
-import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
/**
public static final String NAME = "prefix";
private final String shardName;
- private final Configuration configuration;
+ private final YangInstanceIdentifier prefix;
- public PrefixShardStrategy(final String shardName, final Configuration configuration) {
- this.shardName = shardName;
- this.configuration = configuration;
+ public PrefixShardStrategy(final String shardName,
+ final YangInstanceIdentifier prefix) {
+ this.shardName = shardName != null ? shardName : DefaultShardStrategy.DEFAULT_SHARD;
+ this.prefix = prefix;
}
@Override
public String findShard(final YangInstanceIdentifier path) {
- final String shardNameForPrefix = configuration.getShardNameForPrefix(path);
- return shardNameForPrefix != null ? shardName : DefaultShardStrategy.DEFAULT_SHARD;
+ return shardName;
+ }
+
+ @Override
+ public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) {
+ return prefix;
}
}
* @return the corresponding shard name.
*/
String findShard(YangInstanceIdentifier path);
+
+ /**
+ * Get the prefix of the shard that contains the data pointed to by the specified path.
+ * @param path the location of the data in the logical tree.
+ * @return the corresponding shards prefix.
+ */
+ YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path);
}
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
public class ShardStrategyFactory {
private static final String UNKNOWN_MODULE_NAME = "unknown";
private final Configuration configuration;
+ private final LogicalDatastoreType logicalStoreType;
- public ShardStrategyFactory(final Configuration configuration) {
+ public ShardStrategyFactory(final Configuration configuration, LogicalDatastoreType logicalStoreType) {
Preconditions.checkState(configuration != null, "configuration should not be missing");
this.configuration = configuration;
+ this.logicalStoreType = Preconditions.checkNotNull(logicalStoreType);
}
public ShardStrategy getStrategy(final YangInstanceIdentifier path) {
final ShardStrategy shardStrategy = configuration.getStrategyForModule(moduleName);
if (shardStrategy == null) {
// retry with prefix based sharding
- final ShardStrategy strategyForPrefix = configuration.getStrategyForPrefix(path);
+ final ShardStrategy strategyForPrefix =
+ configuration.getStrategyForPrefix(new DOMDataTreeIdentifier(logicalStoreType, path));
if (strategyForPrefix == null) {
return DefaultShardStrategy.getInstance();
}
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
this.datastoreContext = datastoreContext;
this.dispatchers = new Dispatchers(actorSystem.dispatchers());
this.primaryShardInfoCache = primaryShardInfoCache;
- this.shardStrategyFactory = new ShardStrategyFactory(configuration);
+
+ final LogicalDatastoreType convertedType =
+ LogicalDatastoreType.valueOf(datastoreContext.getLogicalStoreType().name());
+ this.shardStrategyFactory = new ShardStrategyFactory(configuration, convertedType);
setCachedProperties();
package org.opendaylight.controller.cluster.datastore.utils;
+import akka.cluster.ddata.Key;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORMapKey;
+import java.util.Map;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
/**
* Utils for encoding prefix shard name.
*/
public class ClusterUtils {
+ // key for replicated configuration key
+ public static final Key<ORMap<PrefixShardConfiguration>> CONFIGURATION_KEY =
+ ORMapKey.create("prefix-shard-configuration");
+
public static ShardIdentifier getShardIdentifier(final MemberName memberName, final DOMDataTreeIdentifier prefix) {
- return ShardIdentifier
- .create(getCleanShardName(prefix.getRootIdentifier()), memberName, prefix.getDatastoreType().name());
+ final String type;
+ switch (prefix.getDatastoreType()) {
+ case OPERATIONAL:
+ type = "operational";
+ break;
+ case CONFIGURATION:
+ type = "config";
+ break;
+ default:
+ type = prefix.getDatastoreType().name();
+ }
+
+ return ShardIdentifier.create(getCleanShardName(prefix.getRootIdentifier()), memberName, type);
}
/**
* @return encoded name that doesn't contain characters that cannot be in actor path.
*/
public static String getCleanShardName(final YangInstanceIdentifier path) {
+ if (path.isEmpty()) {
+ return "default";
+ }
+
final StringBuilder builder = new StringBuilder();
// TODO need a better mapping that includes namespace, but we'll need to cleanup the string beforehand
+ // we have to fight both javax and akka url path restrictions..
path.getPathArguments().forEach(p -> {
builder.append(p.getNodeType().getLocalName());
+ if (p instanceof NodeIdentifierWithPredicates) {
+ builder.append("-key_");
+ final Map<QName, Object> key = ((NodeIdentifierWithPredicates) p).getKeyValues();
+ key.entrySet().forEach(e -> {
+ builder.append(e.getKey().getLocalName());
+ builder.append(e.getValue());
+ builder.append("-");
+ });
+ builder.append("_");
+ }
builder.append("!");
});
return builder.toString();
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
+import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext;
+import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardProducer;
+import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
+import org.opendaylight.mdsal.dom.spi.shard.ReadableWriteableDOMDataTreeShard;
+import org.opendaylight.mdsal.dom.spi.shard.SubshardProducerSpecification;
+import org.opendaylight.mdsal.dom.spi.shard.WriteableDOMDataTreeShard;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Proxy implementation of a shard that creates forwarding producers to the backend shard.
+ */
+class DistributedShardFrontend implements ReadableWriteableDOMDataTreeShard {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DistributedShardFrontend.class);
+
+ private final DataStoreClient client;
+ private final DOMDataTreeIdentifier shardRoot;
+ @GuardedBy("this")
+ private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards = new HashMap<>();
+ @GuardedBy("this")
+ private final List<ShardProxyProducer> producers = new ArrayList<>();
+ private final DistributedDataStore distributedDataStore;
+
+ DistributedShardFrontend(final DistributedDataStore distributedDataStore,
+ final DataStoreClient client,
+ final DOMDataTreeIdentifier shardRoot) {
+ this.distributedDataStore = Preconditions.checkNotNull(distributedDataStore);
+ this.client = Preconditions.checkNotNull(client);
+ this.shardRoot = Preconditions.checkNotNull(shardRoot);
+ }
+
+ @Override
+ public synchronized DOMDataTreeShardProducer createProducer(final Collection<DOMDataTreeIdentifier> paths) {
+ for (final DOMDataTreeIdentifier prodPrefix : paths) {
+ Preconditions.checkArgument(paths.contains(prodPrefix), "Prefix %s is not contained under shard root",
+ prodPrefix, paths);
+ }
+
+ final ShardProxyProducer ret =
+ new ShardProxyProducer(shardRoot, paths, client, createModificationFactory(paths));
+ producers.add(ret);
+ return ret;
+ }
+
+ @Override
+ public synchronized void onChildAttached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
+ LOG.debug("{} : Child shard attached at {}", shardRoot, prefix);
+ Preconditions.checkArgument(child != this, "Attempted to attach child %s onto self", this);
+ addChildShard(prefix, child);
+ updateProducers();
+ }
+
+ @Override
+ public synchronized void onChildDetached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
+ LOG.debug("{} : Child shard detached at {}", shardRoot, prefix);
+ childShards.remove(prefix);
+ updateProducers();
+ // TODO we should grab the dataTreeSnapshot that's in the shard and apply it to this shard
+ }
+
+ private void addChildShard(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
+ Preconditions.checkArgument(child instanceof WriteableDOMDataTreeShard);
+ childShards.put(prefix, new ChildShardContext(prefix, (WriteableDOMDataTreeShard) child));
+ }
+
+ DistributedShardModificationFactory createModificationFactory(final Collection<DOMDataTreeIdentifier> prefixes) {
+ // TODO this could be abstract
+ final Map<DOMDataTreeIdentifier, SubshardProducerSpecification> affectedSubshards = new HashMap<>();
+
+ for (final DOMDataTreeIdentifier producerPrefix : prefixes) {
+ for (final ChildShardContext maybeAffected : childShards.values()) {
+ final DOMDataTreeIdentifier bindPath;
+ if (producerPrefix.contains(maybeAffected.getPrefix())) {
+ bindPath = maybeAffected.getPrefix();
+ } else if (maybeAffected.getPrefix().contains(producerPrefix)) {
+ // Bound path is inside subshard
+ bindPath = producerPrefix;
+ } else {
+ continue;
+ }
+
+ SubshardProducerSpecification spec = affectedSubshards.get(maybeAffected.getPrefix());
+ if (spec == null) {
+ spec = new SubshardProducerSpecification(maybeAffected);
+ affectedSubshards.put(maybeAffected.getPrefix(), spec);
+ }
+ spec.addPrefix(bindPath);
+ }
+ }
+
+ final DistributedShardModificationFactoryBuilder builder =
+ new DistributedShardModificationFactoryBuilder(shardRoot);
+ for (final SubshardProducerSpecification spec : affectedSubshards.values()) {
+ final ForeignShardModificationContext foreignContext =
+ new ForeignShardModificationContext(spec.getPrefix(), spec.createProducer());
+ builder.addSubshard(foreignContext);
+ builder.addSubshard(spec.getPrefix(), foreignContext);
+ }
+
+ return builder.build();
+ }
+
+ private void updateProducers() {
+ for (final ShardProxyProducer producer : producers) {
+ producer.setModificationFactory(createModificationFactory(producer.getPrefixes()));
+ }
+ }
+
+ @Nonnull
+ @Override
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
+ final YangInstanceIdentifier treeId, final L listener) {
+ throw new UnsupportedOperationException("Listener registration not supported");
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
+import org.opendaylight.mdsal.dom.spi.shard.WritableNodeOperation;
+import org.opendaylight.mdsal.dom.spi.shard.WriteCursorStrategy;
+import org.opendaylight.mdsal.dom.spi.shard.WriteableModificationNode;
+import org.opendaylight.mdsal.dom.spi.shard.WriteableNodeWithSubshard;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+
+/**
+ * Shard modification that consists of the whole shard context, provides cursors which correctly delegate to subshards
+ * if any are present.
+ */
+public class DistributedShardModification extends WriteableNodeWithSubshard {
+
+ private final DistributedShardModificationContext context;
+ private final Map<DOMDataTreeIdentifier, ForeignShardModificationContext> childShards;
+
+ public DistributedShardModification(final DistributedShardModificationContext context,
+ final Map<PathArgument, WriteableModificationNode> subshards,
+ final Map<DOMDataTreeIdentifier, ForeignShardModificationContext> childShards) {
+ super(subshards);
+ this.context = Preconditions.checkNotNull(context);
+ this.childShards = Preconditions.checkNotNull(childShards);
+ }
+
+ @Override
+ public PathArgument getIdentifier() {
+ return context.getIdentifier().getRootIdentifier().getLastPathArgument();
+ }
+
+ @Override
+ public WriteCursorStrategy createOperation(final DOMDataTreeWriteCursor parentCursor) {
+ return new WritableNodeOperation(this, context.cursor()) {
+ @Override
+ public void exit() {
+ throw new IllegalStateException("Can not exit data tree root");
+ }
+ };
+ }
+
+ void cursorClosed() {
+ context.closeCursor();
+ }
+
+ DOMStoreThreePhaseCommitCohort seal() {
+ childShards.values().forEach(ForeignShardModificationContext::ready);
+ return context.ready();
+ }
+
+ DOMDataTreeIdentifier getPrefix() {
+ return context.getIdentifier();
+ }
+
+ Map<DOMDataTreeIdentifier, ForeignShardModificationContext> getChildShards() {
+ return childShards;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+
+/**
+ * The context for a single shards modification, keeps a ClientTransaction so it can route requests correctly.
+ */
+public class DistributedShardModificationContext {
+
+ private ClientTransaction transaction;
+ private DOMDataTreeIdentifier identifier;
+ private DOMDataTreeWriteCursor cursor;
+
+ public DistributedShardModificationContext(final ClientTransaction transaction,
+ final DOMDataTreeIdentifier identifier) {
+ this.transaction = transaction;
+ this.identifier = identifier;
+ }
+
+ public DOMDataTreeIdentifier getIdentifier() {
+ return identifier;
+ }
+
+ DOMDataTreeWriteCursor cursor() {
+ if (cursor == null) {
+ cursor = transaction.openCursor();
+ }
+
+ return cursor;
+ }
+
+ DOMStoreThreePhaseCommitCohort ready() {
+ if (cursor != null) {
+ cursor.close();
+ cursor = null;
+ }
+
+ return transaction.ready();
+ }
+
+ void closeCursor() {
+ if (cursor != null) {
+ cursor.close();
+ cursor = null;
+ }
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import org.opendaylight.mdsal.dom.spi.shard.AbstractDataModificationCursor;
+import org.opendaylight.mdsal.dom.spi.shard.WriteCursorStrategy;
+
+/**
+ * Internal cursor implementation consisting of WriteCursorStrategies which forwards writes to foreign modifications
+ * if any.
+ */
+public class DistributedShardModificationCursor extends AbstractDataModificationCursor<DistributedShardModification> {
+
+ private ShardProxyTransaction parent;
+
+ public DistributedShardModificationCursor(final DistributedShardModification root,
+ final ShardProxyTransaction parent) {
+ super(root);
+ this.parent = parent;
+ }
+
+ @Override
+ protected WriteCursorStrategy getRootOperation(final DistributedShardModification root) {
+ return root.createOperation(null);
+ }
+
+ @Override
+ public void close() {
+ parent.cursorClosed();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
+import org.opendaylight.mdsal.dom.spi.shard.WriteableModificationNode;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+
+/**
+ * Factory for {@link DistributedShardModification}.
+ */
+public final class DistributedShardModificationFactory {
+ private final Map<DOMDataTreeIdentifier, ForeignShardModificationContext> childShards;
+ private final Map<PathArgument, WriteableModificationNode> children;
+ private final DOMDataTreeIdentifier root;
+
+ DistributedShardModificationFactory(final DOMDataTreeIdentifier root,
+ final Map<PathArgument, WriteableModificationNode> children,
+ final Map<DOMDataTreeIdentifier, ForeignShardModificationContext> childShards) {
+ this.root = Preconditions.checkNotNull(root);
+ this.children = ImmutableMap.copyOf(children);
+ this.childShards = ImmutableMap.copyOf(childShards);
+ }
+
+ @VisibleForTesting
+ Map<PathArgument, WriteableModificationNode> getChildren() {
+ return children;
+ }
+
+ @VisibleForTesting
+ Map<DOMDataTreeIdentifier, ForeignShardModificationContext> getChildShards() {
+ return childShards;
+ }
+
+ DistributedShardModification createModification(final ClientTransaction transaction) {
+ return new DistributedShardModification(
+ new DistributedShardModificationContext(transaction, root), children, childShards);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.store.inmemory.ShardDataModificationFactoryBuilder;
+
+/**
+ * Builder for {@link DistributedShardModificationFactory}.
+ */
+public class DistributedShardModificationFactoryBuilder
+ extends ShardDataModificationFactoryBuilder<DistributedShardModificationFactory> {
+
+
+ public DistributedShardModificationFactoryBuilder(final DOMDataTreeIdentifier root) {
+ super(root);
+ }
+
+ @Override
+ public DistributedShardModificationFactory build() {
+ return new DistributedShardModificationFactory(root, buildChildren(), childShards);
+ }
+}
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import akka.cluster.Cluster;
+import akka.cluster.Member;
import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
+import com.google.common.collect.Collections2;
import com.google.common.collect.ForwardingObject;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.concurrent.CompletionException;
+import java.util.EnumMap;
+import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
-import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
-import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator;
-import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
-import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
+import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
+import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
+import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
+import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
+import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
+import scala.collection.JavaConverters;
/**
* A layer on top of DOMDataTreeService that distributes producer/shard registrations to remote nodes via
private final ActorRef shardedDataTreeActor;
private final MemberName memberName;
+ private final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<DOMDataTreeShard>> shards =
+ DOMDataTreePrefixTable.create();
+
+ private final EnumMap<LogicalDatastoreType, DistributedShardRegistration> defaultShardRegistrations =
+ new EnumMap<>(LogicalDatastoreType.class);
+
public DistributedShardedDOMDataTree(final ActorSystem actorSystem,
final DistributedDataStore distributedOperDatastore,
final DistributedDataStore distributedConfigDatastore) {
shardedDataTreeActor = createShardedDataTreeActor(actorSystem,
new ShardedDataTreeActorCreator()
- .setDataTreeService(shardedDOMDataTree)
- .setShardingService(shardedDOMDataTree)
+ .setShardingService(this)
.setActorSystem(actorSystem)
.setClusterWrapper(distributedConfigDatastore.getActorContext().getClusterWrapper())
.setDistributedConfigDatastore(distributedConfigDatastore)
ACTOR_ID);
this.memberName = distributedConfigDatastore.getActorContext().getCurrentMemberName();
+
+ //create shard registration for DEFAULT_SHARD
+ try {
+ defaultShardRegistrations.put(LogicalDatastoreType.CONFIGURATION,
+ initDefaultShard(LogicalDatastoreType.CONFIGURATION));
+ } catch (final DOMDataTreeProducerException | DOMDataTreeShardingConflictException e) {
+ LOG.error("Unable to create default shard frontend for config shard", e);
+ }
+
+ try {
+ defaultShardRegistrations.put(LogicalDatastoreType.OPERATIONAL,
+ initDefaultShard(LogicalDatastoreType.OPERATIONAL));
+ } catch (final DOMDataTreeProducerException | DOMDataTreeShardingConflictException e) {
+ LOG.error("Unable to create default shard frontend for operational shard", e);
+ }
}
@Nonnull
@Nonnull
@Override
public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
- LOG.debug("Creating producer for {}", subtrees);
+ LOG.debug("{} - Creating producer for {}",
+ distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(subtrees);
final Object response = distributedConfigDatastore.getActorContext()
.executeOperation(shardedDataTreeActor, new ProducerCreated(subtrees));
if (response == null) {
- LOG.debug("Received success from remote nodes, creating producer:{}", subtrees);
+ LOG.debug("{} - Received success from remote nodes, creating producer:{}",
+ distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
return new ProxyProducer(producer, subtrees, shardedDataTreeActor,
distributedConfigDatastore.getActorContext());
} else if (response instanceof Exception) {
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
+ //TODO: it would be better to block here until the message is processed by the actor
public DistributedShardRegistration createDistributedShard(
final DOMDataTreeIdentifier prefix, final Collection<MemberName> replicaMembers)
- throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException,
- DOMDataTreeShardCreationFailedException {
+ throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException {
+ final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
+ shards.lookup(prefix);
+ if (lookup != null && lookup.getValue().getPrefix().equals(prefix)) {
+ throw new DOMDataTreeShardingConflictException(
+ "Prefix " + prefix + " is already occupied by another shard.");
+ }
+
+ PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers);
+ shardedDataTreeActor.tell(new CreatePrefixShard(config), noSender());
+
+ return new DistributedShardRegistrationImpl(prefix, shardedDataTreeActor, this);
+ }
+
+ void resolveShardAdditions(final Set<DOMDataTreeIdentifier> additions) {
+ LOG.debug("Member {}: Resolving additions : {}", memberName, additions);
+ final ArrayList<DOMDataTreeIdentifier> list = new ArrayList<>(additions);
+ // we need to register the shards from top to bottom, so we need to atleast make sure the ordering reflects that
+ Collections.sort(list, (o1, o2) -> {
+ if (o1.getRootIdentifier().getPathArguments().size() < o2.getRootIdentifier().getPathArguments().size()) {
+ return -1;
+ } else if (o1.getRootIdentifier().getPathArguments().size()
+ == o2.getRootIdentifier().getPathArguments().size()) {
+ return 0;
+ } else {
+ return 1;
+ }
+ });
+ list.forEach(this::createShardFrontend);
+ }
+ void resolveShardRemovals(final Set<DOMDataTreeIdentifier> removals) {
+ LOG.debug("Member {}: Resolving removals : {}", memberName, removals);
+
+ // do we need to go from bottom to top?
+ removals.forEach(this::despawnShardFrontend);
+ }
+
+ private void createShardFrontend(final DOMDataTreeIdentifier prefix) {
+ LOG.debug("Member {}: Creating CDS shard for prefix: {}", memberName, prefix);
final String shardName = ClusterUtils.getCleanShardName(prefix.getRootIdentifier());
final DistributedDataStore distributedDataStore =
prefix.getDatastoreType().equals(org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION)
? distributedConfigDatastore : distributedOperDatastore;
- final PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers);
- if (replicaMembers.contains(memberName)) {
- // spawn the backend shard and have the shard Manager create all replicas
- final ActorRef shardManager = distributedDataStore.getActorContext().getShardManager();
-
- shardManager.tell(new CreatePrefixedShard(config, null, Shard.builder()), noSender());
- }
-
- LOG.debug("Creating distributed datastore client for shard {}", shardName);
- final Props distributedDataStoreClientProps =
- SimpleDataStoreClientActor
- .props(memberName, "Shard-" + shardName, distributedDataStore.getActorContext(), shardName);
+ try (final DOMDataTreeProducer producer = localCreateProducer(Collections.singletonList(prefix))) {
+ final Entry<DataStoreClient, ActorRef> entry =
+ createDatastoreClient(shardName, distributedDataStore.getActorContext());
- final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
- final DataStoreClient client;
- try {
- client = SimpleDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
- } catch (final Exception e) {
- LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
- clientActor.tell(PoisonPill.getInstance(), noSender());
- throw new DOMDataTreeProducerException("Unable to create producer", e);
- }
+ final DistributedShardFrontend shard =
+ new DistributedShardFrontend(distributedDataStore, entry.getKey(), prefix);
- // register the frontend into the sharding service and let the actor distribute this onto the other nodes
- final ListenerRegistration<ShardFrontend> shardFrontendRegistration;
- try (DOMDataTreeProducer producer = createProducer(Collections.singletonList(prefix))) {
- shardFrontendRegistration = shardedDOMDataTree
- .registerDataTreeShard(prefix,
- new ShardFrontend(client, prefix),
- ((ProxyProducer) producer).delegate());
+ @SuppressWarnings("unchecked")
+ final DOMDataTreeShardRegistration<DOMDataTreeShard> reg =
+ (DOMDataTreeShardRegistration) shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
+ shards.store(prefix, reg);
+ } catch (final DOMDataTreeShardingConflictException e) {
+ LOG.error("Prefix {} is already occupied by another shard", prefix, e);
+ } catch (DOMDataTreeProducerException e) {
+ LOG.error("Unable to close producer", e);
+ } catch (DOMDataTreeShardCreationFailedException e) {
+ LOG.error("Unable to create datastore client for shard {}", prefix, e);
}
+ }
- final Future<Object> future = distributedDataStore.getActorContext()
- .executeOperationAsync(shardedDataTreeActor, new PrefixShardCreated(config), DEFAULT_ASK_TIMEOUT);
- try {
- final Object result = Await.result(future, DEFAULT_ASK_TIMEOUT.duration());
- if (result != null) {
- throw new DOMDataTreeShardCreationFailedException("Received unexpected response to PrefixShardCreated"
- + result);
- }
-
- return new DistributedShardRegistrationImpl(shardFrontendRegistration, prefix, shardedDataTreeActor);
- } catch (final CompletionException e) {
- shardedDataTreeActor.tell(new PrefixShardRemoved(prefix), noSender());
- clientActor.tell(PoisonPill.getInstance(), noSender());
+ private void despawnShardFrontend(final DOMDataTreeIdentifier prefix) {
+ LOG.debug("Member {}: Removing CDS shard for prefix: {}", memberName, prefix);
+ final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
+ shards.lookup(prefix);
- final Throwable cause = e.getCause();
- if (cause instanceof DOMDataTreeShardingConflictException) {
- throw (DOMDataTreeShardingConflictException) cause;
- }
+ if (lookup == null || !lookup.getValue().getPrefix().equals(prefix)) {
+ LOG.debug("Member {}: Received despawn for non-existing CDS shard frontend, prefix: {}, ignoring..",
+ memberName, prefix);
+ return;
+ }
- throw new DOMDataTreeShardCreationFailedException("Shard creation failed.", e.getCause());
- } catch (final Exception e) {
- shardedDataTreeActor.tell(new PrefixShardRemoved(prefix), noSender());
- clientActor.tell(PoisonPill.getInstance(), noSender());
+ lookup.getValue().close();
+ // need to remove from our local table thats used for tracking
+ shards.remove(prefix);
+ }
- throw new DOMDataTreeShardCreationFailedException("Shard creation failed.", e);
- }
+ DOMDataTreeProducer localCreateProducer(final Collection<DOMDataTreeIdentifier> prefix) {
+ return shardedDOMDataTree.createProducer(prefix);
}
@Nonnull
return shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private Entry<DataStoreClient, ActorRef> createDatastoreClient(
+ final String shardName, final ActorContext actorContext)
+ throws DOMDataTreeShardCreationFailedException {
+
+ LOG.debug("Creating distributed datastore client for shard {}", shardName);
+ final Props distributedDataStoreClientProps =
+ SimpleDataStoreClientActor.props(memberName, "Shard-" + shardName, actorContext, shardName);
+
+ final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
+ try {
+ return new SimpleEntry<>(SimpleDataStoreClientActor
+ .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS), clientActor);
+ } catch (final Exception e) {
+ LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
+ clientActor.tell(PoisonPill.getInstance(), noSender());
+ throw new DOMDataTreeShardCreationFailedException(
+ "Unable to create datastore client for shard{" + shardName + "}", e);
+ }
+ }
+
+ private DistributedShardRegistration initDefaultShard(final LogicalDatastoreType logicalDatastoreType)
+ throws DOMDataTreeProducerException, DOMDataTreeShardingConflictException {
+ final Collection<Member> members = JavaConverters.asJavaCollectionConverter(
+ Cluster.get(actorSystem).state().members()).asJavaCollection();
+ final Collection<MemberName> names = Collections2.transform(members,
+ m -> MemberName.forName(m.roles().iterator().next()));
+
+ return createDistributedShard(
+ new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names);
+ }
+
private static void closeProducer(final DOMDataTreeProducer producer) {
try {
producer.close();
}
private static class DistributedShardRegistrationImpl implements DistributedShardRegistration {
- private final ListenerRegistration<ShardFrontend> registration;
+
private final DOMDataTreeIdentifier prefix;
private final ActorRef shardedDataTreeActor;
+ private final DistributedShardedDOMDataTree distributedShardedDOMDataTree;
- DistributedShardRegistrationImpl(final ListenerRegistration<ShardFrontend> registration,
- final DOMDataTreeIdentifier prefix,
- final ActorRef shardedDataTreeActor) {
- this.registration = registration;
+ DistributedShardRegistrationImpl(final DOMDataTreeIdentifier prefix,
+ final ActorRef shardedDataTreeActor,
+ final DistributedShardedDOMDataTree distributedShardedDOMDataTree) {
this.prefix = prefix;
this.shardedDataTreeActor = shardedDataTreeActor;
+ this.distributedShardedDOMDataTree = distributedShardedDOMDataTree;
}
@Override
public void close() {
- // TODO send the correct messages to ShardManager to destroy the shard
- // maybe we could provide replica removal mechanisms also?
- shardedDataTreeActor.tell(new PrefixShardRemoved(prefix), noSender());
- registration.close();
+ // first despawn on the local node
+ distributedShardedDOMDataTree.despawnShardFrontend(prefix);
+ // update the config so the remote nodes are updated
+ shardedDataTreeActor.tell(new RemovePrefixShard(prefix), noSender());
}
}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.sharding;
-
-import com.google.common.base.Preconditions;
-import java.util.Collection;
-import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
-import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardProducer;
-import org.opendaylight.mdsal.dom.spi.shard.ReadableWriteableDOMDataTreeShard;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-/**
- * Proxy implementation of a shard that creates forwarding producers to the backend shard.
- */
-class ShardFrontend implements ReadableWriteableDOMDataTreeShard {
-
- private final DataStoreClient client;
- private final DOMDataTreeIdentifier shardRoot;
-
- ShardFrontend(final DataStoreClient client,
- final DOMDataTreeIdentifier shardRoot) {
- this.client = Preconditions.checkNotNull(client);
- this.shardRoot = Preconditions.checkNotNull(shardRoot);
- }
-
- @Override
- public DOMDataTreeShardProducer createProducer(final Collection<DOMDataTreeIdentifier> paths) {
- return new ShardProxyProducer(shardRoot, paths, client);
- }
-
- @Override
- public void onChildAttached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
- // TODO message directly into the shard
- }
-
- @Override
- public void onChildDetached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
- // TODO message directly into the shard
- }
-
- @Nonnull
- @Override
- public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
- final YangInstanceIdentifier treeId, final L listener) {
- throw new UnsupportedOperationException("Registering data tree change listener is not supported");
- }
-}
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardProducer;
import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
+import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataTreeShard;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Proxy producer implementation that creates transactions that forward all calls to {@link DataStoreClient}.
*/
class ShardProxyProducer implements DOMDataTreeShardProducer {
+ private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataTreeShard.class);
+ private static final AtomicLong COUNTER = new AtomicLong();
+
private final DOMDataTreeIdentifier shardRoot;
private final Collection<DOMDataTreeIdentifier> prefixes;
- private final DataStoreClient client;
+ private final ClientLocalHistory history;
+ private DistributedShardModificationFactory modificationFactory;
- ShardProxyProducer(final DOMDataTreeIdentifier shardRoot, final Collection<DOMDataTreeIdentifier> prefixes,
- final DataStoreClient client) {
+ ShardProxyProducer(final DOMDataTreeIdentifier shardRoot,
+ final Collection<DOMDataTreeIdentifier> prefixes,
+ final DataStoreClient client,
+ final DistributedShardModificationFactory modificationFactory) {
this.shardRoot = Preconditions.checkNotNull(shardRoot);
this.prefixes = ImmutableList.copyOf(Preconditions.checkNotNull(prefixes));
- this.client = Preconditions.checkNotNull(client);
+ this.modificationFactory = Preconditions.checkNotNull(modificationFactory);
+ history = Preconditions.checkNotNull(client).createLocalHistory();
}
@Nonnull
@Override
public DOMDataTreeShardWriteTransaction createTransaction() {
- return new ShardProxyTransaction(shardRoot, prefixes, client);
+ return new ShardProxyTransaction(shardRoot, prefixes,
+ modificationFactory.createModification(history.createTransaction()));
+ }
+
+ DistributedShardModificationFactory getModificationFactory() {
+ return modificationFactory;
+ }
+
+ void setModificationFactory(final DistributedShardModificationFactory modificationFactory) {
+ this.modificationFactory = Preconditions.checkNotNull(modificationFactory);
}
}
package org.opendaylight.controller.cluster.sharding;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
+import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.mdsal.dom.store.inmemory.ForeignShardThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction {
private static final Logger LOG = LoggerFactory.getLogger(ShardProxyTransaction.class);
- private static final ListenableFuture<Void> NULL_FUTURE = Futures.immediateFuture(null);
- private static final ListenableFuture<Boolean> VALIDATE_FUTURE = Futures.immediateFuture(true);
private final DOMDataTreeIdentifier shardRoot;
private final Collection<DOMDataTreeIdentifier> prefixes;
- private final DataStoreClient client;
- private final ClientLocalHistory history;
+ private final DistributedShardModification modification;
private ClientTransaction currentTx;
- private DOMStoreThreePhaseCommitCohort cohort;
+ private final List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
+ private DOMDataTreeWriteCursor cursor = null;
- ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot, final Collection<DOMDataTreeIdentifier> prefixes,
- final DataStoreClient client) {
+ ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot,
+ final Collection<DOMDataTreeIdentifier> prefixes,
+ final DistributedShardModification modification) {
this.shardRoot = Preconditions.checkNotNull(shardRoot);
this.prefixes = Preconditions.checkNotNull(prefixes);
- this.client = Preconditions.checkNotNull(client);
- history = client.createLocalHistory();
- currentTx = history.createTransaction();
+ this.modification = Preconditions.checkNotNull(modification);
+ }
+
+ private DOMDataTreeWriteCursor getCursor() {
+ if (cursor == null) {
+ cursor = new DistributedShardModificationCursor(modification, this);
+ }
+ return cursor;
}
@Nonnull
@Override
public DOMDataTreeWriteCursor createCursor(@Nonnull final DOMDataTreeIdentifier prefix) {
checkAvailable(prefix);
+ final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
+ final DOMDataTreeWriteCursor ret = getCursor();
+ ret.enter(relativePath.getPathArguments());
+ return ret;
+ }
- return currentTx.openCursor();
+ void cursorClosed() {
+ cursor = null;
+ modification.cursorClosed();
}
private void checkAvailable(final DOMDataTreeIdentifier prefix) {
+ "Available prefixes: " + prefixes);
}
+ private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
+ final Optional<YangInstanceIdentifier> relative =
+ path.relativeTo(modification.getPrefix().getRootIdentifier());
+ Preconditions.checkArgument(relative.isPresent());
+ return relative.get();
+ }
+
@Override
public void ready() {
LOG.debug("Readying transaction for shard {}", shardRoot);
- Preconditions.checkState(cohort == null, "Transaction was readied already");
- cohort = currentTx.ready();
- currentTx = null;
+ Preconditions.checkNotNull(modification, "Attempting to ready an empty transaction.");
+
+ cohorts.add(modification.seal());
+ for (Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry
+ : modification.getChildShards().entrySet()) {
+ cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
+ }
}
@Override
public void close() {
- if (cohort != null) {
- cohort.abort();
- cohort = null;
- }
+ cohorts.forEach(DOMStoreThreePhaseCommitCohort::abort);
+ cohorts.clear();
+
if (currentTx != null) {
currentTx.abort();
currentTx = null;
public ListenableFuture<Void> submit() {
LOG.debug("Submitting transaction for shard {}", shardRoot);
- Preconditions.checkNotNull(cohort, "Transaction not readied yet");
- return NULL_FUTURE;
+ Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
+
+ final AsyncFunction<Boolean, Void> validateFunction = input -> prepare();
+ final AsyncFunction<Void, Void> prepareFunction = input -> commit();
+
+ // transform validate into prepare
+ final ListenableFuture<Void> prepareFuture = Futures.transform(validate(), validateFunction);
+ // transform prepare into commit and return as submit result
+ return Futures.transform(prepareFuture, prepareFunction);
}
@Override
public ListenableFuture<Boolean> validate() {
LOG.debug("Validating transaction for shard {}", shardRoot);
- Preconditions.checkNotNull(cohort, "Transaction not readied yet");
- return VALIDATE_FUTURE;
+ Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
+ final List<ListenableFuture<Boolean>> futures =
+ cohorts.stream().map(DOMStoreThreePhaseCommitCohort::canCommit).collect(Collectors.toList());
+ final SettableFuture<Boolean> ret = SettableFuture.create();
+
+ Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Boolean>>() {
+ @Override
+ public void onSuccess(final List<Boolean> result) {
+ ret.set(true);
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ ret.setException(throwable);
+ }
+ });
+
+ return ret;
}
@Override
public ListenableFuture<Void> prepare() {
LOG.debug("Preparing transaction for shard {}", shardRoot);
- Preconditions.checkNotNull(cohort, "Transaction not readied yet");
- return NULL_FUTURE;
+ Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
+ final List<ListenableFuture<Void>> futures =
+ cohorts.stream().map(DOMStoreThreePhaseCommitCohort::preCommit).collect(Collectors.toList());
+ final SettableFuture<Void> ret = SettableFuture.create();
+
+ Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
+ @Override
+ public void onSuccess(final List<Void> result) {
+ ret.set(null);
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ ret.setException(throwable);
+ }
+ });
+
+ return ret;
}
@Override
public ListenableFuture<Void> commit() {
LOG.debug("Committing transaction for shard {}", shardRoot);
- Preconditions.checkNotNull(cohort, "Transaction not readied yet");
- return NULL_FUTURE;
+ Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
+ final List<ListenableFuture<Void>> futures =
+ cohorts.stream().map(DOMStoreThreePhaseCommitCohort::commit).collect(Collectors.toList());
+ final SettableFuture<Void> ret = SettableFuture.create();
+
+ Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
+ @Override
+ public void onSuccess(final List<Void> result) {
+ ret.set(null);
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ ret.setException(throwable);
+ }
+ });
+
+ return ret;
}
}
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Status;
+import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.MemberExited;
import akka.cluster.ClusterEvent.MemberRemoved;
import akka.cluster.ClusterEvent.ReachableMember;
import akka.cluster.ClusterEvent.UnreachableMember;
import akka.cluster.Member;
+import akka.cluster.ddata.DistributedData;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.Replicator;
+import akka.cluster.ddata.Replicator.Changed;
+import akka.cluster.ddata.Replicator.Subscribe;
+import akka.cluster.ddata.Replicator.Update;
import akka.util.Timeout;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Sets.SetView;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
-import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
-import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import scala.compat.java8.FutureConverters;
private static final String PERSISTENCE_ID = "sharding-service-actor";
private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
- private final DOMDataTreeService dataTreeService;
- private final DOMDataTreeShardingService shardingService;
+ private final DistributedShardedDOMDataTree shardingService;
private final ActorSystem actorSystem;
- private final ClusterWrapper cluster;
+ private final ClusterWrapper clusterWrapper;
// helper actorContext used only for static calls to executeAsync etc
// for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
private final ActorContext actorContext;
private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
+ private final Cluster cluster;
+ private final ActorRef replicator;
+
+ private ORMap<PrefixShardConfiguration> currentData = ORMap.create();
+ private Map<DOMDataTreeIdentifier, PrefixShardConfiguration> currentConfiguration = new HashMap<>();
+
ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
- dataTreeService = builder.getDataTreeService();
shardingService = builder.getShardingService();
actorSystem = builder.getActorSystem();
- cluster = builder.getClusterWrapper();
+ clusterWrapper = builder.getClusterWrapper();
distributedConfigDatastore = builder.getDistributedConfigDatastore();
distributedOperDatastore = builder.getDistributedOperDatastore();
actorContext = distributedConfigDatastore.getActorContext();
resolver = new ShardingServiceAddressResolver(
- DistributedShardedDOMDataTree.ACTOR_ID, cluster.getCurrentMemberName());
+ DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
+
+ clusterWrapper.subscribeToMemberEvents(self());
+ cluster = Cluster.get(actorSystem);
+
+ replicator = DistributedData.get(context().system()).replicator();
+ }
- cluster.subscribeToMemberEvents(self());
+ @Override
+ public void preStart() {
+ final Subscribe<ORMap<PrefixShardConfiguration>> subscribe =
+ new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self());
+ replicator.tell(subscribe, noSender());
}
@Override
@Override
protected void handleCommand(final Object message) throws Exception {
+ LOG.debug("Received {}", message);
if (message instanceof ClusterEvent.MemberUp) {
memberUp((ClusterEvent.MemberUp) message);
} else if (message instanceof ClusterEvent.MemberWeaklyUp) {
memberUnreachable((ClusterEvent.UnreachableMember) message);
} else if (message instanceof ClusterEvent.ReachableMember) {
memberReachable((ClusterEvent.ReachableMember) message);
+ } else if (message instanceof Changed) {
+ onConfigChanged((Changed) message);
} else if (message instanceof ProducerCreated) {
onProducerCreated((ProducerCreated) message);
} else if (message instanceof NotifyProducerCreated) {
}
}
+ private void onConfigChanged(final Changed<ORMap<PrefixShardConfiguration>> change) {
+ LOG.debug("member : {}, Received configuration changed: {}", clusterWrapper.getCurrentMemberName(), change);
+
+ currentData = change.dataValue();
+ final Map<String, PrefixShardConfiguration> changedConfig = change.dataValue().getEntries();
+
+ LOG.debug("Changed set {}", changedConfig);
+
+ try {
+ final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig =
+ changedConfig.values().stream().collect(
+ Collectors.toMap(PrefixShardConfiguration::getPrefix, Function.identity()));
+ resolveConfig(newConfig);
+ } catch (final IllegalStateException e) {
+ LOG.error("Failed, ", e);
+ }
+
+ }
+
+ private void resolveConfig(final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig) {
+
+ // get the removed configurations
+ final SetView<DOMDataTreeIdentifier> deleted =
+ Sets.difference(currentConfiguration.keySet(), newConfig.keySet());
+ shardingService.resolveShardRemovals(deleted);
+
+ // get the added configurations
+ final SetView<DOMDataTreeIdentifier> additions =
+ Sets.difference(newConfig.keySet(), currentConfiguration.keySet());
+ shardingService.resolveShardAdditions(additions);
+ // we can ignore those that existed previously since the potential changes in replicas will be handled by
+ // shard manager.
+
+ currentConfiguration = new HashMap<>(newConfig);
+ }
+
@Override
public String persistenceId() {
return PERSISTENCE_ID;
private void onProducerCreated(final ProducerCreated message) {
LOG.debug("Received ProducerCreated: {}", message);
+
+ // fastpath if no replication is needed, since there is only one node
+ if (resolver.getShardingServicePeerActorAddresses().size() == 1) {
+ getSender().tell(new Status.Success(null), noSender());
+ }
+
final ActorRef sender = getSender();
final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
futures.toArray(new CompletableFuture[futures.size()]));
combinedFuture.thenRun(() -> {
- for (final CompletableFuture<Object> future : futures) {
- try {
- final Object result = future.get();
- if (result instanceof Status.Failure) {
- sender.tell(result, self());
- return;
- }
- } catch (InterruptedException | ExecutionException e) {
- sender.tell(new Status.Failure(e), self());
- return;
- }
- }
sender.tell(new Status.Success(null), noSender());
}).exceptionally(throwable -> {
sender.tell(new Status.Failure(throwable), self());
try {
final ActorProducerRegistration registration =
- new ActorProducerRegistration(dataTreeService.createProducer(subtrees), subtrees);
+ new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
subtrees.forEach(id -> idToProducer.put(id, registration));
sender().tell(new Status.Success(null), self());
} catch (final IllegalArgumentException e) {
@SuppressWarnings("checkstyle:IllegalCatch")
private void onCreatePrefixShard(final CreatePrefixShard message) {
- LOG.debug("Received CreatePrefixShard: {}", message);
+ LOG.debug("Member: {}, Received CreatePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
final PrefixShardConfiguration configuration = message.getConfiguration();
- final DOMDataTreeProducer producer =
- dataTreeService.createProducer(Collections.singleton(configuration.getPrefix()));
-
- final DistributedDataStore distributedDataStore =
- configuration.getPrefix().getDatastoreType() == LogicalDatastoreType.CONFIGURATION
- ? distributedConfigDatastore : distributedOperDatastore;
- final String shardName = ClusterUtils.getCleanShardName(configuration.getPrefix().getRootIdentifier());
- LOG.debug("Creating distributed datastore client for shard {}", shardName);
- final Props distributedDataStoreClientProps =
- SimpleDataStoreClientActor.props(cluster.getCurrentMemberName(),
- "Shard-" + shardName, distributedDataStore.getActorContext(), shardName);
-
- final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
- final DataStoreClient client;
- try {
- client = SimpleDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
- } catch (final Exception e) {
- LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
- clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- throw Throwables.propagate(e);
- }
-
- try {
- final ListenerRegistration<ShardFrontend> shardFrontendRegistration =
- shardingService.registerDataTreeShard(configuration.getPrefix(),
- new ShardFrontend(
- client,
- configuration.getPrefix()
- ),
- producer);
- idToShardRegistration.put(configuration.getPrefix(),
- new ShardFrontendRegistration(clientActor, shardFrontendRegistration));
+ final Update<ORMap<PrefixShardConfiguration>> update =
+ new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
+ map -> map.put(cluster, configuration.toDataMapKey(), configuration));
- sender().tell(new Status.Success(null), self());
- } catch (final DOMDataTreeShardingConflictException e) {
- LOG.error("Unable to create shard", e);
- clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- sender().tell(new Status.Failure(e), self());
- } finally {
- try {
- producer.close();
- } catch (final DOMDataTreeProducerException e) {
- LOG.error("Unable to close producer that was used for shard registration {}", producer, e);
- }
- }
+ replicator.tell(update, self());
}
private void onPrefixShardCreated(final PrefixShardCreated message) {
- LOG.debug("Received PrefixShardCreated: {}", message);
+ LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
final Collection<String> addresses = resolver.getShardingServicePeerActorAddresses();
final ActorRef sender = getSender();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
combinedFuture.thenRun(() -> {
- for (final CompletableFuture<Object> future : futures) {
- try {
- final Object result = future.get();
- if (result instanceof Status.Failure) {
- sender.tell(result, self());
- return;
- }
- } catch (InterruptedException | ExecutionException e) {
- sender.tell(new Status.Failure(e), self());
- return;
- }
- }
sender.tell(new Status.Success(null), self());
}).exceptionally(throwable -> {
sender.tell(new Status.Failure(throwable), self());
}
private void onRemovePrefixShard(final RemovePrefixShard message) {
- LOG.debug("Received RemovePrefixShard: {}", message);
+ LOG.debug("Member: {}, Received RemovePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
- for (final String address : resolver.getShardingServicePeerActorAddresses()) {
- final ActorSelection selection = actorContext.actorSelection(address);
- selection.tell(new PrefixShardRemoved(message.getPrefix()), getSelf());
- }
+ //TODO the removal message should have the configuration or some other way to get to the key
+ final Update<ORMap<PrefixShardConfiguration>> removal =
+ new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
+ map -> map.remove(cluster, "prefix=" + message.getPrefix()));
+ replicator.tell(removal, self());
}
private void onPrefixShardRemoved(final PrefixShardRemoved message) {
}
private static class ShardFrontendRegistration extends
- AbstractObjectRegistration<ListenerRegistration<ShardFrontend>> {
+ AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
private final ActorRef clientActor;
- private final ListenerRegistration<ShardFrontend> shardRegistration;
+ private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
ShardFrontendRegistration(final ActorRef clientActor,
- final ListenerRegistration<ShardFrontend> shardRegistration) {
+ final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
super(shardRegistration);
this.clientActor = clientActor;
this.shardRegistration = shardRegistration;
public static class ShardedDataTreeActorCreator {
- private DOMDataTreeService dataTreeService;
- private DOMDataTreeShardingService shardingService;
+ private DistributedShardedDOMDataTree shardingService;
private DistributedDataStore distributedConfigDatastore;
private DistributedDataStore distributedOperDatastore;
private ActorSystem actorSystem;
private ClusterWrapper cluster;
- public DOMDataTreeService getDataTreeService() {
- return dataTreeService;
- }
-
- public ShardedDataTreeActorCreator setDataTreeService(final DOMDataTreeService dataTreeService) {
- this.dataTreeService = dataTreeService;
- return this;
- }
-
- public DOMDataTreeShardingService getShardingService() {
+ public DistributedShardedDOMDataTree getShardingService() {
return shardingService;
}
- public ShardedDataTreeActorCreator setShardingService(final DOMDataTreeShardingService shardingService) {
+ public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
this.shardingService = shardingService;
return this;
}
}
private void verify() {
- Preconditions.checkNotNull(dataTreeService);
Preconditions.checkNotNull(shardingService);
Preconditions.checkNotNull(actorSystem);
Preconditions.checkNotNull(cluster);
public PrefixShardConfiguration getConfiguration() {
return configuration;
}
+
+
+ @Override
+ public String toString() {
+ return "CreatePrefixShard{"
+ + "configuration="
+ + configuration
+ + '}';
+ }
}
public PrefixShardConfiguration getConfiguration() {
return configuration;
}
+
+ @Override
+ public String toString() {
+ return "PrefixShardCreated{"
+ + "configuration=" + configuration
+ + '}';
+ }
}
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
/**
- * Message sent to remote {@link ShardedDataTreeActor}'s when there is an
- * attempt to remove a shard from the sharding service.
+ * Message sent to remote {@link ShardedDataTreeActor}'s when there is an attempt to remove the shard,
+ * the ShardedDataTreeActor should remove the shard from the current configuration so that the change is picked up
+ * in the backend ShardManager.
*/
@Beta
public class PrefixShardRemoved implements Serializable {
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
/**
- * Sent to the local {@link ShardedDataTreeActor} to notify of a shard removal on the local node. The local actor
- * should then notify the remote nodes of the Removal with {@link PrefixShardRemoved} message.
+ * Sent to the local {@link ShardedDataTreeActor} to notify of a shard removal on the local node.
+ * The local actor should update the configuration so that the change is picked up by other CDS Node Agents and
+ * backend ShardManagers.
*/
public class RemovePrefixShard {
public DOMDataTreeIdentifier getPrefix() {
return prefix;
}
+
+ @Override
+ public String toString() {
+ return "RemovePrefixShard{"
+ + "prefix=" + prefix
+ + '}';
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public abstract class AbstractClusterRefActorTest extends AbstractTest {
+ private static ActorSystem system;
+
+ @BeforeClass
+ public static void setUpClass() throws IOException {
+ System.setProperty("shard.persistent", "false");
+ system = ActorSystem.create("test", ConfigFactory.load().getConfig("test"));
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws IOException {
+ JavaTestKit.shutdownActorSystem(system);
+ system = null;
+ }
+
+ protected static ActorSystem getSystem() {
+ return system;
+ }
+}
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-public class AbstractShardManagerTest extends AbstractActorTest {
+public class AbstractShardManagerTest extends AbstractClusterRefActorTest {
protected static final MemberName MEMBER_1 = MemberName.forName("member-1");
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
private final Configuration configuration = new MockConfiguration() {
Map<String, ShardStrategy> strategyMap = ImmutableMap.<String, ShardStrategy>builder().put(
- "junk", path -> "junk").put(
- "cars", path -> "cars").build();
+ "junk", new ShardStrategy() {
+ @Override
+ public String findShard(YangInstanceIdentifier path) {
+ return "junk";
+ }
+
+ @Override
+ public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) {
+ return YangInstanceIdentifier.EMPTY;
+ }
+ }).put(
+ "cars", new ShardStrategy() {
+ @Override
+ public String findShard(YangInstanceIdentifier path) {
+ return "cars";
+ }
+
+ @Override
+ public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) {
+ return YangInstanceIdentifier.EMPTY;
+ }
+ }).build();
@Override
public ShardStrategy getStrategyForModule(String moduleName) {
doReturn(getSystem()).when(mockActorContext).getActorSystem();
doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
- doReturn(new ShardStrategyFactory(configuration)).when(mockActorContext).getShardStrategyFactory();
+ doReturn(new ShardStrategyFactory(configuration,
+ LogicalDatastoreType.CONFIGURATION)).when(mockActorContext).getShardStrategyFactory();
doReturn(schemaContext).when(mockActorContext).getSchemaContext();
doReturn(new Timeout(operationTimeoutInSeconds, TimeUnit.SECONDS)).when(mockActorContext).getOperationTimeout();
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
}
private static ShardManagerSnapshot newShardManagerSnapshot(String... shards) {
- return new ShardManagerSnapshot(Arrays.asList(shards));
+ return new ShardManagerSnapshot(Arrays.asList(shards), Collections.emptyMap());
}
private static Snapshot newSnapshot(YangInstanceIdentifier path, NormalizedNode<?, ?> node)
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
public class IntegrationTestKit extends ShardTestKit {
+ private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestKit.class);
+
protected DatastoreContext.Builder datastoreContextBuilder;
protected DatastoreSnapshot restoreFromSnapshot;
return shard;
}
+ public static void waitUntilShardIsDown(ActorContext actorContext, String shardName) {
+ for (int i = 0; i < 20 * 5 ; i++) {
+ LOG.debug("Waiting for shard down {}", shardName);
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ Optional<ActorRef> shardReply = actorContext.findLocalShard(shardName);
+ if (!shardReply.isPresent()) {
+ return;
+ }
+ }
+
+ throw new IllegalStateException("Shard[" + shardName + " did not shutdown in time");
+ }
+
public static void verifyShardStats(final AbstractDataStore datastore, final String shardName,
final ShardStatsVerifier verifier) throws Exception {
ActorContext actorContext = datastore.getActorContext();
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.AddressFromURIString;
import akka.actor.Status.Success;
-import akka.cluster.Cluster;
-import akka.dispatch.Dispatchers;
import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import com.google.common.collect.Lists;
-import com.typesafe.config.ConfigFactory;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.Shard.Builder;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
-import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerTest.TestShardManager;
import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
private static final DOMDataTreeIdentifier TEST_ID =
new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
- private static final MemberName MEMBER_2 = MemberName.forName("member-2");
-
@Test
public void testPrefixShardCreation() throws Exception {
}
};
}
-
- @Test
- public void testPrefixShardReplicas() throws Exception {
- LOG.info("testPrefixShardReplicas starting");
- final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
-
- // Create ACtorSystem for member-1
- final ActorSystem system1 = newActorSystem("Member1");
- Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
-
- final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
- newTestShardMgrBuilder(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
- .waitTillReadyCountDownLatch(ready)
- .cluster(new ClusterWrapperImpl(system1))
- .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
- shardManagerID);
-
- // Create an ActorSystem ShardManager actor for member-2.
-
- final ActorSystem system2 = newActorSystem("Member2");
-
- Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
-
- final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
- newTestShardMgrBuilder()
- .configuration(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
- .waitTillReadyCountDownLatch(ready)
- .cluster(new ClusterWrapperImpl(system2)).props().withDispatcher(
- Dispatchers.DefaultDispatcherId()),
- shardManagerID);
-
- final JavaTestKit kit2 = new JavaTestKit(system2);
-
- new JavaTestKit(system1) {
- {
- shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
-
- // check shard does not exist
- shardManager1.tell(new FindLocalShard(
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
- expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
-
- shardManager2.tell(new FindLocalShard(
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
- kit2.expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
-
- // create shard on node1
- final Builder builder = Shard.builder();
-
- final CreatePrefixedShard createPrefixedShard = new CreatePrefixedShard(
- new PrefixShardConfiguration(TEST_ID,
- PrefixShardStrategy.NAME,
- Lists.newArrayList(MEMBER_1, MEMBER_2)),
- datastoreContextBuilder.build(), builder);
-
- shardManager1.tell(createPrefixedShard, getRef());
- expectMsgClass(duration("5 seconds"), Success.class);
-
- shardManager1.underlyingActor().waitForMemberUp();
-
- LOG.info("changed leader state");
-
- // check node2 cannot find it locally
- shardManager1.tell(new FindLocalShard(
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
- expectMsgClass(duration("5 seconds"), LocalShardFound.class);
-
- shardManager2.tell(new FindLocalShard(
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
- kit2.expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
-
- // but can remotely
- shardManager2.tell(new FindPrimary(
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
- kit2.expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
-
- // add replica and verify if succesful
- shardManager2.tell(new AddPrefixShardReplica(TEST_ID.getRootIdentifier()), kit2.getRef());
- kit2.expectMsgClass(duration("5 seconds"), Success.class);
-
- // verify we have a replica on manager2 now
- shardManager2.tell(new FindLocalShard(
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
- kit2.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
- }
- };
- }
-
- private ActorSystem newActorSystem(final String config) {
- final ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
- actorSystems.add(system);
- return system;
- }
-}
+}
\ No newline at end of file
final ReadyTransactionReply readyReply = ReadyTransactionReply
.fromSerializable(expectMsgClass(duration, ReadyTransactionReply.class));
assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
-
// Send the CanCommitTransaction message for the first Tx.
shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class AbstractClusterRefEntityOwnershipTest extends AbstractEntityOwnershipTest {
+
+ private static ActorSystem system;
+
+ @BeforeClass
+ public static void setUpClass() throws IOException {
+ system = ActorSystem.create("test", ConfigFactory.load().getConfig("test"));
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws IOException {
+ JavaTestKit.shutdownActorSystem(system);
+ system = null;
+ }
+
+ protected static ActorSystem getSystem() {
+ return system;
+ }
+}
*
* @author Thomas Pantelis
*/
-public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnershipTest {
+public class DistributedEntityOwnershipServiceTest extends AbstractClusterRefEntityOwnershipTest {
static final String ENTITY_TYPE = "test";
static final String ENTITY_TYPE2 = "test2";
static final QName QNAME = QName.create("test", "2015-08-11", "foo");
assertEquals("DatastoreSnapshotList size", 2, cloned.size());
assertDatastoreSnapshotEquals(legacyConfigSnapshot, cloned.get(0),
new org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot(
- legacyShardManagerSnapshot.getShardList()),
+ legacyShardManagerSnapshot.getShardList(), Collections.emptyMap()),
Optional.of(legacyConfigRoot1), Optional.of(legacyConfigRoot2));
assertDatastoreSnapshotEquals(legacyOperSnapshot, cloned.get(1),
(org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot)null,
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
+import java.util.Collections;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
@Test
public void testSerialization() {
- ShardManagerSnapshot expected = new ShardManagerSnapshot(Arrays.asList("shard1", "shard2"));
+ ShardManagerSnapshot expected =
+ new ShardManagerSnapshot(Arrays.asList("shard1", "shard2"), Collections.emptyMap());
ShardManagerSnapshot cloned = (ShardManagerSnapshot) SerializationUtils.clone(expected);
assertEquals("getShardList", expected.getShardList(), cloned.getShardList());
JavaTestKit kit = new JavaTestKit(getSystem());
List<String> shardList = Arrays.asList("shard1", "shard2", "shard3");
- ShardManagerSnapshot shardManagerSnapshot = new ShardManagerSnapshot(shardList);
+ ShardManagerSnapshot shardManagerSnapshot = new ShardManagerSnapshot(shardList, Collections.emptyMap());
ActorRef replyActor = getSystem().actorOf(ShardManagerGetSnapshotReplyActor.props(
shardList, "config", shardManagerSnapshot, kit.getRef(),
"shard-manager", Duration.create(100, TimeUnit.SECONDS)), "testSuccess");
.put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
.put("astronauts", Collections.<String>emptyList()).build());
- ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
+ ShardManagerSnapshot snapshot =
+ new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"), Collections.emptyMap());
DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
Collections.<ShardSnapshot>emptyList());
TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
// Have a dummy snapshot to be overwritten by the new data
// persisted.
String[] restoredShards = { "default", "people" };
- ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
+ ShardManagerSnapshot snapshot =
+ new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
LOG.info("testShardPersistenceWithRestoredData starting");
new JavaTestKit(getSystem()) {
{
- MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
- .put("default", Arrays.asList("member-1", "member-2"))
- .put("astronauts", Arrays.asList("member-2"))
- .put("people", Arrays.asList("member-1", "member-2")).build());
- String[] restoredShards = { "default", "astronauts" };
- ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
+ MockConfiguration mockConfig =
+ new MockConfiguration(ImmutableMap.<String, List<String>>builder()
+ .put("default", Arrays.asList("member-1", "member-2"))
+ .put("astronauts", Arrays.asList("member-2"))
+ .put("people", Arrays.asList("member-1", "member-2")).build());
+ String[] restoredShards = {"default", "astronauts"};
+ ShardManagerSnapshot snapshot =
+ new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
// create shardManager to come up with restored data
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
public class ShardStrategyFactoryTest {
@Before
public void setUp() {
- factory = new ShardStrategyFactory(new ConfigurationImpl("module-shards.conf", "modules.conf"));
+ factory = new ShardStrategyFactory(
+ new ConfigurationImpl("module-shards.conf", "modules.conf"), LogicalDatastoreType.CONFIGURATION);
}
@Test
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
import org.opendaylight.controller.cluster.datastore.config.ModuleConfig;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
public class MockConfiguration extends ConfigurationImpl {
public MockConfiguration() {
}
@Override
- public ShardStrategy getStrategyForPrefix(@Nonnull final YangInstanceIdentifier prefix) {
+ public ShardStrategy getStrategyForPrefix(@Nonnull final DOMDataTreeIdentifier prefix) {
return null;
}
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collections;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+public class DistributedShardFrontendTest {
+
+ private static final DOMDataTreeIdentifier ROOT =
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
+ private static final ListenableFuture<Object> SUCCESS_FUTURE = Futures.immediateFuture(null);
+
+ private ShardedDOMDataTree shardedDOMDataTree;
+
+ private DataStoreClient client;
+ private ClientLocalHistory clientHistory;
+ private ClientTransaction clientTransaction;
+ private DOMDataTreeWriteCursor cursor;
+
+ private static final YangInstanceIdentifier OUTER_LIST_YID = TestModel.OUTER_LIST_PATH.node(
+ new NodeIdentifierWithPredicates(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
+ private static final DOMDataTreeIdentifier OUTER_LIST_ID =
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, OUTER_LIST_YID);
+
+ @Captor
+ private ArgumentCaptor<YangInstanceIdentifier.PathArgument> pathArgumentCaptor;
+ @Captor
+ private ArgumentCaptor<NormalizedNode<?, ?>> nodeCaptor;
+
+ private DOMStoreThreePhaseCommitCohort commitCohort;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ shardedDOMDataTree = new ShardedDOMDataTree();
+ client = mock(DataStoreClient.class);
+ cursor = mock(DOMDataTreeWriteCursor.class);
+ clientTransaction = mock(ClientTransaction.class);
+ clientHistory = mock(ClientLocalHistory.class);
+ commitCohort = mock(DOMStoreThreePhaseCommitCohort.class);
+
+ doReturn(SUCCESS_FUTURE).when(commitCohort).canCommit();
+ doReturn(SUCCESS_FUTURE).when(commitCohort).preCommit();
+ doReturn(SUCCESS_FUTURE).when(commitCohort).commit();
+ doReturn(SUCCESS_FUTURE).when(commitCohort).abort();
+
+ doReturn(clientTransaction).when(client).createTransaction();
+ doReturn(clientTransaction).when(clientHistory).createTransaction();
+ doNothing().when(clientHistory).close();
+
+ doNothing().when(client).close();
+ doReturn(clientHistory).when(client).createLocalHistory();
+
+ doReturn(cursor).when(clientTransaction).openCursor();
+ doNothing().when(cursor).close();
+ doNothing().when(cursor).write(any(), any());
+ doNothing().when(cursor).merge(any(), any());
+ doNothing().when(cursor).delete(any());
+
+ doReturn(commitCohort).when(clientTransaction).ready();
+ }
+
+ @Test
+ public void testClientTransaction() throws Exception {
+
+ final DistributedDataStore distributedDataStore = mock(DistributedDataStore.class);
+ final DistributedShardFrontend rootShard = new DistributedShardFrontend(distributedDataStore, client, ROOT);
+
+ try (final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(Collections.singletonList(ROOT))) {
+ shardedDOMDataTree.registerDataTreeShard(ROOT, rootShard, producer);
+ }
+
+ final DataStoreClient outerListClient = mock(DataStoreClient.class);
+ final ClientTransaction outerListClientTransaction = mock(ClientTransaction.class);
+ final ClientLocalHistory outerListClientHistory = mock(ClientLocalHistory.class);
+ final DOMDataTreeWriteCursor outerListCursor = mock(DOMDataTreeWriteCursor.class);
+
+ doNothing().when(outerListCursor).close();
+ doNothing().when(outerListCursor).write(any(), any());
+ doNothing().when(outerListCursor).merge(any(), any());
+ doNothing().when(outerListCursor).delete(any());
+
+ doReturn(outerListCursor).when(outerListClientTransaction).openCursor();
+ doReturn(outerListClientTransaction).when(outerListClient).createTransaction();
+ doReturn(outerListClientHistory).when(outerListClient).createLocalHistory();
+ doReturn(outerListClientTransaction).when(outerListClientHistory).createTransaction();
+
+ doReturn(commitCohort).when(outerListClientTransaction).ready();
+
+ doNothing().when(outerListClientHistory).close();
+ doNothing().when(outerListClient).close();
+
+ final DistributedShardFrontend outerListShard = new DistributedShardFrontend(
+ distributedDataStore, outerListClient, OUTER_LIST_ID);
+ try (final DOMDataTreeProducer producer =
+ shardedDOMDataTree.createProducer(Collections.singletonList(OUTER_LIST_ID))) {
+ shardedDOMDataTree.registerDataTreeShard(OUTER_LIST_ID, outerListShard, producer);
+ }
+
+ final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(Collections.singletonList(ROOT));
+ final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(false);
+ final DOMDataTreeWriteCursor cursor = tx.createCursor(ROOT);
+
+ assertNotNull(cursor);
+ cursor.write(TestModel.TEST_PATH.getLastPathArgument(), createCrossShardContainer());
+
+ //check the lower shard got the correct modification
+ verify(outerListCursor, times(2)).write(pathArgumentCaptor.capture(), nodeCaptor.capture());
+
+ final YangInstanceIdentifier.PathArgument expectedYid = new NodeIdentifier(TestModel.ID_QNAME);
+ final YangInstanceIdentifier.PathArgument actualIdYid = pathArgumentCaptor.getAllValues().get(0);
+ assertEquals(expectedYid, actualIdYid);
+
+ final YangInstanceIdentifier.PathArgument expectedInnerYid = new NodeIdentifier(TestModel.INNER_LIST_QNAME);
+ final YangInstanceIdentifier.PathArgument actualInnerListYid = pathArgumentCaptor.getAllValues().get(1);
+ assertEquals(expectedInnerYid, actualInnerListYid);
+
+ final LeafNode<Integer> actualIdNode = (LeafNode<Integer>) nodeCaptor.getAllValues().get(0);
+ assertEquals(ImmutableNodes.leafNode(TestModel.ID_QNAME, 1), actualIdNode);
+
+ final MapNode actualInnerListNode = (MapNode) nodeCaptor.getAllValues().get(1);
+ assertEquals(createInnerMapNode(1), actualInnerListNode);
+
+ cursor.close();
+ tx.submit().checkedGet();
+
+ verify(commitCohort, times(2)).canCommit();
+ verify(commitCohort, times(2)).preCommit();
+ verify(commitCohort, times(2)).commit();
+
+ }
+
+ private static MapNode createInnerMapNode(final int id) {
+ final MapEntryNode listEntry = ImmutableNodes
+ .mapEntryBuilder(TestModel.INNER_LIST_QNAME, TestModel.NAME_QNAME, "name-" + id)
+ .withChild(ImmutableNodes.leafNode(TestModel.NAME_QNAME, "name-" + id))
+ .withChild(ImmutableNodes.leafNode(TestModel.VALUE_QNAME, "value-" + id))
+ .build();
+
+ return ImmutableNodes.mapNodeBuilder(TestModel.INNER_LIST_QNAME).withChild(listEntry).build();
+ }
+
+ private static ContainerNode createCrossShardContainer() {
+
+ final MapEntryNode outerListEntry1 =
+ ImmutableNodes.mapEntryBuilder(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)
+ .withChild(createInnerMapNode(1))
+ .build();
+ final MapEntryNode outerListEntry2 =
+ ImmutableNodes.mapEntryBuilder(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2)
+ .withChild(createInnerMapNode(2))
+ .build();
+
+ final MapNode outerList = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
+ .withChild(outerListEntry1)
+ .withChild(outerListEntry2)
+ .build();
+
+ final ContainerNode testContainer = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
+ .withChild(outerList)
+ .build();
+
+ return testContainer;
+ }
+
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
+import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.AddressFromURIString;
+import akka.actor.PoisonPill;
+import akka.cluster.Cluster;
+import akka.cluster.ddata.DistributedData;
+import akka.testkit.JavaTestKit;
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
+import java.util.Collections;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractTest;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Ignore("Needs to have the configuration backend switched from distributed-data")
+public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
+
+ private static final Address MEMBER_1_ADDRESS =
+ AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
+
+ private static final DOMDataTreeIdentifier TEST_ID =
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+
+ private ActorSystem leaderSystem;
+ private ActorSystem followerSystem;
+
+
+ private final Builder leaderDatastoreContextBuilder =
+ DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
+
+ private final DatastoreContext.Builder followerDatastoreContextBuilder =
+ DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
+ .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+ private DistributedDataStore followerDistributedDataStore;
+ private DistributedDataStore leaderDistributedDataStore;
+ private IntegrationTestKit followerTestKit;
+ private IntegrationTestKit leaderTestKit;
+
+ private DistributedShardedDOMDataTree leaderShardFactory;
+ private DistributedShardedDOMDataTree followerShardFactory;
+
+ @Before
+ public void setUp() {
+
+ leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+ Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
+
+ followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+ Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
+
+ }
+
+ @After
+ public void tearDown() {
+ if (followerDistributedDataStore != null) {
+ followerDistributedDataStore.close();
+ }
+ if (leaderDistributedDataStore != null) {
+ leaderDistributedDataStore.close();
+ }
+
+ DistributedData.get(leaderSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ DistributedData.get(followerSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+ JavaTestKit.shutdownActorSystem(leaderSystem);
+ JavaTestKit.shutdownActorSystem(followerSystem);
+ }
+
+ private void initEmptyDatastores(final String type) {
+ leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
+
+ leaderDistributedDataStore =
+ leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
+
+ followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
+ followerDistributedDataStore =
+ followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
+
+ leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystem,
+ leaderDistributedDataStore,
+ leaderDistributedDataStore);
+
+ followerShardFactory = new DistributedShardedDOMDataTree(followerSystem,
+ followerDistributedDataStore,
+ followerDistributedDataStore);
+
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
+ }
+
+ @Test
+ @Ignore("Needs different shard creation handling due to replicas")
+ public void testProducerRegistrations() throws Exception {
+ initEmptyDatastores("config");
+
+ leaderTestKit.waitForMembersUp("member-2");
+
+ final DistributedShardRegistration shardRegistration =
+ leaderShardFactory.createDistributedShard(
+ TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
+
+ final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
+
+ assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
+
+ assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
+
+ final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
+ try {
+ followerShardFactory.createProducer(Collections.singleton(TEST_ID));
+ fail("Producer should be already registered on the other node");
+ } catch (final IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("is attached to producer"));
+ }
+
+ producer.close();
+
+ final DOMDataTreeProducer followerProducer =
+ followerShardFactory.createProducer(Collections.singleton(TEST_ID));
+ try {
+ leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
+ fail("Producer should be already registered on the other node");
+ } catch (final IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("is attached to producer"));
+ }
+
+ followerProducer.close();
+ // try to create a shard on an already registered prefix on follower
+ try {
+ followerShardFactory.createDistributedShard(TEST_ID,
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+ fail("This prefix already should have a shard registration that was forwarded from the other node");
+ } catch (final DOMDataTreeShardingConflictException e) {
+ assertTrue(e.getMessage().contains("is already occupied by shard"));
+ }
+ }
+
+ @Test
+ @Ignore("Needs different shard creation handling due to replicas")
+ public void testWriteIntoMultipleShards() throws Exception {
+ initEmptyDatastores("config");
+
+ leaderTestKit.waitForMembersUp("member-2");
+
+ LOG.warn("registering first shard");
+ final DistributedShardRegistration shardRegistration =
+ leaderShardFactory.createDistributedShard(TEST_ID,
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
+ findLocalShard(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
+
+ LOG.warn("Got after waiting for nonleader");
+ final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
+
+ new JavaTestKit(leaderSystem) {
+ {
+ leaderShardManager.tell(
+ new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+ final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
+
+ followerShardManager.tell(new FindLocalShard(
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef());
+ followerTestKit.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+ LOG.warn("Found follower shard");
+
+ leaderDistributedDataStore.getActorContext().getShardManager().tell(
+ new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+ expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
+ }
+ };
+
+ final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
+
+ final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
+ final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
+ Assert.assertNotNull(cursor);
+ final YangInstanceIdentifier nameId =
+ YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
+ cursor.write(nameId.getLastPathArgument(),
+ ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
+ new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
+
+ cursor.close();
+ LOG.warn("Got to pre submit");
+
+ tx.submit();
+ }
+
+ @Test
+ public void testMultipleShardRegistrations() throws Exception {
+ initEmptyDatastores("config");
+
+ final DistributedShardRegistration reg1 = leaderShardFactory
+ .createDistributedShard(TEST_ID,
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+ final DistributedShardRegistration reg2 = leaderShardFactory
+ .createDistributedShard(
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+ final DistributedShardRegistration reg3 = leaderShardFactory
+ .createDistributedShard(
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+ final DistributedShardRegistration reg4 = leaderShardFactory
+ .createDistributedShard(
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
+
+ // check leader has local shards
+ assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
+
+ assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
+
+ assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
+
+ assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
+
+ // check follower has local shards
+ assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
+
+ assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
+
+ assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
+
+ assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
+
+
+ LOG.debug("Closing registrations");
+
+ reg1.close();
+ reg2.close();
+ reg3.close();
+ reg4.close();
+
+ waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+
+ waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
+
+ waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
+
+ waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
+
+ LOG.debug("All leader shards gone");
+
+ waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+
+ waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
+
+ waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
+
+ waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
+
+ LOG.debug("All follower shards gone");
+ }
+
+ @Test
+ public void testMultipleRegistrationsAtOnePrefix() throws Exception {
+ initEmptyDatastores("config");
+
+ for (int i = 0; i < 10; i++) {
+ LOG.debug("Round {}", i);
+ final DistributedShardRegistration reg1 = leaderShardFactory
+ .createDistributedShard(TEST_ID,
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+
+ assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
+
+ assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
+
+ reg1.close();
+
+ waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+
+ waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+ }
+ }
+}
package org.opendaylight.controller.cluster.sharding;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertNotNull;
+import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
+import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.cluster.Cluster;
import akka.testkit.JavaTestKit;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.CheckedFuture;
import com.typesafe.config.ConfigFactory;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
-import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.AbstractTest;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
-import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
-import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
-import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMapNodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Ignore("distributed-data is broken needs to be removed")
public class DistributedShardedDOMDataTreeTest extends AbstractTest {
+ private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
+
private static final Address MEMBER_1_ADDRESS =
AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
private static final DOMDataTreeIdentifier TEST_ID =
new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
- private ShardedDOMDataTree shardedDOMDataTree = new ShardedDOMDataTree();
-
private ActorSystem leaderSystem;
- private ActorSystem followerSystem;
-
private final Builder leaderDatastoreContextBuilder =
- DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
+ DatastoreContext.newBuilder()
+ .shardHeartbeatIntervalInMillis(100)
+ .shardElectionTimeoutFactor(2)
+ .logicalStoreType(
+ org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION);
- private final DatastoreContext.Builder followerDatastoreContextBuilder =
- DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
- .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
-
- private DistributedDataStore followerDistributedDataStore;
private DistributedDataStore leaderDistributedDataStore;
- private IntegrationTestKit followerTestKit;
private IntegrationTestKit leaderTestKit;
private DistributedShardedDOMDataTree leaderShardFactory;
- private DistributedShardedDOMDataTree followerShardFactory;
@Before
public void setUp() {
- shardedDOMDataTree = new ShardedDOMDataTree();
leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
- followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
- Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
}
@After
public void tearDown() {
- if (followerDistributedDataStore != null) {
- leaderDistributedDataStore.close();
- }
if (leaderDistributedDataStore != null) {
leaderDistributedDataStore.close();
}
JavaTestKit.shutdownActorSystem(leaderSystem);
- JavaTestKit.shutdownActorSystem(followerSystem);
}
private void initEmptyDatastore(final String type) {
leaderDistributedDataStore =
leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
- followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
- followerDistributedDataStore =
- followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
-
leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystem,
- Mockito.mock(DistributedDataStore.class),
+ leaderDistributedDataStore,
leaderDistributedDataStore);
-
- followerShardFactory = new DistributedShardedDOMDataTree(followerSystem,
- Mockito.mock(DistributedDataStore.class),
- followerDistributedDataStore);
}
@Test
- public void testProducerRegistrations() throws Exception {
+ public void testWritesIntoDefaultShard() throws Exception {
initEmptyDatastore("config");
- final DistributedShardRegistration shardRegistration =
- leaderShardFactory.createDistributedShard(TEST_ID,
- Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+ leaderShardFactory.createDistributedShard(TEST_ID,
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
- ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
+ ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
- final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
+ final DOMDataTreeIdentifier configRoot =
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
- leaderShardManager.tell(
- new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), leaderTestKit.getRef());
- leaderTestKit.expectMsgClass(JavaTestKit.duration("10 seconds"), LocalShardFound.class);
-
- IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(),
- ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
-
- leaderShardManager.tell(
- new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), leaderTestKit.getRef());
- leaderTestKit.expectMsgClass(JavaTestKit.duration("10 seconds"), LocalPrimaryShardFound.class);
-
- final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
- followerShardManager.tell(
- new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef());
- followerTestKit.expectMsgClass(JavaTestKit.duration("10 seconds"), RemotePrimaryShardFound.class);
-
- final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
- try {
- followerShardFactory.createProducer(Collections.singleton(TEST_ID));
- fail("Producer should be already registered on the other node");
- } catch (final IllegalArgumentException e) {
- assertTrue(e.getMessage().contains("is attached to producer"));
- }
-
- producer.close();
-
- final DOMDataTreeProducer followerProducer =
- followerShardFactory.createProducer(Collections.singleton(TEST_ID));
- try {
- leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
- fail("Producer should be already registered on the other node");
- } catch (final IllegalArgumentException e) {
- assertTrue(e.getMessage().contains("is attached to producer"));
- }
+ final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(configRoot));
- followerProducer.close();
- // try to create a shard on an already registered prefix on follower
- try {
- followerShardFactory.createDistributedShard(TEST_ID,
- Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
- fail("This prefix already should have a shard registration that was forwarded from the other node");
- } catch (final DOMDataTreeShardingConflictException e) {
- assertTrue(e.getMessage().contains("is already occupied by shard"));
- }
+ final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
+ final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
+ Assert.assertNotNull(cursor);
}
@Test
- @Ignore("Needs some other stuff related to 5280")
- public void testWriteIntoMultipleShards() throws Exception {
+ public void testSingleNodeWrites() throws Exception {
initEmptyDatastore("config");
- final DistributedShardRegistration shardRegistration =
- leaderShardFactory.createDistributedShard(
- TEST_ID,Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+ leaderShardFactory.createDistributedShard(TEST_ID,
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+ final DistributedShardRegistration shardRegistration =
+ leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME));
leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
+ LOG.warn("Got after waiting for nonleader");
final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
new JavaTestKit(leaderSystem) {
new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
expectMsgClass(duration("5 seconds"), LocalShardFound.class);
- final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
-
- followerShardManager.tell(
- new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
- expectMsgClass(duration("5 seconds"), LocalShardFound.class);
-
leaderDistributedDataStore.getActorContext().getShardManager().tell(
new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
final YangInstanceIdentifier nameId =
YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
cursor.write(nameId.getLastPathArgument(),
- ImmutableLeafNodeBuilder.<String>create()
- .withNodeIdentifier(new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
+ ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
+ new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
cursor.close();
- tx.submit();
+ LOG.warn("Got to pre submit");
+ tx.submit().checkedGet();
+ }
+
+ @Test
+ public void testMultipleWritesIntoSingleMapEntry() throws Exception {
+ initEmptyDatastore("config");
+
+ final DistributedShardRegistration shardRegistration =
+ leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME));
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
+
+ LOG.warn("Got after waiting for nonleader");
+ final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+
+ final YangInstanceIdentifier oid1 = TestModel.OUTER_LIST_PATH.node(new NodeIdentifierWithPredicates(
+ TestModel.OUTER_LIST_QNAME, QName.create(TestModel.OUTER_LIST_QNAME, "id"), 0));
+ final DOMDataTreeIdentifier outerListPath = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, oid1);
+
+ final DistributedShardRegistration outerListShardReg = leaderShardFactory.createDistributedShard(outerListPath,
+ Lists.newArrayList(AbstractTest.MEMBER_NAME));
+
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(outerListPath.getRootIdentifier()));
+
+ final DOMDataTreeProducer shardProducer = leaderShardFactory.createProducer(
+ Collections.singletonList(outerListPath));
+
+ final DOMDataTreeCursorAwareTransaction tx = shardProducer.createTransaction(false);
+ final DOMDataTreeWriteCursor cursor =
+ tx.createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, oid1));
+ assertNotNull(cursor);
+
+ MapNode innerList = ImmutableMapNodeBuilder
+ .create()
+ .withNodeIdentifier(new NodeIdentifier(TestModel.INNER_LIST_QNAME))
+ .build();
+
+ cursor.write(new NodeIdentifier(TestModel.INNER_LIST_QNAME), innerList);
+ cursor.close();
+ tx.submit().checkedGet();
+
+ final ArrayList<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
+ for (int i = 0; i < 1000; i++) {
+ final Collection<MapEntryNode> innerListMapEntries = createInnerListMapEntries(1000, "run-" + i);
+ for (final MapEntryNode innerListMapEntry : innerListMapEntries) {
+ final DOMDataTreeCursorAwareTransaction tx1 = shardProducer.createTransaction(false);
+ final DOMDataTreeWriteCursor cursor1 = tx1.createCursor(
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
+ oid1.node(new NodeIdentifier(TestModel.INNER_LIST_QNAME))));
+ cursor1.write(innerListMapEntry.getIdentifier(), innerListMapEntry);
+ cursor1.close();
+ futures.add(tx1.submit());
+ }
+ }
+
+ futures.get(futures.size() - 1).checkedGet();
+
+ }
+
+ private static Collection<MapEntryNode> createInnerListMapEntries(final int amount, final String valuePrefix) {
+ final Collection<MapEntryNode> ret = new ArrayList<>();
+ for (int i = 0; i < amount; i++) {
+ ret.add(ImmutableNodes.mapEntryBuilder()
+ .withNodeIdentifier(new NodeIdentifierWithPredicates(TestModel.INNER_LIST_QNAME,
+ QName.create(TestModel.INNER_LIST_QNAME, "name"), Integer.toString(i)))
+ .withChild(ImmutableNodes
+ .leafNode(QName.create(TestModel.INNER_LIST_QNAME, "value"), valuePrefix + "-" + i))
+ .build());
+ }
+
+ return ret;
+ }
+
+ @Test
+ public void testDistributedData() throws Exception {
+ initEmptyDatastore("config");
+
+ leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME));
+ leaderShardFactory.createDistributedShard(
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
+ Lists.newArrayList(AbstractTest.MEMBER_NAME));
+ leaderShardFactory.createDistributedShard(
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
+ Lists.newArrayList(AbstractTest.MEMBER_NAME));
+ leaderShardFactory.createDistributedShard(
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
+ Lists.newArrayList(AbstractTest.MEMBER_NAME));
+
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
+
+ }
+
+ @Test
+ public void testMultipleRegistrationsAtOnePrefix() throws Exception {
+ initEmptyDatastore("config");
+
+ for (int i = 0; i < 10; i++) {
+ LOG.debug("Round {}", i);
+ final DistributedShardRegistration reg1 = leaderShardFactory
+ .createDistributedShard(TEST_ID,
+ Lists.newArrayList(AbstractTest.MEMBER_NAME));
+
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+
+ assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
+
+ reg1.close();
+
+ waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+
+ }
}
-}
+}
\ No newline at end of file
public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");
public static final QName ID_QNAME = QName.create(TEST_QNAME, "id");
public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name");
+ public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value");
public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc");
private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
mailbox-push-timeout-time = 100ms
}
+test {
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 100ms
+ }
+
+ in-memory-journal {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+ }
+
+ in-memory-snapshot-store {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+ plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+ }
+
+ akka {
+ persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+ persistence.journal.plugin = "in-memory-journal"
+
+ loglevel = "INFO"
+
+ actor {
+ provider = "akka.cluster.ClusterActorRefProvider"
+
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ proto = "akka.remote.serialization.ProtobufSerializer"
+ readylocal = "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransactionSerializer"
+ }
+
+ serialization-bindings {
+ "com.google.protobuf.Message" = proto
+ "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
+ }
+ }
+ remote {
+ log-remote-lifecycle-events = off
+ artery {
+ enabled = on
+ canonical.hostname = "127.0.0.1"
+ canonical.port = 2565
+ }
+
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2565
+ }
+ }
+
+ cluster {
+ auto-down-unreachable-after = 100s
+ retry-unsuccessful-join-after = 100ms
+
+ roles = [
+ "member-1"
+ ]
+ }
+ }
+}
+
Member1 {
bounded-mailbox {
mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug
org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.databroker.actors.dds=debug
org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.sharding=debug