X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fjmx%2Fmbeans%2Fshardmanager%2FShardManagerInfo.java;h=94bf67557be278d1510a54457967c177fffe82d5;hb=4b2622df4d579e9b1be0b25603e002ce58db4463;hp=99c8daf87d30af3ce66bf3b5c42aa86133ec5575;hpb=3671ac415342e718f34c16d272647abd15b742c1;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java index 99c8daf87d..94bf67557b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java @@ -8,32 +8,132 @@ package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager; +import akka.actor.ActorRef; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import java.util.List; - +import org.opendaylight.controller.cluster.datastore.ShardManager; +import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior; +import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; +import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; +import akka.pattern.Patterns; +import akka.util.Timeout; +import java.util.concurrent.TimeUnit; +import scala.concurrent.Await; public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfoMBean { public static String JMX_CATEGORY_SHARD_MANAGER = "ShardManager"; + // The only states that you can switch to from outside. You cannot switch to Candidate/IsolatedLeader for example + private static final List ACCEPTABLE_STATES + = Lists.newArrayList(RaftState.Leader.name(), RaftState.Follower.name()); + + private static final Logger LOG = LoggerFactory.getLogger(ShardManagerInfo.class); + + private final String memberName; private final List localShards; - public ShardManagerInfo(String name, String mxBeanType, List localShards) { + private boolean syncStatus = false; + + private ShardManager shardManager; + + public ShardManagerInfo(String memberName, String name, String mxBeanType, List localShards) { super(name, mxBeanType, JMX_CATEGORY_SHARD_MANAGER); + this.memberName = memberName; this.localShards = localShards; } - public static ShardManagerInfo createShardManagerMBean(String name, String mxBeanType, + public static ShardManagerInfo createShardManagerMBean(String memberName, String name, String mxBeanType, List localShards){ - ShardManagerInfo shardManagerInfo = new ShardManagerInfo(name, mxBeanType, localShards); + ShardManagerInfo shardManagerInfo = new ShardManagerInfo(memberName, name, mxBeanType, localShards); shardManagerInfo.registerMBean(); return shardManagerInfo; } + public void addLocalShard(String shardName) { + localShards.add(shardName); + } + @Override public List getLocalShards() { return localShards; } + + @Override + public boolean getSyncStatus() { + return this.syncStatus; + } + + @Override + public String getMemberName() { + return memberName; + } + + @Override + public void switchAllLocalShardsState(String newState, long term) { + LOG.info("switchAllLocalShardsState called newState = {}, term = {}", newState, term); + + for(String shardName : localShards){ + switchShardState(shardName, newState, term); + } + } + + @Override + public void switchShardState(String shardName, String newState, long term) { + LOG.info("switchShardState called shardName = {}, newState = {}, term = {}", shardName, newState, term); + + Preconditions.checkArgument(localShards.contains(shardName), shardName + " is not local"); + Preconditions.checkArgument(ACCEPTABLE_STATES.contains(newState)); + + shardManager.getSelf().tell(new SwitchShardBehavior(shardName, newState, term), ActorRef.noSender()); + } + + public void setSyncStatus(boolean syncStatus){ + this.syncStatus = syncStatus; + } + + public void setShardManager(ShardManager shardManager){ + this.shardManager = shardManager; + } + + @Override + public void setAddShardReplica (String shardName) { + LOG.info ("addShardReplica initiated for shard {}", shardName); + + // TODO addTimeout to be made configurable + Timeout addTimeOut = new Timeout(1, TimeUnit.MINUTES); + try { + Await.result(Patterns.ask(shardManager.getSelf(), + new AddShardReplica(shardName), addTimeOut), + addTimeOut.duration()); + } catch (Exception ex) { + LOG.debug ("Obtained an exception during addShardReplica", ex); + throw (new RuntimeException(ex.getMessage())); + } + return; + } + + @Override + public void setRemoveShardReplica (String shardName) { + LOG.info ("removeShardReplica initiated for shard {}", shardName); + + // TODO remTimeOut to be made configurable + Timeout remTimeOut = new Timeout(30, TimeUnit.SECONDS); + try { + Await.result(Patterns.ask(shardManager.getSelf(), + new RemoveShardReplica(shardName), remTimeOut), + remTimeOut.duration()); + } catch (Exception ex) { + LOG.debug ("Obtained an exception during removeShardReplica", ex); + throw (new RuntimeException(ex.getMessage())); + } + return; + } }