Bug 2187: Address comments in https://git.opendaylight.org/gerrit/#/c/28596/ 81/28981/3
authorTom Pantelis <tpanteli@brocade.com>
Thu, 29 Oct 2015 17:55:00 +0000 (13:55 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 2 Nov 2015 13:04:45 +0000 (13:04 +0000)
Addressed minor comments in https://git.opendaylight.org/gerrit/#/c/28596/.

Unified the response messages and debug messages.

Added persistenceId() format param to the debug messages for additional
context.

Change-Id: Ic1a4e852126425cf7ae67ee5b9ea301b06a3f9a8
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java

index a878f6decbf0cbe51a8c34176fc2a58e522d74c0..724c8d2c03dc787549f511f60ac3e289ea763978 100644 (file)
@@ -51,6 +51,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
@@ -63,11 +64,10 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
-import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
@@ -761,72 +761,68 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 .build());
     }
 
+    private void checkLocalShardExists(final String shardName, final ActorRef sender) {
+        if (localShards.containsKey(shardName)) {
+            String msg = String.format("Local shard %s already exists", shardName);
+            LOG.debug ("{}: {}", persistenceId(), msg);
+            sender.tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+        }
+    }
+
     private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
         final String shardName = shardReplicaMsg.getShardName();
 
         // verify the local shard replica is already available in the controller node
-        LOG.debug ("received AddShardReplica for shard {}", shardName);
-        if (localShards.containsKey(shardName)) {
-            LOG.debug ("Local shard {} already available in the controller node", shardName);
-            getSender().tell(new akka.actor.Status.Failure(
-                   new IllegalArgumentException(String.format("Local shard %s already exists",
-                                                 shardName))), getSelf());
-            return;
-        }
+        LOG.debug ("onAddShardReplica: {}", shardReplicaMsg);
+
+        checkLocalShardExists(shardName, getSender());
+
         // verify the shard with the specified name is present in the cluster configuration
         if (!(this.configuration.isShardConfigured(shardName))) {
-            LOG.debug ("No module configuration exists for shard {}", shardName);
-            getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(
-                        String.format("No module configuration exists for shard %s",
-                                       shardName))), getSelf());
+            String msg = String.format("No module configuration exists for shard %s", shardName);
+            LOG.debug ("{}: {}", persistenceId(), msg);
+            getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
             return;
         }
 
         // Create the localShard
         if (schemaContext == null) {
-            LOG.debug ("schemaContext is not updated to create localShardActor");
-            getSender().tell(new akka.actor.Status.Failure(
-               new IllegalStateException(String.format(
-                  "schemaContext not available to create localShardActor for %s",
-                   shardName))), getSelf());
+            String msg = String.format(
+                  "No SchemaContext is available in order to create a local shard instance for %s", shardName);
+            LOG.debug ("{}: {}", persistenceId(), msg);
+            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
             return;
         }
 
         Map<String, String> peerAddresses = getPeerAddresses(shardName);
         if (peerAddresses.isEmpty()) {
-            LOG.debug ("Shard peers not available for replicating shard data from leader");
-            getSender().tell(new akka.actor.Status.Failure(
-                new IllegalStateException(String.format(
-                    "Cannot add replica for shard %s because no peer is available",
-                    shardName))), getSelf());
+            String msg = String.format("Cannot add replica for shard %s because no peer is available", shardName);
+            LOG.debug ("{}: {}", persistenceId(), msg);
+            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
             return;
         }
 
-        Timeout findPrimaryTimeout = new Timeout(datastoreContext
-                       .getShardInitializationTimeout().duration().$times(2));
+        Timeout findPrimaryTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
 
         final ActorRef sender = getSender();
-        Future<Object> futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true),
-                                       findPrimaryTimeout);
+        Future<Object> futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), findPrimaryTimeout);
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object response) {
                 if (failure != null) {
-                    LOG.debug ("Failed to receive response for FindPrimary of shard {}",
-                                shardName, failure);
+                    LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure);
                     sender.tell(new akka.actor.Status.Failure(new RuntimeException(
                         String.format("Failed to find leader for shard %s", shardName), failure)),
                         getSelf());
                 } else {
                     if (!(response instanceof RemotePrimaryShardFound)) {
-                        LOG.debug ("Shard leader not available for creating local shard replica {}",
-                            shardName);
-                        sender.tell(new akka.actor.Status.Failure(
-                            new IllegalStateException(String.format(
-                                "Invalid response type, %s, received from FindPrimary for shard %s",
-                                response.getClass().getName(), shardName))), getSelf());
+                        String msg = String.format("Failed to find leader for shard %s: received response: %s",
+                                shardName, response);
+                        LOG.debug ("{}: {}", persistenceId(), msg);
+                        sender.tell(new akka.actor.Status.Failure(new RuntimeException(msg)), getSelf());
                         return;
                     }
+
                     RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
                     addShard (shardName, message, sender);
                 }
@@ -834,12 +830,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
-    private void addShard(final String shardName, final RemotePrimaryShardFound response,
-                          final ActorRef sender) {
-        ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
-                                                     shardName);
-        String localShardAddress = peerAddressResolver.getShardActorAddress(shardName,
-            cluster.getCurrentMemberName());
+    private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
+        checkLocalShardExists(shardName, sender);
+
+        ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+        String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
         final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
                           getPeerAddresses(shardName), getInitShardDataStoreContext(),
                           new DefaultShardPropsCreator(), peerAddressResolver);
@@ -847,8 +842,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         shardInfo.setActor(newShardActor(schemaContext, shardInfo));
 
         //inform ShardLeader to add this shard as a replica by sending an AddServer message
-        LOG.debug ("sending AddServer message to peer {} for shard {}",
-            response.getPrimaryPath(), shardId);
+        LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
+                response.getPrimaryPath(), shardId);
 
         Timeout addServerTimeout = new Timeout(datastoreContext
                        .getShardLeaderElectionTimeout().duration().$times(4));
@@ -859,13 +854,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             @Override
             public void onComplete(Throwable failure, Object addServerResponse) {
                 if (failure != null) {
-                    LOG.debug ("AddServer request to {} for {} failed",
-                        response.getPrimaryPath(), shardName, failure);
+                    LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
+                            response.getPrimaryPath(), shardName, failure);
+
                     // Remove the shard
                     localShards.remove(shardName);
                     if (shardInfo.getActor() != null) {
                         shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
                     }
+
                     sender.tell(new akka.actor.Status.Failure(new RuntimeException(
                         String.format("AddServer request to leader %s for shard %s failed",
                             response.getPrimaryPath(), shardName), failure)), getSelf());
@@ -881,33 +878,31 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void onAddServerReply (String shardName, ShardInformation shardInfo,
                                    AddServerReply replyMsg, ActorRef sender, String leaderPath) {
+        LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
+
         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
-            LOG.debug ("Leader shard successfully added the replica shard {}",
-                        shardName);
+            LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
+
             // Make the local shard voting capable
             shardInfo.setDatastoreContext(datastoreContext, getSelf());
-            ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
-                                                         shardName);
-            mBean.addLocalShard(shardId.toString());
+
+            mBean.addLocalShard(shardInfo.getShardId().toString());
             sender.tell(new akka.actor.Status.Success(true), getSelf());
         } else {
-            LOG.warn ("Cannot add shard replica {} status {}",
-                      shardName, replyMsg.getStatus());
-            LOG.debug ("removing the local shard replica for shard {}",
-                       shardName);
+            LOG.warn ("{}: Leader failed to add shard replica {} with status {} - removing the local shard",
+                    persistenceId(), shardName, replyMsg.getStatus());
+
             //remove the local replica created
             localShards.remove(shardName);
             if (shardInfo.getActor() != null) {
                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
             }
             switch (replyMsg.getStatus()) {
-                //case ServerChangeStatus.TIMEOUT:
                 case TIMEOUT:
                     sender.tell(new akka.actor.Status.Failure(new RuntimeException(
                         String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
                             leaderPath, shardName))), getSelf());
                     break;
-                //case ServerChangeStatus.NO_LEADER:
                 case NO_LEADER:
                     sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
                         "There is no shard leader available for shard %s", shardName))), getSelf());
@@ -922,14 +917,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
         String shardName = shardReplicaMsg.getShardName();
-        boolean deleteStatus = false;
 
         // verify the local shard replica is available in the controller node
         if (!localShards.containsKey(shardName)) {
-            LOG.debug ("Local shard replica {} is not available in the controller node", shardName);
-            getSender().tell(new akka.actor.Status.Failure(
-                   new IllegalArgumentException(String.format("Local shard %s not available",
-                                                shardName))), getSelf());
+            String msg = String.format("Local shard %s does not", shardName);
+            LOG.debug ("{}: {}", persistenceId(), msg);
+            getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
             return;
         }
         // call RemoveShard for the shardName
index 4a6754bbb3b2cb665bc328d0938007365ced577c..bd7e7d65c663f7195b4d112836799280bf558057 100644 (file)
@@ -1334,7 +1334,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
     private static class MockRespondActor extends MessageCollectorActor {
 
-        private Object responseMsg;
+        private volatile Object responseMsg;
 
         public void updateResponse(Object response) {
             responseMsg = response;