BUG 2185 : Add JMX API to change the state of a Shard 31/26331/1
authorMoiz Raja <moraja@cisco.com>
Fri, 24 Jul 2015 02:19:16 +0000 (19:19 -0700)
committerMoiz Raja <moraja@cisco.com>
Wed, 2 Sep 2015 12:48:01 +0000 (12:48 +0000)
Added two APIs to the ShardManager MBeans
- switchAllLocalShardsState
- switchShardState

Change-Id: I896e421f322f487b4f8eb321708e01cc93bbd48f
Signed-off-by: Moiz Raja <moraja@cisco.com>
(cherry picked from commit 3a59e9f47466fbf150b6b14a44f4d6c775f79ee9)

opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SwitchBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfoMBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/SwitchShardBehavior.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java

index 35dee10ba45d4b3862c366787905fb0db10b4f12..6828c85e20525c8cf2ecc6f2304206a8046d71ea 100644 (file)
@@ -158,10 +158,12 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         @Override
         public RaftPolicy get() {
             if(Strings.isNullOrEmpty(DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass)){
+                LOG.debug("No custom RaftPolicy specified. Using DefaultRaftPolicy");
                 return DefaultRaftPolicy.INSTANCE;
             }
             try {
                 String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass;
+                LOG.info("Trying to use custom RaftPolicy {}", className);
                 Class c = Class.forName(className);
                 RaftPolicy obj = (RaftPolicy)c.newInstance();
                 return obj;
index b3c54615676fd77a7d65eedf6491b714f681611d..9596bb388189e6996a56328bed777b6fc8c51389 100644 (file)
@@ -26,4 +26,13 @@ public class SwitchBehavior {
     public long getNewTerm() {
         return newTerm;
     }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder("SwitchBehavior{");
+        sb.append("newState=").append(newState);
+        sb.append(", newTerm=").append(newTerm);
+        sb.append('}');
+        return sb.toString();
+    }
 }
index a46bb3dae8e5887202d66ad0ddfda51e5774537b..e05dd7c09ba5923009274543038fe8e6970407a9 100644 (file)
@@ -54,6 +54,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve
 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
+import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
@@ -61,6 +62,7 @@ import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListe
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
@@ -183,7 +185,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if(message instanceof ShardNotInitializedTimeout) {
             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
         } else if(message instanceof ShardLeaderStateChanged) {
-            onLeaderStateChanged((ShardLeaderStateChanged)message);
+            onLeaderStateChanged((ShardLeaderStateChanged) message);
+        } else if(message instanceof SwitchShardBehavior){
+            onSwitchShardBehavior((SwitchShardBehavior) message);
         } else {
             unknownMessage(message);
         }
@@ -478,6 +482,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private void onSwitchShardBehavior(SwitchShardBehavior message) {
+        ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
+
+        ShardInformation shardInformation = localShards.get(identifier.getShardName());
+
+        if(shardInformation != null && shardInformation.getActor() != null) {
+            shardInformation.getActor().tell(
+                    new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
+        } else {
+            LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
+                    message.getShardName(), message.getNewState());
+        }
+    }
+
     /**
      * Notifies all the local shards of a change in the schema context
      *
@@ -606,6 +624,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
                     datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
+
+        mBean.setShardManager(this);
     }
 
     /**
index 8adc8b24b27c1ad4d3eff65730cf9d27646fd3d0..aaac644b558b3e1913facbd01bc5a07536bf30ee 100644 (file)
@@ -8,18 +8,33 @@
 
 package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager;
 
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import java.util.List;
-
+import org.opendaylight.controller.cluster.datastore.ShardManager;
+import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
+import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfoMBean {
 
     public static String JMX_CATEGORY_SHARD_MANAGER = "ShardManager";
 
+    // The only states that you can switch to from outside. You cannot switch to Candidate/IsolatedLeader for example
+    private static final List<String> ACCEPTABLE_STATES
+            = Lists.newArrayList(RaftState.Leader.name(), RaftState.Follower.name());
+
+    private static final Logger LOG = LoggerFactory.getLogger(ShardManagerInfo.class);
+
     private final List<String> localShards;
 
     private boolean syncStatus = false;
 
+    private ShardManager shardManager;
+
     public ShardManagerInfo(String name, String mxBeanType, List<String> localShards) {
         super(name, mxBeanType, JMX_CATEGORY_SHARD_MANAGER);
         this.localShards = localShards;
@@ -44,7 +59,30 @@ public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfo
         return this.syncStatus;
     }
 
+    @Override
+    public void switchAllLocalShardsState(String newState, long term) {
+        LOG.info("switchAllLocalShardsState called newState = {}, term = {}", newState, term);
+
+        for(String shardName : localShards){
+            switchShardState(shardName, newState, term);
+        }
+    }
+
+    @Override
+    public void switchShardState(String shardName, String newState, long term) {
+        LOG.info("switchShardState called shardName = {}, newState = {}, term = {}", shardName, newState, term);
+
+        Preconditions.checkArgument(localShards.contains(shardName), shardName + " is not local");
+        Preconditions.checkArgument(ACCEPTABLE_STATES.contains(newState));
+
+        shardManager.getSelf().tell(new SwitchShardBehavior(shardName, newState, term), ActorRef.noSender());
+    }
+
     public void setSyncStatus(boolean syncStatus){
         this.syncStatus = syncStatus;
     }
+
+    public void setShardManager(ShardManager shardManager){
+        this.shardManager = shardManager;
+    }
 }
index b64ba747827d3eb6c90d6d6b7ed043cf6986e1e1..da0331eb77cfd6383bf6b602933de1dc97d8da18 100644 (file)
@@ -11,6 +11,38 @@ package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager;
 import java.util.List;
 
 public interface ShardManagerInfoMBean {
+    /**
+     *
+     * @return a list of all the local shard names
+     */
     List<String> getLocalShards();
+
+    /**
+     *
+     * @return true if all local shards are in sync with their corresponding leaders
+     */
     boolean getSyncStatus();
+
+    /**
+     * Switch the Raft Behavior of all the local shards to the newBehavior
+     *
+     * @param newBehavior should be either Leader/Follower only
+     * @param term when switching to the Leader specifies for which term the Shard would be the Leader. Any modifications
+     *             made to state will be written with this term. This term will then be used by the Raft replication
+     *             implementation to decide which modifications should stay and which ones should be removed. Ideally
+     *             the term provided when switching to a new Leader should always be higher than the previous term.
+     */
+    void switchAllLocalShardsState(String newBehavior, long term);
+
+    /**
+     * Switch the Raft Behavior of the shard specified by shardName to the newBehavior
+     *
+     * @param shardName a shard that is local to this shard manager
+     * @param newBehavior should be either Leader/Follower only
+     * @param term when switching to the Leader specifies for which term the Shard would be the Leader. Any modifications
+     *             made to state will be written with this term. This term will then be used by the Raft replication
+     *             implementation to decide which modifications should stay and which ones should be removed. Ideally
+     *             the term provided when switching to a new Leader should always be higher than the previous term.
+     */
+    void switchShardState(String shardName, String newBehavior, long term);
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/SwitchShardBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/SwitchShardBehavior.java
new file mode 100644 (file)
index 0000000..f1c2a07
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class SwitchShardBehavior {
+    private final String shardName;
+    private final String newState;
+    private final long term;
+
+    public SwitchShardBehavior(String shardName, String newState, long term) {
+        this.shardName = shardName;
+        this.newState = newState;
+        this.term = term;
+    }
+
+    public String getShardName() {
+        return shardName;
+    }
+
+    public String getNewState() {
+        return newState;
+    }
+
+    public long getTerm() {
+        return term;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder("SwitchShardBehavior{");
+        sb.append("shardName='").append(shardName).append('\'');
+        sb.append(", newState='").append(newState).append('\'');
+        sb.append(", term=").append(term);
+        sb.append('}');
+        return sb.toString();
+    }
+}
index 7edddeb722f706f28ca49fd0b1fc2163fb921283..b6ba73cd83c5735b68f29465abac72aa5e3e169a 100644 (file)
@@ -58,6 +58,7 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
+import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
@@ -68,6 +69,7 @@ import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListe
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
@@ -87,6 +89,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
     private static TestActorRef<MessageCollectorActor> mockShardActor;
 
+    private static String mockShardName;
+
     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
                    .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
@@ -105,8 +109,8 @@ public class ShardManagerTest extends AbstractActorTest {
         InMemoryJournal.clear();
 
         if(mockShardActor == null) {
-            String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
-            mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name);
+            mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
+            mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName);
         }
 
         mockShardActor.underlyingActor().clear();
@@ -907,6 +911,24 @@ public class ShardManagerTest extends AbstractActorTest {
 
     }
 
+    @Test
+    public void testOnReceiveSwitchShardBehavior() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+
+            shardManager.tell(new SwitchShardBehavior(mockShardName, "Leader", 1000), getRef());
+
+            SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
+
+            assertEquals(RaftState.Leader, switchBehavior.getNewState());
+            assertEquals(1000, switchBehavior.getNewTerm());
+        }};
+    }
+
+
     private static class TestShardManager extends ShardManager {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);