import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
onCreateShard((CreateShard)message);
} else if (message instanceof AddShardReplica) {
onAddShardReplica((AddShardReplica) message);
+ } else if (message instanceof AddPrefixShardReplica) {
+ onAddPrefixShardReplica((AddPrefixShardReplica) message);
} else if (message instanceof PrefixShardCreated) {
onPrefixShardCreated((PrefixShardCreated) message);
} else if (message instanceof PrefixShardRemoved) {
onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
} else if (message instanceof RemoveShardReplica) {
onRemoveShardReplica((RemoveShardReplica) message);
+ } else if (message instanceof RemovePrefixShardReplica) {
+ onRemovePrefixShardReplica((RemovePrefixShardReplica) message);
} else if (message instanceof WrappedShardResponse) {
onWrappedShardResponse((WrappedShardResponse) message);
} else if (message instanceof GetSnapshot) {
}
private void onInitConfigListener() {
- LOG.debug("{}: Initializing config listener.", persistenceId());
+ LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());
final org.opendaylight.mdsal.common.api.LogicalDatastoreType type =
org.opendaylight.mdsal.common.api.LogicalDatastoreType
}
}
+ private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName,
+ final String primaryPath, final ActorRef sender) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
+
+ shardReplicaOperationsInProgress.add(shardName);
+
+ final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
+
+ final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+
+ //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
+ LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+ primaryPath, shardId);
+
+ Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
+ Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
+ new RemoveServer(shardId.toString()), removeServerTimeout);
+
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ shardReplicaOperationsInProgress.remove(shardName);
+ String msg = String.format("RemoveServer request to leader %s for shard %s failed",
+ primaryPath, shardName);
+
+ LOG.debug("{}: {}", persistenceId(), msg, failure);
+
+ // FAILURE
+ sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+ } else {
+ // SUCCESS
+ self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
final ActorRef sender) {
if (isShardReplicaOperationInProgress(shardName, sender)) {
final PrefixShardConfiguration config = message.getConfiguration();
- final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
- config.getPrefix());
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier()));
final String shardName = shardId.getShardName();
if (localShards.containsKey(shardName)) {
.storeRoot(config.getPrefix().getRootIdentifier());
DatastoreContext shardDatastoreContext = builder.build();
- final Map<String, String> peerAddresses = Collections.emptyMap();
+ final Map<String, String> peerAddresses = getPeerAddresses(shardName);
final boolean isActiveMember = true;
LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message);
final DOMDataTreeIdentifier prefix = message.getPrefix();
- final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix);
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
final ShardInformation shard = localShards.remove(shardId.getShardName());
configuration.removePrefixShardConfiguration(prefix);
return false;
}
+ private void onAddPrefixShardReplica(final AddPrefixShardReplica message) {
+ LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message);
+
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(message.getShardPrefix()));
+ final String shardName = shardId.getShardName();
+
+ // Create the localShard
+ if (schemaContext == null) {
+ String msg = String.format(
+ "No SchemaContext is available in order to create a local shard instance for %s", shardName);
+ LOG.debug("{}: {}", persistenceId(), msg);
+ getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
+ return;
+ }
+
+ findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
+ getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+ getSelf().tell((RunnableMessage) () -> addPrefixShard(getShardName(), message.getShardPrefix(),
+ response, getSender()), getTargetActor());
+ }
+
+ @Override
+ public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
+ sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
+ }
+ });
+ }
+
private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
final String shardName = shardReplicaMsg.getShardName();
sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
}
+ private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix,
+ final RemotePrimaryShardFound response, final ActorRef sender) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
+
+ shardReplicaOperationsInProgress.add(shardName);
+
+ final ShardInformation shardInfo;
+ final boolean removeShardOnFailure;
+ ShardInformation existingShardInfo = localShards.get(shardName);
+ if (existingShardInfo == null) {
+ removeShardOnFailure = true;
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+ final Builder builder = newShardDatastoreContextBuilder(shardName);
+ builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+ DatastoreContext datastoreContext = builder.build();
+
+ shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
+ Shard.builder(), peerAddressResolver);
+ shardInfo.setActiveMember(false);
+ localShards.put(shardName, shardInfo);
+ shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+ } else {
+ removeShardOnFailure = false;
+ shardInfo = existingShardInfo;
+ }
+
+ execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
+ }
+
private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
if (isShardReplicaOperationInProgress(shardName, sender)) {
return;
shardInfo = existingShardInfo;
}
- String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+ execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
+ }
+
+ private void execAddShard(final String shardName,
+ final ShardInformation shardInfo,
+ final RemotePrimaryShardFound response,
+ final boolean removeShardOnFailure,
+ final ActorRef sender) {
+
+ final String localShardAddress =
+ peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
//inform ShardLeader to add this shard as a replica by sending an AddServer message
LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
response.getPrimaryPath(), shardInfo.getShardId());
- Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
+ final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
.getShardLeaderElectionTimeout().duration());
- Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
- new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
+ final Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
+ new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
futureObj.onComplete(new OnComplete<Object>() {
@Override
LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
response.getPrimaryPath(), shardName, failure);
- String msg = String.format("AddServer request to leader %s for shard %s failed",
+ final String msg = String.format("AddServer request to leader %s for shard %s failed",
response.getPrimaryPath(), shardName);
self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
} else {
});
}
+ private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) {
+ LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message);
+
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(message.getShardPrefix()));
+ final String shardName = shardId.getShardName();
+
+ findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(),
+ shardName, persistenceId(), getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+ doRemoveShardReplicaAsync(response.getPrimaryPath());
+ }
+
+ @Override
+ public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
+ doRemoveShardReplicaAsync(response.getPrimaryPath());
+ }
+
+ private void doRemoveShardReplicaAsync(final String primaryPath) {
+ getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(),
+ primaryPath, getSender()), getTargetActor());
+ }
+ });
+ }
+
private void persistShardList() {
List<String> shardList = new ArrayList<>(localShards.keySet());
for (ShardInformation shardInfo : localShards.values()) {