From: Tomas Cere Date: Tue, 10 Jan 2017 11:44:57 +0000 (+0100) Subject: BUG-7965 Switch distributed-data backend to a separate shard X-Git-Tag: release/carbon~127 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=149feb98f151186975fe42bab5853e05aafd4b51 BUG-7965 Switch distributed-data backend to a separate shard The shard needs to be present on all nodes and replicated across the cluster. Making this into shard allows us to leverage the current datastore api's and also persistence so we have the sharding layout persisted. The shard is started on all nodes once DistributedShardedDOMDataTree is created. Change-Id: I697be9b7134a27720e23e3e56f9fddc71301ec1e Signed-off-by: Tomas Cere --- diff --git a/opendaylight/md-sal/sal-common-util/pom.xml b/opendaylight/md-sal/sal-common-util/pom.xml index 610366c00b..7065bd880d 100644 --- a/opendaylight/md-sal/sal-common-util/pom.xml +++ b/opendaylight/md-sal/sal-common-util/pom.xml @@ -54,6 +54,10 @@ org.opendaylight.yangtools yang-common + + org.opendaylight.yangtools + yang-data-util + diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java index dce5368218..c0bb8662ba 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java @@ -28,6 +28,7 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXB import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; @@ -94,7 +95,8 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface .datastoreContextFactory(datastoreContextFactory) .waitTillReadyCountDownLatch(waitTillReadyCountDownLatch) .primaryShardInfoCache(primaryShardInfoCache) - .restoreFromSnapshot(restoreFromSnapshot); + .restoreFromSnapshot(restoreFromSnapshot) + .distributedDataStore(this); actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher, shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(), @@ -319,4 +321,19 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface return (ListenerRegistration) listenerRegistrationProxy; } + @SuppressWarnings("unchecked") + public ListenerRegistration registerShardConfigListener( + final YangInstanceIdentifier internalPath, + final DOMDataTreeChangeListener delegate) { + Preconditions.checkNotNull(delegate, "delegate should not be null"); + + LOG.debug("Registering a listener for the configuration shard: {}", internalPath); + + final DataTreeChangeListenerProxy proxy = + new DataTreeChangeListenerProxy<>(actorContext, delegate, internalPath); + proxy.init(ClusterUtils.PREFIX_CONFIG_SHARD_ID); + + return (ListenerRegistration) proxy; + } + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/PrefixShardConfiguration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/PrefixShardConfiguration.java index 6b79d548f9..629418f99b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/PrefixShardConfiguration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/PrefixShardConfiguration.java @@ -8,19 +8,67 @@ package org.opendaylight.controller.cluster.datastore.config; -import akka.cluster.ddata.ReplicatedData; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; /** * Configuration for prefix based shards. */ -public class PrefixShardConfiguration implements ReplicatedData, Serializable { +public class PrefixShardConfiguration implements Serializable { + private static final class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + + private PrefixShardConfiguration prefixShardConfiguration; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + } + + Proxy(final PrefixShardConfiguration prefixShardConfiguration) { + this.prefixShardConfiguration = prefixShardConfiguration; + } + + @Override + public void writeExternal(final ObjectOutput objectOutput) throws IOException { + objectOutput.writeObject(prefixShardConfiguration.getPrefix()); + objectOutput.writeObject(prefixShardConfiguration.getShardStrategyName()); + + objectOutput.writeInt(prefixShardConfiguration.getShardMemberNames().size()); + for (MemberName name : prefixShardConfiguration.getShardMemberNames()) { + name.writeTo(objectOutput); + } + } + + @Override + public void readExternal(final ObjectInput objectInput) throws IOException, ClassNotFoundException { + final DOMDataTreeIdentifier prefix = (DOMDataTreeIdentifier) objectInput.readObject(); + final String strategyName = (String) objectInput.readObject(); + + final int size = objectInput.readInt(); + final Collection shardMemberNames = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + shardMemberNames.add(MemberName.readFrom(objectInput)); + } + + prefixShardConfiguration = new PrefixShardConfiguration(prefix, strategyName, shardMemberNames); + } + + private Object readResolve() { + return prefixShardConfiguration; + } + } + private static final long serialVersionUID = 1L; private final DOMDataTreeIdentifier prefix; @@ -57,23 +105,7 @@ public class PrefixShardConfiguration implements ReplicatedData, Serializable { + '}'; } - public String toDataMapKey() { - return "prefix=" + prefix; - } - - @Override - public ReplicatedData merge(final ReplicatedData replicatedData) { - if (!(replicatedData instanceof PrefixShardConfiguration)) { - throw new IllegalStateException("replicatedData expected to be instance of PrefixShardConfiguration"); - } - final PrefixShardConfiguration entry = (PrefixShardConfiguration) replicatedData; - if (!entry.getPrefix().equals(prefix)) { - // this should never happen since the key is the prefix - // if it does just return current? - return this; - } - final HashSet members = new HashSet<>(shardMemberNames); - members.addAll(entry.getShardMemberNames()); - return new PrefixShardConfiguration(prefix, shardStrategyName, members); + private Object writeReplace() { + return new Proxy(this); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AddPrefixShardReplica.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AddPrefixShardReplica.java deleted file mode 100644 index 2ac883be8d..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AddPrefixShardReplica.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.datastore.messages; - -import com.google.common.base.Preconditions; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; - -/** - * A message sent to the ShardManager to dynamically add a new local shard - * that is a replica for an existing shard that is already available in the - * cluster. - */ - -public class AddPrefixShardReplica { - - private final YangInstanceIdentifier prefix; - - public AddPrefixShardReplica(final YangInstanceIdentifier prefix) { - this.prefix = Preconditions.checkNotNull(prefix); - } - - public YangInstanceIdentifier getPrefix() { - return prefix; - } - - @Override - public String toString() { - return "AddPrefixShardReplica[ShardName=" + prefix + "]"; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreatePrefixedShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreatePrefixedShard.java deleted file mode 100644 index 9a87a50383..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreatePrefixedShard.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore.messages; - -import org.opendaylight.controller.cluster.datastore.DatastoreContext; -import org.opendaylight.controller.cluster.datastore.Shard; -import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; - -/** - * Message sent to the ShardManager to create a shard located at a certain logical position in the dataTree. - */ -public class CreatePrefixedShard { - - private final PrefixShardConfiguration config; - private final DatastoreContext context; - private final Shard.Builder builder; - - public CreatePrefixedShard(final PrefixShardConfiguration config, - final DatastoreContext context, - final Shard.Builder builder) { - this.config = config; - this.context = context; - this.builder = builder; - } - - public PrefixShardConfiguration getConfig() { - return config; - } - - public DatastoreContext getContext() { - return context; - } - - public Shard.Builder getShardBuilder() { - return builder; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/AbstractShardManagerCreator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/AbstractShardManagerCreator.java index 7dbd66909a..4dd409e85e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/AbstractShardManagerCreator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/AbstractShardManagerCreator.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.shardmanager; import akka.actor.Props; import com.google.common.base.Preconditions; import java.util.concurrent.CountDownLatch; +import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; import org.opendaylight.controller.cluster.datastore.config.Configuration; @@ -20,6 +21,7 @@ public abstract class AbstractShardManagerCreator shardReplicaOperationsInProgress = new HashSet<>(); private final String persistenceId; + private final AbstractDataStore dataStore; - private final ActorRef replicator; + private ListenerRegistration configListenerReg = null; + private PrefixedShardConfigUpdateHandler configUpdateHandler; ShardManager(AbstractShardManagerCreator builder) { this.cluster = builder.getCluster(); @@ -192,16 +191,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType()); shardManagerMBean.registerMBean(); - replicator = DistributedData.get(context().system()).replicator(); - + dataStore = builder.getDistributedDataStore(); } + @Override public void preStart() { - LOG.info("Starting Shardmanager {}", persistenceId); - - final Subscribe> subscribe = - new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self()); - replicator.tell(subscribe, noSender()); + LOG.info("Starting ShardManager {}", persistenceId); } @Override @@ -209,6 +204,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.info("Stopping ShardManager {}", persistenceId()); shardManagerMBean.unregisterMBean(); + + if (configListenerReg != null) { + configListenerReg.close(); + configListenerReg = null; + } } @Override @@ -249,10 +249,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onCreateShard((CreateShard)message); } else if (message instanceof AddShardReplica) { onAddShardReplica((AddShardReplica) message); - } else if (message instanceof CreatePrefixedShard) { - onCreatePrefixedShard((CreatePrefixedShard) message); - } else if (message instanceof AddPrefixShardReplica) { - onAddPrefixShardReplica((AddPrefixShardReplica) message); + } else if (message instanceof PrefixShardCreated) { + onPrefixShardCreated((PrefixShardCreated) message); + } else if (message instanceof PrefixShardRemoved) { + onPrefixShardRemoved((PrefixShardRemoved) message); + } else if (message instanceof InitConfigListener) { + onInitConfigListener(); } else if (message instanceof ForwardedAddServerReply) { ForwardedAddServerReply msg = (ForwardedAddServerReply)message; onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, @@ -283,13 +285,26 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onGetLocalShardIds(); } else if (message instanceof RunnableMessage) { ((RunnableMessage)message).run(); - } else if (message instanceof Changed) { - onConfigChanged((Changed) message); } else { unknownMessage(message); } } + private void onInitConfigListener() { + LOG.debug("{}: Initializing config listener.", persistenceId()); + + final org.opendaylight.mdsal.common.api.LogicalDatastoreType type = + org.opendaylight.mdsal.common.api.LogicalDatastoreType + .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name()); + + if (configUpdateHandler != null) { + configUpdateHandler.close(); + } + + configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName()); + configUpdateHandler.initListener(dataStore, type); + } + private void onShutDown() { List> stopFutures = new ArrayList<>(localShards.size()); for (ShardInformation info : localShards.values()) { @@ -340,88 +355,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void onConfigChanged(final Changed> change) { - LOG.debug("{}, ShardManager {} received config changed {}", - cluster.getCurrentMemberName(), persistenceId, change.dataValue().getEntries()); - - final Map changedConfig = change.dataValue().getEntries(); - - final Map newConfig = - changedConfig.values().stream().collect( - Collectors.toMap(PrefixShardConfiguration::getPrefix, java.util.function.Function.identity())); - - resolveConfig(newConfig); - } - - private void resolveConfig(final Map newConfig) { - LOG.debug("{} ShardManager : {}, resolving new shard configuration : {}", - cluster.getCurrentMemberName(), persistenceId, newConfig); - - newConfig.forEach((prefix, config) -> - LOG.debug("{} ShardManager : {}, received shard config " - + "for prefix {}, config {}", cluster.getCurrentMemberName(), persistenceId, prefix, config)); - - final SetView removedConfigs = - Sets.difference(configuration.getAllPrefixShardConfigurations().keySet(), newConfig.keySet()); - - // resolve removals - - resolveRemovals(removedConfigs); - - final SetView addedConfigs = - Sets.difference(newConfig.keySet(), configuration.getAllPrefixShardConfigurations().keySet()); - // resolve additions - - resolveAdditions(addedConfigs, newConfig); - // iter through to update existing shards, either start/stop replicas or update the shard - // to check for more peers - resolveUpdates(Collections.emptySet()); - } - - private void resolveRemovals(final Set removedConfigs) { - LOG.debug("{} ShardManager : {}, resolving removed configs : {}", - cluster.getCurrentMemberName(), persistenceId, removedConfigs); - - removedConfigs.forEach(id -> doRemovePrefixedShard(id)); - } - - private void resolveAdditions(final Set addedConfigs, - final Map configs) { - LOG.debug("{} ShardManager : {}, resolving added configs : {}", addedConfigs); - - addedConfigs.stream().filter(identifier - -> identifier - .getDatastoreType().equals( - ClusterUtils.toMDSalApi(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType()))) - .forEach(id -> doCreatePrefixedShard(configs.get(id))); - } - - private void resolveUpdates(Set maybeUpdatedConfigs) { - LOG.debug("{} ShardManager : {}, resolving potentially updated configs : {}", maybeUpdatedConfigs); - } - - private void doRemovePrefixedShard(final DOMDataTreeIdentifier prefix) { - LOG.debug("{} ShardManager : {}, removing prefix shard: {}", - cluster.getCurrentMemberName(), persistenceId, prefix); - final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix); - final ShardInformation shard = localShards.remove(shardId.getShardName()); - - configuration.removePrefixShardConfiguration(prefix); - - if (shard == null) { - LOG.warn("Received removal for unconfigured shard: {} , ignoring.. ", prefix); - return; - } - - if (shard.getActor() != null) { - LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor()); - shard.getActor().tell(Shutdown.INSTANCE, self()); - } - LOG.debug("{} : {} : Local Shard replica for shard {} has been removed", cluster.getCurrentMemberName(), - persistenceId(), shardId.getShardName()); - persistShardList(); - } - private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg, String leaderPath) { shardReplicaOperationsInProgress.remove(shardId.getShardName()); @@ -524,31 +457,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - @SuppressWarnings("checkstyle:IllegalCatch") - private void onCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) { - LOG.debug("{}: onCreatePrefixedShard: {}", persistenceId(), createPrefixedShard); - - Object reply; - try { - final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), - createPrefixedShard.getConfig().getPrefix()); - if (localShards.containsKey(shardId.getShardName())) { - LOG.debug("{}: Shard {} already exists", persistenceId(), shardId); - reply = new Status.Success(String.format("Shard with name %s already exists", shardId)); - } else { - doCreatePrefixedShard(createPrefixedShard); - reply = new Status.Success(null); - } - } catch (final Exception e) { - LOG.error("{}: onCreateShard failed", persistenceId(), e); - reply = new Status.Failure(e); - } - - if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) { - getSender().tell(reply, getSelf()); - } - } - @SuppressWarnings("checkstyle:IllegalCatch") private void onCreateShard(CreateShard createShard) { LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard); @@ -573,13 +481,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) { - doCreatePrefixedShard(createPrefixedShard.getConfig()); - // do not replicate on this level - } + private void onPrefixShardCreated(final PrefixShardCreated message) { + LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message); - private void doCreatePrefixedShard(final PrefixShardConfiguration config) { - LOG.debug("doCreatePrefixShard : {}", config.getPrefix()); + final PrefixShardConfiguration config = message.getConfiguration(); final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), config.getPrefix()); @@ -596,6 +501,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + doCreatePrefixShard(config, shardId, shardName); + } + + private void doCreatePrefixShard(PrefixShardConfiguration config, ShardIdentifier shardId, String shardName) { configuration.addPrefixShardConfiguration(config); final Builder builder = newShardDatastoreContextBuilder(shardName); @@ -605,10 +514,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { final Map peerAddresses = Collections.emptyMap(); final boolean isActiveMember = true; - LOG.debug("{} doCreatePrefixedShard: persistenceId(): {}, memberNames: " - + "{}, peerAddresses: {}, isActiveMember: {}", - shardId, persistenceId(), config.getShardMemberNames(), - peerAddresses, isActiveMember); + + LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}", + persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember); final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses, shardDatastoreContext, Shard.builder(), peerAddressResolver); @@ -618,7 +526,28 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { if (schemaContext != null) { info.setActor(newShardActor(schemaContext, info)); } + } + + private void onPrefixShardRemoved(final PrefixShardRemoved message) { + LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message); + + final DOMDataTreeIdentifier prefix = message.getPrefix(); + final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix); + final ShardInformation shard = localShards.remove(shardId.getShardName()); + + configuration.removePrefixShardConfiguration(prefix); + + if (shard == null) { + LOG.warn("{}: Received removal for unconfigured shard: {}, ignoring.. ", persistenceId(), prefix); + return; + } + if (shard.getActor() != null) { + LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor()); + shard.getActor().tell(Shutdown.INSTANCE, self()); + } + + LOG.debug("{}: Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName()); persistShardList(); } @@ -1259,38 +1188,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return false; } - - // With this message the shard does NOT have to be preconfigured - // do a dynamic lookup if the shard exists somewhere and replicate - private void onAddPrefixShardReplica(final AddPrefixShardReplica shardReplicaMsg) { - final String shardName = ClusterUtils.getCleanShardName(shardReplicaMsg.getPrefix()); - - LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), shardReplicaMsg); - - if (schemaContext == null) { - final String msg = String.format( - "No SchemaContext is available in order to create a local shard instance for %s", shardName); - LOG.debug("{}: {}", persistenceId(), msg); - getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf()); - return; - } - - findPrimary(shardName, - new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) { - @Override - public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) { - getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()), - getTargetActor()); - } - - @Override - public void onLocalPrimaryFound(final LocalPrimaryShardFound response) { - sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor()); - } - } - ); - } - private void onAddShardReplica(final AddShardReplica shardReplicaMsg) { final String shardName = shardReplicaMsg.getShardName(); @@ -1584,9 +1481,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void findLocalShard(FindLocalShard message) { + LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName()); + final ShardInformation shardInformation = localShards.get(message.getShardName()); if (shardInformation == null) { + LOG.debug("{}: Local shard {} not found - shards present: {}", + persistenceId(), message.getShardName(), localShards.keySet()); + getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf()); return; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java index 607e78c9d4..2fb9cf1cda 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java @@ -8,30 +8,41 @@ package org.opendaylight.controller.cluster.datastore.utils; -import akka.cluster.ddata.Key; -import akka.cluster.ddata.ORMap; -import akka.cluster.ddata.ORMapKey; import java.util.Map; import org.opendaylight.controller.cluster.access.concepts.MemberName; -import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Utils for encoding prefix shard name. */ public class ClusterUtils { + private static final Logger LOG = LoggerFactory.getLogger(ClusterUtils.class); - // key for replicated configuration key - public static final Key> CONFIGURATION_KEY = - ORMapKey.create("prefix-shard-configuration-config"); + // id for the shard used to store prefix configuration + public static final String PREFIX_CONFIG_SHARD_ID = "prefix-configuration-shard"; - public static final Key> OPERATIONAL_KEY = - ORMapKey.create("prefix-shard-configuration-oper"); + public static final QName PREFIX_SHARDS_QNAME = + QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:clustering:prefix-shard-configuration", + "2017-01-10", "prefix-shards").intern(); + public static final QName SHARD_LIST_QNAME = + QName.create(PREFIX_SHARDS_QNAME, "shard").intern(); + public static final QName SHARD_PREFIX_QNAME = + QName.create(PREFIX_SHARDS_QNAME, "prefix").intern(); + public static final QName SHARD_REPLICAS_QNAME = + QName.create(PREFIX_SHARDS_QNAME, "replicas").intern(); + public static final QName SHARD_REPLICA_QNAME = + QName.create(PREFIX_SHARDS_QNAME, "replica").intern(); + + public static final YangInstanceIdentifier PREFIX_SHARDS_PATH = + YangInstanceIdentifier.of(PREFIX_SHARDS_QNAME).toOptimized(); + public static final YangInstanceIdentifier SHARD_LIST_PATH = + PREFIX_SHARDS_PATH.node(SHARD_LIST_QNAME).toOptimized(); public static ShardIdentifier getShardIdentifier(final MemberName memberName, final DOMDataTreeIdentifier prefix) { final String type; @@ -44,6 +55,7 @@ public class ClusterUtils { break; default: type = prefix.getDatastoreType().name(); + LOG.warn("Unknown data store type {}", type); } return ShardIdentifier.create(getCleanShardName(prefix.getRootIdentifier()), memberName, type); @@ -80,17 +92,4 @@ public class ClusterUtils { }); return builder.toString(); } - - public static Key> getReplicatorKey(LogicalDatastoreType type) { - if (LogicalDatastoreType.CONFIGURATION.equals(type)) { - return CONFIGURATION_KEY; - } else { - return OPERATIONAL_KEY; - } - } - - public static org.opendaylight.mdsal.common.api.LogicalDatastoreType toMDSalApi( - final LogicalDatastoreType logicalDatastoreType) { - return org.opendaylight.mdsal.common.api.LogicalDatastoreType.valueOf(logicalDatastoreType.name()); - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java index 53411a94dc..f6060a89fd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java @@ -61,7 +61,7 @@ class DistributedShardFrontend implements ReadableWriteableDOMDataTreeShard { @Override public synchronized DOMDataTreeShardProducer createProducer(final Collection paths) { for (final DOMDataTreeIdentifier prodPrefix : paths) { - Preconditions.checkArgument(paths.contains(prodPrefix), "Prefix %s is not contained under shard root", + Preconditions.checkArgument(shardRoot.contains(prodPrefix), "Prefix %s is not contained under shard root", prodPrefix, paths); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java index 69dfe4c7b7..30b46e52ff 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java @@ -14,40 +14,52 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; -import akka.cluster.Cluster; -import akka.cluster.Member; import akka.dispatch.Mapper; +import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.common.collect.Collections2; import com.google.common.collect.ForwardingObject; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.Uninterruptibles; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumMap; +import java.util.List; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.ActorSystemProvider; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; -import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; +import org.opendaylight.controller.cluster.datastore.Shard; +import org.opendaylight.controller.cluster.datastore.config.Configuration; +import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; +import org.opendaylight.controller.cluster.datastore.messages.CreateShard; +import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator; -import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard; +import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener; +import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard; +import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup; import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated; import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved; -import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard; +import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; @@ -63,13 +75,15 @@ import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration; import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree; import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable; import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.prefix.shard.configuration.rev170110.PrefixShards; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.collection.JavaConverters; import scala.compat.java8.FutureConverters; +import scala.concurrent.Await; import scala.concurrent.Future; +import scala.concurrent.Promise; import scala.concurrent.duration.FiniteDuration; /** @@ -84,9 +98,9 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat private static final int MAX_ACTOR_CREATION_RETRIES = 100; private static final int ACTOR_RETRY_DELAY = 100; private static final TimeUnit ACTOR_RETRY_TIME_UNIT = TimeUnit.MILLISECONDS; - static final FiniteDuration SHARD_FUTURE_TIMEOUT_DURATION = new FiniteDuration( - ShardedDataTreeActor.LOOKUP_TASK_MAX_RETRIES * ShardedDataTreeActor.LOOKUP_TASK_MAX_RETRIES * 3, - TimeUnit.SECONDS); + private static final int LOOKUP_TASK_MAX_RETRIES = 100; + static final FiniteDuration SHARD_FUTURE_TIMEOUT_DURATION = + new FiniteDuration(LOOKUP_TASK_MAX_RETRIES * LOOKUP_TASK_MAX_RETRIES * 3, TimeUnit.SECONDS); static final Timeout SHARD_FUTURE_TIMEOUT = new Timeout(SHARD_FUTURE_TIMEOUT_DURATION); static final String ACTOR_ID = "ShardedDOMDataTreeFrontend"; @@ -99,12 +113,21 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat private final ActorRef shardedDataTreeActor; private final MemberName memberName; + @GuardedBy("shards") private final DOMDataTreePrefixTable> shards = DOMDataTreePrefixTable.create(); private final EnumMap defaultShardRegistrations = new EnumMap<>(LogicalDatastoreType.class); + private final EnumMap> configurationShardMap = + new EnumMap<>(LogicalDatastoreType.class); + + private final EnumMap writerMap = + new EnumMap<>(LogicalDatastoreType.class); + + private final PrefixedShardConfigUpdateHandler updateHandler; + public DistributedShardedDOMDataTree(final ActorSystemProvider actorSystemProvider, final DistributedDataStore distributedOperDatastore, final DistributedDataStore distributedConfigDatastore) { @@ -119,27 +142,141 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat .setActorSystem(actorSystem) .setClusterWrapper(distributedConfigDatastore.getActorContext().getClusterWrapper()) .setDistributedConfigDatastore(distributedConfigDatastore) - .setDistributedOperDatastore(distributedOperDatastore), + .setDistributedOperDatastore(distributedOperDatastore) + .setLookupTaskMaxRetries(LOOKUP_TASK_MAX_RETRIES), ACTOR_ID); this.memberName = distributedConfigDatastore.getActorContext().getCurrentMemberName(); + updateHandler = new PrefixedShardConfigUpdateHandler(shardedDataTreeActor, + distributedConfigDatastore.getActorContext().getCurrentMemberName()); + + LOG.debug("{} - Starting prefix configuration shards", memberName); + createPrefixConfigShard(distributedConfigDatastore); + createPrefixConfigShard(distributedOperDatastore); + } + + private void createPrefixConfigShard(final DistributedDataStore dataStore) { + Configuration configuration = dataStore.getActorContext().getConfiguration(); + Collection memberNames = configuration.getUniqueMemberNamesForAllShards(); + CreateShard createShardMessage = + new CreateShard(new ModuleShardConfiguration(PrefixShards.QNAME.getNamespace(), + "prefix-shard-configuration", ClusterUtils.PREFIX_CONFIG_SHARD_ID, ModuleShardStrategy.NAME, + memberNames), + Shard.builder(), dataStore.getActorContext().getDatastoreContext()); + + dataStore.getActorContext().getShardManager().tell(createShardMessage, noSender()); + } + + /** + * This will try to initialize prefix configuration shards upon their + * successful start. We need to create writers to these shards, so we can + * satisfy future {@link #createDistributedShard} and + * {@link #resolveShardAdditions} requests and update prefix configuration + * shards accordingly. + * + *

+ * We also need to initialize listeners on these shards, so we can react + * on changes made on them by other cluster members or even by ourselves. + * + *

+ * Finally, we need to be sure that default shards for both operational and + * configuration data stores are up and running and we have distributed + * shards frontend created for them. + */ + void init() { + // create our writers to the configuration + try { + LOG.debug("{} - starting config shard lookup.", + distributedConfigDatastore.getActorContext().getCurrentMemberName()); + + // We have to wait for prefix config shards to be up and running + // so we can create datastore clients for them + handleConfigShardLookup().get(SHARD_FUTURE_TIMEOUT_DURATION.length(), SHARD_FUTURE_TIMEOUT_DURATION.unit()); + + LOG.debug("Prefix configuration shards ready - creating clients"); + + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new IllegalStateException("Prefix config shards not found", e); + } + + try { + LOG.debug("Prefix configuration shards ready - creating clients"); + configurationShardMap.put(LogicalDatastoreType.CONFIGURATION, + createDatastoreClient(ClusterUtils.PREFIX_CONFIG_SHARD_ID, + distributedConfigDatastore.getActorContext())); + } catch (final DOMDataTreeShardCreationFailedException e) { + throw new IllegalStateException( + "Unable to create datastoreClient for config DS prefix configuration shard.", e); + } + + try { + configurationShardMap.put(LogicalDatastoreType.OPERATIONAL, + createDatastoreClient(ClusterUtils.PREFIX_CONFIG_SHARD_ID, + distributedOperDatastore.getActorContext())); + + } catch (final DOMDataTreeShardCreationFailedException e) { + throw new IllegalStateException( + "Unable to create datastoreClient for oper DS prefix configuration shard.", e); + } + + writerMap.put(LogicalDatastoreType.CONFIGURATION, new PrefixedShardConfigWriter( + configurationShardMap.get(LogicalDatastoreType.CONFIGURATION).getKey())); + + writerMap.put(LogicalDatastoreType.OPERATIONAL, new PrefixedShardConfigWriter( + configurationShardMap.get(LogicalDatastoreType.OPERATIONAL).getKey())); + + updateHandler.initListener(distributedConfigDatastore, LogicalDatastoreType.CONFIGURATION); + updateHandler.initListener(distributedOperDatastore, LogicalDatastoreType.OPERATIONAL); + + distributedConfigDatastore.getActorContext().getShardManager().tell(InitConfigListener.INSTANCE, noSender()); + distributedOperDatastore.getActorContext().getShardManager().tell(InitConfigListener.INSTANCE, noSender()); + + //create shard registration for DEFAULT_SHARD try { defaultShardRegistrations.put(LogicalDatastoreType.CONFIGURATION, initDefaultShard(LogicalDatastoreType.CONFIGURATION)); } catch (final InterruptedException | ExecutionException e) { - LOG.error("Unable to create default shard frontend for config shard", e); + throw new IllegalStateException("Unable to create default shard frontend for config shard", e); } try { defaultShardRegistrations.put(LogicalDatastoreType.OPERATIONAL, initDefaultShard(LogicalDatastoreType.OPERATIONAL)); } catch (final InterruptedException | ExecutionException e) { - LOG.error("Unable to create default shard frontend for operational shard", e); + throw new IllegalStateException("Unable to create default shard frontend for operational shard", e); } } + private ListenableFuture> handleConfigShardLookup() { + + final ListenableFuture configFuture = lookupConfigShard(LogicalDatastoreType.CONFIGURATION); + final ListenableFuture operFuture = lookupConfigShard(LogicalDatastoreType.OPERATIONAL); + + return Futures.allAsList(configFuture, operFuture); + } + + private ListenableFuture lookupConfigShard(final LogicalDatastoreType type) { + final SettableFuture future = SettableFuture.create(); + + final Future ask = + Patterns.ask(shardedDataTreeActor, new StartConfigShardLookup(type), SHARD_FUTURE_TIMEOUT); + + ask.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable throwable, final Object result) throws Throwable { + if (throwable != null) { + future.setException(throwable); + } else { + future.set(null); + } + } + }, actorSystem.dispatcher()); + + return future; + } + @Nonnull @Override public ListenerRegistration registerListener( @@ -176,34 +313,54 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat public CompletionStage createDistributedShard( final DOMDataTreeIdentifier prefix, final Collection replicaMembers) throws DOMDataTreeShardingConflictException { - final DOMDataTreePrefixTableEntry> lookup = - shards.lookup(prefix); - if (lookup != null && lookup.getValue().getPrefix().equals(prefix)) { - throw new DOMDataTreeShardingConflictException( - "Prefix " + prefix + " is already occupied by another shard."); + + synchronized (shards) { + final DOMDataTreePrefixTableEntry> lookup = + shards.lookup(prefix); + if (lookup != null && lookup.getValue().getPrefix().equals(prefix)) { + throw new DOMDataTreeShardingConflictException( + "Prefix " + prefix + " is already occupied by another shard."); + } } - final PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers); + final PrefixedShardConfigWriter writer = writerMap.get(prefix.getDatastoreType()); + + final ListenableFuture writeFuture = + writer.writeConfig(prefix.getRootIdentifier(), replicaMembers); + + final Promise shardRegistrationPromise = akka.dispatch.Futures.promise(); + Futures.addCallback(writeFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable final Void result) { + + final Future ask = + Patterns.ask(shardedDataTreeActor, new LookupPrefixShard(prefix), SHARD_FUTURE_TIMEOUT); + + shardRegistrationPromise.completeWith(ask.transform( + new Mapper() { + @Override + public DistributedShardRegistration apply(final Object parameter) { + return new DistributedShardRegistrationImpl( + prefix, shardedDataTreeActor, DistributedShardedDOMDataTree.this); + } + }, + new Mapper() { + @Override + public Throwable apply(final Throwable throwable) { + return new DOMDataTreeShardCreationFailedException( + "Unable to create a cds shard.", throwable); + } + }, actorSystem.dispatcher())); + } - final Future ask = - Patterns.ask(shardedDataTreeActor, new CreatePrefixShard(config), SHARD_FUTURE_TIMEOUT); - - final Future shardRegistrationFuture = ask.transform( - new Mapper() { - @Override - public DistributedShardRegistration apply(final Object parameter) { - return new DistributedShardRegistrationImpl( - prefix, shardedDataTreeActor, DistributedShardedDOMDataTree.this); - } - }, - new Mapper() { - @Override - public Throwable apply(final Throwable throwable) { - return new DOMDataTreeShardCreationFailedException("Unable to create a cds shard.", throwable); - } - }, actorSystem.dispatcher()); - - return FutureConverters.toJava(shardRegistrationFuture); + @Override + public void onFailure(final Throwable throwable) { + shardRegistrationPromise.failure( + new DOMDataTreeShardCreationFailedException("Unable to create a cds shard.", throwable)); + } + }); + + return FutureConverters.toJava(shardRegistrationPromise.future()); } void resolveShardAdditions(final Set additions) { @@ -247,9 +404,14 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat @SuppressWarnings("unchecked") final DOMDataTreeShardRegistration reg = (DOMDataTreeShardRegistration) shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer); - shards.store(prefix, reg); + + synchronized (shards) { + shards.store(prefix, reg); + } + } catch (final DOMDataTreeShardingConflictException e) { - LOG.error("Prefix {} is already occupied by another shard", prefix, e); + LOG.error("{}: Prefix {} is already occupied by another shard", + distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), prefix, e); } catch (DOMDataTreeProducerException e) { LOG.error("Unable to close producer", e); } catch (DOMDataTreeShardCreationFailedException e) { @@ -259,8 +421,10 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat private void despawnShardFrontend(final DOMDataTreeIdentifier prefix) { LOG.debug("Member {}: Removing CDS shard for prefix: {}", memberName, prefix); - final DOMDataTreePrefixTableEntry> lookup = - shards.lookup(prefix); + final DOMDataTreePrefixTableEntry> lookup; + synchronized (shards) { + lookup = shards.lookup(prefix); + } if (lookup == null || !lookup.getValue().getPrefix().equals(prefix)) { LOG.debug("Member {}: Received despawn for non-existing CDS shard frontend, prefix: {}, ignoring..", @@ -270,7 +434,24 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat lookup.getValue().close(); // need to remove from our local table thats used for tracking - shards.remove(prefix); + synchronized (shards) { + shards.remove(prefix); + } + + final PrefixedShardConfigWriter writer = writerMap.get(prefix.getDatastoreType()); + final ListenableFuture future = writer.removeConfig(prefix.getRootIdentifier()); + + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Void result) { + LOG.debug("{} - Succesfuly removed shard for {}", memberName, prefix); + } + + @Override + public void onFailure(Throwable throwable) { + LOG.error("Removal of shard {} from configuration failed.", prefix, throwable); + } + }); } DOMDataTreePrefixTableEntry> lookupShardFrontend( @@ -321,23 +502,35 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat } } + @SuppressWarnings("checkstyle:IllegalCatch") private DistributedShardRegistration initDefaultShard(final LogicalDatastoreType logicalDatastoreType) throws ExecutionException, InterruptedException { - final Collection members = JavaConverters.asJavaCollectionConverter( - Cluster.get(actorSystem).state().members()).asJavaCollection(); - final Collection names = Collections2.transform(members, - m -> MemberName.forName(m.roles().iterator().next())); + final Collection names = + distributedConfigDatastore.getActorContext().getConfiguration().getUniqueMemberNamesForAllShards(); - try { - // we should probably only have one node create the default shards - return createDistributedShard( - new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names) - .toCompletableFuture().get(); - } catch (DOMDataTreeShardingConflictException e) { - LOG.debug("Default shard already registered, possibly due to other node doing it faster"); + final PrefixedShardConfigWriter writer = writerMap.get(logicalDatastoreType); + + if (writer.checkDefaultIsPresent()) { + LOG.debug("Default shard for {} is already present in the config. Possibly saved in snapshot.", + logicalDatastoreType); return new DistributedShardRegistrationImpl( new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), shardedDataTreeActor, this); + } else { + try { + // we should probably only have one node create the default shards + return Await.result(FutureConverters.toScala(createDistributedShard( + new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names)), + SHARD_FUTURE_TIMEOUT_DURATION); + } catch (DOMDataTreeShardingConflictException e) { + LOG.debug("Default shard already registered, possibly due to other node doing it faster"); + return new DistributedShardRegistrationImpl( + new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), + shardedDataTreeActor, this); + } catch (Exception e) { + LOG.error("{} default shard initialization failed", logicalDatastoreType, e); + throw new RuntimeException(e); + } } } @@ -390,7 +583,7 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat distributedShardedDOMDataTree.despawnShardFrontend(prefix); // update the config so the remote nodes are updated final Future ask = - Patterns.ask(shardedDataTreeActor, new RemovePrefixShard(prefix), SHARD_FUTURE_TIMEOUT); + Patterns.ask(shardedDataTreeActor, new PrefixShardRemovalLookup(prefix), SHARD_FUTURE_TIMEOUT); final Future closeFuture = ask.transform( new Mapper() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/LookupTask.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/LookupTask.java new file mode 100644 index 0000000000..ca33e31a87 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/LookupTask.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.sharding; + +import static akka.actor.ActorRef.noSender; + +import akka.actor.ActorRef; +import akka.actor.Status; +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; + +/** + * Base class for lookup tasks. Lookup tasks are supposed to run repeatedly + * until successful lookup or maximum retries are hit. + */ +@NotThreadSafe +abstract class LookupTask implements Runnable { + private final int maxRetries; + private final ActorRef replyTo; + private int retries = 0; + + LookupTask(final ActorRef replyTo, final int maxRetries) { + this.replyTo = replyTo; + this.maxRetries = maxRetries; + } + + abstract void reschedule(int retries); + + void tryReschedule(@Nullable final Throwable throwable) { + if (retries <= maxRetries) { + retries++; + reschedule(retries); + } else { + fail(throwable); + } + } + + void fail(@Nullable final Throwable throwable) { + if (throwable == null) { + replyTo.tell(new Status.Failure( + new DOMDataTreeShardCreationFailedException("Unable to find the backend shard." + + "Failing..")), noSender()); + } else { + replyTo.tell(new Status.Failure( + new DOMDataTreeShardCreationFailedException("Unable to find the backend shard." + + "Failing..", throwable)), noSender()); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigUpdateHandler.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigUpdateHandler.java new file mode 100644 index 0000000000..f8ecdb804c --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigUpdateHandler.java @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.sharding; + +import static akka.actor.ActorRef.noSender; +import static org.opendaylight.controller.cluster.datastore.utils.ClusterUtils.SHARD_PREFIX_QNAME; +import static org.opendaylight.controller.cluster.datastore.utils.ClusterUtils.SHARD_REPLICAS_QNAME; +import static org.opendaylight.controller.cluster.datastore.utils.ClusterUtils.SHARD_REPLICA_QNAME; + +import akka.actor.ActorRef; +import com.google.common.base.Preconditions; +import java.util.Collection; +import java.util.EnumMap; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.datastore.AbstractDataStore; +import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; +import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy; +import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; +import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; +import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved; +import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataTreeChangeListener; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; +import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Listens on changes on prefix-shard-configuration. Resolves the changes and + * notifies handling actor with {@link PrefixShardCreated} and + * {@link PrefixShardRemoved} messages. + */ +public class PrefixedShardConfigUpdateHandler { + + private static final Logger LOG = LoggerFactory.getLogger(PrefixedShardConfigUpdateHandler.class); + private final ActorRef handlingActor; + private final MemberName memberName; + + private final EnumMap> registrations = + new EnumMap<>(LogicalDatastoreType.class); + + public PrefixedShardConfigUpdateHandler(final ActorRef handlingActor, final MemberName memberName) { + this.handlingActor = Preconditions.checkNotNull(handlingActor); + this.memberName = Preconditions.checkNotNull(memberName); + } + + public void initListener(final AbstractDataStore dataStore, final LogicalDatastoreType type) { + registrations.put(type, dataStore.registerShardConfigListener( + ClusterUtils.SHARD_LIST_PATH, new ShardConfigHandler(memberName, type, handlingActor))); + } + + public void close() { + registrations.values().forEach(ListenerRegistration::close); + registrations.clear(); + } + + public static final class ShardConfigHandler implements ClusteredDOMDataTreeChangeListener { + + private final MemberName memberName; + private final LogicalDatastoreType type; + private final ActorRef handlingActor; + + public ShardConfigHandler(final MemberName memberName, + final LogicalDatastoreType type, + final ActorRef handlingActor) { + this.memberName = memberName; + this.type = type; + this.handlingActor = handlingActor; + } + + @Override + public void onDataTreeChanged(@Nonnull final Collection changes) { + changes.forEach(this::resolveChange); + } + + private void resolveChange(final DataTreeCandidate candidate) { + switch (candidate.getRootNode().getModificationType()) { + case UNMODIFIED: + break; + case SUBTREE_MODIFIED: + case APPEARED: + case WRITE: + resolveWrite(candidate.getRootNode()); + break; + case DELETE: + case DISAPPEARED: + resolveDelete(candidate.getRootNode()); + break; + default: + break; + } + } + + private void resolveWrite(final DataTreeCandidateNode rootNode) { + + LOG.debug("{}: New config received {}", memberName, rootNode); + LOG.debug("{}: Data after: {}", memberName, rootNode.getDataAfter()); + + // were in the shards list, iter children and resolve + for (final DataTreeCandidateNode childNode : rootNode.getChildNodes()) { + switch (childNode.getModificationType()) { + case UNMODIFIED: + break; + case SUBTREE_MODIFIED: + case APPEARED: + case WRITE: + resolveWrittenShard(childNode); + break; + case DELETE: + case DISAPPEARED: + resolveDeletedShard(childNode); + break; + default: + break; + } + } + } + + private void resolveWrittenShard(final DataTreeCandidateNode childNode) { + final MapEntryNode entryNode = (MapEntryNode) childNode.getDataAfter().get(); + final LeafNode prefix = + (LeafNode) entryNode.getChild(new NodeIdentifier(SHARD_PREFIX_QNAME)).get(); + + final YangInstanceIdentifier identifier = prefix.getValue(); + + LOG.debug("{}: Deserialized {} from datastore", memberName, identifier); + + final ContainerNode replicas = + (ContainerNode) entryNode.getChild(new NodeIdentifier(SHARD_REPLICAS_QNAME)).get(); + + final LeafSetNode replicaList = + (LeafSetNode) replicas.getChild(new NodeIdentifier(SHARD_REPLICA_QNAME)).get(); + + final List retReplicas = replicaList.getValue().stream() + .map(child -> MemberName.forName(child.getValue())) + .collect(Collectors.toList()); + + LOG.debug("{}: Replicas read from ds {}", memberName, retReplicas.toString()); + + final PrefixShardConfiguration newConfig = + new PrefixShardConfiguration(new DOMDataTreeIdentifier(type, identifier), + PrefixShardStrategy.NAME, retReplicas); + + LOG.debug("{}: Resulting config {}", memberName, newConfig); + + handlingActor.tell(new PrefixShardCreated(newConfig), noSender()); + } + + private void resolveDeletedShard(final DataTreeCandidateNode childNode) { + + final MapEntryNode entryNode = (MapEntryNode) childNode.getDataBefore().get(); + + final LeafNode prefix = + (LeafNode) entryNode.getChild(new NodeIdentifier(SHARD_PREFIX_QNAME)).get(); + + final YangInstanceIdentifier deleted = prefix.getValue(); + LOG.debug("{}: Removing shard at {}.", memberName, deleted); + + final DOMDataTreeIdentifier domDataTreeIdentifier = new DOMDataTreeIdentifier(type, deleted); + final PrefixShardRemoved message = new PrefixShardRemoved(domDataTreeIdentifier); + + handlingActor.tell(message, noSender()); + } + + private void resolveDelete(final DataTreeCandidateNode rootNode) { + + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigWriter.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigWriter.java new file mode 100644 index 0000000000..b0507f6383 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigWriter.java @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.sharding; + +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import java.util.Collection; +import java.util.concurrent.ExecutionException; +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientSnapshot; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; +import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; +import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; +import org.opendaylight.mdsal.common.api.ReadFailedException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.ListNodeBuilder; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetEntryNodeBuilder; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetNodeBuilder; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMapEntryNodeBuilder; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMapNodeBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Writes and removes prefix-based shards' configuration + * to prefix-shard-configuration. This classed is meant to be utilized + * by {@link DistributedShardedDOMDataTree} for updating + * prefix-shard-configuration upon creating and de-spawning prefix-based shards. + */ +class PrefixedShardConfigWriter { + + private static final Logger LOG = LoggerFactory.getLogger(PrefixedShardConfigWriter.class); + + private final ClientLocalHistory history; + + PrefixedShardConfigWriter(final DataStoreClient client) { + history = client.createLocalHistory(); + writeInitialParent(); + } + + ListenableFuture writeConfig(final YangInstanceIdentifier path, final Collection replicas) { + LOG.debug("Writing config for {}, replicas {}", path, replicas); + + return doSubmit(doWrite(path, replicas)); + } + + ListenableFuture removeConfig(final YangInstanceIdentifier path) { + LOG.debug("Removing config for {}.", path); + + return doSubmit(doDelete(path)); + } + + private void writeInitialParent() { + final ClientTransaction tx = history.createTransaction(); + + final DOMDataTreeWriteCursor cursor = tx.openCursor(); + + final ContainerNode root = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(ClusterUtils.PREFIX_SHARDS_QNAME)) + .withChild(ImmutableMapNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(ClusterUtils.SHARD_LIST_QNAME)) + .build()) + .build(); + + cursor.merge(ClusterUtils.PREFIX_SHARDS_PATH.getLastPathArgument(), root); + cursor.close(); + + final DOMStoreThreePhaseCommitCohort cohort = tx.ready(); + + submitBlocking(cohort); + } + + private void submitBlocking(final DOMStoreThreePhaseCommitCohort cohort) { + try { + doSubmit(cohort).get(); + } catch (final InterruptedException | ExecutionException e) { + LOG.error("Unable to write initial shard config parent.", e); + } + } + + private ListenableFuture doSubmit(final DOMStoreThreePhaseCommitCohort cohort) { + final AsyncFunction validateFunction = input -> cohort.preCommit(); + final AsyncFunction prepareFunction = input -> cohort.commit(); + + final ListenableFuture prepareFuture = Futures.transform(cohort.canCommit(), validateFunction); + return Futures.transform(prepareFuture, prepareFunction); + } + + boolean checkDefaultIsPresent() { + final NodeIdentifierWithPredicates pag = + new NodeIdentifierWithPredicates(ClusterUtils.SHARD_LIST_QNAME, ClusterUtils.SHARD_PREFIX_QNAME, + YangInstanceIdentifier.EMPTY); + + final YangInstanceIdentifier defaultId = ClusterUtils.SHARD_LIST_PATH.node(pag); + + final ClientSnapshot snapshot = history.takeSnapshot(); + try { + return snapshot.exists(defaultId).checkedGet(); + } catch (final ReadFailedException e) { + LOG.error("Presence check of default shard in configuration failed.", e); + return false; + } + } + + private DOMStoreThreePhaseCommitCohort doWrite(final YangInstanceIdentifier path, + final Collection replicas) { + + final ListNodeBuilder> replicaListBuilder = + ImmutableLeafSetNodeBuilder.create().withNodeIdentifier( + new NodeIdentifier(ClusterUtils.SHARD_REPLICA_QNAME)); + + replicas.forEach(name -> replicaListBuilder.withChild( + ImmutableLeafSetEntryNodeBuilder.create() + .withNodeIdentifier(new NodeWithValue<>(ClusterUtils.SHARD_REPLICA_QNAME, name.getName())) + .withValue(name.getName()) + .build())); + + final MapEntryNode newEntry = ImmutableMapEntryNodeBuilder.create() + .withNodeIdentifier( + new NodeIdentifierWithPredicates(ClusterUtils.SHARD_LIST_QNAME, ClusterUtils.SHARD_PREFIX_QNAME, + path)) + .withChild(ImmutableLeafNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(ClusterUtils.SHARD_PREFIX_QNAME)) + .withValue(path) + .build()) + .withChild(ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(ClusterUtils.SHARD_REPLICAS_QNAME)) + .withChild(replicaListBuilder.build()) + .build()) + .build(); + + final ClientTransaction tx = history.createTransaction(); + final DOMDataTreeWriteCursor cursor = tx.openCursor(); + + ClusterUtils.SHARD_LIST_PATH.getPathArguments().forEach(cursor::enter); + + cursor.write(newEntry.getIdentifier(), newEntry); + cursor.close(); + + return tx.ready(); + } + + private DOMStoreThreePhaseCommitCohort doDelete(final YangInstanceIdentifier path) { + + final ClientTransaction tx = history.createTransaction(); + final DOMDataTreeWriteCursor cursor = tx.openCursor(); + + ClusterUtils.SHARD_LIST_PATH.getPathArguments().forEach(cursor::enter); + + cursor.delete( + new NodeIdentifierWithPredicates(ClusterUtils.SHARD_LIST_QNAME, ClusterUtils.SHARD_PREFIX_QNAME, path)); + cursor.close(); + + return tx.ready(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java index e8f3f70860..cfbb526e9c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java @@ -16,7 +16,6 @@ import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status; -import akka.actor.Status.Failure; import akka.actor.Status.Success; import akka.cluster.Cluster; import akka.cluster.ClusterEvent; @@ -27,29 +26,19 @@ import akka.cluster.ClusterEvent.MemberWeaklyUp; import akka.cluster.ClusterEvent.ReachableMember; import akka.cluster.ClusterEvent.UnreachableMember; import akka.cluster.Member; -import akka.cluster.ddata.DistributedData; -import akka.cluster.ddata.ORMap; -import akka.cluster.ddata.Replicator; -import akka.cluster.ddata.Replicator.Changed; -import akka.cluster.ddata.Replicator.Subscribe; -import akka.cluster.ddata.Replicator.Update; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; -import com.google.common.collect.Sets.SetView; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.stream.Collectors; -import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; @@ -59,14 +48,15 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; -import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard; +import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard; import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated; import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved; import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; +import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup; import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved; import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated; import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved; -import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard; +import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; @@ -95,7 +85,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS); static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS); - static final int LOOKUP_TASK_MAX_RETRIES = 100; private final DistributedShardedDOMDataTree shardingService; private final ActorSystem actorSystem; @@ -106,14 +95,13 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ShardingServiceAddressResolver resolver; private final DistributedDataStore distributedConfigDatastore; private final DistributedDataStore distributedOperDatastore; + private final int lookupTaskMaxRetries; private final Map idToProducer = new HashMap<>(); private final Map idToShardRegistration = new HashMap<>(); private final Cluster cluster; - private final ActorRef replicator; - private ORMap currentData = ORMap.create(); private Map currentConfiguration = new HashMap<>(); ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) { @@ -124,21 +112,17 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { clusterWrapper = builder.getClusterWrapper(); distributedConfigDatastore = builder.getDistributedConfigDatastore(); distributedOperDatastore = builder.getDistributedOperDatastore(); + lookupTaskMaxRetries = builder.getLookupTaskMaxRetries(); actorContext = distributedConfigDatastore.getActorContext(); resolver = new ShardingServiceAddressResolver( DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName()); clusterWrapper.subscribeToMemberEvents(self()); cluster = Cluster.get(actorSystem); - - replicator = DistributedData.get(context().system()).replicator(); } @Override public void preStart() { - final Subscribe> subscribe = - new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self()); - replicator.tell(subscribe, noSender()); } @Override @@ -148,7 +132,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { @Override protected void handleCommand(final Object message) throws Exception { - LOG.debug("Received {}", message); + LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message); if (message instanceof ClusterEvent.MemberUp) { memberUp((ClusterEvent.MemberUp) message); } else if (message instanceof ClusterEvent.MemberWeaklyUp) { @@ -161,8 +145,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { memberUnreachable((ClusterEvent.UnreachableMember) message); } else if (message instanceof ClusterEvent.ReachableMember) { memberReachable((ClusterEvent.ReachableMember) message); - } else if (message instanceof Changed) { - onConfigChanged((Changed) message); } else if (message instanceof ProducerCreated) { onProducerCreated((ProducerCreated) message); } else if (message instanceof NotifyProducerCreated) { @@ -173,51 +155,17 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { onNotifyProducerRemoved((NotifyProducerRemoved) message); } else if (message instanceof PrefixShardCreated) { onPrefixShardCreated((PrefixShardCreated) message); - } else if (message instanceof CreatePrefixShard) { - onCreatePrefixShard((CreatePrefixShard) message); - } else if (message instanceof RemovePrefixShard) { - onRemovePrefixShard((RemovePrefixShard) message); + } else if (message instanceof LookupPrefixShard) { + onLookupPrefixShard((LookupPrefixShard) message); + } else if (message instanceof PrefixShardRemovalLookup) { + onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message); } else if (message instanceof PrefixShardRemoved) { onPrefixShardRemoved((PrefixShardRemoved) message); + } else if (message instanceof StartConfigShardLookup) { + onStartConfigShardLookup((StartConfigShardLookup) message); } } - private void onConfigChanged(final Changed> change) { - LOG.debug("member : {}, Received configuration changed: {}", clusterWrapper.getCurrentMemberName(), change); - - currentData = change.dataValue(); - final Map changedConfig = change.dataValue().getEntries(); - - LOG.debug("Changed set {}", changedConfig); - - try { - final Map newConfig = - changedConfig.values().stream().collect( - Collectors.toMap(PrefixShardConfiguration::getPrefix, Function.identity())); - resolveConfig(newConfig); - } catch (final IllegalStateException e) { - LOG.error("Failed, ", e); - } - - } - - private void resolveConfig(final Map newConfig) { - - // get the removed configurations - final SetView deleted = - Sets.difference(currentConfiguration.keySet(), newConfig.keySet()); - shardingService.resolveShardRemovals(deleted); - - // get the added configurations - final SetView additions = - Sets.difference(newConfig.keySet(), currentConfiguration.keySet()); - shardingService.resolveShardAdditions(additions); - // we can ignore those that existed previously since the potential changes in replicas will be handled by - // shard manager. - - currentConfiguration = new HashMap<>(newConfig); - } - @Override public String persistenceId() { return PERSISTENCE_ID; @@ -368,65 +316,34 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } @SuppressWarnings("checkstyle:IllegalCatch") - private void onCreatePrefixShard(final CreatePrefixShard message) { - LOG.debug("Member: {}, Received CreatePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message); + private void onLookupPrefixShard(final LookupPrefixShard message) { + LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message); - final PrefixShardConfiguration configuration = message.getConfiguration(); + final DOMDataTreeIdentifier prefix = message.getPrefix(); - final Update> update = - new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(), - map -> map.put(cluster, configuration.toDataMapKey(), configuration)); - - replicator.tell(update, self()); - - final ActorContext context = - configuration.getPrefix().getDatastoreType() == LogicalDatastoreType.CONFIGURATION + final ActorContext context = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext(); // schedule a notification task for the reply actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper, - context, shardingService, configuration.getPrefix()), - actorSystem.dispatcher()); + context, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher()); } private void onPrefixShardCreated(final PrefixShardCreated message) { LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message); - final Collection addresses = resolver.getShardingServicePeerActorAddresses(); - final ActorRef sender = getSender(); - - final List> futures = new ArrayList<>(); + final PrefixShardConfiguration config = message.getConfiguration(); - for (final String address : addresses) { - final ActorSelection actorSelection = actorSystem.actorSelection(address); - futures.add(FutureConverters.toJava(actorContext.executeOperationAsync(actorSelection, - new CreatePrefixShard(message.getConfiguration()))).toCompletableFuture()); - } - - final CompletableFuture combinedFuture = - CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); - - combinedFuture.thenRun(() -> { - sender.tell(new Status.Success(null), self()); - }).exceptionally(throwable -> { - sender.tell(new Status.Failure(throwable), self()); - return null; - }); + shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix())); } - private void onRemovePrefixShard(final RemovePrefixShard message) { - LOG.debug("Member: {}, Received RemovePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message); - - //TODO the removal message should have the configuration or some other way to get to the key - final Update> removal = - new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(), - map -> map.remove(cluster, "prefix=" + message.getPrefix())); - replicator.tell(removal, self()); + private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) { + LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message); final ShardRemovalLookupTask removalTask = new ShardRemovalLookupTask(actorSystem, getSender(), - actorContext, message.getPrefix()); + actorContext, message.getPrefix(), lookupTaskMaxRetries); actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher()); } @@ -434,15 +351,21 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private void onPrefixShardRemoved(final PrefixShardRemoved message) { LOG.debug("Received PrefixShardRemoved: {}", message); - final ShardFrontendRegistration registration = idToShardRegistration.get(message.getPrefix()); + shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix())); + } - if (registration == null) { - LOG.warn("Received shard removed for {}, but not shard registered at this prefix all registrations: {}", - message.getPrefix(), idToShardRegistration); - return; - } + private void onStartConfigShardLookup(final StartConfigShardLookup message) { + LOG.debug("Received StartConfigShardLookup: {}", message); - registration.close(); + final ActorContext context = + message.getType().equals(LogicalDatastoreType.CONFIGURATION) + ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext(); + + // schedule a notification task for the reply + actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, + new ConfigShardLookupTask( + actorSystem, getSender(), context, clusterWrapper, message, lookupTaskMaxRetries), + actorSystem.dispatcher()); } private static MemberName memberToName(final Member member) { @@ -486,39 +409,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } } - private abstract static class LookupTask implements Runnable { - - private final ActorRef replyTo; - private int retries = 0; - - private LookupTask(final ActorRef replyTo) { - this.replyTo = replyTo; - } - - abstract void reschedule(int retries); - - void tryReschedule(@Nullable final Throwable throwable) { - if (retries <= LOOKUP_TASK_MAX_RETRIES) { - retries++; - reschedule(retries); - } else { - fail(throwable); - } - } - - void fail(@Nullable final Throwable throwable) { - if (throwable == null) { - replyTo.tell(new Failure( - new DOMDataTreeShardCreationFailedException("Unable to find the backend shard." - + "Failing..")), noSender()); - } else { - replyTo.tell(new Failure( - new DOMDataTreeShardCreationFailedException("Unable to find the backend shard." - + "Failing..", throwable)), noSender()); - } - } - } - /** * Handles the lookup step of cds shard creation once the configuration is updated. */ @@ -530,20 +420,23 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ActorContext context; private final DistributedShardedDOMDataTree shardingService; private final DOMDataTreeIdentifier toLookup; + private final int lookupMaxRetries; ShardCreationLookupTask(final ActorSystem system, final ActorRef replyTo, final ClusterWrapper clusterWrapper, final ActorContext context, final DistributedShardedDOMDataTree shardingService, - final DOMDataTreeIdentifier toLookup) { - super(replyTo); + final DOMDataTreeIdentifier toLookup, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); this.system = system; this.replyTo = replyTo; this.clusterWrapper = clusterWrapper; this.context = context; this.shardingService = shardingService; this.toLookup = toLookup; + this.lookupMaxRetries = lookupMaxRetries; } @Override @@ -562,7 +455,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { system.scheduler().scheduleOnce( SHARD_LOOKUP_TASK_INTERVAL, new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef, - shardingService, toLookup), + shardingService, toLookup, lookupMaxRetries), system.dispatcher()); } } @@ -589,6 +482,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ActorRef shard; private final DistributedShardedDOMDataTree shardingService; private final DOMDataTreeIdentifier toLookup; + private final int lookupMaxRetries; ShardLeaderLookupTask(final ActorSystem system, final ActorRef replyTo, @@ -596,8 +490,9 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final ClusterWrapper clusterWrapper, final ActorRef shard, final DistributedShardedDOMDataTree shardingService, - final DOMDataTreeIdentifier toLookup) { - super(replyTo); + final DOMDataTreeIdentifier toLookup, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); this.system = system; this.replyTo = replyTo; this.context = context; @@ -605,6 +500,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { this.shard = shard; this.shardingService = shardingService; this.toLookup = toLookup; + this.lookupMaxRetries = lookupMaxRetries; } @Override @@ -626,7 +522,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { clusterWrapper.getCurrentMemberName(), toLookup); system.scheduler().scheduleOnce( SHARD_LOOKUP_TASK_INTERVAL, - new FrontendLookupTask(system, replyTo, shardingService, toLookup), + new FrontendLookupTask( + system, replyTo, shardingService, toLookup, lookupMaxRetries), system.dispatcher()); } else { tryReschedule(null); @@ -661,8 +558,9 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { FrontendLookupTask(final ActorSystem system, final ActorRef replyTo, final DistributedShardedDOMDataTree shardingService, - final DOMDataTreeIdentifier toLookup) { - super(replyTo); + final DOMDataTreeIdentifier toLookup, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); this.system = system; this.replyTo = replyTo; this.shardingService = shardingService; @@ -720,8 +618,9 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { ShardRemovalLookupTask(final ActorSystem system, final ActorRef replyTo, final ActorContext context, - final DOMDataTreeIdentifier toLookup) { - super(replyTo); + final DOMDataTreeIdentifier toLookup, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); this.system = system; this.replyTo = replyTo; this.context = context; @@ -757,6 +656,115 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } } + /** + * Task for handling the lookup of the backend for the configuration shard. + */ + private static class ConfigShardLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ActorContext context; + private final ClusterWrapper clusterWrapper; + private final int lookupTaskMaxRetries; + + ConfigShardLookupTask(final ActorSystem system, + final ActorRef replyTo, + final ActorContext context, + final ClusterWrapper clusterWrapper, + final StartConfigShardLookup message, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); + this.system = system; + this.replyTo = replyTo; + this.context = context; + this.clusterWrapper = clusterWrapper; + this.lookupTaskMaxRetries = lookupMaxRetries; + } + + @Override + void reschedule(int retries) { + LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher()); + } + + @Override + public void run() { + final Optional localShard = + context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID); + + if (!localShard.isPresent()) { + tryReschedule(null); + } else { + LOG.debug("Local backend for prefix configuration shard lookup successful, starting leader lookup.."); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, + new ConfigShardReadinessTask( + system, replyTo, context, clusterWrapper, localShard.get(), lookupTaskMaxRetries), + system.dispatcher()); + } + } + } + + /** + * Task for handling the readiness state of the config shard. Reports success once the leader is elected. + */ + private static class ConfigShardReadinessTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ActorContext context; + private final ClusterWrapper clusterWrapper; + private final ActorRef shard; + + ConfigShardReadinessTask(final ActorSystem system, + final ActorRef replyTo, + final ActorContext context, + final ClusterWrapper clusterWrapper, + final ActorRef shard, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); + this.system = system; + this.replyTo = replyTo; + this.context = context; + this.clusterWrapper = clusterWrapper; + this.shard = shard; + } + + @Override + void reschedule(int retries) { + LOG.debug("{} - Leader for config shard not found on try: {}, retrying..", + clusterWrapper.getCurrentMemberName(), retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher()); + } + + @Override + public void run() { + final Future ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout()); + + ask.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable { + if (throwable != null) { + tryReschedule(throwable); + } else { + final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply; + final java.util.Optional leaderActor = findLeader.getLeaderActor(); + if (leaderActor.isPresent()) { + // leader is found, backend seems ready, check if the frontend is ready + LOG.debug("{} - Leader for config shard is ready. Ending lookup.", + clusterWrapper.getCurrentMemberName()); + replyTo.tell(new Status.Success(null), noSender()); + } else { + tryReschedule(null); + } + } + } + }, system.dispatcher()); + } + } + public static class ShardedDataTreeActorCreator { private DistributedShardedDOMDataTree shardingService; @@ -764,6 +772,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private DistributedDataStore distributedOperDatastore; private ActorSystem actorSystem; private ClusterWrapper cluster; + private int maxRetries; public DistributedShardedDOMDataTree getShardingService() { return shardingService; @@ -812,6 +821,15 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { return this; } + public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + public int getLookupTaskMaxRetries() { + return maxRetries; + } + private void verify() { Preconditions.checkNotNull(shardingService); Preconditions.checkNotNull(actorSystem); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/InitConfigListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/InitConfigListener.java new file mode 100644 index 0000000000..bec9765552 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/InitConfigListener.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.sharding.messages; + +/** + * Message sent to the local ShardManager, once the shard configuration shard is ready and the ShardManager should + * start its listener. + */ +public class InitConfigListener { + + public static final InitConfigListener INSTANCE = new InitConfigListener(); + + private InitConfigListener() { + + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/CreatePrefixShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/LookupPrefixShard.java similarity index 64% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/CreatePrefixShard.java rename to opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/LookupPrefixShard.java index b9bf7915ec..9ea641c865 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/CreatePrefixShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/LookupPrefixShard.java @@ -11,8 +11,8 @@ package org.opendaylight.controller.cluster.sharding.messages; import com.google.common.annotations.Beta; import com.google.common.base.Preconditions; import java.io.Serializable; -import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; /** * Sent to the local {@link ShardedDataTreeActor} when there was a shard created @@ -20,25 +20,25 @@ import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor; * create the required frontend/backend shards. */ @Beta -public class CreatePrefixShard implements Serializable { +public class LookupPrefixShard implements Serializable { private static final long serialVersionUID = 1L; - private final PrefixShardConfiguration configuration; + private final DOMDataTreeIdentifier prefix; - public CreatePrefixShard(final PrefixShardConfiguration configuration) { - this.configuration = Preconditions.checkNotNull(configuration); + public LookupPrefixShard(final DOMDataTreeIdentifier prefix) { + this.prefix = Preconditions.checkNotNull(prefix); } - public PrefixShardConfiguration getConfiguration() { - return configuration; + public DOMDataTreeIdentifier getPrefix() { + return prefix; } @Override public String toString() { - return "CreatePrefixShard{" - + "configuration=" - + configuration + return "LookupPrefixShard{" + + "prefix=" + + prefix + '}'; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/RemovePrefixShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardRemovalLookup.java similarity index 70% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/RemovePrefixShard.java rename to opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardRemovalLookup.java index 6de1bb0eba..d6a4319c0b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/RemovePrefixShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardRemovalLookup.java @@ -13,15 +13,14 @@ import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; /** - * Sent to the local {@link ShardedDataTreeActor} to notify of a shard removal on the local node. - * The local actor should update the configuration so that the change is picked up by other CDS Node Agents and - * backend ShardManagers. + * Sent to the local {@link ShardedDataTreeActor} to initiate the lookup of the shard, once the shard is removed from + * the system entirely the actor responds with a success. */ -public class RemovePrefixShard { +public class PrefixShardRemovalLookup { private final DOMDataTreeIdentifier prefix; - public RemovePrefixShard(final DOMDataTreeIdentifier prefix) { + public PrefixShardRemovalLookup(final DOMDataTreeIdentifier prefix) { this.prefix = Preconditions.checkNotNull(prefix); } @@ -32,7 +31,7 @@ public class RemovePrefixShard { @Override public String toString() { - return "RemovePrefixShard{" + return "PrefixShardRemovalLookup{" + "prefix=" + prefix + '}'; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/StartConfigShardLookup.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/StartConfigShardLookup.java new file mode 100644 index 0000000000..22e5dbf98f --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/StartConfigShardLookup.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.sharding.messages; + +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; + +/** + * Message that should be sent to ShardedDataTreeActor when the lookup of the prefix config shard should begin. + * Replied to with Succes once the shard has a leader. + */ +public class StartConfigShardLookup { + + private LogicalDatastoreType type; + + public StartConfigShardLookup(final LogicalDatastoreType type) { + this.type = type; + } + + public LogicalDatastoreType getType() { + return type; + } + + @Override + public String toString() { + return "StartConfigShardLookup{type=" + type + '}'; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/prefix-shard-configuration.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/prefix-shard-configuration.yang new file mode 100644 index 0000000000..02d5c304e0 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/prefix-shard-configuration.yang @@ -0,0 +1,34 @@ +module prefix-shard-configuration { + yang-version 1; + namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:clustering:prefix-shard-configuration"; + prefix "prefix-config"; + + description + "This module contains the base YANG definitions for + shards based on prefix configuration"; + + revision "2017-01-10" { + description "Initial revision."; + } + + container prefix-shards { + + list shard { + key prefix; + leaf prefix { + type instance-identifier; + description "Prefix that this shard is rooted at."; + } + + container replicas { + leaf-list replica { + type string; + } + + description "List of cluster member nodes that this shard is replicated on"; + } + + description "List of prefix-based shards configured."; + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java index 80cc2e7c75..fc6445e8f2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore; +import static org.mockito.Mockito.mock; import static org.mockito.MockitoAnnotations.initMocks; import akka.actor.ActorRef; @@ -51,11 +52,12 @@ public class AbstractShardManagerTest extends AbstractClusterRefActorTest { protected static CountDownLatch ready; protected TestShardManager.Builder newTestShardMgrBuilder() { - return TestShardManager.builder(datastoreContextBuilder); + return TestShardManager.builder(datastoreContextBuilder).distributedDataStore(mock(DistributedDataStore.class)); } protected TestShardManager.Builder newTestShardMgrBuilder(final Configuration config) { - return TestShardManager.builder(datastoreContextBuilder).configuration(config); + return TestShardManager.builder(datastoreContextBuilder).configuration(config) + .distributedDataStore(mock(DistributedDataStore.class)); } protected Props newShardMgrProps(final Configuration config) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java index b336365221..86c875993a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java @@ -37,6 +37,7 @@ import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; @@ -84,8 +85,15 @@ public class IntegrationTestKit extends ShardTestKit { public AbstractDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig, final boolean waitUntilLeader, final SchemaContext schemaContext, final String... shardNames) { + return setupDistributedDataStore(typeName, moduleShardsConfig, "modules.conf", waitUntilLeader, + schemaContext, shardNames); + } + + public AbstractDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig, + final String modulesConfig, final boolean waitUntilLeader, + final SchemaContext schemaContext, final String... shardNames) { final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem()); - final Configuration config = new ConfigurationImpl(moduleShardsConfig, "modules.conf"); + final Configuration config = new ConfigurationImpl(moduleShardsConfig, modulesConfig); datastoreContextBuilder.dataStoreName(typeName); @@ -129,6 +137,30 @@ public class IntegrationTestKit extends ShardTestKit { return dataStore; } + public DistributedDataStore setupDistributedDataStoreWithoutConfig(final String typeName, + final SchemaContext schemaContext, + final LogicalDatastoreType storeType) { + final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem()); + final ConfigurationImpl configuration = new ConfigurationImpl(new EmptyModuleShardConfigProvider()); + + getDatastoreContextBuilder().dataStoreName(typeName); + + final DatastoreContext datastoreContext = + getDatastoreContextBuilder().logicalStoreType(storeType).build(); + + final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class); + Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext(); + Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString()); + + final DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, + configuration, mockContextFactory, restoreFromSnapshot); + + dataStore.onGlobalContextUpdated(schemaContext); + + datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext); + return dataStore; + } + public void waitUntilLeader(final ActorContext actorContext, final String... shardNames) { for (String shardName: shardNames) { ActorRef shard = findLocalShard(actorContext, shardName); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/PrefixShardCreationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/PrefixShardCreationTest.java deleted file mode 100644 index 289c6b72fa..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/PrefixShardCreationTest.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.datastore; - -import akka.actor.ActorRef; -import akka.actor.Status.Success; -import akka.testkit.JavaTestKit; -import java.util.Collections; -import java.util.concurrent.TimeUnit; -import org.junit.Test; -import org.opendaylight.controller.cluster.datastore.Shard.Builder; -import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; -import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider; -import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; -import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard; -import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; -import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; -import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; -import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy; -import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; -import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Tests prefix shard creation in ShardManager. - */ -public class PrefixShardCreationTest extends AbstractShardManagerTest { - - private static final Logger LOG = LoggerFactory.getLogger(PrefixShardCreationTest.class); - - private static final DOMDataTreeIdentifier TEST_ID = - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH); - - @Test - public void testPrefixShardCreation() throws Exception { - - LOG.info("testPrefixShardCreation starting"); - new JavaTestKit(getSystem()) { - { - datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); - - final ActorRef shardManager = actorFactory.createActor(newShardMgrProps( - new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); - - final SchemaContext schemaContext = TestModel.createTestContext(); - shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); - - shardManager.tell(new FindLocalShard( - ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef()); - expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); - - final Builder builder = Shard.builder(); - - final CreatePrefixedShard createPrefixedShard = new CreatePrefixedShard( - new PrefixShardConfiguration(TEST_ID, - PrefixShardStrategy.NAME, - Collections.singletonList(MEMBER_1)), - datastoreContextBuilder.build(), builder); - - shardManager.tell(createPrefixedShard, getRef()); - expectMsgClass(duration("5 seconds"), Success.class); - - shardManager.tell(new FindLocalShard( - ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef()); - expectMsgClass(duration("5 seconds"), LocalShardFound.class); - } - }; - } -} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java index c6cc64119a..6a8830c25b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java @@ -67,6 +67,7 @@ import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; +import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.ShardManager.SchemaContextModules; import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; @@ -168,7 +169,8 @@ public class ShardManagerTest extends AbstractShardManagerTest { } private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) { - return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor); + return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor) + .distributedDataStore(mock(DistributedDataStore.class)); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java index 8564e2d908..df870f584c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java @@ -19,9 +19,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; -import akka.actor.PoisonPill; import akka.cluster.Cluster; -import akka.cluster.ddata.DistributedData; import akka.testkit.JavaTestKit; import com.google.common.collect.Lists; import com.typesafe.config.ConfigFactory; @@ -29,7 +27,6 @@ import java.util.Collections; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; import org.opendaylight.controller.cluster.ActorSystemProvider; @@ -39,6 +36,8 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.IntegrationTestKit; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; @@ -54,7 +53,6 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLe import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Ignore("Needs to have the configuration backend switched from distributed-data") public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class); @@ -65,6 +63,8 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { private static final DOMDataTreeIdentifier TEST_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH); + private static final String MODULE_SHARDS_CONFIG = "module-shards-cars-member-1-and-2.conf"; + private ActorSystem leaderSystem; private ActorSystem followerSystem; @@ -79,19 +79,25 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { .logicalStoreType( org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION); - private DistributedDataStore followerDistributedDataStore; - private DistributedDataStore leaderDistributedDataStore; + private DistributedDataStore leaderConfigDatastore; + private DistributedDataStore leaderOperDatastore; + + private DistributedDataStore followerConfigDatastore; + private DistributedDataStore followerOperDatastore; + + private IntegrationTestKit followerTestKit; private IntegrationTestKit leaderTestKit; - private DistributedShardedDOMDataTree leaderShardFactory; - private DistributedShardedDOMDataTree followerShardFactory; + private DistributedShardedDOMDataTree followerShardFactory; private ActorSystemProvider leaderSystemProvider; private ActorSystemProvider followerSystemProvider; @Before public void setUp() { + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS); @@ -109,45 +115,67 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { @After public void tearDown() { - if (followerDistributedDataStore != null) { - followerDistributedDataStore.close(); + if (leaderConfigDatastore != null) { + leaderConfigDatastore.close(); } - if (leaderDistributedDataStore != null) { - leaderDistributedDataStore.close(); + if (leaderOperDatastore != null) { + leaderOperDatastore.close(); } - DistributedData.get(leaderSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender()); - DistributedData.get(followerSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender()); + if (followerConfigDatastore != null) { + followerConfigDatastore.close(); + } + if (followerOperDatastore != null) { + followerOperDatastore.close(); + } JavaTestKit.shutdownActorSystem(leaderSystem); JavaTestKit.shutdownActorSystem(followerSystem); + + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); } - private void initEmptyDatastores(final String type) { + private void initEmptyDatastores() { leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder); - leaderDistributedDataStore = - leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full()); + leaderConfigDatastore = (DistributedDataStore) leaderTestKit.setupDistributedDataStore( + "config", MODULE_SHARDS_CONFIG, true, + SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext()); + leaderOperDatastore = (DistributedDataStore) leaderTestKit.setupDistributedDataStore( + "operational", MODULE_SHARDS_CONFIG, true, + SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext()); + + leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider, + leaderOperDatastore, + leaderConfigDatastore); followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder); - followerDistributedDataStore = - followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full()); - leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider, - leaderDistributedDataStore, - leaderDistributedDataStore); + followerConfigDatastore = (DistributedDataStore) followerTestKit.setupDistributedDataStore( + "config", MODULE_SHARDS_CONFIG, true, SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext()); + followerOperDatastore = (DistributedDataStore) followerTestKit.setupDistributedDataStore( + "operational", MODULE_SHARDS_CONFIG, true, + SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext()); followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider, - followerDistributedDataStore, - followerDistributedDataStore); + followerOperDatastore, + followerConfigDatastore); + + leaderShardFactory.init(); + followerShardFactory.init(); + + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), + ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY)); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderOperDatastore.getActorContext(), ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY)); + } @Test public void testProducerRegistrations() throws Exception { - initEmptyDatastores("config"); + initEmptyDatastores(); leaderTestKit.waitForMembersUp("member-2"); @@ -156,15 +184,15 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); - final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager(); + final ActorRef leaderShardManager = leaderConfigDatastore.getActorContext().getShardManager(); - assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()))); - assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()))); final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID)); @@ -196,11 +224,13 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { } catch (final DOMDataTreeShardingConflictException e) { assertTrue(e.getMessage().contains("is already occupied by another shard")); } + + shardRegistration.close().toCompletableFuture().get(); } @Test public void testWriteIntoMultipleShards() throws Exception { - initEmptyDatastores("config"); + initEmptyDatastores(); leaderTestKit.waitForMembersUp("member-2"); @@ -211,9 +241,9 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); - findLocalShard(followerDistributedDataStore.getActorContext(), + findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); LOG.debug("Got after waiting for nonleader"); @@ -232,11 +262,13 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { LOG.warn("Got to pre submit"); tx.submit().checkedGet(); + + shardRegistration.close().toCompletableFuture().get(); } @Test public void testMultipleShardRegistrations() throws Exception { - initEmptyDatastores("config"); + initEmptyDatastores(); final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard( TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), @@ -257,73 +289,73 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)); // check leader has local shards - assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); - assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH))); - assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH))); - assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.JUNK_PATH))); // check follower has local shards - assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); - assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH))); - assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH))); - assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.JUNK_PATH))); LOG.debug("Closing registrations"); - reg1.close(); - reg2.close(); - reg3.close(); - reg4.close(); + reg1.close().toCompletableFuture().get(); + reg2.close().toCompletableFuture().get(); + reg3.close().toCompletableFuture().get(); + reg4.close().toCompletableFuture().get(); - waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + waitUntilShardIsDown(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + waitUntilShardIsDown(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)); - waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + waitUntilShardIsDown(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)); - waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + waitUntilShardIsDown(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)); LOG.debug("All leader shards gone"); - waitUntilShardIsDown(followerDistributedDataStore.getActorContext(), + waitUntilShardIsDown(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - waitUntilShardIsDown(followerDistributedDataStore.getActorContext(), + waitUntilShardIsDown(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)); - waitUntilShardIsDown(followerDistributedDataStore.getActorContext(), + waitUntilShardIsDown(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)); - waitUntilShardIsDown(followerDistributedDataStore.getActorContext(), + waitUntilShardIsDown(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)); LOG.debug("All follower shards gone"); @@ -331,7 +363,7 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { @Test public void testMultipleRegistrationsAtOnePrefix() throws Exception { - initEmptyDatastores("config"); + initEmptyDatastores(); for (int i = 0; i < 10; i++) { LOG.debug("Round {}", i); @@ -339,22 +371,23 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), + leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); - assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(), + assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH))); waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(), + waitUntilShardIsDown(leaderConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - waitUntilShardIsDown(followerDistributedDataStore.getActorContext(), + waitUntilShardIsDown(followerConfigDatastore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java index 91435bed5c..33a8e59935 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java @@ -55,6 +55,8 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.IntegrationTestKit; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; @@ -82,7 +84,6 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMa import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Ignore("distributed-data is broken needs to be removed") public class DistributedShardedDOMDataTreeTest extends AbstractTest { private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class); @@ -99,6 +100,8 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { .node(TestModel.INNER_LIST_QNAME)); private static final Set SINGLE_MEMBER = Collections.singleton(AbstractTest.MEMBER_NAME); + private static final String MODULE_SHARDS_CONFIG = "module-shards-cars-member-1.conf"; + private ActorSystem leaderSystem; private final Builder leaderDatastoreContextBuilder = @@ -109,6 +112,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION); private DistributedDataStore leaderDistributedDataStore; + private DistributedDataStore operDistributedDatastore; private IntegrationTestKit leaderTestKit; private DistributedShardedDOMDataTree leaderShardFactory; @@ -124,6 +128,9 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { public void setUp() { MockitoAnnotations.initMocks(this); + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); + leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS); @@ -132,30 +139,43 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { } @After - public void tearDown() { + public void tearDown() throws Exception { if (leaderDistributedDataStore != null) { leaderDistributedDataStore.close(); } + if (operDistributedDatastore != null) { + operDistributedDatastore.close(); + } + JavaTestKit.shutdownActorSystem(leaderSystem); + + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); } - private void initEmptyDatastore(final String type) { + private void initEmptyDatastores() { leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder); - leaderDistributedDataStore = - leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full()); + leaderDistributedDataStore = (DistributedDataStore) leaderTestKit.setupDistributedDataStore( + "config", MODULE_SHARDS_CONFIG, "empty-modules.conf", true, + SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext()); + operDistributedDatastore = (DistributedDataStore) leaderTestKit.setupDistributedDataStore( + "operational", MODULE_SHARDS_CONFIG, "empty-modules.conf",true, + SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext()); leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider, - leaderDistributedDataStore, + operDistributedDatastore, leaderDistributedDataStore); + + leaderShardFactory.init(); } @Test public void testWritesIntoDefaultShard() throws Exception { - initEmptyDatastore("config"); + initEmptyDatastores(); final DOMDataTreeIdentifier configRoot = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY); @@ -180,7 +200,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { @Test public void testSingleNodeWrites() throws Exception { - initEmptyDatastore("config"); + initEmptyDatastores(); final DistributedShardRegistration shardRegistration = waitOnAsyncTask( leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)), @@ -198,6 +218,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build(); final LeafNode valueToCheck = ImmutableLeafNodeBuilder.create().withNodeIdentifier( new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build(); + LOG.debug("Writing data {} at {}, cursor {}", nameId.getLastPathArgument(), valueToCheck, cursor); cursor.write(nameId.getLastPathArgument(), valueToCheck); @@ -225,11 +246,13 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { verifyNoMoreInteractions(mockedDataTreeListener); + shardRegistration.close().toCompletableFuture().get(); + } @Test public void testMultipleWritesIntoSingleMapEntry() throws Exception { - initEmptyDatastore("config"); + initEmptyDatastores(); final DistributedShardRegistration shardRegistration = waitOnAsyncTask( leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)), @@ -312,11 +335,12 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { } // top level shard at TEST element, with subshards on each outer-list map entry + @Ignore("https://bugs.opendaylight.org/show_bug.cgi?id=8116") @Test public void testMultipleShardLevels() throws Exception { - initEmptyDatastore("config"); + initEmptyDatastores(); - final DistributedShardRegistration testShardId = waitOnAsyncTask( + final DistributedShardRegistration testShardReg = waitOnAsyncTask( leaderShardFactory.createDistributedShard(TEST_ID, SINGLE_MEMBER), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); @@ -374,7 +398,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { true, Collections.emptyList()); // need 6 invocations, first initial thats from the parent shard, and then each individual subshard - verify(mockedDataTreeListener, timeout(10000).times(6)).onDataTreeChanged(captorForChanges.capture(), + verify(mockedDataTreeListener, timeout(20000).times(6)).onDataTreeChanged(captorForChanges.capture(), captorForSubtrees.capture()); verifyNoMoreInteractions(mockedDataTreeListener); final List>> allSubtrees = captorForSubtrees.getAllValues(); @@ -391,49 +415,19 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { .withValue(createOuterEntries(listSize, "testing-values")).build()) .build(); - assertEquals(expected, actual); - } - - @Test - public void testDistributedData() throws Exception { - initEmptyDatastore("config"); - - waitOnAsyncTask( - leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)), - DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - - waitOnAsyncTask( - leaderShardFactory.createDistributedShard( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH), - Lists.newArrayList(AbstractTest.MEMBER_NAME)), - DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - waitOnAsyncTask( - leaderShardFactory.createDistributedShard( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH), - Lists.newArrayList(AbstractTest.MEMBER_NAME)), - DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); + for (final DistributedShardRegistration registration : registrations) { + waitOnAsyncTask(registration.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); + } - waitOnAsyncTask( - leaderShardFactory.createDistributedShard( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH), - Lists.newArrayList(AbstractTest.MEMBER_NAME)), - DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); - - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), - ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), - ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), - ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), - ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)); + waitOnAsyncTask(testShardReg.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); + assertEquals(expected, actual); } @Test public void testMultipleRegistrationsAtOnePrefix() throws Exception { - initEmptyDatastore("config"); + initEmptyDatastores(); for (int i = 0; i < 10; i++) { LOG.debug("Round {}", i); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/SchemaContextHelper.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/SchemaContextHelper.java index 164f363573..4e906a4d15 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/SchemaContextHelper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/SchemaContextHelper.java @@ -10,6 +10,8 @@ package org.opendaylight.controller.md.cluster.datastore.model; import com.google.common.base.Throwables; import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -46,6 +48,19 @@ public class SchemaContextHelper { } } + public static SchemaContext distributedShardedDOMDataTreeSchemaContext() { + final List streams = new ArrayList<>(); + try { + // we need prefix-shard-configuration and odl-datastore-test models + // for DistributedShardedDOMDataTree tests + streams.add(getInputStream(ODL_DATASTORE_TEST_YANG)); + streams.add(new FileInputStream("src/main/yang/prefix-shard-configuration.yang")); + return YangParserTestUtils.parseYangStreams(streams); + } catch (FileNotFoundException | ReactorException e) { + throw new RuntimeException(e); + } + } + public static SchemaContext entityOwners() { try { return YangParserTestUtils.parseYangSources(new File("src/main/yang/entity-owners.yang")); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/empty-modules.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/empty-modules.conf new file mode 100644 index 0000000000..14c091658b --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/empty-modules.conf @@ -0,0 +1 @@ +modules = [] \ No newline at end of file