From 62bc70796cebbf866ee41e0b9e98006461f19153 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Sat, 14 Nov 2015 07:41:00 -0500 Subject: [PATCH] Bug 2187: Don't close over internal state in ShardManager For AddShardReplica, we use the ask pattern for the FindPrimary and AddServer messages. However in the OnComplete callbacks we're closing over internal state which isn't safe since the callback will be notified outside of the actor's execution context which may result in concurrent mutation of internal state. Therefore I added internal messages that are sent to self in the callbacks. Change-Id: I1f6662a4e473749925046f127cad868e54b761a2 Signed-off-by: Tom Pantelis --- .../cluster/datastore/ShardManager.java | 71 ++++++++++++++++--- 1 file changed, 62 insertions(+), 9 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 7dcc52028c..814f117f31 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -212,6 +212,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onCreateShard((CreateShard)message); } else if(message instanceof AddShardReplica){ onAddShardReplica((AddShardReplica)message); + } else if(message instanceof ForwardedAddServerReply) { + ForwardedAddServerReply msg = (ForwardedAddServerReply)message; + onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, + msg.removeShardOnFailure); + } else if(message instanceof ForwardedAddServerFailure) { + ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message; + onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure); + } else if(message instanceof ForwardedAddServerPrimaryShardFound) { + ForwardedAddServerPrimaryShardFound msg = (ForwardedAddServerPrimaryShardFound)message; + addShard(msg.shardName, msg.primaryFound, getSender()); } else if(message instanceof RemoveShardReplica){ onRemoveShardReplica((RemoveShardReplica)message); } else if(message instanceof GetSnapshot) { @@ -224,7 +234,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else { unknownMessage(message); } - } private void onGetSnapshot() { @@ -840,7 +849,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void onAddShardReplica (AddShardReplica shardReplicaMsg) { final String shardName = shardReplicaMsg.getShardName(); - LOG.debug ("onAddShardReplica: {}", shardReplicaMsg); + LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg); // verify the shard with the specified name is present in the cluster configuration if (!(this.configuration.isShardConfigured(shardName))) { @@ -873,8 +882,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { String.format("Failed to find leader for shard %s", shardName), failure)), getSelf()); } else { if(response instanceof RemotePrimaryShardFound) { - RemotePrimaryShardFound message = (RemotePrimaryShardFound)response; - addShard (shardName, message, sender); + self().tell(new ForwardedAddServerPrimaryShardFound(shardName, + (RemotePrimaryShardFound)response), sender); } else if(response instanceof LocalPrimaryShardFound) { sendLocalReplicaAlreadyExistsReply(shardName, sender); } else { @@ -936,16 +945,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { futureObj.onComplete(new OnComplete() { @Override public void onComplete(Throwable failure, Object addServerResponse) { - shardReplicaOperationsInProgress.remove(shardName); if (failure != null) { LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(), response.getPrimaryPath(), shardName, failure); - onAddServerFailure(shardName, String.format("AddServer request to leader %s for shard %s failed", - response.getPrimaryPath(), shardName), failure, sender, removeShardOnFailure); + String msg = String.format("AddServer request to leader %s for shard %s failed", + response.getPrimaryPath(), shardName); + self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender); } else { - AddServerReply reply = (AddServerReply)addServerResponse; - onAddServerReply(shardInfo, reply, sender, response.getPrimaryPath(), removeShardOnFailure); + self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse, + response.getPrimaryPath(), removeShardOnFailure), sender); } } }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); @@ -953,6 +962,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender, boolean removeShardOnFailure) { + shardReplicaOperationsInProgress.remove(shardName); + if(removeShardOnFailure) { ShardInformation shardInfo = localShards.remove(shardName); if (shardInfo.getActor() != null) { @@ -967,6 +978,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender, String leaderPath, boolean removeShardOnFailure) { String shardName = shardInfo.getShardName(); + shardReplicaOperationsInProgress.remove(shardName); + LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath); if (replyMsg.getStatus() == ServerChangeStatus.OK) { @@ -1054,6 +1067,46 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private static class ForwardedAddServerPrimaryShardFound { + String shardName; + RemotePrimaryShardFound primaryFound; + + ForwardedAddServerPrimaryShardFound(String shardName, RemotePrimaryShardFound primaryFound) { + this.shardName = shardName; + this.primaryFound = primaryFound; + } + } + + private static class ForwardedAddServerReply { + ShardInformation shardInfo; + AddServerReply addServerReply; + String leaderPath; + boolean removeShardOnFailure; + + ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath, + boolean removeShardOnFailure) { + this.shardInfo = shardInfo; + this.addServerReply = addServerReply; + this.leaderPath = leaderPath; + this.removeShardOnFailure = removeShardOnFailure; + } + } + + private static class ForwardedAddServerFailure { + String shardName; + String failureMessage; + Throwable failure; + boolean removeShardOnFailure; + + ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure, + boolean removeShardOnFailure) { + this.shardName = shardName; + this.failureMessage = failureMessage; + this.failure = failure; + this.removeShardOnFailure = removeShardOnFailure; + } + } + @VisibleForTesting protected static class ShardInformation { private final ShardIdentifier shardId; -- 2.36.6