<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-common</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-data-util</artifactId>
+ </dependency>
</dependencies>
<scm>
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
.datastoreContextFactory(datastoreContextFactory)
.waitTillReadyCountDownLatch(waitTillReadyCountDownLatch)
.primaryShardInfoCache(primaryShardInfoCache)
- .restoreFromSnapshot(restoreFromSnapshot);
+ .restoreFromSnapshot(restoreFromSnapshot)
+ .distributedDataStore(this);
actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher,
shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(),
return (ListenerRegistration<L>) listenerRegistrationProxy;
}
+ @SuppressWarnings("unchecked")
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerShardConfigListener(
+ final YangInstanceIdentifier internalPath,
+ final DOMDataTreeChangeListener delegate) {
+ Preconditions.checkNotNull(delegate, "delegate should not be null");
+
+ LOG.debug("Registering a listener for the configuration shard: {}", internalPath);
+
+ final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy =
+ new DataTreeChangeListenerProxy<>(actorContext, delegate, internalPath);
+ proxy.init(ClusterUtils.PREFIX_CONFIG_SHARD_ID);
+
+ return (ListenerRegistration<L>) proxy;
+ }
+
}
package org.opendaylight.controller.cluster.datastore.config;
-import akka.cluster.ddata.ReplicatedData;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
/**
* Configuration for prefix based shards.
*/
-public class PrefixShardConfiguration implements ReplicatedData, Serializable {
+public class PrefixShardConfiguration implements Serializable {
+ private static final class Proxy implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ private PrefixShardConfiguration prefixShardConfiguration;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ }
+
+ Proxy(final PrefixShardConfiguration prefixShardConfiguration) {
+ this.prefixShardConfiguration = prefixShardConfiguration;
+ }
+
+ @Override
+ public void writeExternal(final ObjectOutput objectOutput) throws IOException {
+ objectOutput.writeObject(prefixShardConfiguration.getPrefix());
+ objectOutput.writeObject(prefixShardConfiguration.getShardStrategyName());
+
+ objectOutput.writeInt(prefixShardConfiguration.getShardMemberNames().size());
+ for (MemberName name : prefixShardConfiguration.getShardMemberNames()) {
+ name.writeTo(objectOutput);
+ }
+ }
+
+ @Override
+ public void readExternal(final ObjectInput objectInput) throws IOException, ClassNotFoundException {
+ final DOMDataTreeIdentifier prefix = (DOMDataTreeIdentifier) objectInput.readObject();
+ final String strategyName = (String) objectInput.readObject();
+
+ final int size = objectInput.readInt();
+ final Collection<MemberName> shardMemberNames = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ shardMemberNames.add(MemberName.readFrom(objectInput));
+ }
+
+ prefixShardConfiguration = new PrefixShardConfiguration(prefix, strategyName, shardMemberNames);
+ }
+
+ private Object readResolve() {
+ return prefixShardConfiguration;
+ }
+ }
+
private static final long serialVersionUID = 1L;
private final DOMDataTreeIdentifier prefix;
+ '}';
}
- public String toDataMapKey() {
- return "prefix=" + prefix;
- }
-
- @Override
- public ReplicatedData merge(final ReplicatedData replicatedData) {
- if (!(replicatedData instanceof PrefixShardConfiguration)) {
- throw new IllegalStateException("replicatedData expected to be instance of PrefixShardConfiguration");
- }
- final PrefixShardConfiguration entry = (PrefixShardConfiguration) replicatedData;
- if (!entry.getPrefix().equals(prefix)) {
- // this should never happen since the key is the prefix
- // if it does just return current?
- return this;
- }
- final HashSet<MemberName> members = new HashSet<>(shardMemberNames);
- members.addAll(entry.getShardMemberNames());
- return new PrefixShardConfiguration(prefix, shardStrategyName, members);
+ private Object writeReplace() {
+ return new Proxy(this);
}
}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import com.google.common.base.Preconditions;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-/**
- * A message sent to the ShardManager to dynamically add a new local shard
- * that is a replica for an existing shard that is already available in the
- * cluster.
- */
-
-public class AddPrefixShardReplica {
-
- private final YangInstanceIdentifier prefix;
-
- public AddPrefixShardReplica(final YangInstanceIdentifier prefix) {
- this.prefix = Preconditions.checkNotNull(prefix);
- }
-
- public YangInstanceIdentifier getPrefix() {
- return prefix;
- }
-
- @Override
- public String toString() {
- return "AddPrefixShardReplica[ShardName=" + prefix + "]";
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import org.opendaylight.controller.cluster.datastore.DatastoreContext;
-import org.opendaylight.controller.cluster.datastore.Shard;
-import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
-
-/**
- * Message sent to the ShardManager to create a shard located at a certain logical position in the dataTree.
- */
-public class CreatePrefixedShard {
-
- private final PrefixShardConfiguration config;
- private final DatastoreContext context;
- private final Shard.Builder builder;
-
- public CreatePrefixedShard(final PrefixShardConfiguration config,
- final DatastoreContext context,
- final Shard.Builder builder) {
- this.config = config;
- this.context = context;
- this.builder = builder;
- }
-
- public PrefixShardConfiguration getConfig() {
- return config;
- }
-
- public DatastoreContext getContext() {
- return context;
- }
-
- public Shard.Builder getShardBuilder() {
- return builder;
- }
-}
import akka.actor.Props;
import com.google.common.base.Preconditions;
import java.util.concurrent.CountDownLatch;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
private ClusterWrapper cluster;
private Configuration configuration;
private DatastoreContextFactory datastoreContextFactory;
+ private AbstractDataStore distributedDataStore;
private CountDownLatch waitTillReadyCountDownLatch;
private PrimaryShardInfoFutureCache primaryShardInfoCache;
private DatastoreSnapshot restoreFromSnapshot;
return datastoreContextFactory;
}
- public T datastoreContextFactory(DatastoreContextFactory newDatastoreContextFactory) {
+ public T datastoreContextFactory(final DatastoreContextFactory newDatastoreContextFactory) {
checkSealed();
- this.datastoreContextFactory = newDatastoreContextFactory;
+ this.datastoreContextFactory = Preconditions.checkNotNull(newDatastoreContextFactory);
+ return self();
+ }
+
+ AbstractDataStore getDistributedDataStore() {
+ return distributedDataStore;
+ }
+
+ public T distributedDataStore(final AbstractDataStore distributedDataStore) {
+ checkSealed();
+ this.distributedDataStore = distributedDataStore;
return self();
}
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
+ Preconditions.checkNotNull(distributedDataStore, "distributedDataStore should not be null");
Preconditions.checkNotNull(waitTillReadyCountDownLatch, "waitTillReadyCountdownLatch should not be null");
Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
}
package org.opendaylight.controller.cluster.datastore.shardmanager;
-import static akka.actor.ActorRef.noSender;
import static akka.pattern.Patterns.ask;
import akka.actor.ActorRef;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.MemberWeaklyUp;
import akka.cluster.Member;
-import akka.cluster.ddata.DistributedData;
-import akka.cluster.ddata.ORMap;
-import akka.cluster.ddata.Replicator.Changed;
-import akka.cluster.ddata.Replicator.Subscribe;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.japi.Function;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;
-import java.util.stream.Collectors;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
-import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler;
+import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
private final String persistenceId;
+ private final AbstractDataStore dataStore;
- private final ActorRef replicator;
+ private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
+ private PrefixedShardConfigUpdateHandler configUpdateHandler;
ShardManager(AbstractShardManagerCreator<?> builder) {
this.cluster = builder.getCluster();
datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
shardManagerMBean.registerMBean();
- replicator = DistributedData.get(context().system()).replicator();
-
+ dataStore = builder.getDistributedDataStore();
}
+ @Override
public void preStart() {
- LOG.info("Starting Shardmanager {}", persistenceId);
-
- final Subscribe<ORMap<PrefixShardConfiguration>> subscribe =
- new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self());
- replicator.tell(subscribe, noSender());
+ LOG.info("Starting ShardManager {}", persistenceId);
}
@Override
LOG.info("Stopping ShardManager {}", persistenceId());
shardManagerMBean.unregisterMBean();
+
+ if (configListenerReg != null) {
+ configListenerReg.close();
+ configListenerReg = null;
+ }
}
@Override
onCreateShard((CreateShard)message);
} else if (message instanceof AddShardReplica) {
onAddShardReplica((AddShardReplica) message);
- } else if (message instanceof CreatePrefixedShard) {
- onCreatePrefixedShard((CreatePrefixedShard) message);
- } else if (message instanceof AddPrefixShardReplica) {
- onAddPrefixShardReplica((AddPrefixShardReplica) message);
+ } else if (message instanceof PrefixShardCreated) {
+ onPrefixShardCreated((PrefixShardCreated) message);
+ } else if (message instanceof PrefixShardRemoved) {
+ onPrefixShardRemoved((PrefixShardRemoved) message);
+ } else if (message instanceof InitConfigListener) {
+ onInitConfigListener();
} else if (message instanceof ForwardedAddServerReply) {
ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
onGetLocalShardIds();
} else if (message instanceof RunnableMessage) {
((RunnableMessage)message).run();
- } else if (message instanceof Changed) {
- onConfigChanged((Changed) message);
} else {
unknownMessage(message);
}
}
+ private void onInitConfigListener() {
+ LOG.debug("{}: Initializing config listener.", persistenceId());
+
+ final org.opendaylight.mdsal.common.api.LogicalDatastoreType type =
+ org.opendaylight.mdsal.common.api.LogicalDatastoreType
+ .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
+
+ if (configUpdateHandler != null) {
+ configUpdateHandler.close();
+ }
+
+ configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
+ configUpdateHandler.initListener(dataStore, type);
+ }
+
private void onShutDown() {
List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
for (ShardInformation info : localShards.values()) {
}
}
- private void onConfigChanged(final Changed<ORMap<PrefixShardConfiguration>> change) {
- LOG.debug("{}, ShardManager {} received config changed {}",
- cluster.getCurrentMemberName(), persistenceId, change.dataValue().getEntries());
-
- final Map<String, PrefixShardConfiguration> changedConfig = change.dataValue().getEntries();
-
- final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig =
- changedConfig.values().stream().collect(
- Collectors.toMap(PrefixShardConfiguration::getPrefix, java.util.function.Function.identity()));
-
- resolveConfig(newConfig);
- }
-
- private void resolveConfig(final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig) {
- LOG.debug("{} ShardManager : {}, resolving new shard configuration : {}",
- cluster.getCurrentMemberName(), persistenceId, newConfig);
-
- newConfig.forEach((prefix, config) ->
- LOG.debug("{} ShardManager : {}, received shard config "
- + "for prefix {}, config {}", cluster.getCurrentMemberName(), persistenceId, prefix, config));
-
- final SetView<DOMDataTreeIdentifier> removedConfigs =
- Sets.difference(configuration.getAllPrefixShardConfigurations().keySet(), newConfig.keySet());
-
- // resolve removals
-
- resolveRemovals(removedConfigs);
-
- final SetView<DOMDataTreeIdentifier> addedConfigs =
- Sets.difference(newConfig.keySet(), configuration.getAllPrefixShardConfigurations().keySet());
- // resolve additions
-
- resolveAdditions(addedConfigs, newConfig);
- // iter through to update existing shards, either start/stop replicas or update the shard
- // to check for more peers
- resolveUpdates(Collections.emptySet());
- }
-
- private void resolveRemovals(final Set<DOMDataTreeIdentifier> removedConfigs) {
- LOG.debug("{} ShardManager : {}, resolving removed configs : {}",
- cluster.getCurrentMemberName(), persistenceId, removedConfigs);
-
- removedConfigs.forEach(id -> doRemovePrefixedShard(id));
- }
-
- private void resolveAdditions(final Set<DOMDataTreeIdentifier> addedConfigs,
- final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> configs) {
- LOG.debug("{} ShardManager : {}, resolving added configs : {}", addedConfigs);
-
- addedConfigs.stream().filter(identifier
- -> identifier
- .getDatastoreType().equals(
- ClusterUtils.toMDSalApi(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType())))
- .forEach(id -> doCreatePrefixedShard(configs.get(id)));
- }
-
- private void resolveUpdates(Set<DOMDataTreeIdentifier> maybeUpdatedConfigs) {
- LOG.debug("{} ShardManager : {}, resolving potentially updated configs : {}", maybeUpdatedConfigs);
- }
-
- private void doRemovePrefixedShard(final DOMDataTreeIdentifier prefix) {
- LOG.debug("{} ShardManager : {}, removing prefix shard: {}",
- cluster.getCurrentMemberName(), persistenceId, prefix);
- final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix);
- final ShardInformation shard = localShards.remove(shardId.getShardName());
-
- configuration.removePrefixShardConfiguration(prefix);
-
- if (shard == null) {
- LOG.warn("Received removal for unconfigured shard: {} , ignoring.. ", prefix);
- return;
- }
-
- if (shard.getActor() != null) {
- LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor());
- shard.getActor().tell(Shutdown.INSTANCE, self());
- }
- LOG.debug("{} : {} : Local Shard replica for shard {} has been removed", cluster.getCurrentMemberName(),
- persistenceId(), shardId.getShardName());
- persistShardList();
- }
-
private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
String leaderPath) {
shardReplicaOperationsInProgress.remove(shardId.getShardName());
}
}
- @SuppressWarnings("checkstyle:IllegalCatch")
- private void onCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
- LOG.debug("{}: onCreatePrefixedShard: {}", persistenceId(), createPrefixedShard);
-
- Object reply;
- try {
- final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
- createPrefixedShard.getConfig().getPrefix());
- if (localShards.containsKey(shardId.getShardName())) {
- LOG.debug("{}: Shard {} already exists", persistenceId(), shardId);
- reply = new Status.Success(String.format("Shard with name %s already exists", shardId));
- } else {
- doCreatePrefixedShard(createPrefixedShard);
- reply = new Status.Success(null);
- }
- } catch (final Exception e) {
- LOG.error("{}: onCreateShard failed", persistenceId(), e);
- reply = new Status.Failure(e);
- }
-
- if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
- getSender().tell(reply, getSelf());
- }
- }
-
@SuppressWarnings("checkstyle:IllegalCatch")
private void onCreateShard(CreateShard createShard) {
LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
}
}
- private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
- doCreatePrefixedShard(createPrefixedShard.getConfig());
- // do not replicate on this level
- }
+ private void onPrefixShardCreated(final PrefixShardCreated message) {
+ LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message);
- private void doCreatePrefixedShard(final PrefixShardConfiguration config) {
- LOG.debug("doCreatePrefixShard : {}", config.getPrefix());
+ final PrefixShardConfiguration config = message.getConfiguration();
final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
config.getPrefix());
}
}
+ doCreatePrefixShard(config, shardId, shardName);
+ }
+
+ private void doCreatePrefixShard(PrefixShardConfiguration config, ShardIdentifier shardId, String shardName) {
configuration.addPrefixShardConfiguration(config);
final Builder builder = newShardDatastoreContextBuilder(shardName);
final Map<String, String> peerAddresses = Collections.emptyMap();
final boolean isActiveMember = true;
- LOG.debug("{} doCreatePrefixedShard: persistenceId(): {}, memberNames: "
- + "{}, peerAddresses: {}, isActiveMember: {}",
- shardId, persistenceId(), config.getShardMemberNames(),
- peerAddresses, isActiveMember);
+
+ LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+ persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember);
final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
shardDatastoreContext, Shard.builder(), peerAddressResolver);
if (schemaContext != null) {
info.setActor(newShardActor(schemaContext, info));
}
+ }
+
+ private void onPrefixShardRemoved(final PrefixShardRemoved message) {
+ LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message);
+
+ final DOMDataTreeIdentifier prefix = message.getPrefix();
+ final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix);
+ final ShardInformation shard = localShards.remove(shardId.getShardName());
+
+ configuration.removePrefixShardConfiguration(prefix);
+
+ if (shard == null) {
+ LOG.warn("{}: Received removal for unconfigured shard: {}, ignoring.. ", persistenceId(), prefix);
+ return;
+ }
+ if (shard.getActor() != null) {
+ LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor());
+ shard.getActor().tell(Shutdown.INSTANCE, self());
+ }
+
+ LOG.debug("{}: Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
persistShardList();
}
return false;
}
-
- // With this message the shard does NOT have to be preconfigured
- // do a dynamic lookup if the shard exists somewhere and replicate
- private void onAddPrefixShardReplica(final AddPrefixShardReplica shardReplicaMsg) {
- final String shardName = ClusterUtils.getCleanShardName(shardReplicaMsg.getPrefix());
-
- LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), shardReplicaMsg);
-
- if (schemaContext == null) {
- final String msg = String.format(
- "No SchemaContext is available in order to create a local shard instance for %s", shardName);
- LOG.debug("{}: {}", persistenceId(), msg);
- getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
- return;
- }
-
- findPrimary(shardName,
- new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
- @Override
- public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
- getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()),
- getTargetActor());
- }
-
- @Override
- public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
- sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
- }
- }
- );
- }
-
private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
final String shardName = shardReplicaMsg.getShardName();
}
private void findLocalShard(FindLocalShard message) {
+ LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
+
final ShardInformation shardInformation = localShards.get(message.getShardName());
if (shardInformation == null) {
+ LOG.debug("{}: Local shard {} not found - shards present: {}",
+ persistenceId(), message.getShardName(), localShards.keySet());
+
getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
return;
}
package org.opendaylight.controller.cluster.datastore.utils;
-import akka.cluster.ddata.Key;
-import akka.cluster.ddata.ORMap;
-import akka.cluster.ddata.ORMapKey;
import java.util.Map;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Utils for encoding prefix shard name.
*/
public class ClusterUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(ClusterUtils.class);
- // key for replicated configuration key
- public static final Key<ORMap<PrefixShardConfiguration>> CONFIGURATION_KEY =
- ORMapKey.create("prefix-shard-configuration-config");
+ // id for the shard used to store prefix configuration
+ public static final String PREFIX_CONFIG_SHARD_ID = "prefix-configuration-shard";
- public static final Key<ORMap<PrefixShardConfiguration>> OPERATIONAL_KEY =
- ORMapKey.create("prefix-shard-configuration-oper");
+ public static final QName PREFIX_SHARDS_QNAME =
+ QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:clustering:prefix-shard-configuration",
+ "2017-01-10", "prefix-shards").intern();
+ public static final QName SHARD_LIST_QNAME =
+ QName.create(PREFIX_SHARDS_QNAME, "shard").intern();
+ public static final QName SHARD_PREFIX_QNAME =
+ QName.create(PREFIX_SHARDS_QNAME, "prefix").intern();
+ public static final QName SHARD_REPLICAS_QNAME =
+ QName.create(PREFIX_SHARDS_QNAME, "replicas").intern();
+ public static final QName SHARD_REPLICA_QNAME =
+ QName.create(PREFIX_SHARDS_QNAME, "replica").intern();
+
+ public static final YangInstanceIdentifier PREFIX_SHARDS_PATH =
+ YangInstanceIdentifier.of(PREFIX_SHARDS_QNAME).toOptimized();
+ public static final YangInstanceIdentifier SHARD_LIST_PATH =
+ PREFIX_SHARDS_PATH.node(SHARD_LIST_QNAME).toOptimized();
public static ShardIdentifier getShardIdentifier(final MemberName memberName, final DOMDataTreeIdentifier prefix) {
final String type;
break;
default:
type = prefix.getDatastoreType().name();
+ LOG.warn("Unknown data store type {}", type);
}
return ShardIdentifier.create(getCleanShardName(prefix.getRootIdentifier()), memberName, type);
});
return builder.toString();
}
-
- public static Key<ORMap<PrefixShardConfiguration>> getReplicatorKey(LogicalDatastoreType type) {
- if (LogicalDatastoreType.CONFIGURATION.equals(type)) {
- return CONFIGURATION_KEY;
- } else {
- return OPERATIONAL_KEY;
- }
- }
-
- public static org.opendaylight.mdsal.common.api.LogicalDatastoreType toMDSalApi(
- final LogicalDatastoreType logicalDatastoreType) {
- return org.opendaylight.mdsal.common.api.LogicalDatastoreType.valueOf(logicalDatastoreType.name());
- }
}
@Override
public synchronized DOMDataTreeShardProducer createProducer(final Collection<DOMDataTreeIdentifier> paths) {
for (final DOMDataTreeIdentifier prodPrefix : paths) {
- Preconditions.checkArgument(paths.contains(prodPrefix), "Prefix %s is not contained under shard root",
+ Preconditions.checkArgument(shardRoot.contains(prodPrefix), "Prefix %s is not contained under shard root",
prodPrefix, paths);
}
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
-import akka.cluster.Cluster;
-import akka.cluster.Member;
import akka.dispatch.Mapper;
+import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
-import com.google.common.collect.Collections2;
import com.google.common.collect.ForwardingObject;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
+import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
-import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator;
-import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
+import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
+import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup;
import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
-import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
+import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.prefix.shard.configuration.rev170110.PrefixShards;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.collection.JavaConverters;
import scala.compat.java8.FutureConverters;
+import scala.concurrent.Await;
import scala.concurrent.Future;
+import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
/**
private static final int MAX_ACTOR_CREATION_RETRIES = 100;
private static final int ACTOR_RETRY_DELAY = 100;
private static final TimeUnit ACTOR_RETRY_TIME_UNIT = TimeUnit.MILLISECONDS;
- static final FiniteDuration SHARD_FUTURE_TIMEOUT_DURATION = new FiniteDuration(
- ShardedDataTreeActor.LOOKUP_TASK_MAX_RETRIES * ShardedDataTreeActor.LOOKUP_TASK_MAX_RETRIES * 3,
- TimeUnit.SECONDS);
+ private static final int LOOKUP_TASK_MAX_RETRIES = 100;
+ static final FiniteDuration SHARD_FUTURE_TIMEOUT_DURATION =
+ new FiniteDuration(LOOKUP_TASK_MAX_RETRIES * LOOKUP_TASK_MAX_RETRIES * 3, TimeUnit.SECONDS);
static final Timeout SHARD_FUTURE_TIMEOUT = new Timeout(SHARD_FUTURE_TIMEOUT_DURATION);
static final String ACTOR_ID = "ShardedDOMDataTreeFrontend";
private final ActorRef shardedDataTreeActor;
private final MemberName memberName;
+ @GuardedBy("shards")
private final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<DOMDataTreeShard>> shards =
DOMDataTreePrefixTable.create();
private final EnumMap<LogicalDatastoreType, DistributedShardRegistration> defaultShardRegistrations =
new EnumMap<>(LogicalDatastoreType.class);
+ private final EnumMap<LogicalDatastoreType, Entry<DataStoreClient, ActorRef>> configurationShardMap =
+ new EnumMap<>(LogicalDatastoreType.class);
+
+ private final EnumMap<LogicalDatastoreType, PrefixedShardConfigWriter> writerMap =
+ new EnumMap<>(LogicalDatastoreType.class);
+
+ private final PrefixedShardConfigUpdateHandler updateHandler;
+
public DistributedShardedDOMDataTree(final ActorSystemProvider actorSystemProvider,
final DistributedDataStore distributedOperDatastore,
final DistributedDataStore distributedConfigDatastore) {
.setActorSystem(actorSystem)
.setClusterWrapper(distributedConfigDatastore.getActorContext().getClusterWrapper())
.setDistributedConfigDatastore(distributedConfigDatastore)
- .setDistributedOperDatastore(distributedOperDatastore),
+ .setDistributedOperDatastore(distributedOperDatastore)
+ .setLookupTaskMaxRetries(LOOKUP_TASK_MAX_RETRIES),
ACTOR_ID);
this.memberName = distributedConfigDatastore.getActorContext().getCurrentMemberName();
+ updateHandler = new PrefixedShardConfigUpdateHandler(shardedDataTreeActor,
+ distributedConfigDatastore.getActorContext().getCurrentMemberName());
+
+ LOG.debug("{} - Starting prefix configuration shards", memberName);
+ createPrefixConfigShard(distributedConfigDatastore);
+ createPrefixConfigShard(distributedOperDatastore);
+ }
+
+ private void createPrefixConfigShard(final DistributedDataStore dataStore) {
+ Configuration configuration = dataStore.getActorContext().getConfiguration();
+ Collection<MemberName> memberNames = configuration.getUniqueMemberNamesForAllShards();
+ CreateShard createShardMessage =
+ new CreateShard(new ModuleShardConfiguration(PrefixShards.QNAME.getNamespace(),
+ "prefix-shard-configuration", ClusterUtils.PREFIX_CONFIG_SHARD_ID, ModuleShardStrategy.NAME,
+ memberNames),
+ Shard.builder(), dataStore.getActorContext().getDatastoreContext());
+
+ dataStore.getActorContext().getShardManager().tell(createShardMessage, noSender());
+ }
+
+ /**
+ * This will try to initialize prefix configuration shards upon their
+ * successful start. We need to create writers to these shards, so we can
+ * satisfy future {@link #createDistributedShard} and
+ * {@link #resolveShardAdditions} requests and update prefix configuration
+ * shards accordingly.
+ *
+ * <p>
+ * We also need to initialize listeners on these shards, so we can react
+ * on changes made on them by other cluster members or even by ourselves.
+ *
+ * <p>
+ * Finally, we need to be sure that default shards for both operational and
+ * configuration data stores are up and running and we have distributed
+ * shards frontend created for them.
+ */
+ void init() {
+ // create our writers to the configuration
+ try {
+ LOG.debug("{} - starting config shard lookup.",
+ distributedConfigDatastore.getActorContext().getCurrentMemberName());
+
+ // We have to wait for prefix config shards to be up and running
+ // so we can create datastore clients for them
+ handleConfigShardLookup().get(SHARD_FUTURE_TIMEOUT_DURATION.length(), SHARD_FUTURE_TIMEOUT_DURATION.unit());
+
+ LOG.debug("Prefix configuration shards ready - creating clients");
+
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new IllegalStateException("Prefix config shards not found", e);
+ }
+
+ try {
+ LOG.debug("Prefix configuration shards ready - creating clients");
+ configurationShardMap.put(LogicalDatastoreType.CONFIGURATION,
+ createDatastoreClient(ClusterUtils.PREFIX_CONFIG_SHARD_ID,
+ distributedConfigDatastore.getActorContext()));
+ } catch (final DOMDataTreeShardCreationFailedException e) {
+ throw new IllegalStateException(
+ "Unable to create datastoreClient for config DS prefix configuration shard.", e);
+ }
+
+ try {
+ configurationShardMap.put(LogicalDatastoreType.OPERATIONAL,
+ createDatastoreClient(ClusterUtils.PREFIX_CONFIG_SHARD_ID,
+ distributedOperDatastore.getActorContext()));
+
+ } catch (final DOMDataTreeShardCreationFailedException e) {
+ throw new IllegalStateException(
+ "Unable to create datastoreClient for oper DS prefix configuration shard.", e);
+ }
+
+ writerMap.put(LogicalDatastoreType.CONFIGURATION, new PrefixedShardConfigWriter(
+ configurationShardMap.get(LogicalDatastoreType.CONFIGURATION).getKey()));
+
+ writerMap.put(LogicalDatastoreType.OPERATIONAL, new PrefixedShardConfigWriter(
+ configurationShardMap.get(LogicalDatastoreType.OPERATIONAL).getKey()));
+
+ updateHandler.initListener(distributedConfigDatastore, LogicalDatastoreType.CONFIGURATION);
+ updateHandler.initListener(distributedOperDatastore, LogicalDatastoreType.OPERATIONAL);
+
+ distributedConfigDatastore.getActorContext().getShardManager().tell(InitConfigListener.INSTANCE, noSender());
+ distributedOperDatastore.getActorContext().getShardManager().tell(InitConfigListener.INSTANCE, noSender());
+
+
//create shard registration for DEFAULT_SHARD
try {
defaultShardRegistrations.put(LogicalDatastoreType.CONFIGURATION,
initDefaultShard(LogicalDatastoreType.CONFIGURATION));
} catch (final InterruptedException | ExecutionException e) {
- LOG.error("Unable to create default shard frontend for config shard", e);
+ throw new IllegalStateException("Unable to create default shard frontend for config shard", e);
}
try {
defaultShardRegistrations.put(LogicalDatastoreType.OPERATIONAL,
initDefaultShard(LogicalDatastoreType.OPERATIONAL));
} catch (final InterruptedException | ExecutionException e) {
- LOG.error("Unable to create default shard frontend for operational shard", e);
+ throw new IllegalStateException("Unable to create default shard frontend for operational shard", e);
}
}
+ private ListenableFuture<List<Void>> handleConfigShardLookup() {
+
+ final ListenableFuture<Void> configFuture = lookupConfigShard(LogicalDatastoreType.CONFIGURATION);
+ final ListenableFuture<Void> operFuture = lookupConfigShard(LogicalDatastoreType.OPERATIONAL);
+
+ return Futures.allAsList(configFuture, operFuture);
+ }
+
+ private ListenableFuture<Void> lookupConfigShard(final LogicalDatastoreType type) {
+ final SettableFuture<Void> future = SettableFuture.create();
+
+ final Future<Object> ask =
+ Patterns.ask(shardedDataTreeActor, new StartConfigShardLookup(type), SHARD_FUTURE_TIMEOUT);
+
+ ask.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable throwable, final Object result) throws Throwable {
+ if (throwable != null) {
+ future.setException(throwable);
+ } else {
+ future.set(null);
+ }
+ }
+ }, actorSystem.dispatcher());
+
+ return future;
+ }
+
@Nonnull
@Override
public <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(
public CompletionStage<DistributedShardRegistration> createDistributedShard(
final DOMDataTreeIdentifier prefix, final Collection<MemberName> replicaMembers)
throws DOMDataTreeShardingConflictException {
- final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
- shards.lookup(prefix);
- if (lookup != null && lookup.getValue().getPrefix().equals(prefix)) {
- throw new DOMDataTreeShardingConflictException(
- "Prefix " + prefix + " is already occupied by another shard.");
+
+ synchronized (shards) {
+ final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
+ shards.lookup(prefix);
+ if (lookup != null && lookup.getValue().getPrefix().equals(prefix)) {
+ throw new DOMDataTreeShardingConflictException(
+ "Prefix " + prefix + " is already occupied by another shard.");
+ }
}
- final PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers);
+ final PrefixedShardConfigWriter writer = writerMap.get(prefix.getDatastoreType());
+
+ final ListenableFuture<Void> writeFuture =
+ writer.writeConfig(prefix.getRootIdentifier(), replicaMembers);
+
+ final Promise<DistributedShardRegistration> shardRegistrationPromise = akka.dispatch.Futures.promise();
+ Futures.addCallback(writeFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(@Nullable final Void result) {
+
+ final Future<Object> ask =
+ Patterns.ask(shardedDataTreeActor, new LookupPrefixShard(prefix), SHARD_FUTURE_TIMEOUT);
+
+ shardRegistrationPromise.completeWith(ask.transform(
+ new Mapper<Object, DistributedShardRegistration>() {
+ @Override
+ public DistributedShardRegistration apply(final Object parameter) {
+ return new DistributedShardRegistrationImpl(
+ prefix, shardedDataTreeActor, DistributedShardedDOMDataTree.this);
+ }
+ },
+ new Mapper<Throwable, Throwable>() {
+ @Override
+ public Throwable apply(final Throwable throwable) {
+ return new DOMDataTreeShardCreationFailedException(
+ "Unable to create a cds shard.", throwable);
+ }
+ }, actorSystem.dispatcher()));
+ }
- final Future<Object> ask =
- Patterns.ask(shardedDataTreeActor, new CreatePrefixShard(config), SHARD_FUTURE_TIMEOUT);
-
- final Future<DistributedShardRegistration> shardRegistrationFuture = ask.transform(
- new Mapper<Object, DistributedShardRegistration>() {
- @Override
- public DistributedShardRegistration apply(final Object parameter) {
- return new DistributedShardRegistrationImpl(
- prefix, shardedDataTreeActor, DistributedShardedDOMDataTree.this);
- }
- },
- new Mapper<Throwable, Throwable>() {
- @Override
- public Throwable apply(final Throwable throwable) {
- return new DOMDataTreeShardCreationFailedException("Unable to create a cds shard.", throwable);
- }
- }, actorSystem.dispatcher());
-
- return FutureConverters.toJava(shardRegistrationFuture);
+ @Override
+ public void onFailure(final Throwable throwable) {
+ shardRegistrationPromise.failure(
+ new DOMDataTreeShardCreationFailedException("Unable to create a cds shard.", throwable));
+ }
+ });
+
+ return FutureConverters.toJava(shardRegistrationPromise.future());
}
void resolveShardAdditions(final Set<DOMDataTreeIdentifier> additions) {
@SuppressWarnings("unchecked")
final DOMDataTreeShardRegistration<DOMDataTreeShard> reg =
(DOMDataTreeShardRegistration) shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
- shards.store(prefix, reg);
+
+ synchronized (shards) {
+ shards.store(prefix, reg);
+ }
+
} catch (final DOMDataTreeShardingConflictException e) {
- LOG.error("Prefix {} is already occupied by another shard", prefix, e);
+ LOG.error("{}: Prefix {} is already occupied by another shard",
+ distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), prefix, e);
} catch (DOMDataTreeProducerException e) {
LOG.error("Unable to close producer", e);
} catch (DOMDataTreeShardCreationFailedException e) {
private void despawnShardFrontend(final DOMDataTreeIdentifier prefix) {
LOG.debug("Member {}: Removing CDS shard for prefix: {}", memberName, prefix);
- final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
- shards.lookup(prefix);
+ final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup;
+ synchronized (shards) {
+ lookup = shards.lookup(prefix);
+ }
if (lookup == null || !lookup.getValue().getPrefix().equals(prefix)) {
LOG.debug("Member {}: Received despawn for non-existing CDS shard frontend, prefix: {}, ignoring..",
lookup.getValue().close();
// need to remove from our local table thats used for tracking
- shards.remove(prefix);
+ synchronized (shards) {
+ shards.remove(prefix);
+ }
+
+ final PrefixedShardConfigWriter writer = writerMap.get(prefix.getDatastoreType());
+ final ListenableFuture<Void> future = writer.removeConfig(prefix.getRootIdentifier());
+
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(@Nullable Void result) {
+ LOG.debug("{} - Succesfuly removed shard for {}", memberName, prefix);
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ LOG.error("Removal of shard {} from configuration failed.", prefix, throwable);
+ }
+ });
}
DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookupShardFrontend(
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private DistributedShardRegistration initDefaultShard(final LogicalDatastoreType logicalDatastoreType)
throws ExecutionException, InterruptedException {
- final Collection<Member> members = JavaConverters.asJavaCollectionConverter(
- Cluster.get(actorSystem).state().members()).asJavaCollection();
- final Collection<MemberName> names = Collections2.transform(members,
- m -> MemberName.forName(m.roles().iterator().next()));
+ final Collection<MemberName> names =
+ distributedConfigDatastore.getActorContext().getConfiguration().getUniqueMemberNamesForAllShards();
- try {
- // we should probably only have one node create the default shards
- return createDistributedShard(
- new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names)
- .toCompletableFuture().get();
- } catch (DOMDataTreeShardingConflictException e) {
- LOG.debug("Default shard already registered, possibly due to other node doing it faster");
+ final PrefixedShardConfigWriter writer = writerMap.get(logicalDatastoreType);
+
+ if (writer.checkDefaultIsPresent()) {
+ LOG.debug("Default shard for {} is already present in the config. Possibly saved in snapshot.",
+ logicalDatastoreType);
return new DistributedShardRegistrationImpl(
new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY),
shardedDataTreeActor, this);
+ } else {
+ try {
+ // we should probably only have one node create the default shards
+ return Await.result(FutureConverters.toScala(createDistributedShard(
+ new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names)),
+ SHARD_FUTURE_TIMEOUT_DURATION);
+ } catch (DOMDataTreeShardingConflictException e) {
+ LOG.debug("Default shard already registered, possibly due to other node doing it faster");
+ return new DistributedShardRegistrationImpl(
+ new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY),
+ shardedDataTreeActor, this);
+ } catch (Exception e) {
+ LOG.error("{} default shard initialization failed", logicalDatastoreType, e);
+ throw new RuntimeException(e);
+ }
}
}
distributedShardedDOMDataTree.despawnShardFrontend(prefix);
// update the config so the remote nodes are updated
final Future<Object> ask =
- Patterns.ask(shardedDataTreeActor, new RemovePrefixShard(prefix), SHARD_FUTURE_TIMEOUT);
+ Patterns.ask(shardedDataTreeActor, new PrefixShardRemovalLookup(prefix), SHARD_FUTURE_TIMEOUT);
final Future<Void> closeFuture = ask.transform(
new Mapper<Object, Void>() {
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import static akka.actor.ActorRef.noSender;
+
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Base class for lookup tasks. Lookup tasks are supposed to run repeatedly
+ * until successful lookup or maximum retries are hit.
+ */
+@NotThreadSafe
+abstract class LookupTask implements Runnable {
+ private final int maxRetries;
+ private final ActorRef replyTo;
+ private int retries = 0;
+
+ LookupTask(final ActorRef replyTo, final int maxRetries) {
+ this.replyTo = replyTo;
+ this.maxRetries = maxRetries;
+ }
+
+ abstract void reschedule(int retries);
+
+ void tryReschedule(@Nullable final Throwable throwable) {
+ if (retries <= maxRetries) {
+ retries++;
+ reschedule(retries);
+ } else {
+ fail(throwable);
+ }
+ }
+
+ void fail(@Nullable final Throwable throwable) {
+ if (throwable == null) {
+ replyTo.tell(new Status.Failure(
+ new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
+ + "Failing..")), noSender());
+ } else {
+ replyTo.tell(new Status.Failure(
+ new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
+ + "Failing..", throwable)), noSender());
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import static akka.actor.ActorRef.noSender;
+import static org.opendaylight.controller.cluster.datastore.utils.ClusterUtils.SHARD_PREFIX_QNAME;
+import static org.opendaylight.controller.cluster.datastore.utils.ClusterUtils.SHARD_REPLICAS_QNAME;
+import static org.opendaylight.controller.cluster.datastore.utils.ClusterUtils.SHARD_REPLICA_QNAME;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataTreeChangeListener;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listens on changes on prefix-shard-configuration. Resolves the changes and
+ * notifies handling actor with {@link PrefixShardCreated} and
+ * {@link PrefixShardRemoved} messages.
+ */
+public class PrefixedShardConfigUpdateHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PrefixedShardConfigUpdateHandler.class);
+ private final ActorRef handlingActor;
+ private final MemberName memberName;
+
+ private final EnumMap<LogicalDatastoreType,
+ ListenerRegistration<org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener>> registrations =
+ new EnumMap<>(LogicalDatastoreType.class);
+
+ public PrefixedShardConfigUpdateHandler(final ActorRef handlingActor, final MemberName memberName) {
+ this.handlingActor = Preconditions.checkNotNull(handlingActor);
+ this.memberName = Preconditions.checkNotNull(memberName);
+ }
+
+ public void initListener(final AbstractDataStore dataStore, final LogicalDatastoreType type) {
+ registrations.put(type, dataStore.registerShardConfigListener(
+ ClusterUtils.SHARD_LIST_PATH, new ShardConfigHandler(memberName, type, handlingActor)));
+ }
+
+ public void close() {
+ registrations.values().forEach(ListenerRegistration::close);
+ registrations.clear();
+ }
+
+ public static final class ShardConfigHandler implements ClusteredDOMDataTreeChangeListener {
+
+ private final MemberName memberName;
+ private final LogicalDatastoreType type;
+ private final ActorRef handlingActor;
+
+ public ShardConfigHandler(final MemberName memberName,
+ final LogicalDatastoreType type,
+ final ActorRef handlingActor) {
+ this.memberName = memberName;
+ this.type = type;
+ this.handlingActor = handlingActor;
+ }
+
+ @Override
+ public void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
+ changes.forEach(this::resolveChange);
+ }
+
+ private void resolveChange(final DataTreeCandidate candidate) {
+ switch (candidate.getRootNode().getModificationType()) {
+ case UNMODIFIED:
+ break;
+ case SUBTREE_MODIFIED:
+ case APPEARED:
+ case WRITE:
+ resolveWrite(candidate.getRootNode());
+ break;
+ case DELETE:
+ case DISAPPEARED:
+ resolveDelete(candidate.getRootNode());
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void resolveWrite(final DataTreeCandidateNode rootNode) {
+
+ LOG.debug("{}: New config received {}", memberName, rootNode);
+ LOG.debug("{}: Data after: {}", memberName, rootNode.getDataAfter());
+
+ // were in the shards list, iter children and resolve
+ for (final DataTreeCandidateNode childNode : rootNode.getChildNodes()) {
+ switch (childNode.getModificationType()) {
+ case UNMODIFIED:
+ break;
+ case SUBTREE_MODIFIED:
+ case APPEARED:
+ case WRITE:
+ resolveWrittenShard(childNode);
+ break;
+ case DELETE:
+ case DISAPPEARED:
+ resolveDeletedShard(childNode);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ private void resolveWrittenShard(final DataTreeCandidateNode childNode) {
+ final MapEntryNode entryNode = (MapEntryNode) childNode.getDataAfter().get();
+ final LeafNode<YangInstanceIdentifier> prefix =
+ (LeafNode<YangInstanceIdentifier>) entryNode.getChild(new NodeIdentifier(SHARD_PREFIX_QNAME)).get();
+
+ final YangInstanceIdentifier identifier = prefix.getValue();
+
+ LOG.debug("{}: Deserialized {} from datastore", memberName, identifier);
+
+ final ContainerNode replicas =
+ (ContainerNode) entryNode.getChild(new NodeIdentifier(SHARD_REPLICAS_QNAME)).get();
+
+ final LeafSetNode<String> replicaList =
+ (LeafSetNode<String>) replicas.getChild(new NodeIdentifier(SHARD_REPLICA_QNAME)).get();
+
+ final List<MemberName> retReplicas = replicaList.getValue().stream()
+ .map(child -> MemberName.forName(child.getValue()))
+ .collect(Collectors.toList());
+
+ LOG.debug("{}: Replicas read from ds {}", memberName, retReplicas.toString());
+
+ final PrefixShardConfiguration newConfig =
+ new PrefixShardConfiguration(new DOMDataTreeIdentifier(type, identifier),
+ PrefixShardStrategy.NAME, retReplicas);
+
+ LOG.debug("{}: Resulting config {}", memberName, newConfig);
+
+ handlingActor.tell(new PrefixShardCreated(newConfig), noSender());
+ }
+
+ private void resolveDeletedShard(final DataTreeCandidateNode childNode) {
+
+ final MapEntryNode entryNode = (MapEntryNode) childNode.getDataBefore().get();
+
+ final LeafNode<YangInstanceIdentifier> prefix =
+ (LeafNode<YangInstanceIdentifier>) entryNode.getChild(new NodeIdentifier(SHARD_PREFIX_QNAME)).get();
+
+ final YangInstanceIdentifier deleted = prefix.getValue();
+ LOG.debug("{}: Removing shard at {}.", memberName, deleted);
+
+ final DOMDataTreeIdentifier domDataTreeIdentifier = new DOMDataTreeIdentifier(type, deleted);
+ final PrefixShardRemoved message = new PrefixShardRemoved(domDataTreeIdentifier);
+
+ handlingActor.tell(message, noSender());
+ }
+
+ private void resolveDelete(final DataTreeCandidateNode rootNode) {
+
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientSnapshot;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.ListNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetEntryNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMapEntryNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMapNodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Writes and removes prefix-based shards' configuration
+ * to prefix-shard-configuration. This classed is meant to be utilized
+ * by {@link DistributedShardedDOMDataTree} for updating
+ * prefix-shard-configuration upon creating and de-spawning prefix-based shards.
+ */
+class PrefixedShardConfigWriter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PrefixedShardConfigWriter.class);
+
+ private final ClientLocalHistory history;
+
+ PrefixedShardConfigWriter(final DataStoreClient client) {
+ history = client.createLocalHistory();
+ writeInitialParent();
+ }
+
+ ListenableFuture<Void> writeConfig(final YangInstanceIdentifier path, final Collection<MemberName> replicas) {
+ LOG.debug("Writing config for {}, replicas {}", path, replicas);
+
+ return doSubmit(doWrite(path, replicas));
+ }
+
+ ListenableFuture<Void> removeConfig(final YangInstanceIdentifier path) {
+ LOG.debug("Removing config for {}.", path);
+
+ return doSubmit(doDelete(path));
+ }
+
+ private void writeInitialParent() {
+ final ClientTransaction tx = history.createTransaction();
+
+ final DOMDataTreeWriteCursor cursor = tx.openCursor();
+
+ final ContainerNode root = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new NodeIdentifier(ClusterUtils.PREFIX_SHARDS_QNAME))
+ .withChild(ImmutableMapNodeBuilder.create()
+ .withNodeIdentifier(new NodeIdentifier(ClusterUtils.SHARD_LIST_QNAME))
+ .build())
+ .build();
+
+ cursor.merge(ClusterUtils.PREFIX_SHARDS_PATH.getLastPathArgument(), root);
+ cursor.close();
+
+ final DOMStoreThreePhaseCommitCohort cohort = tx.ready();
+
+ submitBlocking(cohort);
+ }
+
+ private void submitBlocking(final DOMStoreThreePhaseCommitCohort cohort) {
+ try {
+ doSubmit(cohort).get();
+ } catch (final InterruptedException | ExecutionException e) {
+ LOG.error("Unable to write initial shard config parent.", e);
+ }
+ }
+
+ private ListenableFuture<Void> doSubmit(final DOMStoreThreePhaseCommitCohort cohort) {
+ final AsyncFunction<Boolean, Void> validateFunction = input -> cohort.preCommit();
+ final AsyncFunction<Void, Void> prepareFunction = input -> cohort.commit();
+
+ final ListenableFuture<Void> prepareFuture = Futures.transform(cohort.canCommit(), validateFunction);
+ return Futures.transform(prepareFuture, prepareFunction);
+ }
+
+ boolean checkDefaultIsPresent() {
+ final NodeIdentifierWithPredicates pag =
+ new NodeIdentifierWithPredicates(ClusterUtils.SHARD_LIST_QNAME, ClusterUtils.SHARD_PREFIX_QNAME,
+ YangInstanceIdentifier.EMPTY);
+
+ final YangInstanceIdentifier defaultId = ClusterUtils.SHARD_LIST_PATH.node(pag);
+
+ final ClientSnapshot snapshot = history.takeSnapshot();
+ try {
+ return snapshot.exists(defaultId).checkedGet();
+ } catch (final ReadFailedException e) {
+ LOG.error("Presence check of default shard in configuration failed.", e);
+ return false;
+ }
+ }
+
+ private DOMStoreThreePhaseCommitCohort doWrite(final YangInstanceIdentifier path,
+ final Collection<MemberName> replicas) {
+
+ final ListNodeBuilder<Object, LeafSetEntryNode<Object>> replicaListBuilder =
+ ImmutableLeafSetNodeBuilder.create().withNodeIdentifier(
+ new NodeIdentifier(ClusterUtils.SHARD_REPLICA_QNAME));
+
+ replicas.forEach(name -> replicaListBuilder.withChild(
+ ImmutableLeafSetEntryNodeBuilder.create()
+ .withNodeIdentifier(new NodeWithValue<>(ClusterUtils.SHARD_REPLICA_QNAME, name.getName()))
+ .withValue(name.getName())
+ .build()));
+
+ final MapEntryNode newEntry = ImmutableMapEntryNodeBuilder.create()
+ .withNodeIdentifier(
+ new NodeIdentifierWithPredicates(ClusterUtils.SHARD_LIST_QNAME, ClusterUtils.SHARD_PREFIX_QNAME,
+ path))
+ .withChild(ImmutableLeafNodeBuilder.create()
+ .withNodeIdentifier(new NodeIdentifier(ClusterUtils.SHARD_PREFIX_QNAME))
+ .withValue(path)
+ .build())
+ .withChild(ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new NodeIdentifier(ClusterUtils.SHARD_REPLICAS_QNAME))
+ .withChild(replicaListBuilder.build())
+ .build())
+ .build();
+
+ final ClientTransaction tx = history.createTransaction();
+ final DOMDataTreeWriteCursor cursor = tx.openCursor();
+
+ ClusterUtils.SHARD_LIST_PATH.getPathArguments().forEach(cursor::enter);
+
+ cursor.write(newEntry.getIdentifier(), newEntry);
+ cursor.close();
+
+ return tx.ready();
+ }
+
+ private DOMStoreThreePhaseCommitCohort doDelete(final YangInstanceIdentifier path) {
+
+ final ClientTransaction tx = history.createTransaction();
+ final DOMDataTreeWriteCursor cursor = tx.openCursor();
+
+ ClusterUtils.SHARD_LIST_PATH.getPathArguments().forEach(cursor::enter);
+
+ cursor.delete(
+ new NodeIdentifierWithPredicates(ClusterUtils.SHARD_LIST_QNAME, ClusterUtils.SHARD_PREFIX_QNAME, path));
+ cursor.close();
+
+ return tx.ready();
+ }
+}
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Status;
-import akka.actor.Status.Failure;
import akka.actor.Status.Success;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.ReachableMember;
import akka.cluster.ClusterEvent.UnreachableMember;
import akka.cluster.Member;
-import akka.cluster.ddata.DistributedData;
-import akka.cluster.ddata.ORMap;
-import akka.cluster.ddata.Replicator;
-import akka.cluster.ddata.Replicator.Changed;
-import akka.cluster.ddata.Replicator.Subscribe;
-import akka.cluster.ddata.Replicator.Update;
import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
+import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup;
import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
-import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
+import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
- static final int LOOKUP_TASK_MAX_RETRIES = 100;
private final DistributedShardedDOMDataTree shardingService;
private final ActorSystem actorSystem;
private final ShardingServiceAddressResolver resolver;
private final DistributedDataStore distributedConfigDatastore;
private final DistributedDataStore distributedOperDatastore;
+ private final int lookupTaskMaxRetries;
private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
private final Cluster cluster;
- private final ActorRef replicator;
- private ORMap<PrefixShardConfiguration> currentData = ORMap.create();
private Map<DOMDataTreeIdentifier, PrefixShardConfiguration> currentConfiguration = new HashMap<>();
ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
clusterWrapper = builder.getClusterWrapper();
distributedConfigDatastore = builder.getDistributedConfigDatastore();
distributedOperDatastore = builder.getDistributedOperDatastore();
+ lookupTaskMaxRetries = builder.getLookupTaskMaxRetries();
actorContext = distributedConfigDatastore.getActorContext();
resolver = new ShardingServiceAddressResolver(
DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
clusterWrapper.subscribeToMemberEvents(self());
cluster = Cluster.get(actorSystem);
-
- replicator = DistributedData.get(context().system()).replicator();
}
@Override
public void preStart() {
- final Subscribe<ORMap<PrefixShardConfiguration>> subscribe =
- new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self());
- replicator.tell(subscribe, noSender());
}
@Override
@Override
protected void handleCommand(final Object message) throws Exception {
- LOG.debug("Received {}", message);
+ LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message);
if (message instanceof ClusterEvent.MemberUp) {
memberUp((ClusterEvent.MemberUp) message);
} else if (message instanceof ClusterEvent.MemberWeaklyUp) {
memberUnreachable((ClusterEvent.UnreachableMember) message);
} else if (message instanceof ClusterEvent.ReachableMember) {
memberReachable((ClusterEvent.ReachableMember) message);
- } else if (message instanceof Changed) {
- onConfigChanged((Changed) message);
} else if (message instanceof ProducerCreated) {
onProducerCreated((ProducerCreated) message);
} else if (message instanceof NotifyProducerCreated) {
onNotifyProducerRemoved((NotifyProducerRemoved) message);
} else if (message instanceof PrefixShardCreated) {
onPrefixShardCreated((PrefixShardCreated) message);
- } else if (message instanceof CreatePrefixShard) {
- onCreatePrefixShard((CreatePrefixShard) message);
- } else if (message instanceof RemovePrefixShard) {
- onRemovePrefixShard((RemovePrefixShard) message);
+ } else if (message instanceof LookupPrefixShard) {
+ onLookupPrefixShard((LookupPrefixShard) message);
+ } else if (message instanceof PrefixShardRemovalLookup) {
+ onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message);
} else if (message instanceof PrefixShardRemoved) {
onPrefixShardRemoved((PrefixShardRemoved) message);
+ } else if (message instanceof StartConfigShardLookup) {
+ onStartConfigShardLookup((StartConfigShardLookup) message);
}
}
- private void onConfigChanged(final Changed<ORMap<PrefixShardConfiguration>> change) {
- LOG.debug("member : {}, Received configuration changed: {}", clusterWrapper.getCurrentMemberName(), change);
-
- currentData = change.dataValue();
- final Map<String, PrefixShardConfiguration> changedConfig = change.dataValue().getEntries();
-
- LOG.debug("Changed set {}", changedConfig);
-
- try {
- final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig =
- changedConfig.values().stream().collect(
- Collectors.toMap(PrefixShardConfiguration::getPrefix, Function.identity()));
- resolveConfig(newConfig);
- } catch (final IllegalStateException e) {
- LOG.error("Failed, ", e);
- }
-
- }
-
- private void resolveConfig(final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig) {
-
- // get the removed configurations
- final SetView<DOMDataTreeIdentifier> deleted =
- Sets.difference(currentConfiguration.keySet(), newConfig.keySet());
- shardingService.resolveShardRemovals(deleted);
-
- // get the added configurations
- final SetView<DOMDataTreeIdentifier> additions =
- Sets.difference(newConfig.keySet(), currentConfiguration.keySet());
- shardingService.resolveShardAdditions(additions);
- // we can ignore those that existed previously since the potential changes in replicas will be handled by
- // shard manager.
-
- currentConfiguration = new HashMap<>(newConfig);
- }
-
@Override
public String persistenceId() {
return PERSISTENCE_ID;
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private void onCreatePrefixShard(final CreatePrefixShard message) {
- LOG.debug("Member: {}, Received CreatePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
+ private void onLookupPrefixShard(final LookupPrefixShard message) {
+ LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
- final PrefixShardConfiguration configuration = message.getConfiguration();
+ final DOMDataTreeIdentifier prefix = message.getPrefix();
- final Update<ORMap<PrefixShardConfiguration>> update =
- new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
- map -> map.put(cluster, configuration.toDataMapKey(), configuration));
-
- replicator.tell(update, self());
-
- final ActorContext context =
- configuration.getPrefix().getDatastoreType() == LogicalDatastoreType.CONFIGURATION
+ final ActorContext context = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION
? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext();
// schedule a notification task for the reply
actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
- context, shardingService, configuration.getPrefix()),
- actorSystem.dispatcher());
+ context, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher());
}
private void onPrefixShardCreated(final PrefixShardCreated message) {
LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
- final Collection<String> addresses = resolver.getShardingServicePeerActorAddresses();
- final ActorRef sender = getSender();
-
- final List<CompletableFuture<Object>> futures = new ArrayList<>();
+ final PrefixShardConfiguration config = message.getConfiguration();
- for (final String address : addresses) {
- final ActorSelection actorSelection = actorSystem.actorSelection(address);
- futures.add(FutureConverters.toJava(actorContext.executeOperationAsync(actorSelection,
- new CreatePrefixShard(message.getConfiguration()))).toCompletableFuture());
- }
-
- final CompletableFuture<Void> combinedFuture =
- CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
-
- combinedFuture.thenRun(() -> {
- sender.tell(new Status.Success(null), self());
- }).exceptionally(throwable -> {
- sender.tell(new Status.Failure(throwable), self());
- return null;
- });
+ shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix()));
}
- private void onRemovePrefixShard(final RemovePrefixShard message) {
- LOG.debug("Member: {}, Received RemovePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
-
- //TODO the removal message should have the configuration or some other way to get to the key
- final Update<ORMap<PrefixShardConfiguration>> removal =
- new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
- map -> map.remove(cluster, "prefix=" + message.getPrefix()));
- replicator.tell(removal, self());
+ private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) {
+ LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message);
final ShardRemovalLookupTask removalTask =
new ShardRemovalLookupTask(actorSystem, getSender(),
- actorContext, message.getPrefix());
+ actorContext, message.getPrefix(), lookupTaskMaxRetries);
actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
}
private void onPrefixShardRemoved(final PrefixShardRemoved message) {
LOG.debug("Received PrefixShardRemoved: {}", message);
- final ShardFrontendRegistration registration = idToShardRegistration.get(message.getPrefix());
+ shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix()));
+ }
- if (registration == null) {
- LOG.warn("Received shard removed for {}, but not shard registered at this prefix all registrations: {}",
- message.getPrefix(), idToShardRegistration);
- return;
- }
+ private void onStartConfigShardLookup(final StartConfigShardLookup message) {
+ LOG.debug("Received StartConfigShardLookup: {}", message);
- registration.close();
+ final ActorContext context =
+ message.getType().equals(LogicalDatastoreType.CONFIGURATION)
+ ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext();
+
+ // schedule a notification task for the reply
+ actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
+ new ConfigShardLookupTask(
+ actorSystem, getSender(), context, clusterWrapper, message, lookupTaskMaxRetries),
+ actorSystem.dispatcher());
}
private static MemberName memberToName(final Member member) {
}
}
- private abstract static class LookupTask implements Runnable {
-
- private final ActorRef replyTo;
- private int retries = 0;
-
- private LookupTask(final ActorRef replyTo) {
- this.replyTo = replyTo;
- }
-
- abstract void reschedule(int retries);
-
- void tryReschedule(@Nullable final Throwable throwable) {
- if (retries <= LOOKUP_TASK_MAX_RETRIES) {
- retries++;
- reschedule(retries);
- } else {
- fail(throwable);
- }
- }
-
- void fail(@Nullable final Throwable throwable) {
- if (throwable == null) {
- replyTo.tell(new Failure(
- new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
- + "Failing..")), noSender());
- } else {
- replyTo.tell(new Failure(
- new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
- + "Failing..", throwable)), noSender());
- }
- }
- }
-
/**
* Handles the lookup step of cds shard creation once the configuration is updated.
*/
private final ActorContext context;
private final DistributedShardedDOMDataTree shardingService;
private final DOMDataTreeIdentifier toLookup;
+ private final int lookupMaxRetries;
ShardCreationLookupTask(final ActorSystem system,
final ActorRef replyTo,
final ClusterWrapper clusterWrapper,
final ActorContext context,
final DistributedShardedDOMDataTree shardingService,
- final DOMDataTreeIdentifier toLookup) {
- super(replyTo);
+ final DOMDataTreeIdentifier toLookup,
+ final int lookupMaxRetries) {
+ super(replyTo, lookupMaxRetries);
this.system = system;
this.replyTo = replyTo;
this.clusterWrapper = clusterWrapper;
this.context = context;
this.shardingService = shardingService;
this.toLookup = toLookup;
+ this.lookupMaxRetries = lookupMaxRetries;
}
@Override
system.scheduler().scheduleOnce(
SHARD_LOOKUP_TASK_INTERVAL,
new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
- shardingService, toLookup),
+ shardingService, toLookup, lookupMaxRetries),
system.dispatcher());
}
}
private final ActorRef shard;
private final DistributedShardedDOMDataTree shardingService;
private final DOMDataTreeIdentifier toLookup;
+ private final int lookupMaxRetries;
ShardLeaderLookupTask(final ActorSystem system,
final ActorRef replyTo,
final ClusterWrapper clusterWrapper,
final ActorRef shard,
final DistributedShardedDOMDataTree shardingService,
- final DOMDataTreeIdentifier toLookup) {
- super(replyTo);
+ final DOMDataTreeIdentifier toLookup,
+ final int lookupMaxRetries) {
+ super(replyTo, lookupMaxRetries);
this.system = system;
this.replyTo = replyTo;
this.context = context;
this.shard = shard;
this.shardingService = shardingService;
this.toLookup = toLookup;
+ this.lookupMaxRetries = lookupMaxRetries;
}
@Override
clusterWrapper.getCurrentMemberName(), toLookup);
system.scheduler().scheduleOnce(
SHARD_LOOKUP_TASK_INTERVAL,
- new FrontendLookupTask(system, replyTo, shardingService, toLookup),
+ new FrontendLookupTask(
+ system, replyTo, shardingService, toLookup, lookupMaxRetries),
system.dispatcher());
} else {
tryReschedule(null);
FrontendLookupTask(final ActorSystem system,
final ActorRef replyTo,
final DistributedShardedDOMDataTree shardingService,
- final DOMDataTreeIdentifier toLookup) {
- super(replyTo);
+ final DOMDataTreeIdentifier toLookup,
+ final int lookupMaxRetries) {
+ super(replyTo, lookupMaxRetries);
this.system = system;
this.replyTo = replyTo;
this.shardingService = shardingService;
ShardRemovalLookupTask(final ActorSystem system,
final ActorRef replyTo,
final ActorContext context,
- final DOMDataTreeIdentifier toLookup) {
- super(replyTo);
+ final DOMDataTreeIdentifier toLookup,
+ final int lookupMaxRetries) {
+ super(replyTo, lookupMaxRetries);
this.system = system;
this.replyTo = replyTo;
this.context = context;
}
}
+ /**
+ * Task for handling the lookup of the backend for the configuration shard.
+ */
+ private static class ConfigShardLookupTask extends LookupTask {
+
+ private final ActorSystem system;
+ private final ActorRef replyTo;
+ private final ActorContext context;
+ private final ClusterWrapper clusterWrapper;
+ private final int lookupTaskMaxRetries;
+
+ ConfigShardLookupTask(final ActorSystem system,
+ final ActorRef replyTo,
+ final ActorContext context,
+ final ClusterWrapper clusterWrapper,
+ final StartConfigShardLookup message,
+ final int lookupMaxRetries) {
+ super(replyTo, lookupMaxRetries);
+ this.system = system;
+ this.replyTo = replyTo;
+ this.context = context;
+ this.clusterWrapper = clusterWrapper;
+ this.lookupTaskMaxRetries = lookupMaxRetries;
+ }
+
+ @Override
+ void reschedule(int retries) {
+ LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries);
+ system.scheduler().scheduleOnce(
+ SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher());
+ }
+
+ @Override
+ public void run() {
+ final Optional<ActorRef> localShard =
+ context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID);
+
+ if (!localShard.isPresent()) {
+ tryReschedule(null);
+ } else {
+ LOG.debug("Local backend for prefix configuration shard lookup successful, starting leader lookup..");
+ system.scheduler().scheduleOnce(
+ SHARD_LOOKUP_TASK_INTERVAL,
+ new ConfigShardReadinessTask(
+ system, replyTo, context, clusterWrapper, localShard.get(), lookupTaskMaxRetries),
+ system.dispatcher());
+ }
+ }
+ }
+
+ /**
+ * Task for handling the readiness state of the config shard. Reports success once the leader is elected.
+ */
+ private static class ConfigShardReadinessTask extends LookupTask {
+
+ private final ActorSystem system;
+ private final ActorRef replyTo;
+ private final ActorContext context;
+ private final ClusterWrapper clusterWrapper;
+ private final ActorRef shard;
+
+ ConfigShardReadinessTask(final ActorSystem system,
+ final ActorRef replyTo,
+ final ActorContext context,
+ final ClusterWrapper clusterWrapper,
+ final ActorRef shard,
+ final int lookupMaxRetries) {
+ super(replyTo, lookupMaxRetries);
+ this.system = system;
+ this.replyTo = replyTo;
+ this.context = context;
+ this.clusterWrapper = clusterWrapper;
+ this.shard = shard;
+ }
+
+ @Override
+ void reschedule(int retries) {
+ LOG.debug("{} - Leader for config shard not found on try: {}, retrying..",
+ clusterWrapper.getCurrentMemberName(), retries);
+ system.scheduler().scheduleOnce(
+ SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher());
+ }
+
+ @Override
+ public void run() {
+ final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
+
+ ask.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable {
+ if (throwable != null) {
+ tryReschedule(throwable);
+ } else {
+ final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
+ final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
+ if (leaderActor.isPresent()) {
+ // leader is found, backend seems ready, check if the frontend is ready
+ LOG.debug("{} - Leader for config shard is ready. Ending lookup.",
+ clusterWrapper.getCurrentMemberName());
+ replyTo.tell(new Status.Success(null), noSender());
+ } else {
+ tryReschedule(null);
+ }
+ }
+ }
+ }, system.dispatcher());
+ }
+ }
+
public static class ShardedDataTreeActorCreator {
private DistributedShardedDOMDataTree shardingService;
private DistributedDataStore distributedOperDatastore;
private ActorSystem actorSystem;
private ClusterWrapper cluster;
+ private int maxRetries;
public DistributedShardedDOMDataTree getShardingService() {
return shardingService;
return this;
}
+ public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int maxRetries) {
+ this.maxRetries = maxRetries;
+ return this;
+ }
+
+ public int getLookupTaskMaxRetries() {
+ return maxRetries;
+ }
+
private void verify() {
Preconditions.checkNotNull(shardingService);
Preconditions.checkNotNull(actorSystem);
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding.messages;
+
+/**
+ * Message sent to the local ShardManager, once the shard configuration shard is ready and the ShardManager should
+ * start its listener.
+ */
+public class InitConfigListener {
+
+ public static final InitConfigListener INSTANCE = new InitConfigListener();
+
+ private InitConfigListener() {
+
+ }
+}
import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
import java.io.Serializable;
-import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
/**
* Sent to the local {@link ShardedDataTreeActor} when there was a shard created
* create the required frontend/backend shards.
*/
@Beta
-public class CreatePrefixShard implements Serializable {
+public class LookupPrefixShard implements Serializable {
private static final long serialVersionUID = 1L;
- private final PrefixShardConfiguration configuration;
+ private final DOMDataTreeIdentifier prefix;
- public CreatePrefixShard(final PrefixShardConfiguration configuration) {
- this.configuration = Preconditions.checkNotNull(configuration);
+ public LookupPrefixShard(final DOMDataTreeIdentifier prefix) {
+ this.prefix = Preconditions.checkNotNull(prefix);
}
- public PrefixShardConfiguration getConfiguration() {
- return configuration;
+ public DOMDataTreeIdentifier getPrefix() {
+ return prefix;
}
@Override
public String toString() {
- return "CreatePrefixShard{"
- + "configuration="
- + configuration
+ return "LookupPrefixShard{"
+ + "prefix="
+ + prefix
+ '}';
}
}
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
/**
- * Sent to the local {@link ShardedDataTreeActor} to notify of a shard removal on the local node.
- * The local actor should update the configuration so that the change is picked up by other CDS Node Agents and
- * backend ShardManagers.
+ * Sent to the local {@link ShardedDataTreeActor} to initiate the lookup of the shard, once the shard is removed from
+ * the system entirely the actor responds with a success.
*/
-public class RemovePrefixShard {
+public class PrefixShardRemovalLookup {
private final DOMDataTreeIdentifier prefix;
- public RemovePrefixShard(final DOMDataTreeIdentifier prefix) {
+ public PrefixShardRemovalLookup(final DOMDataTreeIdentifier prefix) {
this.prefix = Preconditions.checkNotNull(prefix);
}
@Override
public String toString() {
- return "RemovePrefixShard{"
+ return "PrefixShardRemovalLookup{"
+ "prefix=" + prefix
+ '}';
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding.messages;
+
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+
+/**
+ * Message that should be sent to ShardedDataTreeActor when the lookup of the prefix config shard should begin.
+ * Replied to with Succes once the shard has a leader.
+ */
+public class StartConfigShardLookup {
+
+ private LogicalDatastoreType type;
+
+ public StartConfigShardLookup(final LogicalDatastoreType type) {
+ this.type = type;
+ }
+
+ public LogicalDatastoreType getType() {
+ return type;
+ }
+
+ @Override
+ public String toString() {
+ return "StartConfigShardLookup{type=" + type + '}';
+ }
+}
--- /dev/null
+module prefix-shard-configuration {
+ yang-version 1;
+ namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:clustering:prefix-shard-configuration";
+ prefix "prefix-config";
+
+ description
+ "This module contains the base YANG definitions for
+ shards based on prefix configuration";
+
+ revision "2017-01-10" {
+ description "Initial revision.";
+ }
+
+ container prefix-shards {
+
+ list shard {
+ key prefix;
+ leaf prefix {
+ type instance-identifier;
+ description "Prefix that this shard is rooted at.";
+ }
+
+ container replicas {
+ leaf-list replica {
+ type string;
+ }
+
+ description "List of cluster member nodes that this shard is replicated on";
+ }
+
+ description "List of prefix-based shards configured.";
+ }
+ }
+}
package org.opendaylight.controller.cluster.datastore;
+import static org.mockito.Mockito.mock;
import static org.mockito.MockitoAnnotations.initMocks;
import akka.actor.ActorRef;
protected static CountDownLatch ready;
protected TestShardManager.Builder newTestShardMgrBuilder() {
- return TestShardManager.builder(datastoreContextBuilder);
+ return TestShardManager.builder(datastoreContextBuilder).distributedDataStore(mock(DistributedDataStore.class));
}
protected TestShardManager.Builder newTestShardMgrBuilder(final Configuration config) {
- return TestShardManager.builder(datastoreContextBuilder).configuration(config);
+ return TestShardManager.builder(datastoreContextBuilder).configuration(config)
+ .distributedDataStore(mock(DistributedDataStore.class));
}
protected Props newShardMgrProps(final Configuration config) {
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
public AbstractDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
final boolean waitUntilLeader, final SchemaContext schemaContext, final String... shardNames) {
+ return setupDistributedDataStore(typeName, moduleShardsConfig, "modules.conf", waitUntilLeader,
+ schemaContext, shardNames);
+ }
+
+ public AbstractDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
+ final String modulesConfig, final boolean waitUntilLeader,
+ final SchemaContext schemaContext, final String... shardNames) {
final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
- final Configuration config = new ConfigurationImpl(moduleShardsConfig, "modules.conf");
+ final Configuration config = new ConfigurationImpl(moduleShardsConfig, modulesConfig);
datastoreContextBuilder.dataStoreName(typeName);
return dataStore;
}
+ public DistributedDataStore setupDistributedDataStoreWithoutConfig(final String typeName,
+ final SchemaContext schemaContext,
+ final LogicalDatastoreType storeType) {
+ final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
+ final ConfigurationImpl configuration = new ConfigurationImpl(new EmptyModuleShardConfigProvider());
+
+ getDatastoreContextBuilder().dataStoreName(typeName);
+
+ final DatastoreContext datastoreContext =
+ getDatastoreContextBuilder().logicalStoreType(storeType).build();
+
+ final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
+ Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
+ Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
+
+ final DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
+ configuration, mockContextFactory, restoreFromSnapshot);
+
+ dataStore.onGlobalContextUpdated(schemaContext);
+
+ datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
+ return dataStore;
+ }
+
public void waitUntilLeader(final ActorContext actorContext, final String... shardNames) {
for (String shardName: shardNames) {
ActorRef shard = findLocalShard(actorContext, shardName);
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorRef;
-import akka.actor.Status.Success;
-import akka.testkit.JavaTestKit;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.Shard.Builder;
-import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
-import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
-import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
-import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
-import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
-import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
-import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy;
-import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Tests prefix shard creation in ShardManager.
- */
-public class PrefixShardCreationTest extends AbstractShardManagerTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(PrefixShardCreationTest.class);
-
- private static final DOMDataTreeIdentifier TEST_ID =
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
-
- @Test
- public void testPrefixShardCreation() throws Exception {
-
- LOG.info("testPrefixShardCreation starting");
- new JavaTestKit(getSystem()) {
- {
- datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
-
- final ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
- new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
-
- final SchemaContext schemaContext = TestModel.createTestContext();
- shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
-
- shardManager.tell(new FindLocalShard(
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
- expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
-
- final Builder builder = Shard.builder();
-
- final CreatePrefixedShard createPrefixedShard = new CreatePrefixedShard(
- new PrefixShardConfiguration(TEST_ID,
- PrefixShardStrategy.NAME,
- Collections.singletonList(MEMBER_1)),
- datastoreContextBuilder.build(), builder);
-
- shardManager.tell(createPrefixedShard, getRef());
- expectMsgClass(duration("5 seconds"), Success.class);
-
- shardManager.tell(new FindLocalShard(
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
- expectMsgClass(duration("5 seconds"), LocalShardFound.class);
- }
- };
- }
-}
\ No newline at end of file
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.ShardManager.SchemaContextModules;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
}
private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) {
- return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor);
+ return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
+ .distributedDataStore(mock(DistributedDataStore.class));
}
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.AddressFromURIString;
-import akka.actor.PoisonPill;
import akka.cluster.Cluster;
-import akka.cluster.ddata.DistributedData;
import akka.testkit.JavaTestKit;
import com.google.common.collect.Lists;
import com.typesafe.config.ConfigFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Ignore("Needs to have the configuration backend switched from distributed-data")
public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
private static final DOMDataTreeIdentifier TEST_ID =
new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+ private static final String MODULE_SHARDS_CONFIG = "module-shards-cars-member-1-and-2.conf";
+
private ActorSystem leaderSystem;
private ActorSystem followerSystem;
.logicalStoreType(
org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION);
- private DistributedDataStore followerDistributedDataStore;
- private DistributedDataStore leaderDistributedDataStore;
+ private DistributedDataStore leaderConfigDatastore;
+ private DistributedDataStore leaderOperDatastore;
+
+ private DistributedDataStore followerConfigDatastore;
+ private DistributedDataStore followerOperDatastore;
+
+
private IntegrationTestKit followerTestKit;
private IntegrationTestKit leaderTestKit;
-
private DistributedShardedDOMDataTree leaderShardFactory;
- private DistributedShardedDOMDataTree followerShardFactory;
+ private DistributedShardedDOMDataTree followerShardFactory;
private ActorSystemProvider leaderSystemProvider;
private ActorSystemProvider followerSystemProvider;
@Before
public void setUp() {
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
@After
public void tearDown() {
- if (followerDistributedDataStore != null) {
- followerDistributedDataStore.close();
+ if (leaderConfigDatastore != null) {
+ leaderConfigDatastore.close();
}
- if (leaderDistributedDataStore != null) {
- leaderDistributedDataStore.close();
+ if (leaderOperDatastore != null) {
+ leaderOperDatastore.close();
}
- DistributedData.get(leaderSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
- DistributedData.get(followerSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ if (followerConfigDatastore != null) {
+ followerConfigDatastore.close();
+ }
+ if (followerOperDatastore != null) {
+ followerOperDatastore.close();
+ }
JavaTestKit.shutdownActorSystem(leaderSystem);
JavaTestKit.shutdownActorSystem(followerSystem);
+
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
}
- private void initEmptyDatastores(final String type) {
+ private void initEmptyDatastores() {
leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
- leaderDistributedDataStore =
- leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
+ leaderConfigDatastore = (DistributedDataStore) leaderTestKit.setupDistributedDataStore(
+ "config", MODULE_SHARDS_CONFIG, true,
+ SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
+ leaderOperDatastore = (DistributedDataStore) leaderTestKit.setupDistributedDataStore(
+ "operational", MODULE_SHARDS_CONFIG, true,
+ SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
+
+ leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
+ leaderOperDatastore,
+ leaderConfigDatastore);
followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
- followerDistributedDataStore =
- followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
- leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
- leaderDistributedDataStore,
- leaderDistributedDataStore);
+ followerConfigDatastore = (DistributedDataStore) followerTestKit.setupDistributedDataStore(
+ "config", MODULE_SHARDS_CONFIG, true, SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
+ followerOperDatastore = (DistributedDataStore) followerTestKit.setupDistributedDataStore(
+ "operational", MODULE_SHARDS_CONFIG, true,
+ SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider,
- followerDistributedDataStore,
- followerDistributedDataStore);
+ followerOperDatastore,
+ followerConfigDatastore);
+
+ leaderShardFactory.init();
+ followerShardFactory.init();
+
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
+ ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderOperDatastore.getActorContext(),
ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
+
}
@Test
public void testProducerRegistrations() throws Exception {
- initEmptyDatastores("config");
+ initEmptyDatastores();
leaderTestKit.waitForMembersUp("member-2");
TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
- final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
+ final ActorRef leaderShardManager = leaderConfigDatastore.getActorContext().getShardManager();
- assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
- assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
} catch (final DOMDataTreeShardingConflictException e) {
assertTrue(e.getMessage().contains("is already occupied by another shard"));
}
+
+ shardRegistration.close().toCompletableFuture().get();
}
@Test
public void testWriteIntoMultipleShards() throws Exception {
- initEmptyDatastores("config");
+ initEmptyDatastores();
leaderTestKit.waitForMembersUp("member-2");
DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
- findLocalShard(followerDistributedDataStore.getActorContext(),
+ findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
LOG.debug("Got after waiting for nonleader");
LOG.warn("Got to pre submit");
tx.submit().checkedGet();
+
+ shardRegistration.close().toCompletableFuture().get();
}
@Test
public void testMultipleShardRegistrations() throws Exception {
- initEmptyDatastores("config");
+ initEmptyDatastores();
final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
// check leader has local shards
- assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
- assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
- assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
- assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
// check follower has local shards
- assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
- assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
- assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
- assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
LOG.debug("Closing registrations");
- reg1.close();
- reg2.close();
- reg3.close();
- reg4.close();
+ reg1.close().toCompletableFuture().get();
+ reg2.close().toCompletableFuture().get();
+ reg3.close().toCompletableFuture().get();
+ reg4.close().toCompletableFuture().get();
- waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
- waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
- waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
- waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
LOG.debug("All leader shards gone");
- waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
- waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
- waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
- waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
LOG.debug("All follower shards gone");
@Test
public void testMultipleRegistrationsAtOnePrefix() throws Exception {
- initEmptyDatastores("config");
+ initEmptyDatastores();
for (int i = 0; i < 10; i++) {
LOG.debug("Round {}", i);
TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
- assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
- assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
- waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
}
+
}
}
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Ignore("distributed-data is broken needs to be removed")
public class DistributedShardedDOMDataTreeTest extends AbstractTest {
private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
.node(TestModel.INNER_LIST_QNAME));
private static final Set<MemberName> SINGLE_MEMBER = Collections.singleton(AbstractTest.MEMBER_NAME);
+ private static final String MODULE_SHARDS_CONFIG = "module-shards-cars-member-1.conf";
+
private ActorSystem leaderSystem;
private final Builder leaderDatastoreContextBuilder =
org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION);
private DistributedDataStore leaderDistributedDataStore;
+ private DistributedDataStore operDistributedDatastore;
private IntegrationTestKit leaderTestKit;
private DistributedShardedDOMDataTree leaderShardFactory;
public void setUp() {
MockitoAnnotations.initMocks(this);
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
+
leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
}
@After
- public void tearDown() {
+ public void tearDown() throws Exception {
if (leaderDistributedDataStore != null) {
leaderDistributedDataStore.close();
}
+ if (operDistributedDatastore != null) {
+ operDistributedDatastore.close();
+ }
+
JavaTestKit.shutdownActorSystem(leaderSystem);
+
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
}
- private void initEmptyDatastore(final String type) {
+ private void initEmptyDatastores() {
leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
- leaderDistributedDataStore =
- leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
+ leaderDistributedDataStore = (DistributedDataStore) leaderTestKit.setupDistributedDataStore(
+ "config", MODULE_SHARDS_CONFIG, "empty-modules.conf", true,
+ SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
+ operDistributedDatastore = (DistributedDataStore) leaderTestKit.setupDistributedDataStore(
+ "operational", MODULE_SHARDS_CONFIG, "empty-modules.conf",true,
+ SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
- leaderDistributedDataStore,
+ operDistributedDatastore,
leaderDistributedDataStore);
+
+ leaderShardFactory.init();
}
@Test
public void testWritesIntoDefaultShard() throws Exception {
- initEmptyDatastore("config");
+ initEmptyDatastores();
final DOMDataTreeIdentifier configRoot =
new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
@Test
public void testSingleNodeWrites() throws Exception {
- initEmptyDatastore("config");
+ initEmptyDatastores();
final DistributedShardRegistration shardRegistration = waitOnAsyncTask(
leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)),
YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
final LeafNode<String> valueToCheck = ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build();
+ LOG.debug("Writing data {} at {}, cursor {}", nameId.getLastPathArgument(), valueToCheck, cursor);
cursor.write(nameId.getLastPathArgument(),
valueToCheck);
verifyNoMoreInteractions(mockedDataTreeListener);
+ shardRegistration.close().toCompletableFuture().get();
+
}
@Test
public void testMultipleWritesIntoSingleMapEntry() throws Exception {
- initEmptyDatastore("config");
+ initEmptyDatastores();
final DistributedShardRegistration shardRegistration = waitOnAsyncTask(
leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)),
}
// top level shard at TEST element, with subshards on each outer-list map entry
+ @Ignore("https://bugs.opendaylight.org/show_bug.cgi?id=8116")
@Test
public void testMultipleShardLevels() throws Exception {
- initEmptyDatastore("config");
+ initEmptyDatastores();
- final DistributedShardRegistration testShardId = waitOnAsyncTask(
+ final DistributedShardRegistration testShardReg = waitOnAsyncTask(
leaderShardFactory.createDistributedShard(TEST_ID, SINGLE_MEMBER),
DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
true, Collections.emptyList());
// need 6 invocations, first initial thats from the parent shard, and then each individual subshard
- verify(mockedDataTreeListener, timeout(10000).times(6)).onDataTreeChanged(captorForChanges.capture(),
+ verify(mockedDataTreeListener, timeout(20000).times(6)).onDataTreeChanged(captorForChanges.capture(),
captorForSubtrees.capture());
verifyNoMoreInteractions(mockedDataTreeListener);
final List<Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>>> allSubtrees = captorForSubtrees.getAllValues();
.withValue(createOuterEntries(listSize, "testing-values")).build())
.build();
- assertEquals(expected, actual);
- }
-
- @Test
- public void testDistributedData() throws Exception {
- initEmptyDatastore("config");
-
- waitOnAsyncTask(
- leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)),
- DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
-
- waitOnAsyncTask(
- leaderShardFactory.createDistributedShard(
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
- Lists.newArrayList(AbstractTest.MEMBER_NAME)),
- DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- waitOnAsyncTask(
- leaderShardFactory.createDistributedShard(
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
- Lists.newArrayList(AbstractTest.MEMBER_NAME)),
- DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
+ for (final DistributedShardRegistration registration : registrations) {
+ waitOnAsyncTask(registration.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
+ }
- waitOnAsyncTask(
- leaderShardFactory.createDistributedShard(
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
- Lists.newArrayList(AbstractTest.MEMBER_NAME)),
- DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
-
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
- ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
- ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
- ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
+ waitOnAsyncTask(testShardReg.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
+ assertEquals(expected, actual);
}
@Test
public void testMultipleRegistrationsAtOnePrefix() throws Exception {
- initEmptyDatastore("config");
+ initEmptyDatastores();
for (int i = 0; i < 10; i++) {
LOG.debug("Round {}", i);
import com.google.common.base.Throwables;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
}
}
+ public static SchemaContext distributedShardedDOMDataTreeSchemaContext() {
+ final List<InputStream> streams = new ArrayList<>();
+ try {
+ // we need prefix-shard-configuration and odl-datastore-test models
+ // for DistributedShardedDOMDataTree tests
+ streams.add(getInputStream(ODL_DATASTORE_TEST_YANG));
+ streams.add(new FileInputStream("src/main/yang/prefix-shard-configuration.yang"));
+ return YangParserTestUtils.parseYangStreams(streams);
+ } catch (FileNotFoundException | ReactorException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public static SchemaContext entityOwners() {
try {
return YangParserTestUtils.parseYangSources(new File("src/main/yang/entity-owners.yang"));
--- /dev/null
+modules = []
\ No newline at end of file