@Override
public RaftPolicy get() {
if(Strings.isNullOrEmpty(DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass)){
+ LOG.debug("No custom RaftPolicy specified. Using DefaultRaftPolicy");
return DefaultRaftPolicy.INSTANCE;
}
try {
String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass;
+ LOG.info("Trying to use custom RaftPolicy {}", className);
Class c = Class.forName(className);
RaftPolicy obj = (RaftPolicy)c.newInstance();
return obj;
public long getNewTerm() {
return newTerm;
}
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("SwitchBehavior{");
+ sb.append("newState=").append(newState);
+ sb.append(", newTerm=").append(newTerm);
+ sb.append('}');
+ return sb.toString();
+ }
}
import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
+import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
} else if(message instanceof ShardNotInitializedTimeout) {
onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
} else if(message instanceof ShardLeaderStateChanged) {
- onLeaderStateChanged((ShardLeaderStateChanged)message);
+ onLeaderStateChanged((ShardLeaderStateChanged) message);
+ } else if(message instanceof SwitchShardBehavior){
+ onSwitchShardBehavior((SwitchShardBehavior) message);
} else {
unknownMessage(message);
}
}
}
+ private void onSwitchShardBehavior(SwitchShardBehavior message) {
+ ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
+
+ ShardInformation shardInformation = localShards.get(identifier.getShardName());
+
+ if(shardInformation != null && shardInformation.getActor() != null) {
+ shardInformation.getActor().tell(
+ new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
+ } else {
+ LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
+ message.getShardName(), message.getNewState());
+ }
+ }
+
/**
* Notifies all the local shards of a change in the schema context
*
mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
+
+ mBean.setShardManager(this);
}
/**
package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager;
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import java.util.List;
-
+import org.opendaylight.controller.cluster.datastore.ShardManager;
+import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
+import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfoMBean {
public static String JMX_CATEGORY_SHARD_MANAGER = "ShardManager";
+ // The only states that you can switch to from outside. You cannot switch to Candidate/IsolatedLeader for example
+ private static final List<String> ACCEPTABLE_STATES
+ = Lists.newArrayList(RaftState.Leader.name(), RaftState.Follower.name());
+
+ private static final Logger LOG = LoggerFactory.getLogger(ShardManagerInfo.class);
+
private final List<String> localShards;
private boolean syncStatus = false;
+ private ShardManager shardManager;
+
public ShardManagerInfo(String name, String mxBeanType, List<String> localShards) {
super(name, mxBeanType, JMX_CATEGORY_SHARD_MANAGER);
this.localShards = localShards;
return this.syncStatus;
}
+ @Override
+ public void switchAllLocalShardsState(String newState, long term) {
+ LOG.info("switchAllLocalShardsState called newState = {}, term = {}", newState, term);
+
+ for(String shardName : localShards){
+ switchShardState(shardName, newState, term);
+ }
+ }
+
+ @Override
+ public void switchShardState(String shardName, String newState, long term) {
+ LOG.info("switchShardState called shardName = {}, newState = {}, term = {}", shardName, newState, term);
+
+ Preconditions.checkArgument(localShards.contains(shardName), shardName + " is not local");
+ Preconditions.checkArgument(ACCEPTABLE_STATES.contains(newState));
+
+ shardManager.getSelf().tell(new SwitchShardBehavior(shardName, newState, term), ActorRef.noSender());
+ }
+
public void setSyncStatus(boolean syncStatus){
this.syncStatus = syncStatus;
}
+
+ public void setShardManager(ShardManager shardManager){
+ this.shardManager = shardManager;
+ }
}
import java.util.List;
public interface ShardManagerInfoMBean {
+ /**
+ *
+ * @return a list of all the local shard names
+ */
List<String> getLocalShards();
+
+ /**
+ *
+ * @return true if all local shards are in sync with their corresponding leaders
+ */
boolean getSyncStatus();
+
+ /**
+ * Switch the Raft Behavior of all the local shards to the newBehavior
+ *
+ * @param newBehavior should be either Leader/Follower only
+ * @param term when switching to the Leader specifies for which term the Shard would be the Leader. Any modifications
+ * made to state will be written with this term. This term will then be used by the Raft replication
+ * implementation to decide which modifications should stay and which ones should be removed. Ideally
+ * the term provided when switching to a new Leader should always be higher than the previous term.
+ */
+ void switchAllLocalShardsState(String newBehavior, long term);
+
+ /**
+ * Switch the Raft Behavior of the shard specified by shardName to the newBehavior
+ *
+ * @param shardName a shard that is local to this shard manager
+ * @param newBehavior should be either Leader/Follower only
+ * @param term when switching to the Leader specifies for which term the Shard would be the Leader. Any modifications
+ * made to state will be written with this term. This term will then be used by the Raft replication
+ * implementation to decide which modifications should stay and which ones should be removed. Ideally
+ * the term provided when switching to a new Leader should always be higher than the previous term.
+ */
+ void switchShardState(String shardName, String newBehavior, long term);
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class SwitchShardBehavior {
+ private final String shardName;
+ private final String newState;
+ private final long term;
+
+ public SwitchShardBehavior(String shardName, String newState, long term) {
+ this.shardName = shardName;
+ this.newState = newState;
+ this.term = term;
+ }
+
+ public String getShardName() {
+ return shardName;
+ }
+
+ public String getNewState() {
+ return newState;
+ }
+
+ public long getTerm() {
+ return term;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("SwitchShardBehavior{");
+ sb.append("shardName='").append(shardName).append('\'');
+ sb.append(", newState='").append(newState).append('\'');
+ sb.append(", term=").append(term);
+ sb.append('}');
+ return sb.toString();
+ }
+}
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
+import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
private static TestActorRef<MessageCollectorActor> mockShardActor;
+ private static String mockShardName;
+
private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
InMemoryJournal.clear();
if(mockShardActor == null) {
- String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
- mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name);
+ mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
+ mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName);
}
mockShardActor.underlyingActor().clear();
}
+ @Test
+ public void testOnReceiveSwitchShardBehavior() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ shardManager.tell(new SwitchShardBehavior(mockShardName, "Leader", 1000), getRef());
+
+ SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
+
+ assertEquals(RaftState.Leader, switchBehavior.getNewState());
+ assertEquals(1000, switchBehavior.getNewTerm());
+ }};
+ }
+
+
private static class TestShardManager extends ShardManager {
private final CountDownLatch recoveryComplete = new CountDownLatch(1);