X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=38fed0722fcec025d5101405936d0b1658426785;hb=4447f81c26b851e46acd3f111768bb498f0d553f;hp=5bfa1851eedbdcf76222137fcec52eab2a746b9b;hpb=149feb98f151186975fe42bab5853e05aafd4b51;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index 5bfa1851ee..38fed0722f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -64,6 +64,7 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; +import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; @@ -75,6 +76,7 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary; import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; @@ -108,6 +110,7 @@ import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -249,6 +252,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onCreateShard((CreateShard)message); } else if (message instanceof AddShardReplica) { onAddShardReplica((AddShardReplica) message); + } else if (message instanceof AddPrefixShardReplica) { + onAddPrefixShardReplica((AddPrefixShardReplica) message); } else if (message instanceof PrefixShardCreated) { onPrefixShardCreated((PrefixShardCreated) message); } else if (message instanceof PrefixShardRemoved) { @@ -264,6 +269,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure); } else if (message instanceof RemoveShardReplica) { onRemoveShardReplica((RemoveShardReplica) message); + } else if (message instanceof RemovePrefixShardReplica) { + onRemovePrefixShardReplica((RemovePrefixShardReplica) message); } else if (message instanceof WrappedShardResponse) { onWrappedShardResponse((WrappedShardResponse) message); } else if (message instanceof GetSnapshot) { @@ -291,7 +298,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void onInitConfigListener() { - LOG.debug("{}: Initializing config listener.", persistenceId()); + LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName()); final org.opendaylight.mdsal.common.api.LogicalDatastoreType type = org.opendaylight.mdsal.common.api.LogicalDatastoreType @@ -374,6 +381,46 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName, + final String primaryPath, final ActorRef sender) { + if (isShardReplicaOperationInProgress(shardName, sender)) { + return; + } + + shardReplicaOperationsInProgress.add(shardName); + + final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName); + + final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build(); + + //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message + LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(), + primaryPath, shardId); + + Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration()); + Future futureObj = ask(getContext().actorSelection(primaryPath), + new RemoveServer(shardId.toString()), removeServerTimeout); + + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + shardReplicaOperationsInProgress.remove(shardName); + String msg = String.format("RemoveServer request to leader %s for shard %s failed", + primaryPath, shardName); + + LOG.debug("{}: {}", persistenceId(), msg, failure); + + // FAILURE + sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); + } else { + // SUCCESS + self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender); + } + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath, final ActorRef sender) { if (isShardReplicaOperationInProgress(shardName, sender)) { @@ -486,8 +533,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { final PrefixShardConfiguration config = message.getConfiguration(); - final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), - config.getPrefix()); + final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), + ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier())); final String shardName = shardId.getShardName(); if (localShards.containsKey(shardName)) { @@ -512,7 +559,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { .storeRoot(config.getPrefix().getRootIdentifier()); DatastoreContext shardDatastoreContext = builder.build(); - final Map peerAddresses = Collections.emptyMap(); + final Map peerAddresses = getPeerAddresses(shardName); final boolean isActiveMember = true; LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}", @@ -532,7 +579,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message); final DOMDataTreeIdentifier prefix = message.getPrefix(); - final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix); + final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), + ClusterUtils.getCleanShardName(prefix.getRootIdentifier())); final ShardInformation shard = localShards.remove(shardId.getShardName()); configuration.removePrefixShardConfiguration(prefix); @@ -1188,6 +1236,37 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return false; } + private void onAddPrefixShardReplica(final AddPrefixShardReplica message) { + LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message); + + final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), + ClusterUtils.getCleanShardName(message.getShardPrefix())); + final String shardName = shardId.getShardName(); + + // Create the localShard + if (schemaContext == null) { + String msg = String.format( + "No SchemaContext is available in order to create a local shard instance for %s", shardName); + LOG.debug("{}: {}", persistenceId(), msg); + getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf()); + return; + } + + findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), + getSelf()) { + @Override + public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { + getSelf().tell((RunnableMessage) () -> addPrefixShard(getShardName(), message.getShardPrefix(), + response, getSender()), getTargetActor()); + } + + @Override + public void onLocalPrimaryFound(LocalPrimaryShardFound message) { + sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor()); + } + }); + } + private void onAddShardReplica(final AddShardReplica shardReplicaMsg) { final String shardName = shardReplicaMsg.getShardName(); @@ -1232,6 +1311,39 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf()); } + private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix, + final RemotePrimaryShardFound response, final ActorRef sender) { + if (isShardReplicaOperationInProgress(shardName, sender)) { + return; + } + + shardReplicaOperationsInProgress.add(shardName); + + final ShardInformation shardInfo; + final boolean removeShardOnFailure; + ShardInformation existingShardInfo = localShards.get(shardName); + if (existingShardInfo == null) { + removeShardOnFailure = true; + ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); + + final Builder builder = newShardDatastoreContextBuilder(shardName); + builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); + + DatastoreContext datastoreContext = builder.build(); + + shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext, + Shard.builder(), peerAddressResolver); + shardInfo.setActiveMember(false); + localShards.put(shardName, shardInfo); + shardInfo.setActor(newShardActor(schemaContext, shardInfo)); + } else { + removeShardOnFailure = false; + shardInfo = existingShardInfo; + } + + execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender); + } + private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) { if (isShardReplicaOperationInProgress(shardName, sender)) { return; @@ -1259,16 +1371,26 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardInfo = existingShardInfo; } - String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName()); + execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender); + } + + private void execAddShard(final String shardName, + final ShardInformation shardInfo, + final RemotePrimaryShardFound response, + final boolean removeShardOnFailure, + final ActorRef sender) { + + final String localShardAddress = + peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName()); //inform ShardLeader to add this shard as a replica by sending an AddServer message LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(), response.getPrimaryPath(), shardInfo.getShardId()); - Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext() + final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext() .getShardLeaderElectionTimeout().duration()); - Future futureObj = ask(getContext().actorSelection(response.getPrimaryPath()), - new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout); + final Future futureObj = ask(getContext().actorSelection(response.getPrimaryPath()), + new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout); futureObj.onComplete(new OnComplete() { @Override @@ -1277,7 +1399,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(), response.getPrimaryPath(), shardName, failure); - String msg = String.format("AddServer request to leader %s for shard %s failed", + final String msg = String.format("AddServer request to leader %s for shard %s failed", response.getPrimaryPath(), shardName); self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender); } else { @@ -1379,6 +1501,32 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { }); } + private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) { + LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message); + + final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), + ClusterUtils.getCleanShardName(message.getShardPrefix())); + final String shardName = shardId.getShardName(); + + findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), + shardName, persistenceId(), getSelf()) { + @Override + public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { + doRemoveShardReplicaAsync(response.getPrimaryPath()); + } + + @Override + public void onLocalPrimaryFound(LocalPrimaryShardFound response) { + doRemoveShardReplicaAsync(response.getPrimaryPath()); + } + + private void doRemoveShardReplicaAsync(final String primaryPath) { + getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(), + primaryPath, getSender()), getTargetActor()); + } + }); + } + private void persistShardList() { List shardList = new ArrayList<>(localShards.keySet()); for (ShardInformation shardInfo : localShards.values()) {