BUG 2817 - Create reusable classes for doing an action on finding primary 37/29837/5
authorMoiz Raja <moraja@cisco.com>
Wed, 18 Nov 2015 02:55:23 +0000 (18:55 -0800)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 19 Nov 2015 02:09:30 +0000 (02:09 +0000)
For AddShardReplica we are finding the primary and when it is found we send
a ForwardedAddServer* message to self. I need to do something similar for
RemoveShardReplica - so I've extracted the design pattern that was used for
AddShardReplica into a set of reusable classes. To start with this I can
use this for RemoveShardReplica but I think it can be used for other such
messages in future.

Change-Id: Ib625403f6eab5b07bc126af9db3d4e6e566e2038
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java

index 616f56c466bbac02194460dca1d07b4a57b2569f..b5e1ca4f1a5b3a3793de1e63646f88d593bb627f 100644 (file)
@@ -46,6 +46,7 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
@@ -222,9 +223,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if(message instanceof ForwardedAddServerFailure) {
             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
-        } else if(message instanceof ForwardedAddServerPrimaryShardFound) {
-            ForwardedAddServerPrimaryShardFound msg = (ForwardedAddServerPrimaryShardFound)message;
-            addShard(msg.shardName, msg.primaryFound, getSender());
+        } else if(message instanceof PrimaryShardFoundForContext) {
+            PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
+            onPrimaryShardFoundContext(primaryShardFoundContext);
         } else if(message instanceof RemoveShardReplica){
             onRemoveShardReplica((RemoveShardReplica)message);
         } else if(message instanceof GetSnapshot) {
@@ -234,13 +235,19 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if (message instanceof SaveSnapshotSuccess) {
             LOG.debug("{} saved ShardManager snapshot successfully", persistenceId());
         } else if (message instanceof SaveSnapshotFailure) {
-            LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
-                persistenceId(), ((SaveSnapshotFailure)message).cause());
+            LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
+                    persistenceId(), ((SaveSnapshotFailure) message).cause());
         } else {
             unknownMessage(message);
         }
     }
 
+    private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
+        if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
+            addShard(primaryShardFoundContext.shardName, primaryShardFoundContext.getRemotePrimaryShardFound(), getSender());
+        }
+    }
+
     private void onShardReplicaRemoved(ServerRemoved message) {
         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
@@ -865,7 +872,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return false;
     }
 
-    private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
+    private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
         final String shardName = shardReplicaMsg.getShardName();
 
         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
@@ -887,34 +894,19 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
-        Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
-                getShardInitializationTimeout().duration().$times(2));
+        findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
+            @Override
+            public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+
+            }
 
-        final ActorRef sender = getSender();
-        Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
-        futureObj.onComplete(new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object response) {
-                if (failure != null) {
-                    LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure);
-                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(
-                        String.format("Failed to find leader for shard %s", shardName), failure)), getSelf());
-                } else {
-                    if(response instanceof RemotePrimaryShardFound) {
-                        self().tell(new ForwardedAddServerPrimaryShardFound(shardName,
-                                (RemotePrimaryShardFound)response), sender);
-                    } else if(response instanceof LocalPrimaryShardFound) {
-                        sendLocalReplicaAlreadyExistsReply(shardName, sender);
-                    } else {
-                        String msg = String.format("Failed to find leader for shard %s: received response: %s",
-                                shardName, response);
-                        LOG.debug ("{}: {}", persistenceId(), msg);
-                        sender.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable)response :
-                            new RuntimeException(msg)), getSelf());
-                    }
-                }
+            public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
+                sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
             }
-        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+
+        });
     }
 
     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
@@ -1086,16 +1078,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private static class ForwardedAddServerPrimaryShardFound {
-        String shardName;
-        RemotePrimaryShardFound primaryFound;
-
-        ForwardedAddServerPrimaryShardFound(String shardName, RemotePrimaryShardFound primaryFound) {
-            this.shardName = shardName;
-            this.primaryFound = primaryFound;
-        }
-    }
-
     private static class ForwardedAddServerReply {
         ShardInformation shardInfo;
         AddServerReply addServerReply;
@@ -1496,6 +1478,168 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return Props.create(ShardManager.class, this);
         }
     }
+
+    private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
+        Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
+                getShardInitializationTimeout().duration().$times(2));
+
+
+        Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                if (failure != null) {
+                    handler.onFailure(failure);
+                } else {
+                    if(response instanceof RemotePrimaryShardFound) {
+                        handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
+                    } else if(response instanceof LocalPrimaryShardFound) {
+                        handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
+                    } else {
+                        handler.onUnknownResponse(response);
+                    }
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
+
+    /**
+     * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
+     * a remote or local find primary message is processed
+     */
+    private static interface FindPrimaryResponseHandler {
+        /**
+         * Invoked when a Failure message is received as a response
+         *
+         * @param failure
+         */
+        void onFailure(Throwable failure);
+
+        /**
+         * Invoked when a RemotePrimaryShardFound response is received
+         *
+         * @param response
+         */
+        void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
+
+        /**
+         * Invoked when a LocalPrimaryShardFound response is received
+         * @param response
+         */
+        void onLocalPrimaryFound(LocalPrimaryShardFound response);
+
+        /**
+         * Invoked when an unknown response is received. This is another type of failure.
+         *
+         * @param response
+         */
+        void onUnknownResponse(Object response);
+    }
+
+    /**
+     * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
+     * replica and sends a wrapped Failure response to some targetActor
+     */
+    private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
+        private final ActorRef targetActor;
+        private final String shardName;
+        private final String persistenceId;
+        private final ActorRef shardManagerActor;
+
+        /**
+         * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
+         * @param shardName The name of the shard for which the primary replica had to be found
+         * @param persistenceId The persistenceId for the ShardManager
+         * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
+         */
+        protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
+            this.targetActor = Preconditions.checkNotNull(targetActor);
+            this.shardName = Preconditions.checkNotNull(shardName);
+            this.persistenceId = Preconditions.checkNotNull(persistenceId);
+            this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
+        }
+
+        public ActorRef getTargetActor() {
+            return targetActor;
+        }
+
+        public String getShardName() {
+            return shardName;
+        }
+
+        public ActorRef getShardManagerActor() {
+            return shardManagerActor;
+        }
+
+        @Override
+        public void onFailure(Throwable failure) {
+            LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
+            targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
+                    String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
+        }
+
+        @Override
+        public void onUnknownResponse(Object response) {
+            String msg = String.format("Failed to find leader for shard %s: received response: %s",
+                    shardName, response);
+            LOG.debug ("{}: {}", persistenceId, msg);
+            targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
+                    new RuntimeException(msg)), shardManagerActor);
+        }
+    }
+
+
+    /**
+     * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
+     * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
+     * as a successful response to find primary.
+     */
+    private static class PrimaryShardFoundForContext {
+        private final String shardName;
+        private final Object contextMessage;
+        private final RemotePrimaryShardFound remotePrimaryShardFound;
+        private final LocalPrimaryShardFound localPrimaryShardFound;
+
+        public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, @Nonnull Object primaryFoundMessage) {
+            this.shardName = Preconditions.checkNotNull(shardName);
+            this.contextMessage = Preconditions.checkNotNull(contextMessage);
+            Preconditions.checkNotNull(primaryFoundMessage);
+            this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? (RemotePrimaryShardFound) primaryFoundMessage : null;
+            this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? (LocalPrimaryShardFound) primaryFoundMessage : null;
+        }
+
+        @Nonnull
+        public String getPrimaryPath(){
+            if(remotePrimaryShardFound != null){
+                return remotePrimaryShardFound.getPrimaryPath();
+            }
+            return localPrimaryShardFound.getPrimaryPath();
+        }
+
+        @Nonnull
+        public Object getContextMessage() {
+            return contextMessage;
+        }
+
+        @Nullable
+        public RemotePrimaryShardFound getRemotePrimaryShardFound(){
+            return remotePrimaryShardFound;
+        }
+
+        @Nullable
+        public LocalPrimaryShardFound getLocalPrimaryShardFound(){
+            return localPrimaryShardFound;
+        }
+
+        boolean isPrimaryLocal(){
+            return (remotePrimaryShardFound == null);
+        }
+
+        @Nonnull
+        public String getShardName() {
+            return shardName;
+        }
+    }
 }