package org.opendaylight.controller.cluster.datastore.shardmanager;
-import static akka.actor.ActorRef.noSender;
import static akka.pattern.Patterns.ask;
import akka.actor.ActorRef;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.MemberWeaklyUp;
import akka.cluster.Member;
-import akka.cluster.ddata.DistributedData;
-import akka.cluster.ddata.ORMap;
-import akka.cluster.ddata.Replicator.Changed;
-import akka.cluster.ddata.Replicator.Subscribe;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.japi.Function;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;
-import java.util.stream.Collectors;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
-import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler;
+import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
private final String persistenceId;
+ private final AbstractDataStore dataStore;
- private final ActorRef replicator;
+ private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
+ private PrefixedShardConfigUpdateHandler configUpdateHandler;
ShardManager(AbstractShardManagerCreator<?> builder) {
this.cluster = builder.getCluster();
datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
shardManagerMBean.registerMBean();
- replicator = DistributedData.get(context().system()).replicator();
-
+ dataStore = builder.getDistributedDataStore();
}
+ @Override
public void preStart() {
- LOG.info("Starting Shardmanager {}", persistenceId);
-
- final Subscribe<ORMap<PrefixShardConfiguration>> subscribe =
- new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self());
- replicator.tell(subscribe, noSender());
+ LOG.info("Starting ShardManager {}", persistenceId);
}
@Override
LOG.info("Stopping ShardManager {}", persistenceId());
shardManagerMBean.unregisterMBean();
+
+ if (configListenerReg != null) {
+ configListenerReg.close();
+ configListenerReg = null;
+ }
}
@Override
onCreateShard((CreateShard)message);
} else if (message instanceof AddShardReplica) {
onAddShardReplica((AddShardReplica) message);
- } else if (message instanceof CreatePrefixedShard) {
- onCreatePrefixedShard((CreatePrefixedShard) message);
- } else if (message instanceof AddPrefixShardReplica) {
- onAddPrefixShardReplica((AddPrefixShardReplica) message);
+ } else if (message instanceof PrefixShardCreated) {
+ onPrefixShardCreated((PrefixShardCreated) message);
+ } else if (message instanceof PrefixShardRemoved) {
+ onPrefixShardRemoved((PrefixShardRemoved) message);
+ } else if (message instanceof InitConfigListener) {
+ onInitConfigListener();
} else if (message instanceof ForwardedAddServerReply) {
ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
onGetLocalShardIds();
} else if (message instanceof RunnableMessage) {
((RunnableMessage)message).run();
- } else if (message instanceof Changed) {
- onConfigChanged((Changed) message);
} else {
unknownMessage(message);
}
}
+ private void onInitConfigListener() {
+ LOG.debug("{}: Initializing config listener.", persistenceId());
+
+ final org.opendaylight.mdsal.common.api.LogicalDatastoreType type =
+ org.opendaylight.mdsal.common.api.LogicalDatastoreType
+ .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
+
+ if (configUpdateHandler != null) {
+ configUpdateHandler.close();
+ }
+
+ configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
+ configUpdateHandler.initListener(dataStore, type);
+ }
+
private void onShutDown() {
List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
for (ShardInformation info : localShards.values()) {
}
}
- private void onConfigChanged(final Changed<ORMap<PrefixShardConfiguration>> change) {
- LOG.debug("{}, ShardManager {} received config changed {}",
- cluster.getCurrentMemberName(), persistenceId, change.dataValue().getEntries());
-
- final Map<String, PrefixShardConfiguration> changedConfig = change.dataValue().getEntries();
-
- final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig =
- changedConfig.values().stream().collect(
- Collectors.toMap(PrefixShardConfiguration::getPrefix, java.util.function.Function.identity()));
-
- resolveConfig(newConfig);
- }
-
- private void resolveConfig(final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig) {
- LOG.debug("{} ShardManager : {}, resolving new shard configuration : {}",
- cluster.getCurrentMemberName(), persistenceId, newConfig);
-
- newConfig.forEach((prefix, config) ->
- LOG.debug("{} ShardManager : {}, received shard config "
- + "for prefix {}, config {}", cluster.getCurrentMemberName(), persistenceId, prefix, config));
-
- final SetView<DOMDataTreeIdentifier> removedConfigs =
- Sets.difference(configuration.getAllPrefixShardConfigurations().keySet(), newConfig.keySet());
-
- // resolve removals
-
- resolveRemovals(removedConfigs);
-
- final SetView<DOMDataTreeIdentifier> addedConfigs =
- Sets.difference(newConfig.keySet(), configuration.getAllPrefixShardConfigurations().keySet());
- // resolve additions
-
- resolveAdditions(addedConfigs, newConfig);
- // iter through to update existing shards, either start/stop replicas or update the shard
- // to check for more peers
- resolveUpdates(Collections.emptySet());
- }
-
- private void resolveRemovals(final Set<DOMDataTreeIdentifier> removedConfigs) {
- LOG.debug("{} ShardManager : {}, resolving removed configs : {}",
- cluster.getCurrentMemberName(), persistenceId, removedConfigs);
-
- removedConfigs.forEach(id -> doRemovePrefixedShard(id));
- }
-
- private void resolveAdditions(final Set<DOMDataTreeIdentifier> addedConfigs,
- final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> configs) {
- LOG.debug("{} ShardManager : {}, resolving added configs : {}", addedConfigs);
-
- addedConfigs.stream().filter(identifier
- -> identifier
- .getDatastoreType().equals(
- ClusterUtils.toMDSalApi(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType())))
- .forEach(id -> doCreatePrefixedShard(configs.get(id)));
- }
-
- private void resolveUpdates(Set<DOMDataTreeIdentifier> maybeUpdatedConfigs) {
- LOG.debug("{} ShardManager : {}, resolving potentially updated configs : {}", maybeUpdatedConfigs);
- }
-
- private void doRemovePrefixedShard(final DOMDataTreeIdentifier prefix) {
- LOG.debug("{} ShardManager : {}, removing prefix shard: {}",
- cluster.getCurrentMemberName(), persistenceId, prefix);
- final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix);
- final ShardInformation shard = localShards.remove(shardId.getShardName());
-
- configuration.removePrefixShardConfiguration(prefix);
-
- if (shard == null) {
- LOG.warn("Received removal for unconfigured shard: {} , ignoring.. ", prefix);
- return;
- }
-
- if (shard.getActor() != null) {
- LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor());
- shard.getActor().tell(Shutdown.INSTANCE, self());
- }
- LOG.debug("{} : {} : Local Shard replica for shard {} has been removed", cluster.getCurrentMemberName(),
- persistenceId(), shardId.getShardName());
- persistShardList();
- }
-
private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
String leaderPath) {
shardReplicaOperationsInProgress.remove(shardId.getShardName());
}
}
- @SuppressWarnings("checkstyle:IllegalCatch")
- private void onCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
- LOG.debug("{}: onCreatePrefixedShard: {}", persistenceId(), createPrefixedShard);
-
- Object reply;
- try {
- final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
- createPrefixedShard.getConfig().getPrefix());
- if (localShards.containsKey(shardId.getShardName())) {
- LOG.debug("{}: Shard {} already exists", persistenceId(), shardId);
- reply = new Status.Success(String.format("Shard with name %s already exists", shardId));
- } else {
- doCreatePrefixedShard(createPrefixedShard);
- reply = new Status.Success(null);
- }
- } catch (final Exception e) {
- LOG.error("{}: onCreateShard failed", persistenceId(), e);
- reply = new Status.Failure(e);
- }
-
- if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
- getSender().tell(reply, getSelf());
- }
- }
-
@SuppressWarnings("checkstyle:IllegalCatch")
private void onCreateShard(CreateShard createShard) {
LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
}
}
- private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
- doCreatePrefixedShard(createPrefixedShard.getConfig());
- // do not replicate on this level
- }
+ private void onPrefixShardCreated(final PrefixShardCreated message) {
+ LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message);
- private void doCreatePrefixedShard(final PrefixShardConfiguration config) {
- LOG.debug("doCreatePrefixShard : {}", config.getPrefix());
+ final PrefixShardConfiguration config = message.getConfiguration();
final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
config.getPrefix());
}
}
+ doCreatePrefixShard(config, shardId, shardName);
+ }
+
+ private void doCreatePrefixShard(PrefixShardConfiguration config, ShardIdentifier shardId, String shardName) {
configuration.addPrefixShardConfiguration(config);
final Builder builder = newShardDatastoreContextBuilder(shardName);
final Map<String, String> peerAddresses = Collections.emptyMap();
final boolean isActiveMember = true;
- LOG.debug("{} doCreatePrefixedShard: persistenceId(): {}, memberNames: "
- + "{}, peerAddresses: {}, isActiveMember: {}",
- shardId, persistenceId(), config.getShardMemberNames(),
- peerAddresses, isActiveMember);
+
+ LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+ persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember);
final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
shardDatastoreContext, Shard.builder(), peerAddressResolver);
if (schemaContext != null) {
info.setActor(newShardActor(schemaContext, info));
}
+ }
+
+ private void onPrefixShardRemoved(final PrefixShardRemoved message) {
+ LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message);
+
+ final DOMDataTreeIdentifier prefix = message.getPrefix();
+ final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix);
+ final ShardInformation shard = localShards.remove(shardId.getShardName());
+
+ configuration.removePrefixShardConfiguration(prefix);
+
+ if (shard == null) {
+ LOG.warn("{}: Received removal for unconfigured shard: {}, ignoring.. ", persistenceId(), prefix);
+ return;
+ }
+ if (shard.getActor() != null) {
+ LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor());
+ shard.getActor().tell(Shutdown.INSTANCE, self());
+ }
+
+ LOG.debug("{}: Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
persistShardList();
}
return false;
}
-
- // With this message the shard does NOT have to be preconfigured
- // do a dynamic lookup if the shard exists somewhere and replicate
- private void onAddPrefixShardReplica(final AddPrefixShardReplica shardReplicaMsg) {
- final String shardName = ClusterUtils.getCleanShardName(shardReplicaMsg.getPrefix());
-
- LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), shardReplicaMsg);
-
- if (schemaContext == null) {
- final String msg = String.format(
- "No SchemaContext is available in order to create a local shard instance for %s", shardName);
- LOG.debug("{}: {}", persistenceId(), msg);
- getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
- return;
- }
-
- findPrimary(shardName,
- new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
- @Override
- public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
- getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()),
- getTargetActor());
- }
-
- @Override
- public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
- sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
- }
- }
- );
- }
-
private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
final String shardName = shardReplicaMsg.getShardName();
}
private void findLocalShard(FindLocalShard message) {
+ LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
+
final ShardInformation shardInformation = localShards.get(message.getShardName());
if (shardInformation == null) {
+ LOG.debug("{}: Local shard {} not found - shards present: {}",
+ persistenceId(), message.getShardName(), localShards.keySet());
+
getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
return;
}