Bug 2187: Don't close over internal state in ShardManager 18/29718/4
authorTom Pantelis <tpanteli@brocade.com>
Sat, 14 Nov 2015 12:41:00 +0000 (07:41 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 16 Nov 2015 19:01:38 +0000 (14:01 -0500)
For AddShardReplica, we use the ask pattern for the FindPrimary and
AddServer messages. However in the OnComplete callbacks we're closing
over internal state which isn't safe since the callback will be notified
outside of the actor's execution context which may result in concurrent
mutation of internal state. Therefore I added internal messages that are
sent to self in the callbacks.

Change-Id: I1f6662a4e473749925046f127cad868e54b761a2
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java

index 7dcc52028cc28f7b3211565478be382845b3b2f5..814f117f3155ddbe4fed0223331f2403ad2b42fb 100644 (file)
@@ -212,6 +212,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onCreateShard((CreateShard)message);
         } else if(message instanceof AddShardReplica){
             onAddShardReplica((AddShardReplica)message);
+        } else if(message instanceof ForwardedAddServerReply) {
+            ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
+            onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
+                    msg.removeShardOnFailure);
+        } else if(message instanceof ForwardedAddServerFailure) {
+            ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
+            onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
+        } else if(message instanceof ForwardedAddServerPrimaryShardFound) {
+            ForwardedAddServerPrimaryShardFound msg = (ForwardedAddServerPrimaryShardFound)message;
+            addShard(msg.shardName, msg.primaryFound, getSender());
         } else if(message instanceof RemoveShardReplica){
             onRemoveShardReplica((RemoveShardReplica)message);
         } else if(message instanceof GetSnapshot) {
@@ -224,7 +234,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else {
             unknownMessage(message);
         }
-
     }
 
     private void onGetSnapshot() {
@@ -840,7 +849,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
         final String shardName = shardReplicaMsg.getShardName();
 
-        LOG.debug ("onAddShardReplica: {}", shardReplicaMsg);
+        LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
 
         // verify the shard with the specified name is present in the cluster configuration
         if (!(this.configuration.isShardConfigured(shardName))) {
@@ -873,8 +882,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                         String.format("Failed to find leader for shard %s", shardName), failure)), getSelf());
                 } else {
                     if(response instanceof RemotePrimaryShardFound) {
-                        RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
-                        addShard (shardName, message, sender);
+                        self().tell(new ForwardedAddServerPrimaryShardFound(shardName,
+                                (RemotePrimaryShardFound)response), sender);
                     } else if(response instanceof LocalPrimaryShardFound) {
                         sendLocalReplicaAlreadyExistsReply(shardName, sender);
                     } else {
@@ -936,16 +945,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object addServerResponse) {
-                shardReplicaOperationsInProgress.remove(shardName);
                 if (failure != null) {
                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
                             response.getPrimaryPath(), shardName, failure);
 
-                    onAddServerFailure(shardName, String.format("AddServer request to leader %s for shard %s failed",
-                            response.getPrimaryPath(), shardName), failure, sender, removeShardOnFailure);
+                    String msg = String.format("AddServer request to leader %s for shard %s failed",
+                            response.getPrimaryPath(), shardName);
+                    self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
                 } else {
-                    AddServerReply reply = (AddServerReply)addServerResponse;
-                    onAddServerReply(shardInfo, reply, sender, response.getPrimaryPath(), removeShardOnFailure);
+                    self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
+                            response.getPrimaryPath(), removeShardOnFailure), sender);
                 }
             }
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
@@ -953,6 +962,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
             boolean removeShardOnFailure) {
+        shardReplicaOperationsInProgress.remove(shardName);
+
         if(removeShardOnFailure) {
             ShardInformation shardInfo = localShards.remove(shardName);
             if (shardInfo.getActor() != null) {
@@ -967,6 +978,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
             String leaderPath, boolean removeShardOnFailure) {
         String shardName = shardInfo.getShardName();
+        shardReplicaOperationsInProgress.remove(shardName);
+
         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
 
         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
@@ -1054,6 +1067,46 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private static class ForwardedAddServerPrimaryShardFound {
+        String shardName;
+        RemotePrimaryShardFound primaryFound;
+
+        ForwardedAddServerPrimaryShardFound(String shardName, RemotePrimaryShardFound primaryFound) {
+            this.shardName = shardName;
+            this.primaryFound = primaryFound;
+        }
+    }
+
+    private static class ForwardedAddServerReply {
+        ShardInformation shardInfo;
+        AddServerReply addServerReply;
+        String leaderPath;
+        boolean removeShardOnFailure;
+
+        ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
+                boolean removeShardOnFailure) {
+            this.shardInfo = shardInfo;
+            this.addServerReply = addServerReply;
+            this.leaderPath = leaderPath;
+            this.removeShardOnFailure = removeShardOnFailure;
+        }
+    }
+
+    private static class ForwardedAddServerFailure {
+        String shardName;
+        String failureMessage;
+        Throwable failure;
+        boolean removeShardOnFailure;
+
+        ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
+                boolean removeShardOnFailure) {
+            this.shardName = shardName;
+            this.failureMessage = failureMessage;
+            this.failure = failure;
+            this.removeShardOnFailure = removeShardOnFailure;
+        }
+    }
+
     @VisibleForTesting
     protected static class ShardInformation {
         private final ShardIdentifier shardId;