X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=70bdb2050e8ce27d616a23cc7af25afb6b5f9162;hb=6a5a8670a47f8989998390b6bab6718c1a7857b5;hp=c39c80021c456b20565bb214da63ee52e515f29d;hpb=72a72ae745544e41603f851bf4f47087cfe521ba;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index c39c80021c..70bdb2050e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.datastore; import static akka.pattern.Patterns.ask; -import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.Address; import akka.actor.Cancellable; @@ -303,13 +302,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void onWrappedShardResponse(WrappedShardResponse message) { if (message.getResponse() instanceof RemoveServerReply) { - onRemoveServerReply(getSender(), message.getShardName(), (RemoveServerReply) message.getResponse()); + onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(), + message.getLeaderPath()); } } - private void onRemoveServerReply(ActorRef originalSender, String shardName, RemoveServerReply response) { - shardReplicaOperationsInProgress.remove(shardName); - originalSender.tell(new Status.Success(null), self()); + private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg, + String leaderPath) { + shardReplicaOperationsInProgress.remove(shardId); + + LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName()); + + if (replyMsg.getStatus() == ServerChangeStatus.OK) { + LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(), + shardId.getShardName()); + originalSender.tell(new akka.actor.Status.Success(null), getSelf()); + } else { + LOG.warn ("{}: Leader failed to remove shard replica {} with status {}", + persistenceId(), shardId, replyMsg.getStatus()); + + Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), + leaderPath, shardId); + originalSender.tell(new akka.actor.Status.Failure(failure), getSelf()); + } } private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) { @@ -356,7 +371,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); } else { // SUCCESS - self().tell(new WrappedShardResponse(shardName, response), sender); + self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender); } } }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); @@ -369,8 +384,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString()); return; } else if(shardInformation.getActor() != null) { - LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor()); - shardInformation.getActor().tell(PoisonPill.getInstance(), self()); + LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor()); + shardInformation.getActor().tell(new Shutdown(), self()); } LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName()); persistShardList(); @@ -1299,7 +1314,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final ShardIdentifier shardId; private final String shardName; private ActorRef actor; - private ActorPath actorPath; private final Map initialPeerAddresses; private Optional localShardDataTree; private boolean leaderAvailable = false; @@ -1347,13 +1361,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return actor; } - ActorPath getActorPath() { - return actorPath; - } - void setActor(ActorRef actor) { this.actor = actor; - this.actorPath = actor.path(); } ShardIdentifier getShardId() { @@ -1612,7 +1621,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private CountDownLatch waitTillReadyCountdownLatch; private PrimaryShardInfoFutureCache primaryShardInfoCache; private DatastoreSnapshot restoreFromSnapshot; - private volatile boolean sealed; @SuppressWarnings("unchecked") @@ -1834,21 +1842,27 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * The WrappedShardResponse class wraps a response from a Shard. */ private static class WrappedShardResponse { - private final String shardName; + private final ShardIdentifier shardId; private final Object response; + private final String leaderPath; - private WrappedShardResponse(String shardName, Object response) { - this.shardName = shardName; + private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) { + this.shardId = shardId; this.response = response; + this.leaderPath = leaderPath; } - String getShardName() { - return shardName; + ShardIdentifier getShardId() { + return shardId; } Object getResponse() { return response; } + + String getLeaderPath() { + return leaderPath; + } } }