X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=724c8d2c03dc787549f511f60ac3e289ea763978;hb=9b013985b871adb08173dd8e90d5cf2af82fa5a1;hp=a878f6decbf0cbe51a8c34176fc2a58e522d74c0;hpb=186c5d82335ed7d8c39472355f7b1c1e084c26cd;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index a878f6decb..724c8d2c03 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -51,6 +51,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; +import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; @@ -63,11 +64,10 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerDown; import org.opendaylight.controller.cluster.datastore.messages.PeerUp; import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary; import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; -import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; @@ -761,72 +761,68 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { .build()); } + private void checkLocalShardExists(final String shardName, final ActorRef sender) { + if (localShards.containsKey(shardName)) { + String msg = String.format("Local shard %s already exists", shardName); + LOG.debug ("{}: {}", persistenceId(), msg); + sender.tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf()); + } + } + private void onAddShardReplica (AddShardReplica shardReplicaMsg) { final String shardName = shardReplicaMsg.getShardName(); // verify the local shard replica is already available in the controller node - LOG.debug ("received AddShardReplica for shard {}", shardName); - if (localShards.containsKey(shardName)) { - LOG.debug ("Local shard {} already available in the controller node", shardName); - getSender().tell(new akka.actor.Status.Failure( - new IllegalArgumentException(String.format("Local shard %s already exists", - shardName))), getSelf()); - return; - } + LOG.debug ("onAddShardReplica: {}", shardReplicaMsg); + + checkLocalShardExists(shardName, getSender()); + // verify the shard with the specified name is present in the cluster configuration if (!(this.configuration.isShardConfigured(shardName))) { - LOG.debug ("No module configuration exists for shard {}", shardName); - getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException( - String.format("No module configuration exists for shard %s", - shardName))), getSelf()); + String msg = String.format("No module configuration exists for shard %s", shardName); + LOG.debug ("{}: {}", persistenceId(), msg); + getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf()); return; } // Create the localShard if (schemaContext == null) { - LOG.debug ("schemaContext is not updated to create localShardActor"); - getSender().tell(new akka.actor.Status.Failure( - new IllegalStateException(String.format( - "schemaContext not available to create localShardActor for %s", - shardName))), getSelf()); + String msg = String.format( + "No SchemaContext is available in order to create a local shard instance for %s", shardName); + LOG.debug ("{}: {}", persistenceId(), msg); + getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf()); return; } Map peerAddresses = getPeerAddresses(shardName); if (peerAddresses.isEmpty()) { - LOG.debug ("Shard peers not available for replicating shard data from leader"); - getSender().tell(new akka.actor.Status.Failure( - new IllegalStateException(String.format( - "Cannot add replica for shard %s because no peer is available", - shardName))), getSelf()); + String msg = String.format("Cannot add replica for shard %s because no peer is available", shardName); + LOG.debug ("{}: {}", persistenceId(), msg); + getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf()); return; } - Timeout findPrimaryTimeout = new Timeout(datastoreContext - .getShardInitializationTimeout().duration().$times(2)); + Timeout findPrimaryTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2)); final ActorRef sender = getSender(); - Future futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), - findPrimaryTimeout); + Future futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), findPrimaryTimeout); futureObj.onComplete(new OnComplete() { @Override public void onComplete(Throwable failure, Object response) { if (failure != null) { - LOG.debug ("Failed to receive response for FindPrimary of shard {}", - shardName, failure); + LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure); sender.tell(new akka.actor.Status.Failure(new RuntimeException( String.format("Failed to find leader for shard %s", shardName), failure)), getSelf()); } else { if (!(response instanceof RemotePrimaryShardFound)) { - LOG.debug ("Shard leader not available for creating local shard replica {}", - shardName); - sender.tell(new akka.actor.Status.Failure( - new IllegalStateException(String.format( - "Invalid response type, %s, received from FindPrimary for shard %s", - response.getClass().getName(), shardName))), getSelf()); + String msg = String.format("Failed to find leader for shard %s: received response: %s", + shardName, response); + LOG.debug ("{}: {}", persistenceId(), msg); + sender.tell(new akka.actor.Status.Failure(new RuntimeException(msg)), getSelf()); return; } + RemotePrimaryShardFound message = (RemotePrimaryShardFound)response; addShard (shardName, message, sender); } @@ -834,12 +830,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); } - private void addShard(final String shardName, final RemotePrimaryShardFound response, - final ActorRef sender) { - ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), - shardName); - String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, - cluster.getCurrentMemberName()); + private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) { + checkLocalShardExists(shardName, sender); + + ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); + String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName()); final ShardInformation shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), getInitShardDataStoreContext(), new DefaultShardPropsCreator(), peerAddressResolver); @@ -847,8 +842,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardInfo.setActor(newShardActor(schemaContext, shardInfo)); //inform ShardLeader to add this shard as a replica by sending an AddServer message - LOG.debug ("sending AddServer message to peer {} for shard {}", - response.getPrimaryPath(), shardId); + LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(), + response.getPrimaryPath(), shardId); Timeout addServerTimeout = new Timeout(datastoreContext .getShardLeaderElectionTimeout().duration().$times(4)); @@ -859,13 +854,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void onComplete(Throwable failure, Object addServerResponse) { if (failure != null) { - LOG.debug ("AddServer request to {} for {} failed", - response.getPrimaryPath(), shardName, failure); + LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(), + response.getPrimaryPath(), shardName, failure); + // Remove the shard localShards.remove(shardName); if (shardInfo.getActor() != null) { shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf()); } + sender.tell(new akka.actor.Status.Failure(new RuntimeException( String.format("AddServer request to leader %s for shard %s failed", response.getPrimaryPath(), shardName), failure)), getSelf()); @@ -881,33 +878,31 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void onAddServerReply (String shardName, ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender, String leaderPath) { + LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath); + if (replyMsg.getStatus() == ServerChangeStatus.OK) { - LOG.debug ("Leader shard successfully added the replica shard {}", - shardName); + LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName); + // Make the local shard voting capable shardInfo.setDatastoreContext(datastoreContext, getSelf()); - ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), - shardName); - mBean.addLocalShard(shardId.toString()); + + mBean.addLocalShard(shardInfo.getShardId().toString()); sender.tell(new akka.actor.Status.Success(true), getSelf()); } else { - LOG.warn ("Cannot add shard replica {} status {}", - shardName, replyMsg.getStatus()); - LOG.debug ("removing the local shard replica for shard {}", - shardName); + LOG.warn ("{}: Leader failed to add shard replica {} with status {} - removing the local shard", + persistenceId(), shardName, replyMsg.getStatus()); + //remove the local replica created localShards.remove(shardName); if (shardInfo.getActor() != null) { shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf()); } switch (replyMsg.getStatus()) { - //case ServerChangeStatus.TIMEOUT: case TIMEOUT: sender.tell(new akka.actor.Status.Failure(new RuntimeException( String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data", leaderPath, shardName))), getSelf()); break; - //case ServerChangeStatus.NO_LEADER: case NO_LEADER: sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format( "There is no shard leader available for shard %s", shardName))), getSelf()); @@ -922,14 +917,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) { String shardName = shardReplicaMsg.getShardName(); - boolean deleteStatus = false; // verify the local shard replica is available in the controller node if (!localShards.containsKey(shardName)) { - LOG.debug ("Local shard replica {} is not available in the controller node", shardName); - getSender().tell(new akka.actor.Status.Failure( - new IllegalArgumentException(String.format("Local shard %s not available", - shardName))), getSelf()); + String msg = String.format("Local shard %s does not", shardName); + LOG.debug ("{}: {}", persistenceId(), msg); + getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf()); return; } // call RemoveShard for the shardName