Make ShardManagerInfo a proper view of ShardManager 55/36655/6
authorRobert Varga <rovarga@cisco.com>
Wed, 23 Mar 2016 15:54:03 +0000 (16:54 +0100)
committerTom Pantelis <tpanteli@brocade.com>
Wed, 30 Mar 2016 14:12:38 +0000 (14:12 +0000)
This reduces method visibility, removing unnecessary methods. It also fixes
thread safety issues around local shard lists by repurposing SwitchShardBehavior
and introducing a query message.

Change-Id: I35b05c067448c0be94411e5599e6faee182d2ced
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/GetLocalShardIds.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerInfo.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/SwitchShardBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/GetLocalShardIds.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/GetLocalShardIds.java
new file mode 100644 (file)
index 0000000..3ff457a
--- /dev/null
@@ -0,0 +1,16 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.shardmanager;
+
+final class GetLocalShardIds {
+    static final GetLocalShardIds INSTANCE = new GetLocalShardIds();
+
+    private GetLocalShardIds() {
+        // Prevent instantiation
+    }
+}
index e314e1c894242d802bb242d034de86a403de52b0..4c9fcc1dbab70e27800a7528d8d49bed80997f49 100644 (file)
@@ -174,12 +174,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
 
-        List<String> localShardActorNames = new ArrayList<>();
-        mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
-                "shard-manager-" + this.type,
-                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
-                localShardActorNames);
-        mBean.setShardManager(this);
+        mBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), "shard-manager-" + this.type,
+                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
+        mBean.registerMBean();
     }
 
     @Override
@@ -250,6 +247,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                     persistenceId(), ((SaveSnapshotFailure) message).cause());
         } else if(message instanceof Shutdown) {
             onShutDown();
+        } else if (message instanceof GetLocalShardIds) {
+            onGetLocalShardIds();
         } else {
             unknownMessage(message);
         }
@@ -312,14 +311,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
             LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
                     shardId.getShardName());
-            originalSender.tell(new akka.actor.Status.Success(null), getSelf());
+            originalSender.tell(new Status.Success(null), getSelf());
         } else {
             LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
                     persistenceId(), shardId, replyMsg.getStatus());
 
-            Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
-                    leaderPath, shardId);
-            originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
+            Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
+            originalSender.tell(new Status.Failure(failure), getSelf());
         }
     }
 
@@ -402,7 +400,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         if(notInitialized != null) {
-            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
+            getSender().tell(new Status.Failure(new IllegalStateException(String.format(
                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
             return;
         }
@@ -429,14 +427,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             String shardName = createShard.getModuleShardConfig().getShardName();
             if(localShards.containsKey(shardName)) {
                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
-                reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
+                reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
             } else {
                 doCreateShard(createShard);
-                reply = new akka.actor.Status.Success(null);
+                reply = new Status.Success(null);
             }
         } catch (Exception e) {
             LOG.error("{}: onCreateShard failed", persistenceId(), e);
-            reply = new akka.actor.Status.Failure(e);
+            reply = new Status.Failure(e);
         }
 
         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
@@ -489,8 +487,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         info.setActiveMember(isActiveMember);
         localShards.put(info.getShardName(), info);
 
-        mBean.addLocalShard(shardId.toString());
-
         if(schemaContext != null) {
             info.setActor(newShardActor(schemaContext, info));
         }
@@ -843,17 +839,44 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void onSwitchShardBehavior(SwitchShardBehavior message) {
-        ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
+    private void onGetLocalShardIds() {
+        final List<String> response = new ArrayList<>(localShards.size());
+
+        for (ShardInformation info : localShards.values()) {
+            response.add(info.getShardId().toString());
+        }
+
+        getSender().tell(new Status.Success(response), getSelf());
+    }
+
+    private void onSwitchShardBehavior(final SwitchShardBehavior message) {
+        final ShardIdentifier identifier = message.getShardId();
 
-        ShardInformation shardInformation = localShards.get(identifier.getShardName());
+        if (identifier != null) {
+            final ShardInformation info = localShards.get(identifier.getShardName());
+            if (info == null) {
+                getSender().tell(new Status.Failure(
+                    new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
+                return;
+            }
 
-        if(shardInformation != null && shardInformation.getActor() != null) {
-            shardInformation.getActor().tell(
-                    new SwitchBehavior(message.getNewState(), message.getTerm()), getSelf());
+            switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
         } else {
+            for (ShardInformation info : localShards.values()) {
+                switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
+            }
+        }
+
+        getSender().tell(new Status.Success(null), getSelf());
+    }
+
+    private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
+        final ActorRef actor = info.getActor();
+        if (actor != null) {
+            actor.tell(switchBehavior, getSelf());
+          } else {
             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
-                    message.getShardName(), message.getNewState());
+                info.shardName, switchBehavior.getNewState());
         }
     }
 
@@ -984,7 +1007,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
                         shardSnapshots.get(shardName)), peerAddressResolver));
-            mBean.addLocalShard(shardId.toString());
         }
     }
 
@@ -1038,7 +1060,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         if (shardReplicaOperationsInProgress.contains(shardName)) {
             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
             LOG.debug ("{}: {}", persistenceId(), msg);
-            sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+            sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
             return true;
         }
 
@@ -1054,7 +1076,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         if (!(this.configuration.isShardConfigured(shardName))) {
             String msg = String.format("No module configuration exists for shard %s", shardName);
             LOG.debug ("{}: {}", persistenceId(), msg);
-            getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+            getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
             return;
         }
 
@@ -1063,7 +1085,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             String msg = String.format(
                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
             LOG.debug ("{}: {}", persistenceId(), msg);
-            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+            getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
             return;
         }
 
@@ -1084,7 +1106,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
         String msg = String.format("Local shard %s already exists", shardName);
         LOG.debug ("{}: {}", persistenceId(), msg);
-        sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
+        sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
     }
 
     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
@@ -1154,7 +1176,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             }
         }
 
-        sender.tell(new akka.actor.Status.Failure(message == null ? failure :
+        sender.tell(new Status.Failure(message == null ? failure :
             new RuntimeException(message, failure)), getSelf());
     }
 
@@ -1173,8 +1195,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             shardInfo.setActiveMember(true);
             persistShardList();
 
-            mBean.addLocalShard(shardInfo.getShardId().toString());
-            sender.tell(new akka.actor.Status.Success(null), getSelf());
+            sender.tell(new Status.Success(null), getSelf());
         } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
             sendLocalReplicaAlreadyExistsReply(shardName, sender);
         } else {
@@ -1679,7 +1700,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         @Override
         public void onFailure(Throwable failure) {
             LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
-            targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
+            targetActor.tell(new Status.Failure(new RuntimeException(
                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
         }
 
@@ -1688,7 +1709,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             String msg = String.format("Failed to find leader for shard %s: received response: %s",
                     shardName, response);
             LOG.debug ("{}: {}", persistenceId, msg);
-            targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
+            targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
                     new RuntimeException(msg)), shardManagerActor);
         }
     }
index ef9dceb11e50691857d107415347b3fe67fdab27..61f9c1ca719909f4951a140c1c25189949bd8c05 100644 (file)
@@ -9,59 +9,56 @@
 package org.opendaylight.controller.cluster.datastore.shardmanager;
 
 import akka.actor.ActorRef;
+import akka.pattern.Patterns;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import java.util.Collection;
+import com.google.common.base.Throwables;
 import java.util.List;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 
 final class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfoMBean {
 
     public static final String JMX_CATEGORY_SHARD_MANAGER = "ShardManager";
 
-    // The only states that you can switch to from outside. You cannot switch to Candidate/IsolatedLeader for example
-    private static final Collection<String> ACCEPTABLE_STATES
-            = ImmutableList.of(RaftState.Leader.name(), RaftState.Follower.name());
-
     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerInfo.class);
+    private static final long ASK_TIMEOUT_MILLIS = 5000;
 
+    private final ActorRef shardManager;
     private final String memberName;
-    private final List<String> localShards;
 
-    private boolean syncStatus = false;
+    private volatile boolean syncStatus = false;
 
-    private ShardManager shardManager;
 
-    private ShardManagerInfo(String memberName, String name, String mxBeanType, List<String> localShards) {
+    ShardManagerInfo(final ActorRef shardManager, final String memberName, final String name,
+        final String mxBeanType) {
         super(name, mxBeanType, JMX_CATEGORY_SHARD_MANAGER);
-        this.memberName = memberName;
-        this.localShards = localShards;
-    }
-
-    static ShardManagerInfo createShardManagerMBean(String memberName, String name, String mxBeanType,
-            List<String> localShards){
-        ShardManagerInfo shardManagerInfo = new ShardManagerInfo(memberName, name, mxBeanType, localShards);
-
-        shardManagerInfo.registerMBean();
-
-        return shardManagerInfo;
-    }
-
-    public void addLocalShard(String shardName) {
-        localShards.add(shardName);
+        this.shardManager = Preconditions.checkNotNull(shardManager);
+        this.memberName = Preconditions.checkNotNull(memberName);
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public List<String> getLocalShards() {
-        return localShards;
+        try {
+            return (List<String>) Await.result(
+                Patterns.ask(shardManager, GetLocalShardIds.INSTANCE, ASK_TIMEOUT_MILLIS), Duration.Inf());
+        } catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
     }
 
     @Override
     public boolean getSyncStatus() {
-        return this.syncStatus;
+        return syncStatus;
+    }
+
+    void setSyncStatus(boolean syncStatus) {
+        this.syncStatus = syncStatus;
     }
 
     @Override
@@ -69,31 +66,39 @@ final class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfoM
         return memberName;
     }
 
-    @Override
-    public void switchAllLocalShardsState(String newState, long term) {
-        LOG.info("switchAllLocalShardsState called newState = {}, term = {}", newState, term);
-
-        for(String shardName : localShards){
-            switchShardState(shardName, newState, term);
+    private void requestSwitchShardState(final ShardIdentifier shardId, final String newState, final long term) {
+        // Validates strings argument
+        final RaftState state = RaftState.valueOf(newState);
+
+        // Leader and Follower are the only states to which we can switch externally
+        switch (state) {
+            case Follower:
+            case Leader:
+                try {
+                    Await.result(Patterns.ask(shardManager, new SwitchShardBehavior(shardId, state, term),
+                        ASK_TIMEOUT_MILLIS), Duration.Inf());
+                } catch (Exception e) {
+                    throw Throwables.propagate(e);
+                }
+                break;
+            case Candidate:
+            case IsolatedLeader:
+            default:
+                throw new IllegalArgumentException("Illegal target state " + state);
         }
     }
 
     @Override
-    public void switchShardState(String shardName, String newState, long term) {
-        LOG.info("switchShardState called shardName = {}, newState = {}, term = {}", shardName, newState, term);
-
-        Preconditions.checkArgument(localShards.contains(shardName), shardName + " is not local");
-        Preconditions.checkArgument(ACCEPTABLE_STATES.contains(newState));
-
-        shardManager.getSelf().tell(new SwitchShardBehavior(shardName, RaftState.valueOf(newState), term),
-            ActorRef.noSender());
-    }
-
-    public void setSyncStatus(boolean syncStatus){
-        this.syncStatus = syncStatus;
+    public void switchAllLocalShardsState(String newState, long term) {
+        LOG.info("switchAllLocalShardsState called newState = {}, term = {}", newState, term);
+        requestSwitchShardState(null, newState, term);
     }
 
-    public void setShardManager(ShardManager shardManager){
-        this.shardManager = shardManager;
+    @Override
+    public void switchShardState(String shardId, String newState, long term) {
+        final ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(
+                Preconditions.checkNotNull(shardId, "Shard id may not be null")).build();
+        LOG.info("switchShardState called shardName = {}, newState = {}, term = {}", shardId, newState, term);
+        requestSwitchShardState(identifier, newState, term);
     }
 }
index d797440425eaa39ab38623ad669b95c0b8e83128..e3c222d85cb8f31f310befbb32bc6d9874cabb4a 100644 (file)
@@ -9,21 +9,23 @@
 package org.opendaylight.controller.cluster.datastore.shardmanager;
 
 import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.raft.RaftState;
 
 final class SwitchShardBehavior {
-    private final String shardName;
+    private final ShardIdentifier shardId;
     private final RaftState newState;
     private final long term;
 
-    SwitchShardBehavior(String shardName, RaftState newState, long term) {
-        this.shardName = Preconditions.checkNotNull(shardName);
+    SwitchShardBehavior(final ShardIdentifier shardId, final RaftState newState, final long term) {
         this.newState = Preconditions.checkNotNull(newState);
+        this.shardId = shardId;
         this.term = term;
     }
 
-    String getShardName() {
-        return shardName;
+    @Nullable ShardIdentifier getShardId() {
+        return shardId;
     }
 
     RaftState getNewState() {
@@ -37,7 +39,7 @@ final class SwitchShardBehavior {
     @Override
     public String toString() {
         final StringBuilder sb = new StringBuilder("SwitchShardBehavior{");
-        sb.append("shardName='").append(shardName).append('\'');
+        sb.append("shardId='").append(shardId).append('\'');
         sb.append(", newState='").append(newState).append('\'');
         sb.append(", term=").append(term);
         sb.append('}');
index 8883a5e10b18b018e8ff7a48b91baedeb8f7fafb..4a49e8841cb60b35f3e9e06350d0821f72bd62fd 100644 (file)
@@ -144,7 +144,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
     private static TestActorRef<MessageCollectorActor> mockShardActor;
 
-    private static String mockShardName;
+    private static ShardIdentifier mockShardName;
 
     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
             dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
@@ -162,8 +162,9 @@ public class ShardManagerTest extends AbstractActorTest {
         InMemorySnapshotStore.clear();
 
         if(mockShardActor == null) {
-            mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
-            mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName);
+            mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config");
+            mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class),
+                    mockShardName.toString());
         }
 
         mockShardActor.underlyingActor().clear();
@@ -987,7 +988,7 @@ public class ShardManagerTest extends AbstractActorTest {
         InMemoryJournal.addEntry(persistenceID, 2L, new SchemaContextModules(ImmutableSet.of("bar")));
         InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID);
 
-        TestShardManager shardManager = newTestShardManager();
+        newTestShardManager();
 
         InMemoryJournal.waitForDeleteMessagesComplete(persistenceID);