X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=814f117f3155ddbe4fed0223331f2403ad2b42fb;hp=7dcc52028cc28f7b3211565478be382845b3b2f5;hb=62bc70796cebbf866ee41e0b9e98006461f19153;hpb=fbc126f0ee9f2a50cd6450378976d4ed32c2dce8 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 7dcc52028c..814f117f31 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -212,6 +212,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onCreateShard((CreateShard)message); } else if(message instanceof AddShardReplica){ onAddShardReplica((AddShardReplica)message); + } else if(message instanceof ForwardedAddServerReply) { + ForwardedAddServerReply msg = (ForwardedAddServerReply)message; + onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, + msg.removeShardOnFailure); + } else if(message instanceof ForwardedAddServerFailure) { + ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message; + onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure); + } else if(message instanceof ForwardedAddServerPrimaryShardFound) { + ForwardedAddServerPrimaryShardFound msg = (ForwardedAddServerPrimaryShardFound)message; + addShard(msg.shardName, msg.primaryFound, getSender()); } else if(message instanceof RemoveShardReplica){ onRemoveShardReplica((RemoveShardReplica)message); } else if(message instanceof GetSnapshot) { @@ -224,7 +234,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else { unknownMessage(message); } - } private void onGetSnapshot() { @@ -840,7 +849,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void onAddShardReplica (AddShardReplica shardReplicaMsg) { final String shardName = shardReplicaMsg.getShardName(); - LOG.debug ("onAddShardReplica: {}", shardReplicaMsg); + LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg); // verify the shard with the specified name is present in the cluster configuration if (!(this.configuration.isShardConfigured(shardName))) { @@ -873,8 +882,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { String.format("Failed to find leader for shard %s", shardName), failure)), getSelf()); } else { if(response instanceof RemotePrimaryShardFound) { - RemotePrimaryShardFound message = (RemotePrimaryShardFound)response; - addShard (shardName, message, sender); + self().tell(new ForwardedAddServerPrimaryShardFound(shardName, + (RemotePrimaryShardFound)response), sender); } else if(response instanceof LocalPrimaryShardFound) { sendLocalReplicaAlreadyExistsReply(shardName, sender); } else { @@ -936,16 +945,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { futureObj.onComplete(new OnComplete() { @Override public void onComplete(Throwable failure, Object addServerResponse) { - shardReplicaOperationsInProgress.remove(shardName); if (failure != null) { LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(), response.getPrimaryPath(), shardName, failure); - onAddServerFailure(shardName, String.format("AddServer request to leader %s for shard %s failed", - response.getPrimaryPath(), shardName), failure, sender, removeShardOnFailure); + String msg = String.format("AddServer request to leader %s for shard %s failed", + response.getPrimaryPath(), shardName); + self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender); } else { - AddServerReply reply = (AddServerReply)addServerResponse; - onAddServerReply(shardInfo, reply, sender, response.getPrimaryPath(), removeShardOnFailure); + self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse, + response.getPrimaryPath(), removeShardOnFailure), sender); } } }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); @@ -953,6 +962,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender, boolean removeShardOnFailure) { + shardReplicaOperationsInProgress.remove(shardName); + if(removeShardOnFailure) { ShardInformation shardInfo = localShards.remove(shardName); if (shardInfo.getActor() != null) { @@ -967,6 +978,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender, String leaderPath, boolean removeShardOnFailure) { String shardName = shardInfo.getShardName(); + shardReplicaOperationsInProgress.remove(shardName); + LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath); if (replyMsg.getStatus() == ServerChangeStatus.OK) { @@ -1054,6 +1067,46 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private static class ForwardedAddServerPrimaryShardFound { + String shardName; + RemotePrimaryShardFound primaryFound; + + ForwardedAddServerPrimaryShardFound(String shardName, RemotePrimaryShardFound primaryFound) { + this.shardName = shardName; + this.primaryFound = primaryFound; + } + } + + private static class ForwardedAddServerReply { + ShardInformation shardInfo; + AddServerReply addServerReply; + String leaderPath; + boolean removeShardOnFailure; + + ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath, + boolean removeShardOnFailure) { + this.shardInfo = shardInfo; + this.addServerReply = addServerReply; + this.leaderPath = leaderPath; + this.removeShardOnFailure = removeShardOnFailure; + } + } + + private static class ForwardedAddServerFailure { + String shardName; + String failureMessage; + Throwable failure; + boolean removeShardOnFailure; + + ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure, + boolean removeShardOnFailure) { + this.shardName = shardName; + this.failureMessage = failureMessage; + this.failure = failure; + this.removeShardOnFailure = removeShardOnFailure; + } + } + @VisibleForTesting protected static class ShardInformation { private final ShardIdentifier shardId;