From: Tomas Cere Date: Wed, 7 Sep 2016 11:48:27 +0000 (+0200) Subject: BUG-2138: Create DistributedShardFrontend X-Git-Tag: release/carbon~189 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=c1336f9b497bc6867536a24f629c3f0b002ccb2f BUG-2138: Create DistributedShardFrontend Use the abstract shard implementations from md-sal to create a frontend implementation of a cds shard that forwards requests to backend shards via DistributedDatastoreClient. Change-Id: I7a3485f414368728e71ab2746c84d7a0f83f1436 Signed-off-by: Tomas Cere --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/pom.xml b/opendaylight/md-sal/sal-distributed-datastore/pom.xml index 02ad5c2598..d42b3469d1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/pom.xml +++ b/opendaylight/md-sal/sal-distributed-datastore/pom.xml @@ -56,6 +56,11 @@ com.typesafe.akka akka-remote_${scala.version} + + com.typesafe.akka + akka-distributed-data-experimental_${scala.version} + 2.4.7 + com.typesafe.akka akka-slf4j_${scala.version} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java index 00f7572f98..a3afa22c26 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java @@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory; * * @author Robert Varga */ -abstract class AbstractClientHistory extends LocalAbortable implements Identifiable { +public abstract class AbstractClientHistory extends LocalAbortable implements Identifiable { enum State { IDLE, TX_OPEN, @@ -194,7 +194,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia * @throws TransactionChainClosedException if this history is closed * @throws IllegalStateException if a previous dependent transaction has not been closed */ - public final ClientTransaction createTransaction() { + public ClientTransaction createTransaction() { checkNotClosed(); synchronized (this) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java index 4598d95ebb..493eb4089e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java @@ -25,7 +25,7 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier * @author Robert Varga */ @Beta -public final class ClientLocalHistory extends AbstractClientHistory implements AutoCloseable { +public class ClientLocalHistory extends AbstractClientHistory implements AutoCloseable { ClientLocalHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier historyId) { super(client, historyId); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java index f0ce2faf08..efa503a90c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java @@ -53,7 +53,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; * @author Robert Varga */ @Beta -public final class ClientTransaction extends AbstractClientHandle { +public class ClientTransaction extends AbstractClientHandle { private ClientTransactionCursor cursor; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerSnapshot.java index 994f0761b0..a24f3141b0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerSnapshot.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerSnapshot.java @@ -12,6 +12,7 @@ import com.google.common.base.Preconditions; import java.io.ObjectStreamException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import javax.annotation.Nonnull; @@ -40,6 +41,7 @@ public class ShardManagerSnapshot implements Serializable { } private Object readResolve() throws ObjectStreamException { - return new org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot(shardList); + return new org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot(shardList, + Collections.emptyMap()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/Configuration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/Configuration.java index 3bf37e0f38..d5878c6431 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/Configuration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/Configuration.java @@ -9,12 +9,13 @@ package org.opendaylight.controller.cluster.datastore.config; import java.util.Collection; +import java.util.Map; import java.util.Set; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; public interface Configuration { @@ -36,7 +37,7 @@ public interface Configuration { /** * Return the shard name corresponding to the prefix, or null if none is configured. */ - @Nullable String getShardNameForPrefix(@Nonnull YangInstanceIdentifier prefix); + @Nullable String getShardNameForPrefix(@Nonnull DOMDataTreeIdentifier prefix); /** * Returns the member replicas for the given shard name. @@ -63,6 +64,18 @@ public interface Configuration { */ void addPrefixShardConfiguration(@Nonnull PrefixShardConfiguration config); + /** + * Removes a shard configuration for the specified prefix. + */ + void removePrefixShardConfiguration(@Nonnull DOMDataTreeIdentifier prefix); + + /** + * Returns the configuration for all configured prefix shards. + * + * @return An immutable copy of the currently configured prefix shards. + */ + Map getAllPrefixShardConfigurations(); + /** * Returns a unique set of all member names configured for all shards. */ @@ -86,5 +99,5 @@ public interface Configuration { /** * Returns the ShardStrategy for the given prefix or null if the prefix is not found. */ - @Nullable ShardStrategy getStrategyForPrefix(@Nonnull YangInstanceIdentifier prefix); + @Nullable ShardStrategy getStrategyForPrefix(@Nonnull DOMDataTreeIdentifier prefix); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImpl.java index 59acdbdb01..c1f687bd55 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImpl.java @@ -28,6 +28,7 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardSt import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; // TODO clean this up once we get rid of module based configuration, prefix one should be alot simpler @@ -35,7 +36,7 @@ public class ConfigurationImpl implements Configuration { private volatile Map moduleConfigMap; // TODO should this be initialized with something? on restart we should restore the shards from configuration? - private volatile Map prefixConfigMap = Collections.emptyMap(); + private volatile Map prefixConfigMap = Collections.emptyMap(); // Look up maps to speed things up @@ -121,21 +122,22 @@ public class ConfigurationImpl implements Configuration { @Nullable @Override - public String getShardNameForPrefix(@Nonnull final YangInstanceIdentifier prefix) { + public String getShardNameForPrefix(@Nonnull final DOMDataTreeIdentifier prefix) { Preconditions.checkNotNull(prefix, "prefix should not be null"); - Entry bestMatchEntry = - new SimpleEntry<>(YangInstanceIdentifier.EMPTY, null); + Entry bestMatchEntry = + new SimpleEntry<>( + new DOMDataTreeIdentifier(prefix.getDatastoreType(), YangInstanceIdentifier.EMPTY), null); - for (Entry entry : prefixConfigMap.entrySet()) { - if (entry.getKey().contains(prefix) && entry.getKey().getPathArguments().size() - > bestMatchEntry.getKey().getPathArguments().size()) { + for (Entry entry : prefixConfigMap.entrySet()) { + if (entry.getKey().contains(prefix) && entry.getKey().getRootIdentifier().getPathArguments().size() + > bestMatchEntry.getKey().getRootIdentifier().getPathArguments().size()) { bestMatchEntry = entry; } } //TODO we really should have mapping based on prefix instead of Strings - return ClusterUtils.getCleanShardName(bestMatchEntry.getValue().getPrefix().getRootIdentifier()); + return ClusterUtils.getCleanShardName(bestMatchEntry.getKey().getRootIdentifier()); } @Override @@ -192,14 +194,37 @@ public class ConfigurationImpl implements Configuration { @Override public void addPrefixShardConfiguration(@Nonnull final PrefixShardConfiguration config) { Preconditions.checkNotNull(config, "PrefixShardConfiguration cannot be null"); - updatePrefixConfigMap(config); + addPrefixConfig(config); allShardNames = ImmutableSet.builder().addAll(allShardNames) .add(ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier())).build(); } - private void updatePrefixConfigMap(final PrefixShardConfiguration config) { - final Map newPrefixConfigMap = new HashMap<>(prefixConfigMap); - newPrefixConfigMap.put(config.getPrefix().getRootIdentifier(), config); + @Override + public void removePrefixShardConfiguration(@Nonnull final DOMDataTreeIdentifier prefix) { + Preconditions.checkNotNull(prefix, "Prefix cannot be null"); + + removePrefixConfig(prefix); + + final HashSet temp = new HashSet<>(allShardNames); + temp.remove(ClusterUtils.getCleanShardName(prefix.getRootIdentifier())); + + allShardNames = ImmutableSet.copyOf(temp); + } + + @Override + public Map getAllPrefixShardConfigurations() { + return ImmutableMap.copyOf(prefixConfigMap); + } + + private void addPrefixConfig(final PrefixShardConfiguration config) { + final Map newPrefixConfigMap = new HashMap<>(prefixConfigMap); + newPrefixConfigMap.put(config.getPrefix(), config); + prefixConfigMap = ImmutableMap.copyOf(newPrefixConfigMap); + } + + private void removePrefixConfig(final DOMDataTreeIdentifier prefix) { + final Map newPrefixConfigMap = new HashMap<>(prefixConfigMap); + newPrefixConfigMap.remove(prefix); prefixConfigMap = ImmutableMap.copyOf(newPrefixConfigMap); } @@ -246,15 +271,16 @@ public class ConfigurationImpl implements Configuration { } @Override - public ShardStrategy getStrategyForPrefix(@Nonnull final YangInstanceIdentifier prefix) { + public ShardStrategy getStrategyForPrefix(@Nonnull final DOMDataTreeIdentifier prefix) { Preconditions.checkNotNull(prefix, "Prefix cannot be null"); // FIXME using prefix tables like in mdsal will be better - Entry bestMatchEntry = - new SimpleEntry<>(YangInstanceIdentifier.EMPTY, null); + Entry bestMatchEntry = + new SimpleEntry<>( + new DOMDataTreeIdentifier(prefix.getDatastoreType(), YangInstanceIdentifier.EMPTY), null); - for (Entry entry : prefixConfigMap.entrySet()) { - if (entry.getKey().contains(prefix) && entry.getKey().getPathArguments().size() - > bestMatchEntry.getKey().getPathArguments().size()) { + for (Entry entry : prefixConfigMap.entrySet()) { + if (entry.getKey().contains(prefix) && entry.getKey().getRootIdentifier().getPathArguments().size() + > bestMatchEntry.getKey().getRootIdentifier().getPathArguments().size()) { bestMatchEntry = entry; } } @@ -262,8 +288,9 @@ public class ConfigurationImpl implements Configuration { if (bestMatchEntry.getValue() == null) { return null; } - return new PrefixShardStrategy( - ClusterUtils.getCleanShardName(bestMatchEntry.getValue().getPrefix().getRootIdentifier()), this); + return new PrefixShardStrategy(ClusterUtils + .getCleanShardName(bestMatchEntry.getKey().getRootIdentifier()), + bestMatchEntry.getKey().getRootIdentifier()); } private void updateModuleConfigMap(final ModuleConfig moduleConfig) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/PrefixShardConfiguration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/PrefixShardConfiguration.java index d18e05b619..6b79d548f9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/PrefixShardConfiguration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/PrefixShardConfiguration.java @@ -8,15 +8,19 @@ package org.opendaylight.controller.cluster.datastore.config; +import akka.cluster.ddata.ReplicatedData; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import java.io.Serializable; import java.util.Collection; +import java.util.HashSet; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; /** * Configuration for prefix based shards. */ -public class PrefixShardConfiguration implements Serializable { +public class PrefixShardConfiguration implements ReplicatedData, Serializable { private static final long serialVersionUID = 1L; private final DOMDataTreeIdentifier prefix; @@ -26,9 +30,9 @@ public class PrefixShardConfiguration implements Serializable { public PrefixShardConfiguration(final DOMDataTreeIdentifier prefix, final String shardStrategyName, final Collection shardMemberNames) { - this.prefix = prefix; - this.shardStrategyName = shardStrategyName; - this.shardMemberNames = shardMemberNames; + this.prefix = Preconditions.checkNotNull(prefix); + this.shardStrategyName = Preconditions.checkNotNull(shardStrategyName); + this.shardMemberNames = ImmutableSet.copyOf(shardMemberNames); } public DOMDataTreeIdentifier getPrefix() { @@ -42,4 +46,34 @@ public class PrefixShardConfiguration implements Serializable { public Collection getShardMemberNames() { return shardMemberNames; } + + @Override + public String toString() { + return "PrefixShardConfiguration{" + + "prefix=" + prefix + + ", shardStrategyName='" + + shardStrategyName + '\'' + + ", shardMemberNames=" + shardMemberNames + + '}'; + } + + public String toDataMapKey() { + return "prefix=" + prefix; + } + + @Override + public ReplicatedData merge(final ReplicatedData replicatedData) { + if (!(replicatedData instanceof PrefixShardConfiguration)) { + throw new IllegalStateException("replicatedData expected to be instance of PrefixShardConfiguration"); + } + final PrefixShardConfiguration entry = (PrefixShardConfiguration) replicatedData; + if (!entry.getPrefix().equals(prefix)) { + // this should never happen since the key is the prefix + // if it does just return current? + return this; + } + final HashSet members = new HashSet<>(shardMemberNames); + members.addAll(entry.getShardMemberNames()); + return new PrefixShardConfiguration(prefix, shardStrategyName, members); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java index 84585f4395..dbd031076c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.messages; import com.google.common.base.Preconditions; import java.util.Optional; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; @@ -24,13 +25,13 @@ public class ShardLeaderStateChanged extends LeaderStateChanged { private final DataTree localShardDataTree; - public ShardLeaderStateChanged(@Nonnull String memberId, @Nonnull String leaderId, + public ShardLeaderStateChanged(@Nonnull String memberId, @Nullable String leaderId, @Nonnull DataTree localShardDataTree, short leaderPayloadVersion) { super(memberId, leaderId, leaderPayloadVersion); this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree); } - public ShardLeaderStateChanged(@Nonnull String memberId, @Nonnull String leaderId, + public ShardLeaderStateChanged(@Nonnull String memberId, @Nullable String leaderId, short leaderPayloadVersion) { super(memberId, leaderId, leaderPayloadVersion); this.localShardDataTree = null; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardManagerSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardManagerSnapshot.java index f6be1b808f..93846758ec 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardManagerSnapshot.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardManagerSnapshot.java @@ -8,14 +8,19 @@ package org.opendaylight.controller.cluster.datastore.persisted; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; /** * Represents the persisted snapshot state for the ShardManager. @@ -47,6 +52,12 @@ public class ShardManagerSnapshot implements Serializable { for (String shard: snapshot.shardList) { out.writeObject(shard); } + + out.writeInt(snapshot.prefixShardConfiguration.size()); + for (Map.Entry prefixShardConfigEntry : snapshot.prefixShardConfiguration.entrySet()) { + out.writeObject(prefixShardConfigEntry.getKey()); + out.writeObject(prefixShardConfigEntry.getValue()); + } } @Override @@ -57,7 +68,14 @@ public class ShardManagerSnapshot implements Serializable { shardList.add((String) in.readObject()); } - snapshot = new ShardManagerSnapshot(shardList); + size = in.readInt(); + Map prefixShardConfiguration = new HashMap<>(size); + for (int i = 0; i < size; i++) { + prefixShardConfiguration.put((DOMDataTreeIdentifier) in.readObject(), + (PrefixShardConfiguration) in.readObject()); + } + + snapshot = new ShardManagerSnapshot(shardList, prefixShardConfiguration); } private Object readResolve() { @@ -66,9 +84,12 @@ public class ShardManagerSnapshot implements Serializable { } private final List shardList; + private final Map prefixShardConfiguration; - public ShardManagerSnapshot(@Nonnull final List shardList) { + public ShardManagerSnapshot(@Nonnull final List shardList, + final Map prefixShardConfiguration) { this.shardList = ImmutableList.copyOf(shardList); + this.prefixShardConfiguration = ImmutableMap.copyOf(prefixShardConfiguration); } public List getShardList() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index b2c9d30184..9085027c50 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore.shardmanager; +import static akka.actor.ActorRef.noSender; import static akka.pattern.Patterns.ask; import akka.actor.ActorRef; @@ -21,6 +22,10 @@ import akka.actor.SupervisorStrategy.Directive; import akka.cluster.ClusterEvent; import akka.cluster.ClusterEvent.MemberWeaklyUp; import akka.cluster.Member; +import akka.cluster.ddata.DistributedData; +import akka.cluster.ddata.ORMap; +import akka.cluster.ddata.Replicator.Changed; +import akka.cluster.ddata.Replicator.Subscribe; import akka.dispatch.Futures; import akka.dispatch.OnComplete; import akka.japi.Function; @@ -33,6 +38,8 @@ import akka.persistence.SnapshotSelectionCriteria; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import com.google.common.collect.Sets.SetView; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -47,6 +54,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; @@ -102,6 +110,7 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -157,6 +166,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final String persistenceId; + private final ActorRef replicator; + ShardManager(AbstractShardManagerCreator builder) { this.cluster = builder.getCluster(); this.configuration = builder.getConfiguration(); @@ -180,6 +191,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { "shard-manager-" + this.type, datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType()); shardManagerMBean.registerMBean(); + + replicator = DistributedData.get(context().system()).replicator(); + + } + + public void preStart() { + LOG.info("Starting Shardmanager {}", persistenceId); + + final Subscribe> subscribe = + new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self()); + replicator.tell(subscribe, noSender()); } @Override @@ -261,6 +283,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onGetLocalShardIds(); } else if (message instanceof RunnableMessage) { ((RunnableMessage)message).run(); + } else if (message instanceof Changed) { + onConfigChanged((Changed) message); } else { unknownMessage(message); } @@ -316,6 +340,84 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void onConfigChanged(final Changed> change) { + LOG.debug("{}, ShardManager {} received config changed {}", + cluster.getCurrentMemberName(), persistenceId, change.dataValue().getEntries()); + + final Map changedConfig = change.dataValue().getEntries(); + + final Map newConfig = + changedConfig.values().stream().collect( + Collectors.toMap(PrefixShardConfiguration::getPrefix, java.util.function.Function.identity())); + + resolveConfig(newConfig); + } + + private void resolveConfig(final Map newConfig) { + LOG.debug("{} ShardManager : {}, resolving new shard configuration : {}", + cluster.getCurrentMemberName(), persistenceId, newConfig); + + newConfig.forEach((prefix, config) -> + LOG.debug("{} ShardManager : {}, received shard config " + + "for prefix {}, config {}", cluster.getCurrentMemberName(), persistenceId, prefix, config)); + + final SetView removedConfigs = + Sets.difference(configuration.getAllPrefixShardConfigurations().keySet(), newConfig.keySet()); + + // resolve removals + + resolveRemovals(removedConfigs); + + final SetView addedConfigs = + Sets.difference(newConfig.keySet(), configuration.getAllPrefixShardConfigurations().keySet()); + // resolve additions + + resolveAdditions(addedConfigs, newConfig); + // iter through to update existing shards, either start/stop replicas or update the shard + // to check for more peers + resolveUpdates(Collections.emptySet()); + } + + private void resolveRemovals(final Set removedConfigs) { + LOG.debug("{} ShardManager : {}, resolving removed configs : {}", + cluster.getCurrentMemberName(), persistenceId, removedConfigs); + + removedConfigs.forEach(id -> doRemovePrefixedShard(id)); + } + + private void resolveAdditions(final Set addedConfigs, + final Map configs) { + LOG.debug("{} ShardManager : {}, resolving added configs : {}", addedConfigs); + + addedConfigs.forEach(id -> doCreatePrefixedShard(configs.get(id))); + } + + private void resolveUpdates(Set maybeUpdatedConfigs) { + LOG.debug("{} ShardManager : {}, resolving potentially updated configs : {}", maybeUpdatedConfigs); + } + + private void doRemovePrefixedShard(final DOMDataTreeIdentifier prefix) { + LOG.debug("{} ShardManager : {}, removing prefix shard: {}", + cluster.getCurrentMemberName(), persistenceId, prefix); + final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix); + final ShardInformation shard = localShards.remove(shardId.getShardName()); + + configuration.removePrefixShardConfiguration(prefix); + + if (shard == null) { + LOG.warn("Received removal for unconfigured shard: {} , ignoring.. ", prefix); + return; + } + + if (shard.getActor() != null) { + LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor()); + shard.getActor().tell(Shutdown.INSTANCE, self()); + } + LOG.debug("{} : {} : Local Shard replica for shard {} has been removed", cluster.getCurrentMemberName(), + persistenceId(), shardId.getShardName()); + persistShardList(); + } + private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg, String leaderPath) { shardReplicaOperationsInProgress.remove(shardId.getShardName()); @@ -468,42 +570,41 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) { - final PrefixShardConfiguration config = createPrefixedShard.getConfig(); + doCreatePrefixedShard(createPrefixedShard.getConfig()); + // do not replicate on this level + } + + private void doCreatePrefixedShard(final PrefixShardConfiguration config) { + LOG.debug("doCreatePrefixShard : {}", config.getPrefix()); final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), - createPrefixedShard.getConfig().getPrefix()); + config.getPrefix()); final String shardName = shardId.getShardName(); configuration.addPrefixShardConfiguration(config); - DatastoreContext shardDatastoreContext = createPrefixedShard.getContext(); - - if (shardDatastoreContext == null) { - final Builder builder = newShardDatastoreContextBuilder(shardName); - builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name())) - .storeRoot(config.getPrefix().getRootIdentifier()); - shardDatastoreContext = builder.build(); - } else { - shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver( - peerAddressResolver).build(); - } - - final boolean shardWasInRecoveredSnapshot = currentSnapshot != null - && currentSnapshot.getShardList().contains(shardName); + final Builder builder = newShardDatastoreContextBuilder(shardName); + builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name())) + .storeRoot(config.getPrefix().getRootIdentifier()); + DatastoreContext shardDatastoreContext = builder.build(); final Map peerAddresses = Collections.emptyMap(); final boolean isActiveMember = true; - LOG.debug("{} doCreatePrefixedShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}", - persistenceId(), shardId, peerAddresses, isActiveMember); + LOG.debug("{} doCreatePrefixedShard: persistenceId(): {}, memberNames: " + + "{}, peerAddresses: {}, isActiveMember: {}", + shardId, persistenceId(), config.getShardMemberNames(), + peerAddresses, isActiveMember); final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses, - shardDatastoreContext, createPrefixedShard.getShardBuilder(), peerAddressResolver); + shardDatastoreContext, Shard.builder(), peerAddressResolver); info.setActiveMember(isActiveMember); localShards.put(info.getShardName(), info); if (schemaContext != null) { info.setActor(newShardActor(schemaContext, info)); } + + persistShardList(); } private void doCreateShard(final CreateShard createShard) { @@ -1094,9 +1195,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param shardName the shard name */ private Map getPeerAddresses(String shardName) { - Collection members = configuration.getMembersFromShardName(shardName); - Map peerAddresses = new HashMap<>(); + final Collection members = configuration.getMembersFromShardName(shardName); + return getPeerAddresses(shardName, members); + } + private Map getPeerAddresses(final String shardName, final Collection members) { + Map peerAddresses = new HashMap<>(); MemberName currentMemberName = this.cluster.getCurrentMemberName(); for (MemberName memberName : members) { @@ -1371,11 +1475,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList); - saveSnapshot(updateShardManagerSnapshot(shardList)); + saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations())); } - private ShardManagerSnapshot updateShardManagerSnapshot(List shardList) { - currentSnapshot = new ShardManagerSnapshot(shardList); + private ShardManagerSnapshot updateShardManagerSnapshot( + final List shardList, + final Map allPrefixShardConfigurations) { + currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations); return currentSnapshot; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerSnapshot.java index 46fccc745d..9899aeb1fc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerSnapshot.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerSnapshot.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.shardmanager; import com.google.common.collect.ImmutableList; import java.io.Serializable; +import java.util.Collections; import java.util.List; import javax.annotation.Nonnull; @@ -43,7 +44,8 @@ public final class ShardManagerSnapshot implements Serializable { } private Object readResolve() { - return new org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot(shardList); + return new org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot(shardList, + Collections.emptyMap()); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/DefaultShardStrategy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/DefaultShardStrategy.java index a0712b6714..8628b56461 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/DefaultShardStrategy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/DefaultShardStrategy.java @@ -31,4 +31,9 @@ public final class DefaultShardStrategy implements ShardStrategy { public String findShard(YangInstanceIdentifier path) { return DEFAULT_SHARD; } + + @Override + public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) { + return YangInstanceIdentifier.EMPTY; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ModuleShardStrategy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ModuleShardStrategy.java index c90baf2760..64e24a9cb9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ModuleShardStrategy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ModuleShardStrategy.java @@ -28,4 +28,11 @@ public class ModuleShardStrategy implements ShardStrategy { String shardName = configuration.getShardNameForModule(moduleName); return shardName != null ? shardName : DefaultShardStrategy.DEFAULT_SHARD; } + + @Override + public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) { + return YangInstanceIdentifier.EMPTY; + } + + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/PrefixShardStrategy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/PrefixShardStrategy.java index 1e08a989d5..25e1160c45 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/PrefixShardStrategy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/PrefixShardStrategy.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.datastore.shardstrategy; -import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; /** @@ -19,16 +18,21 @@ public class PrefixShardStrategy implements ShardStrategy { public static final String NAME = "prefix"; private final String shardName; - private final Configuration configuration; + private final YangInstanceIdentifier prefix; - public PrefixShardStrategy(final String shardName, final Configuration configuration) { - this.shardName = shardName; - this.configuration = configuration; + public PrefixShardStrategy(final String shardName, + final YangInstanceIdentifier prefix) { + this.shardName = shardName != null ? shardName : DefaultShardStrategy.DEFAULT_SHARD; + this.prefix = prefix; } @Override public String findShard(final YangInstanceIdentifier path) { - final String shardNameForPrefix = configuration.getShardNameForPrefix(path); - return shardNameForPrefix != null ? shardName : DefaultShardStrategy.DEFAULT_SHARD; + return shardName; + } + + @Override + public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) { + return prefix; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategy.java index 496069d0a1..0a4c54b656 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategy.java @@ -25,4 +25,11 @@ public interface ShardStrategy { * @return the corresponding shard name. */ String findShard(YangInstanceIdentifier path); + + /** + * Get the prefix of the shard that contains the data pointed to by the specified path. + * @param path the location of the data in the logical tree. + * @return the corresponding shards prefix. + */ + YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactory.java index a97adc2730..724499477e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactory.java @@ -10,16 +10,20 @@ package org.opendaylight.controller.cluster.datastore.shardstrategy; import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.datastore.config.Configuration; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; public class ShardStrategyFactory { private static final String UNKNOWN_MODULE_NAME = "unknown"; private final Configuration configuration; + private final LogicalDatastoreType logicalStoreType; - public ShardStrategyFactory(final Configuration configuration) { + public ShardStrategyFactory(final Configuration configuration, LogicalDatastoreType logicalStoreType) { Preconditions.checkState(configuration != null, "configuration should not be missing"); this.configuration = configuration; + this.logicalStoreType = Preconditions.checkNotNull(logicalStoreType); } public ShardStrategy getStrategy(final YangInstanceIdentifier path) { @@ -30,7 +34,8 @@ public class ShardStrategyFactory { final ShardStrategy shardStrategy = configuration.getStrategyForModule(moduleName); if (shardStrategy == null) { // retry with prefix based sharding - final ShardStrategy strategyForPrefix = configuration.getStrategyForPrefix(path); + final ShardStrategy strategyForPrefix = + configuration.getStrategyForPrefix(new DOMDataTreeIdentifier(logicalStoreType, path)); if (strategyForPrefix == null) { return DefaultShardStrategy.getInstance(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index d3eb6f3926..e35dab8e72 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -50,6 +50,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.reporting.MetricsReporter; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -129,7 +130,10 @@ public class ActorContext { this.datastoreContext = datastoreContext; this.dispatchers = new Dispatchers(actorSystem.dispatchers()); this.primaryShardInfoCache = primaryShardInfoCache; - this.shardStrategyFactory = new ShardStrategyFactory(configuration); + + final LogicalDatastoreType convertedType = + LogicalDatastoreType.valueOf(datastoreContext.getLogicalStoreType().name()); + this.shardStrategyFactory = new ShardStrategyFactory(configuration, convertedType); setCachedProperties(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java index dba1761130..4b60f617a2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java @@ -8,19 +8,41 @@ package org.opendaylight.controller.cluster.datastore.utils; +import akka.cluster.ddata.Key; +import akka.cluster.ddata.ORMap; +import akka.cluster.ddata.ORMapKey; +import java.util.Map; import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; /** * Utils for encoding prefix shard name. */ public class ClusterUtils { + // key for replicated configuration key + public static final Key> CONFIGURATION_KEY = + ORMapKey.create("prefix-shard-configuration"); + public static ShardIdentifier getShardIdentifier(final MemberName memberName, final DOMDataTreeIdentifier prefix) { - return ShardIdentifier - .create(getCleanShardName(prefix.getRootIdentifier()), memberName, prefix.getDatastoreType().name()); + final String type; + switch (prefix.getDatastoreType()) { + case OPERATIONAL: + type = "operational"; + break; + case CONFIGURATION: + type = "config"; + break; + default: + type = prefix.getDatastoreType().name(); + } + + return ShardIdentifier.create(getCleanShardName(prefix.getRootIdentifier()), memberName, type); } /** @@ -31,10 +53,25 @@ public class ClusterUtils { * @return encoded name that doesn't contain characters that cannot be in actor path. */ public static String getCleanShardName(final YangInstanceIdentifier path) { + if (path.isEmpty()) { + return "default"; + } + final StringBuilder builder = new StringBuilder(); // TODO need a better mapping that includes namespace, but we'll need to cleanup the string beforehand + // we have to fight both javax and akka url path restrictions.. path.getPathArguments().forEach(p -> { builder.append(p.getNodeType().getLocalName()); + if (p instanceof NodeIdentifierWithPredicates) { + builder.append("-key_"); + final Map key = ((NodeIdentifierWithPredicates) p).getKeyValues(); + key.entrySet().forEach(e -> { + builder.append(e.getKey().getLocalName()); + builder.append(e.getValue()); + builder.append("-"); + }); + builder.append("_"); + } builder.append("!"); }); return builder.toString(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java new file mode 100644 index 0000000000..8edfddedf4 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.sharding; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; +import org.opendaylight.controller.cluster.datastore.DistributedDataStore; +import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeShard; +import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext; +import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardProducer; +import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext; +import org.opendaylight.mdsal.dom.spi.shard.ReadableWriteableDOMDataTreeShard; +import org.opendaylight.mdsal.dom.spi.shard.SubshardProducerSpecification; +import org.opendaylight.mdsal.dom.spi.shard.WriteableDOMDataTreeShard; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Proxy implementation of a shard that creates forwarding producers to the backend shard. + */ +class DistributedShardFrontend implements ReadableWriteableDOMDataTreeShard { + + private static final Logger LOG = LoggerFactory.getLogger(DistributedShardFrontend.class); + + private final DataStoreClient client; + private final DOMDataTreeIdentifier shardRoot; + @GuardedBy("this") + private final Map childShards = new HashMap<>(); + @GuardedBy("this") + private final List producers = new ArrayList<>(); + private final DistributedDataStore distributedDataStore; + + DistributedShardFrontend(final DistributedDataStore distributedDataStore, + final DataStoreClient client, + final DOMDataTreeIdentifier shardRoot) { + this.distributedDataStore = Preconditions.checkNotNull(distributedDataStore); + this.client = Preconditions.checkNotNull(client); + this.shardRoot = Preconditions.checkNotNull(shardRoot); + } + + @Override + public synchronized DOMDataTreeShardProducer createProducer(final Collection paths) { + for (final DOMDataTreeIdentifier prodPrefix : paths) { + Preconditions.checkArgument(paths.contains(prodPrefix), "Prefix %s is not contained under shard root", + prodPrefix, paths); + } + + final ShardProxyProducer ret = + new ShardProxyProducer(shardRoot, paths, client, createModificationFactory(paths)); + producers.add(ret); + return ret; + } + + @Override + public synchronized void onChildAttached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) { + LOG.debug("{} : Child shard attached at {}", shardRoot, prefix); + Preconditions.checkArgument(child != this, "Attempted to attach child %s onto self", this); + addChildShard(prefix, child); + updateProducers(); + } + + @Override + public synchronized void onChildDetached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) { + LOG.debug("{} : Child shard detached at {}", shardRoot, prefix); + childShards.remove(prefix); + updateProducers(); + // TODO we should grab the dataTreeSnapshot that's in the shard and apply it to this shard + } + + private void addChildShard(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) { + Preconditions.checkArgument(child instanceof WriteableDOMDataTreeShard); + childShards.put(prefix, new ChildShardContext(prefix, (WriteableDOMDataTreeShard) child)); + } + + DistributedShardModificationFactory createModificationFactory(final Collection prefixes) { + // TODO this could be abstract + final Map affectedSubshards = new HashMap<>(); + + for (final DOMDataTreeIdentifier producerPrefix : prefixes) { + for (final ChildShardContext maybeAffected : childShards.values()) { + final DOMDataTreeIdentifier bindPath; + if (producerPrefix.contains(maybeAffected.getPrefix())) { + bindPath = maybeAffected.getPrefix(); + } else if (maybeAffected.getPrefix().contains(producerPrefix)) { + // Bound path is inside subshard + bindPath = producerPrefix; + } else { + continue; + } + + SubshardProducerSpecification spec = affectedSubshards.get(maybeAffected.getPrefix()); + if (spec == null) { + spec = new SubshardProducerSpecification(maybeAffected); + affectedSubshards.put(maybeAffected.getPrefix(), spec); + } + spec.addPrefix(bindPath); + } + } + + final DistributedShardModificationFactoryBuilder builder = + new DistributedShardModificationFactoryBuilder(shardRoot); + for (final SubshardProducerSpecification spec : affectedSubshards.values()) { + final ForeignShardModificationContext foreignContext = + new ForeignShardModificationContext(spec.getPrefix(), spec.createProducer()); + builder.addSubshard(foreignContext); + builder.addSubshard(spec.getPrefix(), foreignContext); + } + + return builder.build(); + } + + private void updateProducers() { + for (final ShardProxyProducer producer : producers) { + producer.setModificationFactory(createModificationFactory(producer.getPrefixes())); + } + } + + @Nonnull + @Override + public ListenerRegistration registerTreeChangeListener( + final YangInstanceIdentifier treeId, final L listener) { + throw new UnsupportedOperationException("Listener registration not supported"); + } + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModification.java new file mode 100644 index 0000000000..2bd0dae76c --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModification.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.sharding; + +import com.google.common.base.Preconditions; +import java.util.Map; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; +import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext; +import org.opendaylight.mdsal.dom.spi.shard.WritableNodeOperation; +import org.opendaylight.mdsal.dom.spi.shard.WriteCursorStrategy; +import org.opendaylight.mdsal.dom.spi.shard.WriteableModificationNode; +import org.opendaylight.mdsal.dom.spi.shard.WriteableNodeWithSubshard; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; + +/** + * Shard modification that consists of the whole shard context, provides cursors which correctly delegate to subshards + * if any are present. + */ +public class DistributedShardModification extends WriteableNodeWithSubshard { + + private final DistributedShardModificationContext context; + private final Map childShards; + + public DistributedShardModification(final DistributedShardModificationContext context, + final Map subshards, + final Map childShards) { + super(subshards); + this.context = Preconditions.checkNotNull(context); + this.childShards = Preconditions.checkNotNull(childShards); + } + + @Override + public PathArgument getIdentifier() { + return context.getIdentifier().getRootIdentifier().getLastPathArgument(); + } + + @Override + public WriteCursorStrategy createOperation(final DOMDataTreeWriteCursor parentCursor) { + return new WritableNodeOperation(this, context.cursor()) { + @Override + public void exit() { + throw new IllegalStateException("Can not exit data tree root"); + } + }; + } + + void cursorClosed() { + context.closeCursor(); + } + + DOMStoreThreePhaseCommitCohort seal() { + childShards.values().forEach(ForeignShardModificationContext::ready); + return context.ready(); + } + + DOMDataTreeIdentifier getPrefix() { + return context.getIdentifier(); + } + + Map getChildShards() { + return childShards; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationContext.java new file mode 100644 index 0000000000..9d5168420f --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationContext.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.sharding; + +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; + +/** + * The context for a single shards modification, keeps a ClientTransaction so it can route requests correctly. + */ +public class DistributedShardModificationContext { + + private ClientTransaction transaction; + private DOMDataTreeIdentifier identifier; + private DOMDataTreeWriteCursor cursor; + + public DistributedShardModificationContext(final ClientTransaction transaction, + final DOMDataTreeIdentifier identifier) { + this.transaction = transaction; + this.identifier = identifier; + } + + public DOMDataTreeIdentifier getIdentifier() { + return identifier; + } + + DOMDataTreeWriteCursor cursor() { + if (cursor == null) { + cursor = transaction.openCursor(); + } + + return cursor; + } + + DOMStoreThreePhaseCommitCohort ready() { + if (cursor != null) { + cursor.close(); + cursor = null; + } + + return transaction.ready(); + } + + void closeCursor() { + if (cursor != null) { + cursor.close(); + cursor = null; + } + } + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationCursor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationCursor.java new file mode 100644 index 0000000000..37ccf60dbd --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationCursor.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.sharding; + +import org.opendaylight.mdsal.dom.spi.shard.AbstractDataModificationCursor; +import org.opendaylight.mdsal.dom.spi.shard.WriteCursorStrategy; + +/** + * Internal cursor implementation consisting of WriteCursorStrategies which forwards writes to foreign modifications + * if any. + */ +public class DistributedShardModificationCursor extends AbstractDataModificationCursor { + + private ShardProxyTransaction parent; + + public DistributedShardModificationCursor(final DistributedShardModification root, + final ShardProxyTransaction parent) { + super(root); + this.parent = parent; + } + + @Override + protected WriteCursorStrategy getRootOperation(final DistributedShardModification root) { + return root.createOperation(null); + } + + @Override + public void close() { + parent.cursorClosed(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationFactory.java new file mode 100644 index 0000000000..8fc1f4863d --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationFactory.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.sharding; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext; +import org.opendaylight.mdsal.dom.spi.shard.WriteableModificationNode; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; + +/** + * Factory for {@link DistributedShardModification}. + */ +public final class DistributedShardModificationFactory { + private final Map childShards; + private final Map children; + private final DOMDataTreeIdentifier root; + + DistributedShardModificationFactory(final DOMDataTreeIdentifier root, + final Map children, + final Map childShards) { + this.root = Preconditions.checkNotNull(root); + this.children = ImmutableMap.copyOf(children); + this.childShards = ImmutableMap.copyOf(childShards); + } + + @VisibleForTesting + Map getChildren() { + return children; + } + + @VisibleForTesting + Map getChildShards() { + return childShards; + } + + DistributedShardModification createModification(final ClientTransaction transaction) { + return new DistributedShardModification( + new DistributedShardModificationContext(transaction, root), children, childShards); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationFactoryBuilder.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationFactoryBuilder.java new file mode 100644 index 0000000000..15459ce09c --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationFactoryBuilder.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.sharding; + +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.store.inmemory.ShardDataModificationFactoryBuilder; + +/** + * Builder for {@link DistributedShardModificationFactory}. + */ +public class DistributedShardModificationFactoryBuilder + extends ShardDataModificationFactoryBuilder { + + + public DistributedShardModificationFactoryBuilder(final DOMDataTreeIdentifier root) { + super(root); + } + + @Override + public DistributedShardModificationFactory build() { + return new DistributedShardModificationFactory(root, buildChildren(), childShards); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java index 91b479dfbe..5575438037 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java @@ -14,30 +14,36 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; +import akka.cluster.Cluster; +import akka.cluster.Member; import akka.util.Timeout; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.collect.Collections2; import com.google.common.collect.ForwardingObject; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.concurrent.CompletionException; +import java.util.EnumMap; +import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; -import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; -import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved; +import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard; import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated; import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved; +import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeListener; @@ -48,12 +54,15 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeService; import org.opendaylight.mdsal.dom.api.DOMDataTreeShard; import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException; import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService; +import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration; import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree; +import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable; +import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.Await; -import scala.concurrent.Future; +import scala.collection.JavaConverters; /** * A layer on top of DOMDataTreeService that distributes producer/shard registrations to remote nodes via @@ -79,6 +88,12 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat private final ActorRef shardedDataTreeActor; private final MemberName memberName; + private final DOMDataTreePrefixTable> shards = + DOMDataTreePrefixTable.create(); + + private final EnumMap defaultShardRegistrations = + new EnumMap<>(LogicalDatastoreType.class); + public DistributedShardedDOMDataTree(final ActorSystem actorSystem, final DistributedDataStore distributedOperDatastore, final DistributedDataStore distributedConfigDatastore) { @@ -89,8 +104,7 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat shardedDataTreeActor = createShardedDataTreeActor(actorSystem, new ShardedDataTreeActorCreator() - .setDataTreeService(shardedDOMDataTree) - .setShardingService(shardedDOMDataTree) + .setShardingService(this) .setActorSystem(actorSystem) .setClusterWrapper(distributedConfigDatastore.getActorContext().getClusterWrapper()) .setDistributedConfigDatastore(distributedConfigDatastore) @@ -98,6 +112,21 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat ACTOR_ID); this.memberName = distributedConfigDatastore.getActorContext().getCurrentMemberName(); + + //create shard registration for DEFAULT_SHARD + try { + defaultShardRegistrations.put(LogicalDatastoreType.CONFIGURATION, + initDefaultShard(LogicalDatastoreType.CONFIGURATION)); + } catch (final DOMDataTreeProducerException | DOMDataTreeShardingConflictException e) { + LOG.error("Unable to create default shard frontend for config shard", e); + } + + try { + defaultShardRegistrations.put(LogicalDatastoreType.OPERATIONAL, + initDefaultShard(LogicalDatastoreType.OPERATIONAL)); + } catch (final DOMDataTreeProducerException | DOMDataTreeShardingConflictException e) { + LOG.error("Unable to create default shard frontend for operational shard", e); + } } @Nonnull @@ -113,13 +142,15 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat @Nonnull @Override public DOMDataTreeProducer createProducer(@Nonnull final Collection subtrees) { - LOG.debug("Creating producer for {}", subtrees); + LOG.debug("{} - Creating producer for {}", + distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees); final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(subtrees); final Object response = distributedConfigDatastore.getActorContext() .executeOperation(shardedDataTreeActor, new ProducerCreated(subtrees)); if (response == null) { - LOG.debug("Received success from remote nodes, creating producer:{}", subtrees); + LOG.debug("{} - Received success from remote nodes, creating producer:{}", + distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees); return new ProxyProducer(producer, subtrees, shardedDataTreeActor, distributedConfigDatastore.getActorContext()); } else if (response instanceof Exception) { @@ -133,74 +164,92 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat @Override @SuppressWarnings("checkstyle:IllegalCatch") + //TODO: it would be better to block here until the message is processed by the actor public DistributedShardRegistration createDistributedShard( final DOMDataTreeIdentifier prefix, final Collection replicaMembers) - throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException, - DOMDataTreeShardCreationFailedException { + throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException { + final DOMDataTreePrefixTableEntry> lookup = + shards.lookup(prefix); + if (lookup != null && lookup.getValue().getPrefix().equals(prefix)) { + throw new DOMDataTreeShardingConflictException( + "Prefix " + prefix + " is already occupied by another shard."); + } + + PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers); + shardedDataTreeActor.tell(new CreatePrefixShard(config), noSender()); + + return new DistributedShardRegistrationImpl(prefix, shardedDataTreeActor, this); + } + + void resolveShardAdditions(final Set additions) { + LOG.debug("Member {}: Resolving additions : {}", memberName, additions); + final ArrayList list = new ArrayList<>(additions); + // we need to register the shards from top to bottom, so we need to atleast make sure the ordering reflects that + Collections.sort(list, (o1, o2) -> { + if (o1.getRootIdentifier().getPathArguments().size() < o2.getRootIdentifier().getPathArguments().size()) { + return -1; + } else if (o1.getRootIdentifier().getPathArguments().size() + == o2.getRootIdentifier().getPathArguments().size()) { + return 0; + } else { + return 1; + } + }); + list.forEach(this::createShardFrontend); + } + void resolveShardRemovals(final Set removals) { + LOG.debug("Member {}: Resolving removals : {}", memberName, removals); + + // do we need to go from bottom to top? + removals.forEach(this::despawnShardFrontend); + } + + private void createShardFrontend(final DOMDataTreeIdentifier prefix) { + LOG.debug("Member {}: Creating CDS shard for prefix: {}", memberName, prefix); final String shardName = ClusterUtils.getCleanShardName(prefix.getRootIdentifier()); final DistributedDataStore distributedDataStore = prefix.getDatastoreType().equals(org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION) ? distributedConfigDatastore : distributedOperDatastore; - final PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers); - if (replicaMembers.contains(memberName)) { - // spawn the backend shard and have the shard Manager create all replicas - final ActorRef shardManager = distributedDataStore.getActorContext().getShardManager(); - - shardManager.tell(new CreatePrefixedShard(config, null, Shard.builder()), noSender()); - } - - LOG.debug("Creating distributed datastore client for shard {}", shardName); - final Props distributedDataStoreClientProps = - SimpleDataStoreClientActor - .props(memberName, "Shard-" + shardName, distributedDataStore.getActorContext(), shardName); + try (final DOMDataTreeProducer producer = localCreateProducer(Collections.singletonList(prefix))) { + final Entry entry = + createDatastoreClient(shardName, distributedDataStore.getActorContext()); - final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps); - final DataStoreClient client; - try { - client = SimpleDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS); - } catch (final Exception e) { - LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e); - clientActor.tell(PoisonPill.getInstance(), noSender()); - throw new DOMDataTreeProducerException("Unable to create producer", e); - } + final DistributedShardFrontend shard = + new DistributedShardFrontend(distributedDataStore, entry.getKey(), prefix); - // register the frontend into the sharding service and let the actor distribute this onto the other nodes - final ListenerRegistration shardFrontendRegistration; - try (DOMDataTreeProducer producer = createProducer(Collections.singletonList(prefix))) { - shardFrontendRegistration = shardedDOMDataTree - .registerDataTreeShard(prefix, - new ShardFrontend(client, prefix), - ((ProxyProducer) producer).delegate()); + @SuppressWarnings("unchecked") + final DOMDataTreeShardRegistration reg = + (DOMDataTreeShardRegistration) shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer); + shards.store(prefix, reg); + } catch (final DOMDataTreeShardingConflictException e) { + LOG.error("Prefix {} is already occupied by another shard", prefix, e); + } catch (DOMDataTreeProducerException e) { + LOG.error("Unable to close producer", e); + } catch (DOMDataTreeShardCreationFailedException e) { + LOG.error("Unable to create datastore client for shard {}", prefix, e); } + } - final Future future = distributedDataStore.getActorContext() - .executeOperationAsync(shardedDataTreeActor, new PrefixShardCreated(config), DEFAULT_ASK_TIMEOUT); - try { - final Object result = Await.result(future, DEFAULT_ASK_TIMEOUT.duration()); - if (result != null) { - throw new DOMDataTreeShardCreationFailedException("Received unexpected response to PrefixShardCreated" - + result); - } - - return new DistributedShardRegistrationImpl(shardFrontendRegistration, prefix, shardedDataTreeActor); - } catch (final CompletionException e) { - shardedDataTreeActor.tell(new PrefixShardRemoved(prefix), noSender()); - clientActor.tell(PoisonPill.getInstance(), noSender()); + private void despawnShardFrontend(final DOMDataTreeIdentifier prefix) { + LOG.debug("Member {}: Removing CDS shard for prefix: {}", memberName, prefix); + final DOMDataTreePrefixTableEntry> lookup = + shards.lookup(prefix); - final Throwable cause = e.getCause(); - if (cause instanceof DOMDataTreeShardingConflictException) { - throw (DOMDataTreeShardingConflictException) cause; - } + if (lookup == null || !lookup.getValue().getPrefix().equals(prefix)) { + LOG.debug("Member {}: Received despawn for non-existing CDS shard frontend, prefix: {}, ignoring..", + memberName, prefix); + return; + } - throw new DOMDataTreeShardCreationFailedException("Shard creation failed.", e.getCause()); - } catch (final Exception e) { - shardedDataTreeActor.tell(new PrefixShardRemoved(prefix), noSender()); - clientActor.tell(PoisonPill.getInstance(), noSender()); + lookup.getValue().close(); + // need to remove from our local table thats used for tracking + shards.remove(prefix); + } - throw new DOMDataTreeShardCreationFailedException("Shard creation failed.", e); - } + DOMDataTreeProducer localCreateProducer(final Collection prefix) { + return shardedDOMDataTree.createProducer(prefix); } @Nonnull @@ -216,6 +265,38 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat return shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer); } + @SuppressWarnings("checkstyle:IllegalCatch") + private Entry createDatastoreClient( + final String shardName, final ActorContext actorContext) + throws DOMDataTreeShardCreationFailedException { + + LOG.debug("Creating distributed datastore client for shard {}", shardName); + final Props distributedDataStoreClientProps = + SimpleDataStoreClientActor.props(memberName, "Shard-" + shardName, actorContext, shardName); + + final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps); + try { + return new SimpleEntry<>(SimpleDataStoreClientActor + .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS), clientActor); + } catch (final Exception e) { + LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e); + clientActor.tell(PoisonPill.getInstance(), noSender()); + throw new DOMDataTreeShardCreationFailedException( + "Unable to create datastore client for shard{" + shardName + "}", e); + } + } + + private DistributedShardRegistration initDefaultShard(final LogicalDatastoreType logicalDatastoreType) + throws DOMDataTreeProducerException, DOMDataTreeShardingConflictException { + final Collection members = JavaConverters.asJavaCollectionConverter( + Cluster.get(actorSystem).state().members()).asJavaCollection(); + final Collection names = Collections2.transform(members, + m -> MemberName.forName(m.roles().iterator().next())); + + return createDistributedShard( + new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names); + } + private static void closeProducer(final DOMDataTreeProducer producer) { try { producer.close(); @@ -246,24 +327,25 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat } private static class DistributedShardRegistrationImpl implements DistributedShardRegistration { - private final ListenerRegistration registration; + private final DOMDataTreeIdentifier prefix; private final ActorRef shardedDataTreeActor; + private final DistributedShardedDOMDataTree distributedShardedDOMDataTree; - DistributedShardRegistrationImpl(final ListenerRegistration registration, - final DOMDataTreeIdentifier prefix, - final ActorRef shardedDataTreeActor) { - this.registration = registration; + DistributedShardRegistrationImpl(final DOMDataTreeIdentifier prefix, + final ActorRef shardedDataTreeActor, + final DistributedShardedDOMDataTree distributedShardedDOMDataTree) { this.prefix = prefix; this.shardedDataTreeActor = shardedDataTreeActor; + this.distributedShardedDOMDataTree = distributedShardedDOMDataTree; } @Override public void close() { - // TODO send the correct messages to ShardManager to destroy the shard - // maybe we could provide replica removal mechanisms also? - shardedDataTreeActor.tell(new PrefixShardRemoved(prefix), noSender()); - registration.close(); + // first despawn on the local node + distributedShardedDOMDataTree.despawnShardFrontend(prefix); + // update the config so the remote nodes are updated + shardedDataTreeActor.tell(new RemovePrefixShard(prefix), noSender()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardFrontend.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardFrontend.java deleted file mode 100644 index d39cceca47..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardFrontend.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.sharding; - -import com.google.common.base.Preconditions; -import java.util.Collection; -import javax.annotation.Nonnull; -import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; -import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeShard; -import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardProducer; -import org.opendaylight.mdsal.dom.spi.shard.ReadableWriteableDOMDataTreeShard; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; - -/** - * Proxy implementation of a shard that creates forwarding producers to the backend shard. - */ -class ShardFrontend implements ReadableWriteableDOMDataTreeShard { - - private final DataStoreClient client; - private final DOMDataTreeIdentifier shardRoot; - - ShardFrontend(final DataStoreClient client, - final DOMDataTreeIdentifier shardRoot) { - this.client = Preconditions.checkNotNull(client); - this.shardRoot = Preconditions.checkNotNull(shardRoot); - } - - @Override - public DOMDataTreeShardProducer createProducer(final Collection paths) { - return new ShardProxyProducer(shardRoot, paths, client); - } - - @Override - public void onChildAttached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) { - // TODO message directly into the shard - } - - @Override - public void onChildDetached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) { - // TODO message directly into the shard - } - - @Nonnull - @Override - public ListenerRegistration registerTreeChangeListener( - final YangInstanceIdentifier treeId, final L listener) { - throw new UnsupportedOperationException("Registering data tree change listener is not supported"); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyProducer.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyProducer.java index f97707635c..3c8db5f23c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyProducer.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyProducer.java @@ -11,26 +11,38 @@ package org.opendaylight.controller.cluster.sharding; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory; import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardProducer; import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction; +import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataTreeShard; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Proxy producer implementation that creates transactions that forward all calls to {@link DataStoreClient}. */ class ShardProxyProducer implements DOMDataTreeShardProducer { + private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataTreeShard.class); + private static final AtomicLong COUNTER = new AtomicLong(); + private final DOMDataTreeIdentifier shardRoot; private final Collection prefixes; - private final DataStoreClient client; + private final ClientLocalHistory history; + private DistributedShardModificationFactory modificationFactory; - ShardProxyProducer(final DOMDataTreeIdentifier shardRoot, final Collection prefixes, - final DataStoreClient client) { + ShardProxyProducer(final DOMDataTreeIdentifier shardRoot, + final Collection prefixes, + final DataStoreClient client, + final DistributedShardModificationFactory modificationFactory) { this.shardRoot = Preconditions.checkNotNull(shardRoot); this.prefixes = ImmutableList.copyOf(Preconditions.checkNotNull(prefixes)); - this.client = Preconditions.checkNotNull(client); + this.modificationFactory = Preconditions.checkNotNull(modificationFactory); + history = Preconditions.checkNotNull(client).createLocalHistory(); } @Nonnull @@ -41,7 +53,16 @@ class ShardProxyProducer implements DOMDataTreeShardProducer { @Override public DOMDataTreeShardWriteTransaction createTransaction() { - return new ShardProxyTransaction(shardRoot, prefixes, client); + return new ShardProxyTransaction(shardRoot, prefixes, + modificationFactory.createModification(history.createTransaction())); + } + + DistributedShardModificationFactory getModificationFactory() { + return modificationFactory; + } + + void setModificationFactory(final DistributedShardModificationFactory modificationFactory) { + this.modificationFactory = Preconditions.checkNotNull(modificationFactory); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyTransaction.java index b3c8dfca33..d08b47ba46 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyTransaction.java @@ -8,18 +8,27 @@ package org.opendaylight.controller.cluster.sharding; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; +import java.util.Map.Entry; +import java.util.stream.Collectors; import javax.annotation.Nonnull; -import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory; import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; -import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction; +import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext; import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.mdsal.dom.store.inmemory.ForeignShardThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,32 +39,43 @@ import org.slf4j.LoggerFactory; class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction { private static final Logger LOG = LoggerFactory.getLogger(ShardProxyTransaction.class); - private static final ListenableFuture NULL_FUTURE = Futures.immediateFuture(null); - private static final ListenableFuture VALIDATE_FUTURE = Futures.immediateFuture(true); private final DOMDataTreeIdentifier shardRoot; private final Collection prefixes; - private final DataStoreClient client; - private final ClientLocalHistory history; + private final DistributedShardModification modification; private ClientTransaction currentTx; - private DOMStoreThreePhaseCommitCohort cohort; + private final List cohorts = new ArrayList<>(); + private DOMDataTreeWriteCursor cursor = null; - ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot, final Collection prefixes, - final DataStoreClient client) { + ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot, + final Collection prefixes, + final DistributedShardModification modification) { this.shardRoot = Preconditions.checkNotNull(shardRoot); this.prefixes = Preconditions.checkNotNull(prefixes); - this.client = Preconditions.checkNotNull(client); - history = client.createLocalHistory(); - currentTx = history.createTransaction(); + this.modification = Preconditions.checkNotNull(modification); + } + + private DOMDataTreeWriteCursor getCursor() { + if (cursor == null) { + cursor = new DistributedShardModificationCursor(modification, this); + } + return cursor; } @Nonnull @Override public DOMDataTreeWriteCursor createCursor(@Nonnull final DOMDataTreeIdentifier prefix) { checkAvailable(prefix); + final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier()); + final DOMDataTreeWriteCursor ret = getCursor(); + ret.enter(relativePath.getPathArguments()); + return ret; + } - return currentTx.openCursor(); + void cursorClosed() { + cursor = null; + modification.cursorClosed(); } private void checkAvailable(final DOMDataTreeIdentifier prefix) { @@ -68,21 +88,31 @@ class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction { + "Available prefixes: " + prefixes); } + private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) { + final Optional relative = + path.relativeTo(modification.getPrefix().getRootIdentifier()); + Preconditions.checkArgument(relative.isPresent()); + return relative.get(); + } + @Override public void ready() { LOG.debug("Readying transaction for shard {}", shardRoot); - Preconditions.checkState(cohort == null, "Transaction was readied already"); - cohort = currentTx.ready(); - currentTx = null; + Preconditions.checkNotNull(modification, "Attempting to ready an empty transaction."); + + cohorts.add(modification.seal()); + for (Entry entry + : modification.getChildShards().entrySet()) { + cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue())); + } } @Override public void close() { - if (cohort != null) { - cohort.abort(); - cohort = null; - } + cohorts.forEach(DOMStoreThreePhaseCommitCohort::abort); + cohorts.clear(); + if (currentTx != null) { currentTx.abort(); currentTx = null; @@ -93,31 +123,86 @@ class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction { public ListenableFuture submit() { LOG.debug("Submitting transaction for shard {}", shardRoot); - Preconditions.checkNotNull(cohort, "Transaction not readied yet"); - return NULL_FUTURE; + Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet"); + + final AsyncFunction validateFunction = input -> prepare(); + final AsyncFunction prepareFunction = input -> commit(); + + // transform validate into prepare + final ListenableFuture prepareFuture = Futures.transform(validate(), validateFunction); + // transform prepare into commit and return as submit result + return Futures.transform(prepareFuture, prepareFunction); } @Override public ListenableFuture validate() { LOG.debug("Validating transaction for shard {}", shardRoot); - Preconditions.checkNotNull(cohort, "Transaction not readied yet"); - return VALIDATE_FUTURE; + Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet"); + final List> futures = + cohorts.stream().map(DOMStoreThreePhaseCommitCohort::canCommit).collect(Collectors.toList()); + final SettableFuture ret = SettableFuture.create(); + + Futures.addCallback(Futures.allAsList(futures), new FutureCallback>() { + @Override + public void onSuccess(final List result) { + ret.set(true); + } + + @Override + public void onFailure(final Throwable throwable) { + ret.setException(throwable); + } + }); + + return ret; } @Override public ListenableFuture prepare() { LOG.debug("Preparing transaction for shard {}", shardRoot); - Preconditions.checkNotNull(cohort, "Transaction not readied yet"); - return NULL_FUTURE; + Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet"); + final List> futures = + cohorts.stream().map(DOMStoreThreePhaseCommitCohort::preCommit).collect(Collectors.toList()); + final SettableFuture ret = SettableFuture.create(); + + Futures.addCallback(Futures.allAsList(futures), new FutureCallback>() { + @Override + public void onSuccess(final List result) { + ret.set(null); + } + + @Override + public void onFailure(final Throwable throwable) { + ret.setException(throwable); + } + }); + + return ret; } @Override public ListenableFuture commit() { LOG.debug("Committing transaction for shard {}", shardRoot); - Preconditions.checkNotNull(cohort, "Transaction not readied yet"); - return NULL_FUTURE; + Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet"); + final List> futures = + cohorts.stream().map(DOMStoreThreePhaseCommitCohort::commit).collect(Collectors.toList()); + final SettableFuture ret = SettableFuture.create(); + + Futures.addCallback(Futures.allAsList(futures), new FutureCallback>() { + @Override + public void onSuccess(final List result) { + ret.set(null); + } + + @Override + public void onFailure(final Throwable throwable) { + ret.setException(throwable); + } + }); + + return ret; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java index c1a099b97c..3c1ae1069e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java @@ -16,6 +16,7 @@ import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status; +import akka.cluster.Cluster; import akka.cluster.ClusterEvent; import akka.cluster.ClusterEvent.MemberExited; import akka.cluster.ClusterEvent.MemberRemoved; @@ -24,22 +25,27 @@ import akka.cluster.ClusterEvent.MemberWeaklyUp; import akka.cluster.ClusterEvent.ReachableMember; import akka.cluster.ClusterEvent.UnreachableMember; import akka.cluster.Member; +import akka.cluster.ddata.DistributedData; +import akka.cluster.ddata.ORMap; +import akka.cluster.ddata.Replicator; +import akka.cluster.ddata.Replicator.Changed; +import akka.cluster.ddata.Replicator.Subscribe; +import akka.cluster.ddata.Replicator.Update; import akka.util.Timeout; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; +import com.google.common.collect.Sets; +import com.google.common.collect.Sets.SetView; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; -import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; -import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; @@ -53,13 +59,9 @@ import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved; import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated; import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved; import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; -import org.opendaylight.mdsal.dom.api.DOMDataTreeService; -import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException; -import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService; import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; import scala.compat.java8.FutureConverters; @@ -73,10 +75,9 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private static final String PERSISTENCE_ID = "sharding-service-actor"; private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS); - private final DOMDataTreeService dataTreeService; - private final DOMDataTreeShardingService shardingService; + private final DistributedShardedDOMDataTree shardingService; private final ActorSystem actorSystem; - private final ClusterWrapper cluster; + private final ClusterWrapper clusterWrapper; // helper actorContext used only for static calls to executeAsync etc // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore private final ActorContext actorContext; @@ -87,20 +88,35 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final Map idToProducer = new HashMap<>(); private final Map idToShardRegistration = new HashMap<>(); + private final Cluster cluster; + private final ActorRef replicator; + + private ORMap currentData = ORMap.create(); + private Map currentConfiguration = new HashMap<>(); + ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) { LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName()); - dataTreeService = builder.getDataTreeService(); shardingService = builder.getShardingService(); actorSystem = builder.getActorSystem(); - cluster = builder.getClusterWrapper(); + clusterWrapper = builder.getClusterWrapper(); distributedConfigDatastore = builder.getDistributedConfigDatastore(); distributedOperDatastore = builder.getDistributedOperDatastore(); actorContext = distributedConfigDatastore.getActorContext(); resolver = new ShardingServiceAddressResolver( - DistributedShardedDOMDataTree.ACTOR_ID, cluster.getCurrentMemberName()); + DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName()); + + clusterWrapper.subscribeToMemberEvents(self()); + cluster = Cluster.get(actorSystem); + + replicator = DistributedData.get(context().system()).replicator(); + } - cluster.subscribeToMemberEvents(self()); + @Override + public void preStart() { + final Subscribe> subscribe = + new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self()); + replicator.tell(subscribe, noSender()); } @Override @@ -110,6 +126,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { @Override protected void handleCommand(final Object message) throws Exception { + LOG.debug("Received {}", message); if (message instanceof ClusterEvent.MemberUp) { memberUp((ClusterEvent.MemberUp) message); } else if (message instanceof ClusterEvent.MemberWeaklyUp) { @@ -122,6 +139,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { memberUnreachable((ClusterEvent.UnreachableMember) message); } else if (message instanceof ClusterEvent.ReachableMember) { memberReachable((ClusterEvent.ReachableMember) message); + } else if (message instanceof Changed) { + onConfigChanged((Changed) message); } else if (message instanceof ProducerCreated) { onProducerCreated((ProducerCreated) message); } else if (message instanceof NotifyProducerCreated) { @@ -141,6 +160,42 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } } + private void onConfigChanged(final Changed> change) { + LOG.debug("member : {}, Received configuration changed: {}", clusterWrapper.getCurrentMemberName(), change); + + currentData = change.dataValue(); + final Map changedConfig = change.dataValue().getEntries(); + + LOG.debug("Changed set {}", changedConfig); + + try { + final Map newConfig = + changedConfig.values().stream().collect( + Collectors.toMap(PrefixShardConfiguration::getPrefix, Function.identity())); + resolveConfig(newConfig); + } catch (final IllegalStateException e) { + LOG.error("Failed, ", e); + } + + } + + private void resolveConfig(final Map newConfig) { + + // get the removed configurations + final SetView deleted = + Sets.difference(currentConfiguration.keySet(), newConfig.keySet()); + shardingService.resolveShardRemovals(deleted); + + // get the added configurations + final SetView additions = + Sets.difference(newConfig.keySet(), currentConfiguration.keySet()); + shardingService.resolveShardAdditions(additions); + // we can ignore those that existed previously since the potential changes in replicas will be handled by + // shard manager. + + currentConfiguration = new HashMap<>(newConfig); + } + @Override public String persistenceId() { return PERSISTENCE_ID; @@ -198,6 +253,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private void onProducerCreated(final ProducerCreated message) { LOG.debug("Received ProducerCreated: {}", message); + + // fastpath if no replication is needed, since there is only one node + if (resolver.getShardingServicePeerActorAddresses().size() == 1) { + getSender().tell(new Status.Success(null), noSender()); + } + final ActorRef sender = getSender(); final Collection subtrees = message.getSubtrees(); @@ -216,18 +277,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { futures.toArray(new CompletableFuture[futures.size()])); combinedFuture.thenRun(() -> { - for (final CompletableFuture future : futures) { - try { - final Object result = future.get(); - if (result instanceof Status.Failure) { - sender.tell(result, self()); - return; - } - } catch (InterruptedException | ExecutionException e) { - sender.tell(new Status.Failure(e), self()); - return; - } - } sender.tell(new Status.Success(null), noSender()); }).exceptionally(throwable -> { sender.tell(new Status.Failure(throwable), self()); @@ -242,7 +291,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { try { final ActorProducerRegistration registration = - new ActorProducerRegistration(dataTreeService.createProducer(subtrees), subtrees); + new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees); subtrees.forEach(id -> idToProducer.put(id, registration)); sender().tell(new Status.Success(null), self()); } catch (final IllegalArgumentException e) { @@ -298,59 +347,19 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { @SuppressWarnings("checkstyle:IllegalCatch") private void onCreatePrefixShard(final CreatePrefixShard message) { - LOG.debug("Received CreatePrefixShard: {}", message); + LOG.debug("Member: {}, Received CreatePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message); final PrefixShardConfiguration configuration = message.getConfiguration(); - final DOMDataTreeProducer producer = - dataTreeService.createProducer(Collections.singleton(configuration.getPrefix())); - - final DistributedDataStore distributedDataStore = - configuration.getPrefix().getDatastoreType() == LogicalDatastoreType.CONFIGURATION - ? distributedConfigDatastore : distributedOperDatastore; - final String shardName = ClusterUtils.getCleanShardName(configuration.getPrefix().getRootIdentifier()); - LOG.debug("Creating distributed datastore client for shard {}", shardName); - final Props distributedDataStoreClientProps = - SimpleDataStoreClientActor.props(cluster.getCurrentMemberName(), - "Shard-" + shardName, distributedDataStore.getActorContext(), shardName); - - final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps); - final DataStoreClient client; - try { - client = SimpleDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS); - } catch (final Exception e) { - LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e); - clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - throw Throwables.propagate(e); - } - - try { - final ListenerRegistration shardFrontendRegistration = - shardingService.registerDataTreeShard(configuration.getPrefix(), - new ShardFrontend( - client, - configuration.getPrefix() - ), - producer); - idToShardRegistration.put(configuration.getPrefix(), - new ShardFrontendRegistration(clientActor, shardFrontendRegistration)); + final Update> update = + new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(), + map -> map.put(cluster, configuration.toDataMapKey(), configuration)); - sender().tell(new Status.Success(null), self()); - } catch (final DOMDataTreeShardingConflictException e) { - LOG.error("Unable to create shard", e); - clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - sender().tell(new Status.Failure(e), self()); - } finally { - try { - producer.close(); - } catch (final DOMDataTreeProducerException e) { - LOG.error("Unable to close producer that was used for shard registration {}", producer, e); - } - } + replicator.tell(update, self()); } private void onPrefixShardCreated(final PrefixShardCreated message) { - LOG.debug("Received PrefixShardCreated: {}", message); + LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message); final Collection addresses = resolver.getShardingServicePeerActorAddresses(); final ActorRef sender = getSender(); @@ -367,18 +376,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); combinedFuture.thenRun(() -> { - for (final CompletableFuture future : futures) { - try { - final Object result = future.get(); - if (result instanceof Status.Failure) { - sender.tell(result, self()); - return; - } - } catch (InterruptedException | ExecutionException e) { - sender.tell(new Status.Failure(e), self()); - return; - } - } sender.tell(new Status.Success(null), self()); }).exceptionally(throwable -> { sender.tell(new Status.Failure(throwable), self()); @@ -387,12 +384,13 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } private void onRemovePrefixShard(final RemovePrefixShard message) { - LOG.debug("Received RemovePrefixShard: {}", message); + LOG.debug("Member: {}, Received RemovePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message); - for (final String address : resolver.getShardingServicePeerActorAddresses()) { - final ActorSelection selection = actorContext.actorSelection(address); - selection.tell(new PrefixShardRemoved(message.getPrefix()), getSelf()); - } + //TODO the removal message should have the configuration or some other way to get to the key + final Update> removal = + new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(), + map -> map.remove(cluster, "prefix=" + message.getPrefix())); + replicator.tell(removal, self()); } private void onPrefixShardRemoved(final PrefixShardRemoved message) { @@ -431,13 +429,13 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } private static class ShardFrontendRegistration extends - AbstractObjectRegistration> { + AbstractObjectRegistration> { private final ActorRef clientActor; - private final ListenerRegistration shardRegistration; + private final ListenerRegistration shardRegistration; ShardFrontendRegistration(final ActorRef clientActor, - final ListenerRegistration shardRegistration) { + final ListenerRegistration shardRegistration) { super(shardRegistration); this.clientActor = clientActor; this.shardRegistration = shardRegistration; @@ -452,27 +450,17 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { public static class ShardedDataTreeActorCreator { - private DOMDataTreeService dataTreeService; - private DOMDataTreeShardingService shardingService; + private DistributedShardedDOMDataTree shardingService; private DistributedDataStore distributedConfigDatastore; private DistributedDataStore distributedOperDatastore; private ActorSystem actorSystem; private ClusterWrapper cluster; - public DOMDataTreeService getDataTreeService() { - return dataTreeService; - } - - public ShardedDataTreeActorCreator setDataTreeService(final DOMDataTreeService dataTreeService) { - this.dataTreeService = dataTreeService; - return this; - } - - public DOMDataTreeShardingService getShardingService() { + public DistributedShardedDOMDataTree getShardingService() { return shardingService; } - public ShardedDataTreeActorCreator setShardingService(final DOMDataTreeShardingService shardingService) { + public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) { this.shardingService = shardingService; return this; } @@ -516,7 +504,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } private void verify() { - Preconditions.checkNotNull(dataTreeService); Preconditions.checkNotNull(shardingService); Preconditions.checkNotNull(actorSystem); Preconditions.checkNotNull(cluster); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/CreatePrefixShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/CreatePrefixShard.java index 20c0ea928d..b9bf7915ec 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/CreatePrefixShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/CreatePrefixShard.java @@ -32,4 +32,13 @@ public class CreatePrefixShard implements Serializable { public PrefixShardConfiguration getConfiguration() { return configuration; } + + + @Override + public String toString() { + return "CreatePrefixShard{" + + "configuration=" + + configuration + + '}'; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardCreated.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardCreated.java index f7113ab3eb..d468992aee 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardCreated.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardCreated.java @@ -31,4 +31,11 @@ public class PrefixShardCreated { public PrefixShardConfiguration getConfiguration() { return configuration; } + + @Override + public String toString() { + return "PrefixShardCreated{" + + "configuration=" + configuration + + '}'; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardRemoved.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardRemoved.java index 5f840711b3..1890b647db 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardRemoved.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardRemoved.java @@ -14,8 +14,9 @@ import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; /** - * Message sent to remote {@link ShardedDataTreeActor}'s when there is an - * attempt to remove a shard from the sharding service. + * Message sent to remote {@link ShardedDataTreeActor}'s when there is an attempt to remove the shard, + * the ShardedDataTreeActor should remove the shard from the current configuration so that the change is picked up + * in the backend ShardManager. */ @Beta public class PrefixShardRemoved implements Serializable { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/RemovePrefixShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/RemovePrefixShard.java index fa74af62a2..6de1bb0eba 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/RemovePrefixShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/RemovePrefixShard.java @@ -13,8 +13,9 @@ import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; /** - * Sent to the local {@link ShardedDataTreeActor} to notify of a shard removal on the local node. The local actor - * should then notify the remote nodes of the Removal with {@link PrefixShardRemoved} message. + * Sent to the local {@link ShardedDataTreeActor} to notify of a shard removal on the local node. + * The local actor should update the configuration so that the change is picked up by other CDS Node Agents and + * backend ShardManagers. */ public class RemovePrefixShard { @@ -28,4 +29,11 @@ public class RemovePrefixShard { public DOMDataTreeIdentifier getPrefix() { return prefix; } + + @Override + public String toString() { + return "RemovePrefixShard{" + + "prefix=" + prefix + + '}'; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractClusterRefActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractClusterRefActorTest.java new file mode 100644 index 0000000000..f47ff18027 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractClusterRefActorTest.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import com.typesafe.config.ConfigFactory; +import java.io.IOException; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +public abstract class AbstractClusterRefActorTest extends AbstractTest { + private static ActorSystem system; + + @BeforeClass + public static void setUpClass() throws IOException { + System.setProperty("shard.persistent", "false"); + system = ActorSystem.create("test", ConfigFactory.load().getConfig("test")); + } + + @AfterClass + public static void tearDownClass() throws IOException { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + protected static ActorSystem getSystem() { + return system; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java index 15e6b03645..80cc2e7c75 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java @@ -32,7 +32,7 @@ import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; -public class AbstractShardManagerTest extends AbstractActorTest { +public class AbstractShardManagerTest extends AbstractClusterRefActorTest { protected static final MemberName MEMBER_1 = MemberName.forName("member-1"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java index ade30223e9..1b9ea169b8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java @@ -76,6 +76,7 @@ import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -97,8 +98,28 @@ public abstract class AbstractTransactionProxyTest extends AbstractTest { private final Configuration configuration = new MockConfiguration() { Map strategyMap = ImmutableMap.builder().put( - "junk", path -> "junk").put( - "cars", path -> "cars").build(); + "junk", new ShardStrategy() { + @Override + public String findShard(YangInstanceIdentifier path) { + return "junk"; + } + + @Override + public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) { + return YangInstanceIdentifier.EMPTY; + } + }).put( + "cars", new ShardStrategy() { + @Override + public String findShard(YangInstanceIdentifier path) { + return "cars"; + } + + @Override + public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) { + return YangInstanceIdentifier.EMPTY; + } + }).build(); @Override public ShardStrategy getStrategyForModule(String moduleName) { @@ -157,7 +178,8 @@ public abstract class AbstractTransactionProxyTest extends AbstractTest { doReturn(getSystem()).when(mockActorContext).getActorSystem(); doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher(); doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName(); - doReturn(new ShardStrategyFactory(configuration)).when(mockActorContext).getShardStrategyFactory(); + doReturn(new ShardStrategyFactory(configuration, + LogicalDatastoreType.CONFIGURATION)).when(mockActorContext).getShardStrategyFactory(); doReturn(schemaContext).when(mockActorContext).getSchemaContext(); doReturn(new Timeout(operationTimeoutInSeconds, TimeUnit.SECONDS)).when(mockActorContext).getOperationTimeout(); doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreSnapshotRestoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreSnapshotRestoreTest.java index 67af9bb348..96c45f3e14 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreSnapshotRestoreTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreSnapshotRestoreTest.java @@ -126,7 +126,7 @@ public class DatastoreSnapshotRestoreTest { } private static ShardManagerSnapshot newShardManagerSnapshot(String... shards) { - return new ShardManagerSnapshot(Arrays.asList(shards)); + return new ShardManagerSnapshot(Arrays.asList(shards), Collections.emptyMap()); } private static Snapshot newSnapshot(YangInstanceIdentifier path, NormalizedNode node) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java index 356026019f..28c4fa38bb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java @@ -41,12 +41,16 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; public class IntegrationTestKit extends ShardTestKit { + private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestKit.class); + protected DatastoreContext.Builder datastoreContextBuilder; protected DatastoreSnapshot restoreFromSnapshot; @@ -171,6 +175,19 @@ public class IntegrationTestKit extends ShardTestKit { return shard; } + public static void waitUntilShardIsDown(ActorContext actorContext, String shardName) { + for (int i = 0; i < 20 * 5 ; i++) { + LOG.debug("Waiting for shard down {}", shardName); + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + Optional shardReply = actorContext.findLocalShard(shardName); + if (!shardReply.isPresent()) { + return; + } + } + + throw new IllegalStateException("Shard[" + shardName + " did not shutdown in time"); + } + public static void verifyShardStats(final AbstractDataStore datastore, final String shardName, final ShardStatsVerifier verifier) throws Exception { ActorContext actorContext = datastore.getActorContext(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/PrefixShardCreationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/PrefixShardCreationTest.java index c87b2f4369..289c6b72fa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/PrefixShardCreationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/PrefixShardCreationTest.java @@ -9,33 +9,20 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.AddressFromURIString; import akka.actor.Status.Success; -import akka.cluster.Cluster; -import akka.dispatch.Dispatchers; import akka.testkit.JavaTestKit; -import akka.testkit.TestActorRef; -import com.google.common.collect.Lists; -import com.typesafe.config.ConfigFactory; import java.util.Collections; import java.util.concurrent.TimeUnit; import org.junit.Test; -import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.Shard.Builder; import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider; import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; -import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; -import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; -import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; -import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerTest.TestShardManager; import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; @@ -55,8 +42,6 @@ public class PrefixShardCreationTest extends AbstractShardManagerTest { private static final DOMDataTreeIdentifier TEST_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH); - private static final MemberName MEMBER_2 = MemberName.forName("member-2"); - @Test public void testPrefixShardCreation() throws Exception { @@ -92,98 +77,4 @@ public class PrefixShardCreationTest extends AbstractShardManagerTest { } }; } - - @Test - public void testPrefixShardReplicas() throws Exception { - LOG.info("testPrefixShardReplicas starting"); - final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); - - // Create ACtorSystem for member-1 - final ActorSystem system1 = newActorSystem("Member1"); - Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558")); - - final TestActorRef shardManager1 = TestActorRef.create(system1, - newTestShardMgrBuilder(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) - .waitTillReadyCountDownLatch(ready) - .cluster(new ClusterWrapperImpl(system1)) - .props().withDispatcher(Dispatchers.DefaultDispatcherId()), - shardManagerID); - - // Create an ActorSystem ShardManager actor for member-2. - - final ActorSystem system2 = newActorSystem("Member2"); - - Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558")); - - final TestActorRef shardManager2 = TestActorRef.create(system2, - newTestShardMgrBuilder() - .configuration(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) - .waitTillReadyCountDownLatch(ready) - .cluster(new ClusterWrapperImpl(system2)).props().withDispatcher( - Dispatchers.DefaultDispatcherId()), - shardManagerID); - - final JavaTestKit kit2 = new JavaTestKit(system2); - - new JavaTestKit(system1) { - { - shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - - // check shard does not exist - shardManager1.tell(new FindLocalShard( - ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef()); - expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); - - shardManager2.tell(new FindLocalShard( - ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef()); - kit2.expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); - - // create shard on node1 - final Builder builder = Shard.builder(); - - final CreatePrefixedShard createPrefixedShard = new CreatePrefixedShard( - new PrefixShardConfiguration(TEST_ID, - PrefixShardStrategy.NAME, - Lists.newArrayList(MEMBER_1, MEMBER_2)), - datastoreContextBuilder.build(), builder); - - shardManager1.tell(createPrefixedShard, getRef()); - expectMsgClass(duration("5 seconds"), Success.class); - - shardManager1.underlyingActor().waitForMemberUp(); - - LOG.info("changed leader state"); - - // check node2 cannot find it locally - shardManager1.tell(new FindLocalShard( - ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef()); - expectMsgClass(duration("5 seconds"), LocalShardFound.class); - - shardManager2.tell(new FindLocalShard( - ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef()); - kit2.expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); - - // but can remotely - shardManager2.tell(new FindPrimary( - ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef()); - kit2.expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); - - // add replica and verify if succesful - shardManager2.tell(new AddPrefixShardReplica(TEST_ID.getRootIdentifier()), kit2.getRef()); - kit2.expectMsgClass(duration("5 seconds"), Success.class); - - // verify we have a replica on manager2 now - shardManager2.tell(new FindLocalShard( - ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef()); - kit2.expectMsgClass(duration("5 seconds"), LocalShardFound.class); - } - }; - } - - private ActorSystem newActorSystem(final String config) { - final ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config)); - actorSystems.add(system); - return system; - } -} +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 6cfde54af7..d484f99696 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -622,7 +622,6 @@ public class ShardTest extends AbstractShardTest { final ReadyTransactionReply readyReply = ReadyTransactionReply .fromSerializable(expectMsgClass(duration, ReadyTransactionReply.class)); assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); - // Send the CanCommitTransaction message for the first Tx. shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractClusterRefEntityOwnershipTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractClusterRefEntityOwnershipTest.java new file mode 100644 index 0000000000..13541358ad --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractClusterRefEntityOwnershipTest.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.entityownership; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import com.typesafe.config.ConfigFactory; +import java.io.IOException; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +public class AbstractClusterRefEntityOwnershipTest extends AbstractEntityOwnershipTest { + + private static ActorSystem system; + + @BeforeClass + public static void setUpClass() throws IOException { + system = ActorSystem.create("test", ConfigFactory.load().getConfig("test")); + } + + @AfterClass + public static void tearDownClass() throws IOException { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + protected static ActorSystem getSystem() { + return system; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java index aac4b23d3a..83056bae3a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java @@ -74,7 +74,7 @@ import scala.concurrent.duration.Duration; * * @author Thomas Pantelis */ -public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnershipTest { +public class DistributedEntityOwnershipServiceTest extends AbstractClusterRefEntityOwnershipTest { static final String ENTITY_TYPE = "test"; static final String ENTITY_TYPE2 = "test2"; static final QName QNAME = QName.create("test", "2015-08-11", "foo"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DatastoreSnapshotListTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DatastoreSnapshotListTest.java index 8c8e329e14..f344815a87 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DatastoreSnapshotListTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DatastoreSnapshotListTest.java @@ -76,7 +76,7 @@ public class DatastoreSnapshotListTest { assertEquals("DatastoreSnapshotList size", 2, cloned.size()); assertDatastoreSnapshotEquals(legacyConfigSnapshot, cloned.get(0), new org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot( - legacyShardManagerSnapshot.getShardList()), + legacyShardManagerSnapshot.getShardList(), Collections.emptyMap()), Optional.of(legacyConfigRoot1), Optional.of(legacyConfigRoot2)); assertDatastoreSnapshotEquals(legacyOperSnapshot, cloned.get(1), (org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot)null, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardManagerSnapshotTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardManagerSnapshotTest.java index 7e62963b64..26d08e6c5c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardManagerSnapshotTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardManagerSnapshotTest.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.persisted; import static org.junit.Assert.assertEquals; import java.util.Arrays; +import java.util.Collections; import org.apache.commons.lang.SerializationUtils; import org.junit.Test; @@ -22,7 +23,8 @@ public class ShardManagerSnapshotTest { @Test public void testSerialization() { - ShardManagerSnapshot expected = new ShardManagerSnapshot(Arrays.asList("shard1", "shard2")); + ShardManagerSnapshot expected = + new ShardManagerSnapshot(Arrays.asList("shard1", "shard2"), Collections.emptyMap()); ShardManagerSnapshot cloned = (ShardManagerSnapshot) SerializationUtils.clone(expected); assertEquals("getShardList", expected.getShardList(), cloned.getShardList()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerGetSnapshotReplyActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerGetSnapshotReplyActorTest.java index 76af089b4d..c50318418e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerGetSnapshotReplyActorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerGetSnapshotReplyActorTest.java @@ -45,7 +45,7 @@ public class ShardManagerGetSnapshotReplyActorTest extends AbstractActorTest { JavaTestKit kit = new JavaTestKit(getSystem()); List shardList = Arrays.asList("shard1", "shard2", "shard3"); - ShardManagerSnapshot shardManagerSnapshot = new ShardManagerSnapshot(shardList); + ShardManagerSnapshot shardManagerSnapshot = new ShardManagerSnapshot(shardList, Collections.emptyMap()); ActorRef replyActor = getSystem().actorOf(ShardManagerGetSnapshotReplyActor.props( shardList, "config", shardManagerSnapshot, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.SECONDS)), "testSuccess"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java index ffa264141c..c6cc64119a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java @@ -1392,7 +1392,8 @@ public class ShardManagerTest extends AbstractShardManagerTest { .put("shard1", Collections.emptyList()).put("shard2", Collections.emptyList()) .put("astronauts", Collections.emptyList()).build()); - ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts")); + ShardManagerSnapshot snapshot = + new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"), Collections.emptyMap()); DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot, Collections.emptyList()); TestActorRef shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig) @@ -1492,7 +1493,8 @@ public class ShardManagerTest extends AbstractShardManagerTest { // Have a dummy snapshot to be overwritten by the new data // persisted. String[] restoredShards = { "default", "people" }; - ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards)); + ShardManagerSnapshot snapshot = + new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap()); InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot); Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS); @@ -1947,12 +1949,14 @@ public class ShardManagerTest extends AbstractShardManagerTest { LOG.info("testShardPersistenceWithRestoredData starting"); new JavaTestKit(getSystem()) { { - MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder() - .put("default", Arrays.asList("member-1", "member-2")) - .put("astronauts", Arrays.asList("member-2")) - .put("people", Arrays.asList("member-1", "member-2")).build()); - String[] restoredShards = { "default", "astronauts" }; - ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards)); + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder() + .put("default", Arrays.asList("member-1", "member-2")) + .put("astronauts", Arrays.asList("member-2")) + .put("people", Arrays.asList("member-1", "member-2")).build()); + String[] restoredShards = {"default", "astronauts"}; + ShardManagerSnapshot snapshot = + new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap()); InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot); // create shardManager to come up with restored data diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactoryTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactoryTest.java index 76cb7334fa..e7b70e8c1e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactoryTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactoryTest.java @@ -18,6 +18,7 @@ import org.junit.rules.ExpectedException; import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; public class ShardStrategyFactoryTest { @@ -29,7 +30,8 @@ public class ShardStrategyFactoryTest { @Before public void setUp() { - factory = new ShardStrategyFactory(new ConfigurationImpl("module-shards.conf", "modules.conf")); + factory = new ShardStrategyFactory( + new ConfigurationImpl("module-shards.conf", "modules.conf"), LogicalDatastoreType.CONFIGURATION); } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java index 3a4c38346f..ec64075fa5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java @@ -19,7 +19,7 @@ import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; import org.opendaylight.controller.cluster.datastore.config.ModuleConfig; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; public class MockConfiguration extends ConfigurationImpl { public MockConfiguration() { @@ -41,7 +41,7 @@ public class MockConfiguration extends ConfigurationImpl { } @Override - public ShardStrategy getStrategyForPrefix(@Nonnull final YangInstanceIdentifier prefix) { + public ShardStrategy getStrategyForPrefix(@Nonnull final DOMDataTreeIdentifier prefix) { return null; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontendTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontendTest.java new file mode 100644 index 0000000000..febc929db5 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontendTest.java @@ -0,0 +1,212 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.sharding; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import java.util.Collections; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; +import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; +import org.opendaylight.controller.cluster.datastore.DistributedDataStore; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; +import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; + +public class DistributedShardFrontendTest { + + private static final DOMDataTreeIdentifier ROOT = + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY); + private static final ListenableFuture SUCCESS_FUTURE = Futures.immediateFuture(null); + + private ShardedDOMDataTree shardedDOMDataTree; + + private DataStoreClient client; + private ClientLocalHistory clientHistory; + private ClientTransaction clientTransaction; + private DOMDataTreeWriteCursor cursor; + + private static final YangInstanceIdentifier OUTER_LIST_YID = TestModel.OUTER_LIST_PATH.node( + new NodeIdentifierWithPredicates(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); + private static final DOMDataTreeIdentifier OUTER_LIST_ID = + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, OUTER_LIST_YID); + + @Captor + private ArgumentCaptor pathArgumentCaptor; + @Captor + private ArgumentCaptor> nodeCaptor; + + private DOMStoreThreePhaseCommitCohort commitCohort; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + shardedDOMDataTree = new ShardedDOMDataTree(); + client = mock(DataStoreClient.class); + cursor = mock(DOMDataTreeWriteCursor.class); + clientTransaction = mock(ClientTransaction.class); + clientHistory = mock(ClientLocalHistory.class); + commitCohort = mock(DOMStoreThreePhaseCommitCohort.class); + + doReturn(SUCCESS_FUTURE).when(commitCohort).canCommit(); + doReturn(SUCCESS_FUTURE).when(commitCohort).preCommit(); + doReturn(SUCCESS_FUTURE).when(commitCohort).commit(); + doReturn(SUCCESS_FUTURE).when(commitCohort).abort(); + + doReturn(clientTransaction).when(client).createTransaction(); + doReturn(clientTransaction).when(clientHistory).createTransaction(); + doNothing().when(clientHistory).close(); + + doNothing().when(client).close(); + doReturn(clientHistory).when(client).createLocalHistory(); + + doReturn(cursor).when(clientTransaction).openCursor(); + doNothing().when(cursor).close(); + doNothing().when(cursor).write(any(), any()); + doNothing().when(cursor).merge(any(), any()); + doNothing().when(cursor).delete(any()); + + doReturn(commitCohort).when(clientTransaction).ready(); + } + + @Test + public void testClientTransaction() throws Exception { + + final DistributedDataStore distributedDataStore = mock(DistributedDataStore.class); + final DistributedShardFrontend rootShard = new DistributedShardFrontend(distributedDataStore, client, ROOT); + + try (final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(Collections.singletonList(ROOT))) { + shardedDOMDataTree.registerDataTreeShard(ROOT, rootShard, producer); + } + + final DataStoreClient outerListClient = mock(DataStoreClient.class); + final ClientTransaction outerListClientTransaction = mock(ClientTransaction.class); + final ClientLocalHistory outerListClientHistory = mock(ClientLocalHistory.class); + final DOMDataTreeWriteCursor outerListCursor = mock(DOMDataTreeWriteCursor.class); + + doNothing().when(outerListCursor).close(); + doNothing().when(outerListCursor).write(any(), any()); + doNothing().when(outerListCursor).merge(any(), any()); + doNothing().when(outerListCursor).delete(any()); + + doReturn(outerListCursor).when(outerListClientTransaction).openCursor(); + doReturn(outerListClientTransaction).when(outerListClient).createTransaction(); + doReturn(outerListClientHistory).when(outerListClient).createLocalHistory(); + doReturn(outerListClientTransaction).when(outerListClientHistory).createTransaction(); + + doReturn(commitCohort).when(outerListClientTransaction).ready(); + + doNothing().when(outerListClientHistory).close(); + doNothing().when(outerListClient).close(); + + final DistributedShardFrontend outerListShard = new DistributedShardFrontend( + distributedDataStore, outerListClient, OUTER_LIST_ID); + try (final DOMDataTreeProducer producer = + shardedDOMDataTree.createProducer(Collections.singletonList(OUTER_LIST_ID))) { + shardedDOMDataTree.registerDataTreeShard(OUTER_LIST_ID, outerListShard, producer); + } + + final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(Collections.singletonList(ROOT)); + final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(false); + final DOMDataTreeWriteCursor cursor = tx.createCursor(ROOT); + + assertNotNull(cursor); + cursor.write(TestModel.TEST_PATH.getLastPathArgument(), createCrossShardContainer()); + + //check the lower shard got the correct modification + verify(outerListCursor, times(2)).write(pathArgumentCaptor.capture(), nodeCaptor.capture()); + + final YangInstanceIdentifier.PathArgument expectedYid = new NodeIdentifier(TestModel.ID_QNAME); + final YangInstanceIdentifier.PathArgument actualIdYid = pathArgumentCaptor.getAllValues().get(0); + assertEquals(expectedYid, actualIdYid); + + final YangInstanceIdentifier.PathArgument expectedInnerYid = new NodeIdentifier(TestModel.INNER_LIST_QNAME); + final YangInstanceIdentifier.PathArgument actualInnerListYid = pathArgumentCaptor.getAllValues().get(1); + assertEquals(expectedInnerYid, actualInnerListYid); + + final LeafNode actualIdNode = (LeafNode) nodeCaptor.getAllValues().get(0); + assertEquals(ImmutableNodes.leafNode(TestModel.ID_QNAME, 1), actualIdNode); + + final MapNode actualInnerListNode = (MapNode) nodeCaptor.getAllValues().get(1); + assertEquals(createInnerMapNode(1), actualInnerListNode); + + cursor.close(); + tx.submit().checkedGet(); + + verify(commitCohort, times(2)).canCommit(); + verify(commitCohort, times(2)).preCommit(); + verify(commitCohort, times(2)).commit(); + + } + + private static MapNode createInnerMapNode(final int id) { + final MapEntryNode listEntry = ImmutableNodes + .mapEntryBuilder(TestModel.INNER_LIST_QNAME, TestModel.NAME_QNAME, "name-" + id) + .withChild(ImmutableNodes.leafNode(TestModel.NAME_QNAME, "name-" + id)) + .withChild(ImmutableNodes.leafNode(TestModel.VALUE_QNAME, "value-" + id)) + .build(); + + return ImmutableNodes.mapNodeBuilder(TestModel.INNER_LIST_QNAME).withChild(listEntry).build(); + } + + private static ContainerNode createCrossShardContainer() { + + final MapEntryNode outerListEntry1 = + ImmutableNodes.mapEntryBuilder(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1) + .withChild(createInnerMapNode(1)) + .build(); + final MapEntryNode outerListEntry2 = + ImmutableNodes.mapEntryBuilder(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2) + .withChild(createInnerMapNode(2)) + .build(); + + final MapNode outerList = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME) + .withChild(outerListEntry1) + .withChild(outerListEntry2) + .build(); + + final ContainerNode testContainer = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME)) + .withChild(outerList) + .build(); + + return testContainer; + } + + +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java new file mode 100644 index 0000000000..eb109f1e0a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java @@ -0,0 +1,369 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.sharding; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard; +import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.actor.AddressFromURIString; +import akka.actor.PoisonPill; +import akka.cluster.Cluster; +import akka.cluster.ddata.DistributedData; +import akka.testkit.JavaTestKit; +import com.google.common.collect.Lists; +import com.typesafe.config.ConfigFactory; +import java.util.Collections; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.AbstractTest; +import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; +import org.opendaylight.controller.cluster.datastore.DistributedDataStore; +import org.opendaylight.controller.cluster.datastore.IntegrationTestKit; +import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; +import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; +import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; +import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; +import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; +import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration; +import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; +import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Ignore("Needs to have the configuration backend switched from distributed-data") +public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { + + private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class); + + private static final Address MEMBER_1_ADDRESS = + AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"); + + private static final DOMDataTreeIdentifier TEST_ID = + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH); + + private ActorSystem leaderSystem; + private ActorSystem followerSystem; + + + private final Builder leaderDatastoreContextBuilder = + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2); + + private final DatastoreContext.Builder followerDatastoreContextBuilder = + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5) + .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); + + private DistributedDataStore followerDistributedDataStore; + private DistributedDataStore leaderDistributedDataStore; + private IntegrationTestKit followerTestKit; + private IntegrationTestKit leaderTestKit; + + private DistributedShardedDOMDataTree leaderShardFactory; + private DistributedShardedDOMDataTree followerShardFactory; + + @Before + public void setUp() { + + leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS); + + followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); + Cluster.get(followerSystem).join(MEMBER_1_ADDRESS); + + } + + @After + public void tearDown() { + if (followerDistributedDataStore != null) { + followerDistributedDataStore.close(); + } + if (leaderDistributedDataStore != null) { + leaderDistributedDataStore.close(); + } + + DistributedData.get(leaderSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender()); + DistributedData.get(followerSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender()); + + JavaTestKit.shutdownActorSystem(leaderSystem); + JavaTestKit.shutdownActorSystem(followerSystem); + } + + private void initEmptyDatastores(final String type) { + leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder); + + leaderDistributedDataStore = + leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full()); + + followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder); + followerDistributedDataStore = + followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full()); + + leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystem, + leaderDistributedDataStore, + leaderDistributedDataStore); + + followerShardFactory = new DistributedShardedDOMDataTree(followerSystem, + followerDistributedDataStore, + followerDistributedDataStore); + + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY)); + } + + @Test + @Ignore("Needs different shard creation handling due to replicas") + public void testProducerRegistrations() throws Exception { + initEmptyDatastores("config"); + + leaderTestKit.waitForMembersUp("member-2"); + + final DistributedShardRegistration shardRegistration = + leaderShardFactory.createDistributedShard( + TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); + + final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager(); + + assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()))); + + assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()))); + + final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID)); + try { + followerShardFactory.createProducer(Collections.singleton(TEST_ID)); + fail("Producer should be already registered on the other node"); + } catch (final IllegalArgumentException e) { + assertTrue(e.getMessage().contains("is attached to producer")); + } + + producer.close(); + + final DOMDataTreeProducer followerProducer = + followerShardFactory.createProducer(Collections.singleton(TEST_ID)); + try { + leaderShardFactory.createProducer(Collections.singleton(TEST_ID)); + fail("Producer should be already registered on the other node"); + } catch (final IllegalArgumentException e) { + assertTrue(e.getMessage().contains("is attached to producer")); + } + + followerProducer.close(); + // try to create a shard on an already registered prefix on follower + try { + followerShardFactory.createDistributedShard(TEST_ID, + Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + fail("This prefix already should have a shard registration that was forwarded from the other node"); + } catch (final DOMDataTreeShardingConflictException e) { + assertTrue(e.getMessage().contains("is already occupied by shard")); + } + } + + @Test + @Ignore("Needs different shard creation handling due to replicas") + public void testWriteIntoMultipleShards() throws Exception { + initEmptyDatastores("config"); + + leaderTestKit.waitForMembersUp("member-2"); + + LOG.warn("registering first shard"); + final DistributedShardRegistration shardRegistration = + leaderShardFactory.createDistributedShard(TEST_ID, + Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); + findLocalShard(followerDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); + + LOG.warn("Got after waiting for nonleader"); + final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager(); + + new JavaTestKit(leaderSystem) { + { + leaderShardManager.tell( + new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef()); + expectMsgClass(duration("5 seconds"), LocalShardFound.class); + + final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager(); + + followerShardManager.tell(new FindLocalShard( + ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef()); + followerTestKit.expectMsgClass(duration("5 seconds"), LocalShardFound.class); + LOG.warn("Found follower shard"); + + leaderDistributedDataStore.getActorContext().getShardManager().tell( + new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef()); + expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); + } + }; + + final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID)); + + final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true); + final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID); + Assert.assertNotNull(cursor); + final YangInstanceIdentifier nameId = + YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build(); + cursor.write(nameId.getLastPathArgument(), + ImmutableLeafNodeBuilder.create().withNodeIdentifier( + new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build()); + + cursor.close(); + LOG.warn("Got to pre submit"); + + tx.submit(); + } + + @Test + public void testMultipleShardRegistrations() throws Exception { + initEmptyDatastores("config"); + + final DistributedShardRegistration reg1 = leaderShardFactory + .createDistributedShard(TEST_ID, + Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + + final DistributedShardRegistration reg2 = leaderShardFactory + .createDistributedShard( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH), + Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + + final DistributedShardRegistration reg3 = leaderShardFactory + .createDistributedShard( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH), + Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + + final DistributedShardRegistration reg4 = leaderShardFactory + .createDistributedShard( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH), + Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)); + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)); + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)); + + // check leader has local shards + assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); + + assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH))); + + assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH))); + + assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.JUNK_PATH))); + + // check follower has local shards + assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); + + assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH))); + + assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH))); + + assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.JUNK_PATH))); + + + LOG.debug("Closing registrations"); + + reg1.close(); + reg2.close(); + reg3.close(); + reg4.close(); + + waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); + + waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)); + + waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)); + + waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)); + + LOG.debug("All leader shards gone"); + + waitUntilShardIsDown(followerDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); + + waitUntilShardIsDown(followerDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)); + + waitUntilShardIsDown(followerDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)); + + waitUntilShardIsDown(followerDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)); + + LOG.debug("All follower shards gone"); + } + + @Test + public void testMultipleRegistrationsAtOnePrefix() throws Exception { + initEmptyDatastores("config"); + + for (int i = 0; i < 10; i++) { + LOG.debug("Round {}", i); + final DistributedShardRegistration reg1 = leaderShardFactory + .createDistributedShard(TEST_ID, + Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); + + assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); + + assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); + + reg1.close(); + + waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); + + waitUntilShardIsDown(followerDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java index 1e29bf638e..88a12fe7d7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java @@ -8,8 +8,9 @@ package org.opendaylight.controller.cluster.sharding; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.assertNotNull; +import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard; +import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -18,14 +19,16 @@ import akka.actor.AddressFromURIString; import akka.cluster.Cluster; import akka.testkit.JavaTestKit; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.CheckedFuture; import com.typesafe.config.ConfigFactory; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.AbstractTest; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; @@ -35,75 +38,68 @@ import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; -import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; -import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; -import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; -import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree; +import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMapNodeBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Ignore("distributed-data is broken needs to be removed") public class DistributedShardedDOMDataTreeTest extends AbstractTest { + private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class); + private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); private static final DOMDataTreeIdentifier TEST_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH); - private ShardedDOMDataTree shardedDOMDataTree = new ShardedDOMDataTree(); - private ActorSystem leaderSystem; - private ActorSystem followerSystem; - private final Builder leaderDatastoreContextBuilder = - DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2); + DatastoreContext.newBuilder() + .shardHeartbeatIntervalInMillis(100) + .shardElectionTimeoutFactor(2) + .logicalStoreType( + org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION); - private final DatastoreContext.Builder followerDatastoreContextBuilder = - DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5) - .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); - - private DistributedDataStore followerDistributedDataStore; private DistributedDataStore leaderDistributedDataStore; - private IntegrationTestKit followerTestKit; private IntegrationTestKit leaderTestKit; private DistributedShardedDOMDataTree leaderShardFactory; - private DistributedShardedDOMDataTree followerShardFactory; @Before public void setUp() { - shardedDOMDataTree = new ShardedDOMDataTree(); leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS); - followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); - Cluster.get(followerSystem).join(MEMBER_1_ADDRESS); } @After public void tearDown() { - if (followerDistributedDataStore != null) { - leaderDistributedDataStore.close(); - } if (leaderDistributedDataStore != null) { leaderDistributedDataStore.close(); } JavaTestKit.shutdownActorSystem(leaderSystem); - JavaTestKit.shutdownActorSystem(followerSystem); } private void initEmptyDatastore(final String type) { @@ -112,90 +108,44 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { leaderDistributedDataStore = leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full()); - followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder); - followerDistributedDataStore = - followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full()); - leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystem, - Mockito.mock(DistributedDataStore.class), + leaderDistributedDataStore, leaderDistributedDataStore); - - followerShardFactory = new DistributedShardedDOMDataTree(followerSystem, - Mockito.mock(DistributedDataStore.class), - followerDistributedDataStore); } @Test - public void testProducerRegistrations() throws Exception { + public void testWritesIntoDefaultShard() throws Exception { initEmptyDatastore("config"); - final DistributedShardRegistration shardRegistration = - leaderShardFactory.createDistributedShard(TEST_ID, - Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + leaderShardFactory.createDistributedShard(TEST_ID, + Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), - ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); + ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY)); - final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager(); + final DOMDataTreeIdentifier configRoot = + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY); - leaderShardManager.tell( - new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), leaderTestKit.getRef()); - leaderTestKit.expectMsgClass(JavaTestKit.duration("10 seconds"), LocalShardFound.class); - - IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), - ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); - - leaderShardManager.tell( - new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), leaderTestKit.getRef()); - leaderTestKit.expectMsgClass(JavaTestKit.duration("10 seconds"), LocalPrimaryShardFound.class); - - final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager(); - followerShardManager.tell( - new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef()); - followerTestKit.expectMsgClass(JavaTestKit.duration("10 seconds"), RemotePrimaryShardFound.class); - - final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID)); - try { - followerShardFactory.createProducer(Collections.singleton(TEST_ID)); - fail("Producer should be already registered on the other node"); - } catch (final IllegalArgumentException e) { - assertTrue(e.getMessage().contains("is attached to producer")); - } - - producer.close(); - - final DOMDataTreeProducer followerProducer = - followerShardFactory.createProducer(Collections.singleton(TEST_ID)); - try { - leaderShardFactory.createProducer(Collections.singleton(TEST_ID)); - fail("Producer should be already registered on the other node"); - } catch (final IllegalArgumentException e) { - assertTrue(e.getMessage().contains("is attached to producer")); - } + final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(configRoot)); - followerProducer.close(); - // try to create a shard on an already registered prefix on follower - try { - followerShardFactory.createDistributedShard(TEST_ID, - Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); - fail("This prefix already should have a shard registration that was forwarded from the other node"); - } catch (final DOMDataTreeShardingConflictException e) { - assertTrue(e.getMessage().contains("is already occupied by shard")); - } + final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true); + final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID); + Assert.assertNotNull(cursor); } @Test - @Ignore("Needs some other stuff related to 5280") - public void testWriteIntoMultipleShards() throws Exception { + public void testSingleNodeWrites() throws Exception { initEmptyDatastore("config"); - final DistributedShardRegistration shardRegistration = - leaderShardFactory.createDistributedShard( - TEST_ID,Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + leaderShardFactory.createDistributedShard(TEST_ID, + Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + final DistributedShardRegistration shardRegistration = + leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)); leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); + LOG.warn("Got after waiting for nonleader"); final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager(); new JavaTestKit(leaderSystem) { @@ -204,12 +154,6 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef()); expectMsgClass(duration("5 seconds"), LocalShardFound.class); - final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager(); - - followerShardManager.tell( - new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef()); - expectMsgClass(duration("5 seconds"), LocalShardFound.class); - leaderDistributedDataStore.getActorContext().getShardManager().tell( new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef()); expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); @@ -224,12 +168,136 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { final YangInstanceIdentifier nameId = YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build(); cursor.write(nameId.getLastPathArgument(), - ImmutableLeafNodeBuilder.create() - .withNodeIdentifier(new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build()); + ImmutableLeafNodeBuilder.create().withNodeIdentifier( + new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build()); cursor.close(); - tx.submit(); + LOG.warn("Got to pre submit"); + tx.submit().checkedGet(); + } + + @Test + public void testMultipleWritesIntoSingleMapEntry() throws Exception { + initEmptyDatastore("config"); + + final DistributedShardRegistration shardRegistration = + leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)); + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); + + LOG.warn("Got after waiting for nonleader"); + final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager(); + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); + + final YangInstanceIdentifier oid1 = TestModel.OUTER_LIST_PATH.node(new NodeIdentifierWithPredicates( + TestModel.OUTER_LIST_QNAME, QName.create(TestModel.OUTER_LIST_QNAME, "id"), 0)); + final DOMDataTreeIdentifier outerListPath = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, oid1); + + final DistributedShardRegistration outerListShardReg = leaderShardFactory.createDistributedShard(outerListPath, + Lists.newArrayList(AbstractTest.MEMBER_NAME)); + + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(outerListPath.getRootIdentifier())); + + final DOMDataTreeProducer shardProducer = leaderShardFactory.createProducer( + Collections.singletonList(outerListPath)); + + final DOMDataTreeCursorAwareTransaction tx = shardProducer.createTransaction(false); + final DOMDataTreeWriteCursor cursor = + tx.createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, oid1)); + assertNotNull(cursor); + + MapNode innerList = ImmutableMapNodeBuilder + .create() + .withNodeIdentifier(new NodeIdentifier(TestModel.INNER_LIST_QNAME)) + .build(); + + cursor.write(new NodeIdentifier(TestModel.INNER_LIST_QNAME), innerList); + cursor.close(); + tx.submit().checkedGet(); + + final ArrayList> futures = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + final Collection innerListMapEntries = createInnerListMapEntries(1000, "run-" + i); + for (final MapEntryNode innerListMapEntry : innerListMapEntries) { + final DOMDataTreeCursorAwareTransaction tx1 = shardProducer.createTransaction(false); + final DOMDataTreeWriteCursor cursor1 = tx1.createCursor( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, + oid1.node(new NodeIdentifier(TestModel.INNER_LIST_QNAME)))); + cursor1.write(innerListMapEntry.getIdentifier(), innerListMapEntry); + cursor1.close(); + futures.add(tx1.submit()); + } + } + + futures.get(futures.size() - 1).checkedGet(); + + } + + private static Collection createInnerListMapEntries(final int amount, final String valuePrefix) { + final Collection ret = new ArrayList<>(); + for (int i = 0; i < amount; i++) { + ret.add(ImmutableNodes.mapEntryBuilder() + .withNodeIdentifier(new NodeIdentifierWithPredicates(TestModel.INNER_LIST_QNAME, + QName.create(TestModel.INNER_LIST_QNAME, "name"), Integer.toString(i))) + .withChild(ImmutableNodes + .leafNode(QName.create(TestModel.INNER_LIST_QNAME, "value"), valuePrefix + "-" + i)) + .build()); + } + + return ret; + } + + @Test + public void testDistributedData() throws Exception { + initEmptyDatastore("config"); + + leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)); + leaderShardFactory.createDistributedShard( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH), + Lists.newArrayList(AbstractTest.MEMBER_NAME)); + leaderShardFactory.createDistributedShard( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH), + Lists.newArrayList(AbstractTest.MEMBER_NAME)); + leaderShardFactory.createDistributedShard( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH), + Lists.newArrayList(AbstractTest.MEMBER_NAME)); + + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)); + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)); + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)); + + } + + @Test + public void testMultipleRegistrationsAtOnePrefix() throws Exception { + initEmptyDatastore("config"); + + for (int i = 0; i < 10; i++) { + LOG.debug("Round {}", i); + final DistributedShardRegistration reg1 = leaderShardFactory + .createDistributedShard(TEST_ID, + Lists.newArrayList(AbstractTest.MEMBER_NAME)); + + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); + + assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); + + reg1.close(); + + waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); + + } } -} +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java index 48b7e9b4be..cd8a75454c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java @@ -44,6 +44,7 @@ public class TestModel { public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice"); public static final QName ID_QNAME = QName.create(TEST_QNAME, "id"); public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name"); + public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value"); public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc"); private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang"; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf index bc9a86bf89..a68a0a80d1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf @@ -35,6 +35,67 @@ bounded-mailbox { mailbox-push-timeout-time = 100ms } +test { + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 100ms + } + + in-memory-journal { + class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal" + } + + in-memory-snapshot-store { + class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore" + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + } + + akka { + persistence.snapshot-store.plugin = "in-memory-snapshot-store" + persistence.journal.plugin = "in-memory-journal" + + loglevel = "INFO" + + actor { + provider = "akka.cluster.ClusterActorRefProvider" + + serializers { + java = "akka.serialization.JavaSerializer" + proto = "akka.remote.serialization.ProtobufSerializer" + readylocal = "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransactionSerializer" + } + + serialization-bindings { + "com.google.protobuf.Message" = proto + "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal + } + } + remote { + log-remote-lifecycle-events = off + artery { + enabled = on + canonical.hostname = "127.0.0.1" + canonical.port = 2565 + } + + netty.tcp { + hostname = "127.0.0.1" + port = 2565 + } + } + + cluster { + auto-down-unreachable-after = 100s + retry-unsuccessful-join-after = 100ms + + roles = [ + "member-1" + ] + } + } +} + Member1 { bounded-mailbox { mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties index 4cd5b22007..6cf580c917 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties @@ -8,3 +8,4 @@ org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.databroker=debug org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.databroker.actors.dds=debug org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off +org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.sharding=debug