package org.opendaylight.controller.cluster.datastore.shardmanager;
+import static akka.actor.ActorRef.noSender;
import static akka.pattern.Patterns.ask;
import akka.actor.ActorRef;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.MemberWeaklyUp;
import akka.cluster.Member;
+import akka.cluster.ddata.DistributedData;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.Replicator.Changed;
+import akka.cluster.ddata.Replicator.Subscribe;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.japi.Function;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import java.io.ByteArrayInputStream;
-import java.io.ObjectInputStream;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Sets.SetView;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;
-import org.apache.commons.lang3.SerializationUtils;
+import java.util.stream.Collectors;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
+import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final String persistenceId;
+ private final ActorRef replicator;
+
ShardManager(AbstractShardManagerCreator<?> builder) {
this.cluster = builder.getCluster();
this.configuration = builder.getConfiguration();
this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
- this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountdownLatch();
+ this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
"shard-manager-" + this.type,
datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
shardManagerMBean.registerMBean();
+
+ replicator = DistributedData.get(context().system()).replicator();
+
+ }
+
+ public void preStart() {
+ LOG.info("Starting Shardmanager {}", persistenceId);
+
+ final Subscribe<ORMap<PrefixShardConfiguration>> subscribe =
+ new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self());
+ replicator.tell(subscribe, noSender());
}
@Override
} else if (message instanceof SwitchShardBehavior) {
onSwitchShardBehavior((SwitchShardBehavior) message);
} else if (message instanceof CreateShard) {
- onCreateShard((CreateShard) message);
+ onCreateShard((CreateShard)message);
} else if (message instanceof AddShardReplica) {
onAddShardReplica((AddShardReplica) message);
+ } else if (message instanceof CreatePrefixedShard) {
+ onCreatePrefixedShard((CreatePrefixedShard) message);
+ } else if (message instanceof AddPrefixShardReplica) {
+ onAddPrefixShardReplica((AddPrefixShardReplica) message);
} else if (message instanceof ForwardedAddServerReply) {
- ForwardedAddServerReply msg = (ForwardedAddServerReply) message;
- onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, msg.removeShardOnFailure);
+ ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
+ onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
+ msg.removeShardOnFailure);
} else if (message instanceof ForwardedAddServerFailure) {
- ForwardedAddServerFailure msg = (ForwardedAddServerFailure) message;
+ ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
} else if (message instanceof RemoveShardReplica) {
onRemoveShardReplica((RemoveShardReplica) message);
onGetLocalShardIds();
} else if (message instanceof RunnableMessage) {
((RunnableMessage)message).run();
+ } else if (message instanceof Changed) {
+ onConfigChanged((Changed) message);
} else {
unknownMessage(message);
}
LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
} else {
int nfailed = 0;
- for (Boolean result: results) {
+ for (Boolean result : results) {
if (!result) {
nfailed++;
}
}
}
+ private void onConfigChanged(final Changed<ORMap<PrefixShardConfiguration>> change) {
+ LOG.debug("{}, ShardManager {} received config changed {}",
+ cluster.getCurrentMemberName(), persistenceId, change.dataValue().getEntries());
+
+ final Map<String, PrefixShardConfiguration> changedConfig = change.dataValue().getEntries();
+
+ final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig =
+ changedConfig.values().stream().collect(
+ Collectors.toMap(PrefixShardConfiguration::getPrefix, java.util.function.Function.identity()));
+
+ resolveConfig(newConfig);
+ }
+
+ private void resolveConfig(final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig) {
+ LOG.debug("{} ShardManager : {}, resolving new shard configuration : {}",
+ cluster.getCurrentMemberName(), persistenceId, newConfig);
+
+ newConfig.forEach((prefix, config) ->
+ LOG.debug("{} ShardManager : {}, received shard config "
+ + "for prefix {}, config {}", cluster.getCurrentMemberName(), persistenceId, prefix, config));
+
+ final SetView<DOMDataTreeIdentifier> removedConfigs =
+ Sets.difference(configuration.getAllPrefixShardConfigurations().keySet(), newConfig.keySet());
+
+ // resolve removals
+
+ resolveRemovals(removedConfigs);
+
+ final SetView<DOMDataTreeIdentifier> addedConfigs =
+ Sets.difference(newConfig.keySet(), configuration.getAllPrefixShardConfigurations().keySet());
+ // resolve additions
+
+ resolveAdditions(addedConfigs, newConfig);
+ // iter through to update existing shards, either start/stop replicas or update the shard
+ // to check for more peers
+ resolveUpdates(Collections.emptySet());
+ }
+
+ private void resolveRemovals(final Set<DOMDataTreeIdentifier> removedConfigs) {
+ LOG.debug("{} ShardManager : {}, resolving removed configs : {}",
+ cluster.getCurrentMemberName(), persistenceId, removedConfigs);
+
+ removedConfigs.forEach(id -> doRemovePrefixedShard(id));
+ }
+
+ private void resolveAdditions(final Set<DOMDataTreeIdentifier> addedConfigs,
+ final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> configs) {
+ LOG.debug("{} ShardManager : {}, resolving added configs : {}", addedConfigs);
+
+ addedConfigs.stream().filter(identifier
+ -> identifier
+ .getDatastoreType().equals(
+ ClusterUtils.toMDSalApi(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType())))
+ .forEach(id -> doCreatePrefixedShard(configs.get(id)));
+ }
+
+ private void resolveUpdates(Set<DOMDataTreeIdentifier> maybeUpdatedConfigs) {
+ LOG.debug("{} ShardManager : {}, resolving potentially updated configs : {}", maybeUpdatedConfigs);
+ }
+
+ private void doRemovePrefixedShard(final DOMDataTreeIdentifier prefix) {
+ LOG.debug("{} ShardManager : {}, removing prefix shard: {}",
+ cluster.getCurrentMemberName(), persistenceId, prefix);
+ final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix);
+ final ShardInformation shard = localShards.remove(shardId.getShardName());
+
+ configuration.removePrefixShardConfiguration(prefix);
+
+ if (shard == null) {
+ LOG.warn("Received removal for unconfigured shard: {} , ignoring.. ", prefix);
+ return;
+ }
+
+ if (shard.getActor() != null) {
+ LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor());
+ shard.getActor().tell(Shutdown.INSTANCE, self());
+ }
+ LOG.debug("{} : {} : Local Shard replica for shard {} has been removed", cluster.getCurrentMemberName(),
+ persistenceId(), shardId.getShardName());
+ persistShardList();
+ }
+
private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
String leaderPath) {
shardReplicaOperationsInProgress.remove(shardId.getShardName());
return;
}
- byte[] shardManagerSnapshot = null;
- if (currentSnapshot != null) {
- shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
- }
-
ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
- new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
+ new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), persistenceId(),
datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
for (ShardInformation shardInfo: localShards.values()) {
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void onCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
+ LOG.debug("{}: onCreatePrefixedShard: {}", persistenceId(), createPrefixedShard);
+
+ Object reply;
+ try {
+ final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
+ createPrefixedShard.getConfig().getPrefix());
+ if (localShards.containsKey(shardId.getShardName())) {
+ LOG.debug("{}: Shard {} already exists", persistenceId(), shardId);
+ reply = new Status.Success(String.format("Shard with name %s already exists", shardId));
+ } else {
+ doCreatePrefixedShard(createPrefixedShard);
+ reply = new Status.Success(null);
+ }
+ } catch (final Exception e) {
+ LOG.error("{}: onCreateShard failed", persistenceId(), e);
+ reply = new Status.Failure(e);
+ }
+
+ if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+ getSender().tell(reply, getSelf());
+ }
+ }
+
@SuppressWarnings("checkstyle:IllegalCatch")
private void onCreateShard(CreateShard createShard) {
LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
}
}
- private void doCreateShard(CreateShard createShard) {
- ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
- String shardName = moduleShardConfig.getShardName();
+ private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
+ doCreatePrefixedShard(createPrefixedShard.getConfig());
+ // do not replicate on this level
+ }
+
+ private void doCreatePrefixedShard(final PrefixShardConfiguration config) {
+ LOG.debug("doCreatePrefixShard : {}", config.getPrefix());
+
+ final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
+ config.getPrefix());
+ final String shardName = shardId.getShardName();
+
+ if (localShards.containsKey(shardName)) {
+ LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName);
+ final PrefixShardConfiguration existing =
+ configuration.getAllPrefixShardConfigurations().get(config.getPrefix());
+
+ if (existing != null && existing.equals(config)) {
+ // we don't have to do nothing here
+ return;
+ }
+ }
+
+ configuration.addPrefixShardConfiguration(config);
+
+ final Builder builder = newShardDatastoreContextBuilder(shardName);
+ builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
+ .storeRoot(config.getPrefix().getRootIdentifier());
+ DatastoreContext shardDatastoreContext = builder.build();
+
+ final Map<String, String> peerAddresses = Collections.emptyMap();
+ final boolean isActiveMember = true;
+ LOG.debug("{} doCreatePrefixedShard: persistenceId(): {}, memberNames: "
+ + "{}, peerAddresses: {}, isActiveMember: {}",
+ shardId, persistenceId(), config.getShardMemberNames(),
+ peerAddresses, isActiveMember);
+
+ final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
+ shardDatastoreContext, Shard.builder(), peerAddressResolver);
+ info.setActiveMember(isActiveMember);
+ localShards.put(info.getShardName(), info);
+
+ if (schemaContext != null) {
+ info.setActor(newShardActor(schemaContext, info));
+ }
+
+ persistShardList();
+ }
+
+ private void doCreateShard(final CreateShard createShard) {
+ final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+ final String shardName = moduleShardConfig.getShardName();
configuration.addModuleShardConfiguration(moduleShardConfig);
if (currentSnapshot == null && restoreFromSnapshot != null
&& restoreFromSnapshot.getShardManagerSnapshot() != null) {
- try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
- restoreFromSnapshot.getShardManagerSnapshot()))) {
- ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
+ ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot();
- LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
+ LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", persistenceId(), snapshot);
- applyShardManagerSnapshot(snapshot);
- } catch (Exception e) {
- LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
- }
+ applyShardManagerSnapshot(snapshot);
}
createLocalShards();
* @param shardName the shard name
*/
private Map<String, String> getPeerAddresses(String shardName) {
- Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
- Map<String, String> peerAddresses = new HashMap<>();
+ final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
+ return getPeerAddresses(shardName, members);
+ }
+ private Map<String, String> getPeerAddresses(final String shardName, final Collection<MemberName> members) {
+ Map<String, String> peerAddresses = new HashMap<>();
MemberName currentMemberName = this.cluster.getCurrentMemberName();
for (MemberName memberName : members) {
return false;
}
+
+ // With this message the shard does NOT have to be preconfigured
+ // do a dynamic lookup if the shard exists somewhere and replicate
+ private void onAddPrefixShardReplica(final AddPrefixShardReplica shardReplicaMsg) {
+ final String shardName = ClusterUtils.getCleanShardName(shardReplicaMsg.getPrefix());
+
+ LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), shardReplicaMsg);
+
+ if (schemaContext == null) {
+ final String msg = String.format(
+ "No SchemaContext is available in order to create a local shard instance for %s", shardName);
+ LOG.debug("{}: {}", persistenceId(), msg);
+ getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
+ return;
+ }
+
+ findPrimary(shardName,
+ new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
+ getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()),
+ getTargetActor());
+ }
+
+ @Override
+ public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
+ sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
+ }
+ }
+ );
+ }
+
private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
final String shardName = shardReplicaMsg.getShardName();
}
}
LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
- saveSnapshot(updateShardManagerSnapshot(shardList));
+ saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations()));
}
- private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
- currentSnapshot = new ShardManagerSnapshot(shardList);
+ private ShardManagerSnapshot updateShardManagerSnapshot(
+ final List<String> shardList,
+ final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> allPrefixShardConfigurations) {
+ currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations);
return currentSnapshot;
}
OnDemandRaftState raftState = (OnDemandRaftState) response;
Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
- for ( Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+ for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
serverVotingStatusMap.put(e.getKey(), !e.getValue());
}