Bug 2194: Find primary shard on remote ShardManager 94/16194/12
authorTom Pantelis <tpanteli@brocade.com>
Wed, 25 Mar 2015 04:07:39 +0000 (00:07 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Wed, 25 Mar 2015 04:07:39 +0000 (00:07 -0400)
If there is no local shard, the ShardManager now forwards the FindPrimary
message to another ShardManager member.

I changed the FindPrimary and PrimaryFound messages to Serializable.
Previously they implemented SerializableMessage but they didn't have
equivalent protobuff messages so they couldn't be sent over the wire.
These are simple messages and we don't need protobuff.

I also obsoleted the ActorNotInitialized and PrimaryNotFound messages as
we also have equivalent exception classes (which are inhently
Serializable) so the former messages are redundant. This avoids
translation in ActorContext#findPrimaryShardAsync.

Change-Id: I5762ec1dafb9aa12ee8230efe245b154b0bb9b72
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
12 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryNotFound.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf

index bc4c825351cc72148f5276fc28d5a94e2e64f79d..55a86ceeea966aab6d67f2b3ba1692d772e74640 100644 (file)
@@ -43,19 +43,19 @@ import java.util.concurrent.CountDownLatch;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
@@ -96,6 +96,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     // A data store could be of type config/operational
     private final String type;
 
     // A data store could be of type config/operational
     private final String type;
 
+    private final String shardManagerIdentifierString;
+
     private final ClusterWrapper cluster;
 
     private final Configuration configuration;
     private final ClusterWrapper cluster;
 
     private final Configuration configuration;
@@ -122,6 +124,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.datastoreContext = datastoreContext;
         this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
         this.type = datastoreContext.getDataStoreType();
         this.datastoreContext = datastoreContext;
         this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
         this.type = datastoreContext.getDataStoreType();
+        this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
@@ -158,8 +161,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     @Override
     public void handleCommand(Object message) throws Exception {
 
     @Override
     public void handleCommand(Object message) throws Exception {
-        if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) {
-            findPrimary(FindPrimary.fromSerializable(message));
+        if (message  instanceof FindPrimary) {
+            findPrimary((FindPrimary)message);
         } else if(message instanceof FindLocalShard){
             findLocalShard((FindLocalShard) message);
         } else if (message instanceof UpdateSchemaContext) {
         } else if(message instanceof FindLocalShard){
             findLocalShard((FindLocalShard) message);
         } else if (message instanceof UpdateSchemaContext) {
@@ -203,13 +206,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         ShardInformation shardInfo = message.getShardInfo();
 
         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
         ShardInformation shardInfo = message.getShardInfo();
 
         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
-                shardInfo.getShardId());
+                shardInfo.getShardName());
 
         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
 
         if(!shardInfo.isShardInitialized()) {
 
         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
 
         if(!shardInfo.isShardInitialized()) {
-            message.getSender().tell(new ActorNotInitialized(), getSelf());
+            LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
+            message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
         } else {
         } else {
+            LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
         }
     }
             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
         }
     }
@@ -297,7 +302,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void markShardAsInitialized(String shardName) {
     }
 
     private void markShardAsInitialized(String shardName) {
-        LOG.debug("Initializing shard [{}]", shardName);
+        LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
 
         ShardInformation shardInformation = localShards.get(shardName);
         if (shardInformation != null) {
 
         ShardInformation shardInformation = localShards.get(shardName);
         if (shardInformation != null) {
@@ -367,6 +372,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 shardInformation.addOnShardInitialized(onShardInitialized);
 
 
                 shardInformation.addOnShardInitialized(onShardInitialized);
 
+                LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
+
                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
                         datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
                         datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
@@ -375,8 +382,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
 
             } else if (!shardInformation.isShardInitialized()) {
                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
 
             } else if (!shardInformation.isShardInitialized()) {
-                getSender().tell(new ActorNotInitialized(), getSelf());
+                LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
+                        shardInformation.getShardName());
+                getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
             } else {
             } else {
+                LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
+                        shardInformation.getShardName());
                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
             }
 
                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
             }
 
@@ -392,13 +403,26 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 "recovering and a leader is being elected. Try again later.", shardId));
     }
 
                 "recovering and a leader is being elected. Try again later.", shardId));
     }
 
+    private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
+        return new NotInitializedException(String.format(
+                "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
+    }
+
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
+        String memberName = message.member().roles().head();
+
+        LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
         memberNameToAddress.remove(message.member().roles().head());
     }
 
     private void memberUp(ClusterEvent.MemberUp message) {
         String memberName = message.member().roles().head();
 
         memberNameToAddress.remove(message.member().roles().head());
     }
 
     private void memberUp(ClusterEvent.MemberUp message) {
         String memberName = message.member().roles().head();
 
+        LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
         memberNameToAddress.put(memberName, message.member().address());
 
         for(ShardInformation info : localShards.values()){
         memberNameToAddress.put(memberName, message.member().address());
 
         for(ShardInformation info : localShards.values()){
@@ -461,6 +485,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     }
 
 
     }
 
+    @VisibleForTesting
+    protected ClusterWrapper getCluster() {
+        return cluster;
+    }
+
     @VisibleForTesting
     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
         return getContext().actorOf(Shard.props(info.getShardId(),
     @VisibleForTesting
     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
         return getContext().actorOf(Shard.props(info.getShardId(),
@@ -469,6 +498,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void findPrimary(FindPrimary message) {
     }
 
     private void findPrimary(FindPrimary message) {
+        LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
+
         final String shardName = message.getShardName();
 
         // First see if the there is a local replica for the shard
         final String shardName = message.getShardName();
 
         // First see if the there is a local replica for the shard
@@ -477,10 +508,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
                 @Override
                 public Object get() {
             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
                 @Override
                 public Object get() {
-                    Object found = new PrimaryFound(info.getSerializedLeaderActor()).toSerializable();
+                    Object found = new PrimaryFound(info.getSerializedLeaderActor());
 
                     if(LOG.isDebugEnabled()) {
 
                     if(LOG.isDebugEnabled()) {
-                        LOG.debug("{}: Found primary for {}: {}", shardName, found);
+                        LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
                     }
 
                     return found;
                     }
 
                     return found;
@@ -490,38 +521,35 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
             return;
         }
 
-        List<String> members = configuration.getMembersFromShardName(shardName);
+        for(Map.Entry<String, Address> entry: memberNameToAddress.entrySet()) {
+            if(!cluster.getCurrentMemberName().equals(entry.getKey())) {
+                String path = getShardManagerActorPathBuilder(entry.getValue()).toString();
 
 
-        if(cluster.getCurrentMemberName() != null) {
-            members.remove(cluster.getCurrentMemberName());
-        }
+                LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
+                        shardName, path);
 
 
-        /**
-         * FIXME: Instead of sending remote shard actor path back to sender,
-         * forward FindPrimary message to remote shard manager
-         */
-        // There is no way for us to figure out the primary (for now) so assume
-        // that one of the remote nodes is a primary
-        for(String memberName : members) {
-            Address address = memberNameToAddress.get(memberName);
-            if(address != null){
-                String path =
-                    getShardActorPath(shardName, memberName);
-                getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
+                getContext().actorSelection(path).forward(message, getContext());
                 return;
             }
         }
                 return;
             }
         }
-        getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
+
+        LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
+
+        getSender().tell(new PrimaryNotFoundException(
+                String.format("No primary shard found for %s.", shardName)), getSelf());
+    }
+
+    private StringBuilder getShardManagerActorPathBuilder(Address address) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString);
+        return builder;
     }
 
     private String getShardActorPath(String shardName, String memberName) {
         Address address = memberNameToAddress.get(memberName);
         if(address != null) {
     }
 
     private String getShardActorPath(String shardName, String memberName) {
         Address address = memberNameToAddress.get(memberName);
         if(address != null) {
-            StringBuilder builder = new StringBuilder();
-            builder.append(address.toString())
-                .append("/user/")
-                .append(ShardManagerIdentifier.builder().type(type).build().toString())
-                .append("/")
+            StringBuilder builder = getShardManagerActorPathBuilder(address);
+            builder.append("/")
                 .append(getShardIdentifier(memberName, shardName));
             return builder.toString();
         }
                 .append(getShardIdentifier(memberName, shardName));
             return builder.toString();
         }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java
deleted file mode 100644 (file)
index 576010f..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import java.io.Serializable;
-
-public class ActorNotInitialized implements Serializable {
-    private static final long serialVersionUID = 1L;
-}
index d51d6800a23b44f2c14ff932a1be5c21421d5c5d..2c18eaa86fff9c3d6449068b723906defc930427 100644 (file)
@@ -9,13 +9,14 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 import com.google.common.base.Preconditions;
 package org.opendaylight.controller.cluster.datastore.messages;
 
 import com.google.common.base.Preconditions;
+import java.io.Serializable;
 
 /**
  * The FindPrimary message is used to locate the primary of any given shard
  *
  */
 
 /**
  * The FindPrimary message is used to locate the primary of any given shard
  *
  */
-public class FindPrimary implements SerializableMessage{
-    public static final Class<FindPrimary> SERIALIZABLE_CLASS = FindPrimary.class;
+public class FindPrimary implements Serializable {
+    private static final long serialVersionUID = 1L;
 
     private final String shardName;
     private final boolean waitUntilReady;
 
     private final String shardName;
     private final boolean waitUntilReady;
@@ -36,15 +37,6 @@ public class FindPrimary implements SerializableMessage{
         return waitUntilReady;
     }
 
         return waitUntilReady;
     }
 
-    @Override
-    public Object toSerializable() {
-        return this;
-    }
-
-    public static FindPrimary fromSerializable(Object message){
-        return (FindPrimary) message;
-    }
-
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
index a5565020edc171e43d550f7ace0a8dc6a80739e3..4c154d43ae007268b153f2503ad949250031cb21 100644 (file)
@@ -8,56 +8,48 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import java.io.Serializable;
 
 
-public class PrimaryFound implements SerializableMessage {
-  public static final Class<PrimaryFound> SERIALIZABLE_CLASS = PrimaryFound.class;
-  private final String primaryPath;
+public class PrimaryFound implements Serializable {
+    private static final long serialVersionUID = 1L;
 
 
-  public PrimaryFound(final String primaryPath) {
-    this.primaryPath = primaryPath;
-  }
+    private final String primaryPath;
 
 
-  public String getPrimaryPath() {
-    return primaryPath;
-  }
-
-  @Override
-  public boolean equals(final Object o) {
-    if (this == o) {
-        return true;
+    public PrimaryFound(final String primaryPath) {
+        this.primaryPath = primaryPath;
     }
     }
-    if (o == null || getClass() != o.getClass()) {
-        return false;
-    }
-
-    PrimaryFound that = (PrimaryFound) o;
 
 
-    if (!primaryPath.equals(that.primaryPath)) {
-        return false;
+    public String getPrimaryPath() {
+        return primaryPath;
     }
 
     }
 
-    return true;
-  }
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
 
 
-  @Override
-  public int hashCode() {
-    return primaryPath.hashCode();
-  }
+        PrimaryFound that = (PrimaryFound) o;
 
 
-  @Override
-  public String toString() {
-    return "PrimaryFound{" +
-            "primaryPath='" + primaryPath + '\'' +
-            '}';
-  }
+        if (!primaryPath.equals(that.primaryPath)) {
+            return false;
+        }
 
 
+        return true;
+    }
 
 
-  @Override
-  public Object toSerializable() {
-    return  this;
-  }
+    @Override
+    public int hashCode() {
+        return primaryPath.hashCode();
+    }
 
 
-  public static PrimaryFound fromSerializable(final Object message){
-    return (PrimaryFound) message;
-  }
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("PrimaryFound [primaryPath=").append(primaryPath).append("]");
+        return builder.toString();
+    }
 }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryNotFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryNotFound.java
deleted file mode 100644 (file)
index b47c91b..0000000
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import com.google.common.base.Preconditions;
-
-public class PrimaryNotFound implements SerializableMessage {
-  public static final Class<PrimaryNotFound> SERIALIZABLE_CLASS = PrimaryNotFound.class;
-
-    private final String shardName;
-
-    public PrimaryNotFound(final String shardName){
-
-        Preconditions.checkNotNull(shardName, "shardName should not be null");
-
-        this.shardName = shardName;
-    }
-
-    @Override
-    public boolean equals(final Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        PrimaryNotFound that = (PrimaryNotFound) o;
-
-        if (shardName != null ? !shardName.equals(that.shardName) : that.shardName != null) {
-            return false;
-        }
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        return shardName != null ? shardName.hashCode() : 0;
-    }
-
-  @Override
-  public Object toSerializable() {
-    return this;
-  }
-
-  public static PrimaryNotFound fromSerializable(final Object message){
-    return (PrimaryNotFound) message;
-  }
-}
index 6f9bb7fc9feb4ede3a00d06b07d689d615f51458..b6250fc1cccbfbde152a8af7e1e2f4b9b3c6f7cd 100644 (file)
@@ -41,13 +41,11 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
@@ -209,25 +207,22 @@ public class ActorContext {
             return ret;
         }
         Future<Object> future = executeOperationAsync(shardManager,
             return ret;
         }
         Future<Object> future = executeOperationAsync(shardManager,
-                new FindPrimary(shardName, true).toSerializable(), shardInitializationTimeout);
+                new FindPrimary(shardName, true), shardInitializationTimeout);
 
         return future.transform(new Mapper<Object, ActorSelection>() {
             @Override
             public ActorSelection checkedApply(Object response) throws Exception {
 
         return future.transform(new Mapper<Object, ActorSelection>() {
             @Override
             public ActorSelection checkedApply(Object response) throws Exception {
-                if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) {
-                    PrimaryFound found = PrimaryFound.fromSerializable(response);
+                if(response instanceof PrimaryFound) {
+                    PrimaryFound found = (PrimaryFound)response;
 
                     LOG.debug("Primary found {}", found.getPrimaryPath());
                     ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
                     primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
                     return actorSelection;
 
                     LOG.debug("Primary found {}", found.getPrimaryPath());
                     ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
                     primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
                     return actorSelection;
-                } else if(response instanceof ActorNotInitialized) {
-                    throw new NotInitializedException(
-                            String.format("Found primary shard %s but it's not initialized yet. " +
-                                          "Please try again later", shardName));
-                } else if(response instanceof PrimaryNotFound) {
-                    throw new PrimaryNotFoundException(
-                            String.format("No primary shard found for %S.", shardName));
+                } else if(response instanceof NotInitializedException) {
+                    throw (NotInitializedException)response;
+                } else if(response instanceof PrimaryNotFoundException) {
+                    throw (PrimaryNotFoundException)response;
                 } else if(response instanceof NoShardLeaderException) {
                     throw (NoShardLeaderException)response;
                 }
                 } else if(response instanceof NoShardLeaderException) {
                     throw (NoShardLeaderException)response;
                 }
@@ -274,10 +269,8 @@ public class ActorContext {
                     LocalShardFound found = (LocalShardFound)response;
                     LOG.debug("Local shard found {}", found.getPath());
                     return found.getPath();
                     LocalShardFound found = (LocalShardFound)response;
                     LOG.debug("Local shard found {}", found.getPath());
                     return found.getPath();
-                } else if(response instanceof ActorNotInitialized) {
-                    throw new NotInitializedException(
-                            String.format("Found local shard for %s but it's not initialized yet.",
-                                    shardName));
+                } else if(response instanceof NotInitializedException) {
+                    throw (NotInitializedException)response;
                 } else if(response instanceof LocalShardNotFound) {
                     throw new LocalShardNotFoundException(
                             String.format("Local shard for %s does not exist.", shardName));
                 } else if(response instanceof LocalShardNotFound) {
                     throw new LocalShardNotFoundException(
                             String.format("Local shard for %s does not exist.", shardName));
index f6c8f07f6b18cd51b5b7ba72ae77a693a3885ec0..fac597003e7ff106b46b4b10a09ccbd3860279af 100644 (file)
@@ -7,10 +7,10 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import static org.mockito.Mockito.any;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -28,7 +28,7 @@ import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
@@ -173,7 +173,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
             FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
             Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
 
             FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
             Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
 
-            reply(new ActorNotInitialized());
+            reply(new NotInitializedException("not initialized"));
 
             new Within(duration("1 seconds")) {
                 @Override
 
             new Within(duration("1 seconds")) {
                 @Override
index ae7a4f96c53fec04dbadbb612c9bc0369952f654..95b1b78a198e511d15347637f2f1558b86d81e65 100644 (file)
@@ -9,16 +9,23 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import akka.actor.ActorRef;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.AddressFromURIString;
 import akka.actor.Props;
 import akka.actor.Props;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent;
+import akka.dispatch.Dispatchers;
 import akka.japi.Creator;
 import akka.pattern.Patterns;
 import akka.persistence.RecoveryCompleted;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import akka.japi.Creator;
 import akka.pattern.Patterns;
 import akka.persistence.RecoveryCompleted;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.ConfigFactory;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
@@ -35,15 +42,16 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
@@ -75,6 +83,11 @@ public class ShardManagerTest extends AbstractActorTest {
     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
 
     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
 
+    private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
+        String name = new ShardIdentifier(shardName, memberName,"config").toString();
+        return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
+    }
+
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
@@ -100,21 +113,22 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     private Props newPropsShardMgrWithMockShardActor() {
     }
 
     private Props newPropsShardMgrWithMockShardActor() {
+        return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
+                new MockConfiguration());
+    }
+
+    private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
+            final ClusterWrapper clusterWrapper, final Configuration config) {
         Creator<ShardManager> creator = new Creator<ShardManager>() {
             private static final long serialVersionUID = 1L;
             @Override
             public ShardManager create() throws Exception {
         Creator<ShardManager> creator = new Creator<ShardManager>() {
             private static final long serialVersionUID = 1L;
             @Override
             public ShardManager create() throws Exception {
-                return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
-                        datastoreContextBuilder.build(), ready) {
-                    @Override
-                    protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
-                        return mockShardActor;
-                    }
-                };
+                return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
+                        ready, name, shardActor);
             }
         };
 
             }
         };
 
-        return Props.create(new DelegatingShardManagerCreator(creator));
+        return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
     }
 
     @Test
     }
 
     @Test
@@ -124,9 +138,9 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary("non-existent", false), getRef());
 
 
-            expectMsgEquals(duration("5 seconds"), new PrimaryNotFound("non-existent").toSerializable());
+            expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
         }};
     }
 
         }};
     }
 
@@ -146,9 +160,9 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
                     RaftState.Leader.name())), mockShardActor);
 
             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
                     RaftState.Leader.name())), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
 
-            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
         }};
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
         }};
@@ -170,9 +184,9 @@ public class ShardManagerTest extends AbstractActorTest {
                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
             shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
 
                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
             shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
 
-            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
         }};
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
         }};
@@ -183,9 +197,9 @@ public class ShardManagerTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
 
         new JavaTestKit(getSystem()) {{
             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
 
-            expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
         }};
     }
 
         }};
     }
 
@@ -197,7 +211,7 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
         }};
 
             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
         }};
@@ -215,15 +229,15 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell(new RoleChangeNotification(memberId,
                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
 
             shardManager.tell(new RoleChangeNotification(memberId,
                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
 
             shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
 
 
             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
 
             shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
 
-            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
         }};
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
         }};
@@ -238,7 +252,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
             // delayed until we send ActorInitialized and RoleChangeNotification.
 
             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
             // delayed until we send ActorInitialized and RoleChangeNotification.
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
 
             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
 
 
             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
 
@@ -254,7 +268,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
 
 
             shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
 
-            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
 
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
 
@@ -269,9 +283,9 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
 
 
-            expectMsgClass(duration("2 seconds"), ActorNotInitialized.class);
+            expectMsgClass(duration("2 seconds"), NotInitializedException.class);
 
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
 
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
@@ -289,7 +303,7 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
                     null, RaftState.Candidate.name()), mockShardActor);
 
             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
                     null, RaftState.Candidate.name()), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
 
             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
         }};
 
             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
         }};
@@ -303,12 +317,78 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
 
             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
         }};
     }
 
 
             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
         }};
     }
 
+    @Test
+    public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
+        String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+
+        // Create an ActorSystem ShardManager actor for member-1.
+
+        final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
+
+        final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
+                newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
+                        new MockConfiguration()), shardManagerID);
+
+        // Create an ActorSystem ShardManager actor for member-2.
+
+        final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+
+        Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
+
+        MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+                put("default", Arrays.asList("member-1", "member-2")).
+                put("astronauts", Arrays.asList("member-2")).build());
+
+        final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
+                newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
+                        mockConfig2), shardManagerID);
+
+        new JavaTestKit(system1) {{
+
+            shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+            shardManager2.tell(new ActorInitialized(), mockShardActor2);
+
+            String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
+            shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2);
+            shardManager2.tell(new RoleChangeNotification(memberId2,
+                    RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
+
+            shardManager1.underlyingActor().waitForMemberUp();
+
+            shardManager1.tell(new FindPrimary("astronauts", false), getRef());
+
+            PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+            String path = found.getPrimaryPath();
+            assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
+
+            shardManager2.underlyingActor().verifyFindPrimary();
+
+            Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+            shardManager1.underlyingActor().waitForMemberRemoved();
+
+            shardManager1.tell(new FindPrimary("astronauts", false), getRef());
+
+            expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
+        }};
+
+        JavaTestKit.shutdownActorSystem(system1);
+        JavaTestKit.shutdownActorSystem(system2);
+    }
+
     @Test
     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
         new JavaTestKit(getSystem()) {{
     @Test
     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -348,7 +428,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
 
 
             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
 
-            expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
         }};
     }
 
         }};
     }
 
@@ -371,42 +451,6 @@ public class ShardManagerTest extends AbstractActorTest {
         }};
     }
 
         }};
     }
 
-    @Test
-    public void testOnReceiveMemberUp() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
-
-            MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
-
-            shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
-
-            PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
-                    PrimaryFound.SERIALIZABLE_CLASS));
-            String path = found.getPrimaryPath();
-            assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
-        }};
-    }
-
-    @Test
-    public void testOnReceiveMemberDown() throws Exception {
-
-        new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
-
-            MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
-
-            shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
-
-            expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
-
-            MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
-
-            shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
-
-            expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
-        }};
-    }
-
     @Test
     public void testOnRecoveryJournalIsCleaned() {
         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
     @Test
     public void testOnRecoveryJournalIsCleaned() {
         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
@@ -804,4 +848,69 @@ public class ShardManagerTest extends AbstractActorTest {
             return delegate.create();
         }
     }
             return delegate.create();
         }
     }
+
+    private static class ForwardingShardManager extends ShardManager {
+        private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
+        private CountDownLatch memberUpReceived = new CountDownLatch(1);
+        private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
+        private final ActorRef shardActor;
+        private final String name;
+
+        protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
+                DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
+                ActorRef shardActor) {
+            super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
+            this.shardActor = shardActor;
+            this.name = name;
+        }
+
+        @Override
+        public void handleCommand(Object message) throws Exception {
+            try{
+                super.handleCommand(message);
+            } finally {
+                if(message instanceof FindPrimary) {
+                    findPrimaryMessageReceived.countDown();
+                } else if(message instanceof ClusterEvent.MemberUp) {
+                    String role = ((ClusterEvent.MemberUp)message).member().roles().head();
+                    if(!getCluster().getCurrentMemberName().equals(role)) {
+                        memberUpReceived.countDown();
+                    }
+                } else if(message instanceof ClusterEvent.MemberRemoved) {
+                    String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
+                    if(!getCluster().getCurrentMemberName().equals(role)) {
+                        memberRemovedReceived.countDown();
+                    }
+                }
+            }
+        }
+
+        @Override
+        public String persistenceId() {
+            return name;
+        }
+
+        @Override
+        protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+            return shardActor;
+        }
+
+        void waitForMemberUp() {
+            assertEquals("MemberUp received", true,
+                    Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
+            memberUpReceived = new CountDownLatch(1);
+        }
+
+        void waitForMemberRemoved() {
+            assertEquals("MemberRemoved received", true,
+                    Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
+            memberRemovedReceived = new CountDownLatch(1);
+        }
+
+        void verifyFindPrimary() {
+            assertEquals("FindPrimary received", true,
+                    Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
+            findPrimaryMessageReceived = new CountDownLatch(1);
+        }
+    }
 }
 }
index 2746bcf982906af7046c8c8e3cb930711929df43..6b4f6337785a753e68e1330f8296927aeb70b005 100644 (file)
@@ -37,13 +37,11 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
@@ -458,7 +456,7 @@ public class ActorContextTest extends AbstractActorTest{
                             mock(Configuration.class), dataStoreContext) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
                             mock(Configuration.class), dataStoreContext) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
-                            return Futures.successful((Object) new PrimaryNotFound("foobar"));
+                            return Futures.successful((Object) new PrimaryNotFoundException("not found"));
                         }
                     };
 
                         }
                     };
 
@@ -491,7 +489,7 @@ public class ActorContextTest extends AbstractActorTest{
                             mock(Configuration.class), dataStoreContext) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
                             mock(Configuration.class), dataStoreContext) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
-                            return Futures.successful((Object) new ActorNotInitialized());
+                            return Futures.successful((Object) new NotInitializedException("not iniislized"));
                         }
                     };
 
                         }
                     };
 
@@ -518,8 +516,8 @@ public class ActorContextTest extends AbstractActorTest{
 
             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
 
             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
-            shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()).toSerializable());
-            shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()).toSerializable());
+            shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()));
+            shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()));
             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
 
             Configuration mockConfig = mock(Configuration.class);
             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
 
             Configuration mockConfig = mock(Configuration.class);
index fe40aa0fd4571c65f431124d3b58704b9b62b9c6..810b270cfcee82aab53ca55f96a777e87bdc3141 100644 (file)
@@ -14,14 +14,22 @@ import akka.actor.AddressFromURIString;
 import akka.cluster.ClusterEvent;
 import akka.cluster.MemberStatus;
 import akka.cluster.UniqueAddress;
 import akka.cluster.ClusterEvent;
 import akka.cluster.MemberStatus;
 import akka.cluster.UniqueAddress;
-import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
-import scala.collection.JavaConversions;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.HashSet;
 import java.util.Set;
+import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
+import scala.collection.JavaConversions;
 
 public class MockClusterWrapper implements ClusterWrapper{
 
     private Address selfAddress = new Address("akka.tcp", "test", "127.0.0.1", 2550);
 
 public class MockClusterWrapper implements ClusterWrapper{
 
     private Address selfAddress = new Address("akka.tcp", "test", "127.0.0.1", 2550);
+    private String currentMemberName = "member-1";
+
+    public MockClusterWrapper() {
+    }
+
+    public MockClusterWrapper(String currentMemberName) {
+        this.currentMemberName = currentMemberName;
+    }
 
     @Override
     public void subscribeToMemberEvents(ActorRef actorRef) {
 
     @Override
     public void subscribeToMemberEvents(ActorRef actorRef) {
@@ -29,7 +37,7 @@ public class MockClusterWrapper implements ClusterWrapper{
 
     @Override
     public String getCurrentMemberName() {
 
     @Override
     public String getCurrentMemberName() {
-        return "member-1";
+        return currentMemberName;
     }
 
     @Override
     }
 
     @Override
index 4ef7d65857b3c86a76d405eeca52c888f6f9fc55..0bc561f1bd053f49674f34ac396afd50e6ba70ae 100644 (file)
@@ -9,6 +9,8 @@
 package org.opendaylight.controller.cluster.datastore.utils;
 
 import com.google.common.base.Optional;
 package org.opendaylight.controller.cluster.datastore.utils;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -18,11 +20,23 @@ import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
 
 public class MockConfiguration implements Configuration{
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
 
 public class MockConfiguration implements Configuration{
-    @Override public List<String> getMemberShardNames(final String memberName) {
-        return Arrays.asList("default");
+    private Map<String, List<String>> shardMembers = ImmutableMap.<String, List<String>>builder().
+            put("default", Arrays.asList("member-1", "member-2")).
+            /*put("astronauts", Arrays.asList("member-2", "member-3")).*/build();
+
+    public MockConfiguration() {
+    }
+
+    public MockConfiguration(Map<String, List<String>> shardMembers) {
+        this.shardMembers = shardMembers;
     }
 
     }
 
-    @Override public Optional<String> getModuleNameFromNameSpace(
+    @Override
+    public List<String> getMemberShardNames(final String memberName) {
+        return new ArrayList<>(shardMembers.keySet());
+    }
+    @Override
+    public Optional<String> getModuleNameFromNameSpace(
         final String nameSpace) {
         return Optional.absent();
     }
         final String nameSpace) {
         return Optional.absent();
     }
@@ -44,7 +58,8 @@ public class MockConfiguration implements Configuration{
             return Arrays.asList("member-2", "member-3");
         }
 
             return Arrays.asList("member-2", "member-3");
         }
 
-        return Collections.emptyList();
+        List<String> members = shardMembers.get(shardName);
+        return members != null ? members : Collections.<String>emptyList();
     }
 
     @Override public Set<String> getAllShardNames() {
     }
 
     @Override public Set<String> getAllShardNames() {
index badec6f8313f8c8bbd7b7dfa3e859e1e49034277..03634627d643ab1042f00c31fa3bb2054b0a31a2 100644 (file)
@@ -34,3 +34,105 @@ bounded-mailbox {
   mailbox-capacity = 1000
   mailbox-push-timeout-time = 100ms
 }
   mailbox-capacity = 1000
   mailbox-push-timeout-time = 100ms
 }
+
+Member1 {
+  bounded-mailbox {
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+    mailbox-capacity = 1000
+    mailbox-push-timeout-time = 100ms
+  }
+
+  in-memory-journal {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+  }
+
+  in-memory-snapshot-store {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+    plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+  }
+
+  akka {
+    persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
+    
+    loglevel = "DEBUG"
+    
+    actor {
+      provider = "akka.cluster.ClusterActorRefProvider"
+      
+      serializers {
+          java = "akka.serialization.JavaSerializer"
+          proto = "akka.remote.serialization.ProtobufSerializer"
+      }
+
+      serialization-bindings {
+          "com.google.protobuf.Message" = proto
+      }
+    }
+    remote {
+      log-remote-lifecycle-events = off
+      netty.tcp {
+        hostname = "127.0.0.1"
+        port = 2558
+      }
+    }
+
+    cluster {
+      auto-down-unreachable-after = 100s
+      
+      roles = [
+        "member-1"
+      ]
+    }
+  }
+}
+
+Member2 {
+  bounded-mailbox {
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+    mailbox-capacity = 1000
+    mailbox-push-timeout-time = 100ms
+  }
+  
+  in-memory-journal {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+  }
+
+  in-memory-snapshot-store {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+    plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+  }
+  
+  akka {
+    persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
+    
+    actor {
+      provider = "akka.cluster.ClusterActorRefProvider"
+      
+      serializers {
+          java = "akka.serialization.JavaSerializer"
+          proto = "akka.remote.serialization.ProtobufSerializer"
+      }
+
+      serialization-bindings {
+          "com.google.protobuf.Message" = proto
+      }
+    }
+    remote {
+      log-remote-lifecycle-events = off
+      netty.tcp {
+        hostname = "127.0.0.1"
+        port = 2559
+      }
+    }
+
+    cluster {
+      auto-down-unreachable-after = 100s
+      
+      roles = [
+        "member-2"
+      ]
+    }
+  }
+}