Bug 2187: Don't close over internal state in ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 7dcc52028cc28f7b3211565478be382845b3b2f5..814f117f3155ddbe4fed0223331f2403ad2b42fb 100644 (file)
@@ -212,6 +212,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onCreateShard((CreateShard)message);
         } else if(message instanceof AddShardReplica){
             onAddShardReplica((AddShardReplica)message);
+        } else if(message instanceof ForwardedAddServerReply) {
+            ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
+            onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
+                    msg.removeShardOnFailure);
+        } else if(message instanceof ForwardedAddServerFailure) {
+            ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
+            onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
+        } else if(message instanceof ForwardedAddServerPrimaryShardFound) {
+            ForwardedAddServerPrimaryShardFound msg = (ForwardedAddServerPrimaryShardFound)message;
+            addShard(msg.shardName, msg.primaryFound, getSender());
         } else if(message instanceof RemoveShardReplica){
             onRemoveShardReplica((RemoveShardReplica)message);
         } else if(message instanceof GetSnapshot) {
@@ -224,7 +234,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else {
             unknownMessage(message);
         }
-
     }
 
     private void onGetSnapshot() {
@@ -840,7 +849,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
         final String shardName = shardReplicaMsg.getShardName();
 
-        LOG.debug ("onAddShardReplica: {}", shardReplicaMsg);
+        LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
 
         // verify the shard with the specified name is present in the cluster configuration
         if (!(this.configuration.isShardConfigured(shardName))) {
@@ -873,8 +882,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                         String.format("Failed to find leader for shard %s", shardName), failure)), getSelf());
                 } else {
                     if(response instanceof RemotePrimaryShardFound) {
-                        RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
-                        addShard (shardName, message, sender);
+                        self().tell(new ForwardedAddServerPrimaryShardFound(shardName,
+                                (RemotePrimaryShardFound)response), sender);
                     } else if(response instanceof LocalPrimaryShardFound) {
                         sendLocalReplicaAlreadyExistsReply(shardName, sender);
                     } else {
@@ -936,16 +945,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object addServerResponse) {
-                shardReplicaOperationsInProgress.remove(shardName);
                 if (failure != null) {
                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
                             response.getPrimaryPath(), shardName, failure);
 
-                    onAddServerFailure(shardName, String.format("AddServer request to leader %s for shard %s failed",
-                            response.getPrimaryPath(), shardName), failure, sender, removeShardOnFailure);
+                    String msg = String.format("AddServer request to leader %s for shard %s failed",
+                            response.getPrimaryPath(), shardName);
+                    self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
                 } else {
-                    AddServerReply reply = (AddServerReply)addServerResponse;
-                    onAddServerReply(shardInfo, reply, sender, response.getPrimaryPath(), removeShardOnFailure);
+                    self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
+                            response.getPrimaryPath(), removeShardOnFailure), sender);
                 }
             }
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
@@ -953,6 +962,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
             boolean removeShardOnFailure) {
+        shardReplicaOperationsInProgress.remove(shardName);
+
         if(removeShardOnFailure) {
             ShardInformation shardInfo = localShards.remove(shardName);
             if (shardInfo.getActor() != null) {
@@ -967,6 +978,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
             String leaderPath, boolean removeShardOnFailure) {
         String shardName = shardInfo.getShardName();
+        shardReplicaOperationsInProgress.remove(shardName);
+
         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
 
         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
@@ -1054,6 +1067,46 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private static class ForwardedAddServerPrimaryShardFound {
+        String shardName;
+        RemotePrimaryShardFound primaryFound;
+
+        ForwardedAddServerPrimaryShardFound(String shardName, RemotePrimaryShardFound primaryFound) {
+            this.shardName = shardName;
+            this.primaryFound = primaryFound;
+        }
+    }
+
+    private static class ForwardedAddServerReply {
+        ShardInformation shardInfo;
+        AddServerReply addServerReply;
+        String leaderPath;
+        boolean removeShardOnFailure;
+
+        ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
+                boolean removeShardOnFailure) {
+            this.shardInfo = shardInfo;
+            this.addServerReply = addServerReply;
+            this.leaderPath = leaderPath;
+            this.removeShardOnFailure = removeShardOnFailure;
+        }
+    }
+
+    private static class ForwardedAddServerFailure {
+        String shardName;
+        String failureMessage;
+        Throwable failure;
+        boolean removeShardOnFailure;
+
+        ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
+                boolean removeShardOnFailure) {
+            this.shardName = shardName;
+            this.failureMessage = failureMessage;
+            this.failure = failure;
+            this.removeShardOnFailure = removeShardOnFailure;
+        }
+    }
+
     @VisibleForTesting
     protected static class ShardInformation {
         private final ShardIdentifier shardId;