From: Robert Varga Date: Wed, 23 Mar 2016 15:54:03 +0000 (+0100) Subject: Make ShardManagerInfo a proper view of ShardManager X-Git-Tag: release/boron~268 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=e7ce18361e9d6f2126525934f73438e9841e39bc Make ShardManagerInfo a proper view of ShardManager This reduces method visibility, removing unnecessary methods. It also fixes thread safety issues around local shard lists by repurposing SwitchShardBehavior and introducing a query message. Change-Id: I35b05c067448c0be94411e5599e6faee182d2ced Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/GetLocalShardIds.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/GetLocalShardIds.java new file mode 100644 index 0000000000..3ff457ab06 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/GetLocalShardIds.java @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.shardmanager; + +final class GetLocalShardIds { + static final GetLocalShardIds INSTANCE = new GetLocalShardIds(); + + private GetLocalShardIds() { + // Prevent instantiation + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index e314e1c894..4c9fcc1dba 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -174,12 +174,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); - List localShardActorNames = new ArrayList<>(); - mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(), - "shard-manager-" + this.type, - datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(), - localShardActorNames); - mBean.setShardManager(this); + mBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), "shard-manager-" + this.type, + datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType()); + mBean.registerMBean(); } @Override @@ -250,6 +247,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { persistenceId(), ((SaveSnapshotFailure) message).cause()); } else if(message instanceof Shutdown) { onShutDown(); + } else if (message instanceof GetLocalShardIds) { + onGetLocalShardIds(); } else { unknownMessage(message); } @@ -312,14 +311,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { if (replyMsg.getStatus() == ServerChangeStatus.OK) { LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(), shardId.getShardName()); - originalSender.tell(new akka.actor.Status.Success(null), getSelf()); + originalSender.tell(new Status.Success(null), getSelf()); } else { LOG.warn ("{}: Leader failed to remove shard replica {} with status {}", persistenceId(), shardId, replyMsg.getStatus()); - Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), - leaderPath, shardId); - originalSender.tell(new akka.actor.Status.Failure(failure), getSelf()); + Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId); + originalSender.tell(new Status.Failure(failure), getSelf()); } } @@ -402,7 +400,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } if(notInitialized != null) { - getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format( + getSender().tell(new Status.Failure(new IllegalStateException(String.format( "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf()); return; } @@ -429,14 +427,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String shardName = createShard.getModuleShardConfig().getShardName(); if(localShards.containsKey(shardName)) { LOG.debug("{}: Shard {} already exists", persistenceId(), shardName); - reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName)); + reply = new Status.Success(String.format("Shard with name %s already exists", shardName)); } else { doCreateShard(createShard); - reply = new akka.actor.Status.Success(null); + reply = new Status.Success(null); } } catch (Exception e) { LOG.error("{}: onCreateShard failed", persistenceId(), e); - reply = new akka.actor.Status.Failure(e); + reply = new Status.Failure(e); } if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) { @@ -489,8 +487,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { info.setActiveMember(isActiveMember); localShards.put(info.getShardName(), info); - mBean.addLocalShard(shardId.toString()); - if(schemaContext != null) { info.setActor(newShardActor(schemaContext, info)); } @@ -843,17 +839,44 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void onSwitchShardBehavior(SwitchShardBehavior message) { - ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build(); + private void onGetLocalShardIds() { + final List response = new ArrayList<>(localShards.size()); + + for (ShardInformation info : localShards.values()) { + response.add(info.getShardId().toString()); + } + + getSender().tell(new Status.Success(response), getSelf()); + } + + private void onSwitchShardBehavior(final SwitchShardBehavior message) { + final ShardIdentifier identifier = message.getShardId(); - ShardInformation shardInformation = localShards.get(identifier.getShardName()); + if (identifier != null) { + final ShardInformation info = localShards.get(identifier.getShardName()); + if (info == null) { + getSender().tell(new Status.Failure( + new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf()); + return; + } - if(shardInformation != null && shardInformation.getActor() != null) { - shardInformation.getActor().tell( - new SwitchBehavior(message.getNewState(), message.getTerm()), getSelf()); + switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm())); } else { + for (ShardInformation info : localShards.values()) { + switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm())); + } + } + + getSender().tell(new Status.Success(null), getSelf()); + } + + private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) { + final ActorRef actor = info.getActor(); + if (actor != null) { + actor.tell(switchBehavior, getSelf()); + } else { LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available", - message.getShardName(), message.getNewState()); + info.shardName, switchBehavior.getNewState()); } } @@ -984,7 +1007,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot( shardSnapshots.get(shardName)), peerAddressResolver)); - mBean.addLocalShard(shardId.toString()); } } @@ -1038,7 +1060,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { if (shardReplicaOperationsInProgress.contains(shardName)) { String msg = String.format("A shard replica operation for %s is already in progress", shardName); LOG.debug ("{}: {}", persistenceId(), msg); - sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf()); + sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf()); return true; } @@ -1054,7 +1076,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { if (!(this.configuration.isShardConfigured(shardName))) { String msg = String.format("No module configuration exists for shard %s", shardName); LOG.debug ("{}: {}", persistenceId(), msg); - getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf()); + getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf()); return; } @@ -1063,7 +1085,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String msg = String.format( "No SchemaContext is available in order to create a local shard instance for %s", shardName); LOG.debug ("{}: {}", persistenceId(), msg); - getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf()); + getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf()); return; } @@ -1084,7 +1106,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) { String msg = String.format("Local shard %s already exists", shardName); LOG.debug ("{}: {}", persistenceId(), msg); - sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf()); + sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf()); } private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) { @@ -1154,7 +1176,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - sender.tell(new akka.actor.Status.Failure(message == null ? failure : + sender.tell(new Status.Failure(message == null ? failure : new RuntimeException(message, failure)), getSelf()); } @@ -1173,8 +1195,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardInfo.setActiveMember(true); persistShardList(); - mBean.addLocalShard(shardInfo.getShardId().toString()); - sender.tell(new akka.actor.Status.Success(null), getSelf()); + sender.tell(new Status.Success(null), getSelf()); } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) { sendLocalReplicaAlreadyExistsReply(shardName, sender); } else { @@ -1679,7 +1700,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void onFailure(Throwable failure) { LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure); - targetActor.tell(new akka.actor.Status.Failure(new RuntimeException( + targetActor.tell(new Status.Failure(new RuntimeException( String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor); } @@ -1688,7 +1709,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String msg = String.format("Failed to find leader for shard %s: received response: %s", shardName, response); LOG.debug ("{}: {}", persistenceId, msg); - targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response : + targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response : new RuntimeException(msg)), shardManagerActor); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerInfo.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerInfo.java index ef9dceb11e..61f9c1ca71 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerInfo.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerInfo.java @@ -9,59 +9,56 @@ package org.opendaylight.controller.cluster.datastore.shardmanager; import akka.actor.ActorRef; +import akka.pattern.Patterns; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import java.util.Collection; +import com.google.common.base.Throwables; import java.util.List; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; final class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfoMBean { public static final String JMX_CATEGORY_SHARD_MANAGER = "ShardManager"; - // The only states that you can switch to from outside. You cannot switch to Candidate/IsolatedLeader for example - private static final Collection ACCEPTABLE_STATES - = ImmutableList.of(RaftState.Leader.name(), RaftState.Follower.name()); - private static final Logger LOG = LoggerFactory.getLogger(ShardManagerInfo.class); + private static final long ASK_TIMEOUT_MILLIS = 5000; + private final ActorRef shardManager; private final String memberName; - private final List localShards; - private boolean syncStatus = false; + private volatile boolean syncStatus = false; - private ShardManager shardManager; - private ShardManagerInfo(String memberName, String name, String mxBeanType, List localShards) { + ShardManagerInfo(final ActorRef shardManager, final String memberName, final String name, + final String mxBeanType) { super(name, mxBeanType, JMX_CATEGORY_SHARD_MANAGER); - this.memberName = memberName; - this.localShards = localShards; - } - - static ShardManagerInfo createShardManagerMBean(String memberName, String name, String mxBeanType, - List localShards){ - ShardManagerInfo shardManagerInfo = new ShardManagerInfo(memberName, name, mxBeanType, localShards); - - shardManagerInfo.registerMBean(); - - return shardManagerInfo; - } - - public void addLocalShard(String shardName) { - localShards.add(shardName); + this.shardManager = Preconditions.checkNotNull(shardManager); + this.memberName = Preconditions.checkNotNull(memberName); } + @SuppressWarnings("unchecked") @Override public List getLocalShards() { - return localShards; + try { + return (List) Await.result( + Patterns.ask(shardManager, GetLocalShardIds.INSTANCE, ASK_TIMEOUT_MILLIS), Duration.Inf()); + } catch (Exception e) { + throw Throwables.propagate(e); + } } @Override public boolean getSyncStatus() { - return this.syncStatus; + return syncStatus; + } + + void setSyncStatus(boolean syncStatus) { + this.syncStatus = syncStatus; } @Override @@ -69,31 +66,39 @@ final class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfoM return memberName; } - @Override - public void switchAllLocalShardsState(String newState, long term) { - LOG.info("switchAllLocalShardsState called newState = {}, term = {}", newState, term); - - for(String shardName : localShards){ - switchShardState(shardName, newState, term); + private void requestSwitchShardState(final ShardIdentifier shardId, final String newState, final long term) { + // Validates strings argument + final RaftState state = RaftState.valueOf(newState); + + // Leader and Follower are the only states to which we can switch externally + switch (state) { + case Follower: + case Leader: + try { + Await.result(Patterns.ask(shardManager, new SwitchShardBehavior(shardId, state, term), + ASK_TIMEOUT_MILLIS), Duration.Inf()); + } catch (Exception e) { + throw Throwables.propagate(e); + } + break; + case Candidate: + case IsolatedLeader: + default: + throw new IllegalArgumentException("Illegal target state " + state); } } @Override - public void switchShardState(String shardName, String newState, long term) { - LOG.info("switchShardState called shardName = {}, newState = {}, term = {}", shardName, newState, term); - - Preconditions.checkArgument(localShards.contains(shardName), shardName + " is not local"); - Preconditions.checkArgument(ACCEPTABLE_STATES.contains(newState)); - - shardManager.getSelf().tell(new SwitchShardBehavior(shardName, RaftState.valueOf(newState), term), - ActorRef.noSender()); - } - - public void setSyncStatus(boolean syncStatus){ - this.syncStatus = syncStatus; + public void switchAllLocalShardsState(String newState, long term) { + LOG.info("switchAllLocalShardsState called newState = {}, term = {}", newState, term); + requestSwitchShardState(null, newState, term); } - public void setShardManager(ShardManager shardManager){ - this.shardManager = shardManager; + @Override + public void switchShardState(String shardId, String newState, long term) { + final ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString( + Preconditions.checkNotNull(shardId, "Shard id may not be null")).build(); + LOG.info("switchShardState called shardName = {}, newState = {}, term = {}", shardId, newState, term); + requestSwitchShardState(identifier, newState, term); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/SwitchShardBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/SwitchShardBehavior.java index d797440425..e3c222d85c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/SwitchShardBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/SwitchShardBehavior.java @@ -9,21 +9,23 @@ package org.opendaylight.controller.cluster.datastore.shardmanager; import com.google.common.base.Preconditions; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.raft.RaftState; final class SwitchShardBehavior { - private final String shardName; + private final ShardIdentifier shardId; private final RaftState newState; private final long term; - SwitchShardBehavior(String shardName, RaftState newState, long term) { - this.shardName = Preconditions.checkNotNull(shardName); + SwitchShardBehavior(final ShardIdentifier shardId, final RaftState newState, final long term) { this.newState = Preconditions.checkNotNull(newState); + this.shardId = shardId; this.term = term; } - String getShardName() { - return shardName; + @Nullable ShardIdentifier getShardId() { + return shardId; } RaftState getNewState() { @@ -37,7 +39,7 @@ final class SwitchShardBehavior { @Override public String toString() { final StringBuilder sb = new StringBuilder("SwitchShardBehavior{"); - sb.append("shardName='").append(shardName).append('\''); + sb.append("shardId='").append(shardId).append('\''); sb.append(", newState='").append(newState).append('\''); sb.append(", term=").append(term); sb.append('}'); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java index 8883a5e10b..4a49e8841c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java @@ -144,7 +144,7 @@ public class ShardManagerTest extends AbstractActorTest { private static TestActorRef mockShardActor; - private static String mockShardName; + private static ShardIdentifier mockShardName; private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS) @@ -162,8 +162,9 @@ public class ShardManagerTest extends AbstractActorTest { InMemorySnapshotStore.clear(); if(mockShardActor == null) { - mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString(); - mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName); + mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config"); + mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), + mockShardName.toString()); } mockShardActor.underlyingActor().clear(); @@ -987,7 +988,7 @@ public class ShardManagerTest extends AbstractActorTest { InMemoryJournal.addEntry(persistenceID, 2L, new SchemaContextModules(ImmutableSet.of("bar"))); InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID); - TestShardManager shardManager = newTestShardManager(); + newTestShardManager(); InMemoryJournal.waitForDeleteMessagesComplete(persistenceID);