package org.opendaylight.controller.cluster.datastore.shardmanager;
import static akka.pattern.Patterns.ask;
+import static java.util.Objects.requireNonNull;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.persistence.SnapshotSelectionCriteria;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
/**
// Stores a mapping between a shard name and it's corresponding information
// Shard names look like inventory, topology etc and are as specified in
// configuration
- private final Map<String, ShardInformation> localShards = new HashMap<>();
+ @VisibleForTesting
+ final Map<String, ShardInformation> localShards = new HashMap<>();
// The type of a ShardManager reflects the type of the datastore itself
// A data store could be of type config/operational
private final Configuration configuration;
- private final String shardDispatcherPath;
+ @VisibleForTesting
+ final String shardDispatcherPath;
private final ShardManagerInfo shardManagerMBean;
private final PrimaryShardInfoFutureCache primaryShardInfoCache;
- private final ShardPeerAddressResolver peerAddressResolver;
+ @VisibleForTesting
+ final ShardPeerAddressResolver peerAddressResolver;
private SchemaContext schemaContext;
private final Map<String, CompositeOnComplete<Boolean>> shardActorsStopping = new HashMap<>();
+ private final Set<Consumer<String>> shardAvailabilityCallbacks = new HashSet<>();
+
private final String persistenceId;
private final AbstractDataStore dataStore;
- private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
private PrefixedShardConfigUpdateHandler configUpdateHandler;
ShardManager(final AbstractShardManagerCreator<?> builder) {
LOG.info("Stopping ShardManager {}", persistenceId());
shardManagerMBean.unregisterMBean();
-
- if (configListenerReg != null) {
- configListenerReg.close();
- configListenerReg = null;
- }
}
@Override
onGetShardRole((GetShardRole) message);
} else if (message instanceof RunnableMessage) {
((RunnableMessage)message).run();
+ } else if (message instanceof RegisterForShardAvailabilityChanges) {
+ onRegisterForShardAvailabilityChanges((RegisterForShardAvailabilityChanges)message);
} else if (message instanceof DeleteSnapshotsFailure) {
LOG.warn("{}: Failed to delete prior snapshots", persistenceId(),
((DeleteSnapshotsFailure) message).cause());
} else if (message instanceof DeleteSnapshotsSuccess) {
- LOG.debug("{}: Successfully deleted prior snapshots", persistenceId(), message);
+ LOG.debug("{}: Successfully deleted prior snapshots", persistenceId());
} else if (message instanceof RegisterRoleChangeListenerReply) {
LOG.trace("{}: Received RegisterRoleChangeListenerReply", persistenceId());
} else if (message instanceof ClusterEvent.MemberEvent) {
}
}
+ private void onRegisterForShardAvailabilityChanges(final RegisterForShardAvailabilityChanges message) {
+ LOG.debug("{}: onRegisterForShardAvailabilityChanges: {}", persistenceId(), message);
+
+ final Consumer<String> callback = message.getCallback();
+ shardAvailabilityCallbacks.add(callback);
+
+ getSender().tell(new Status.Success((Registration)
+ () -> executeInSelf(() -> shardAvailabilityCallbacks.remove(callback))), self());
+ }
+
private void onGetShardRole(final GetShardRole message) {
LOG.debug("{}: onGetShardRole for shard: {}", persistenceId(), message.getName());
configUpdateHandler.initListener(dataStore, datastoreType);
}
- private void onShutDown() {
+ void onShutDown() {
List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
for (ShardInformation info : localShards.values()) {
if (info.getActor() != null) {
}
}
+ @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+ justification = "https://github.com/spotbugs/spotbugs/issues/811")
private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName,
final String primaryPath, final ActorRef sender) {
if (isShardReplicaOperationInProgress(shardName, sender)) {
public void onComplete(final Throwable failure, final Object response) {
if (failure != null) {
shardReplicaOperationsInProgress.remove(shardName);
- String msg = String.format("RemoveServer request to leader %s for shard %s failed",
- primaryPath, shardName);
- LOG.debug("{}: {}", persistenceId(), msg, failure);
+ LOG.debug("{}: RemoveServer request to leader {} for shard {} failed", persistenceId(), primaryPath,
+ shardName, failure);
// FAILURE
- sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+ sender.tell(new Status.Failure(new RuntimeException(
+ String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName),
+ failure)), self());
} else {
// SUCCESS
self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
}, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
}
+ @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+ justification = "https://github.com/spotbugs/spotbugs/issues/811")
private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName,
final String primaryPath, final ActorRef sender) {
if (isShardReplicaOperationInProgress(shardName, sender)) {
public void onComplete(final Throwable failure, final Object response) {
if (failure != null) {
shardReplicaOperationsInProgress.remove(shardName);
- String msg = String.format("RemoveServer request to leader %s for shard %s failed",
- primaryPath, shardName);
-
- LOG.debug("{}: {}", persistenceId(), msg, failure);
+ LOG.debug("{}: RemoveServer request to leader {} for shard {} failed", persistenceId(), primaryPath,
+ shardName, failure);
// FAILURE
- sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+ sender.tell(new Status.Failure(new RuntimeException(
+ String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName),
+ failure)), self());
} else {
// SUCCESS
self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
primaryShardInfoCache.remove(shardInformation.getShardName());
+
+ notifyShardAvailabilityCallbacks(shardInformation);
}
checkReady();
}
}
+ private void notifyShardAvailabilityCallbacks(final ShardInformation shardInformation) {
+ shardAvailabilityCallbacks.forEach(callback -> callback.accept(shardInformation.getShardName()));
+ }
+
private void onShardNotInitializedTimeout(final ShardNotInitializedTimeout message) {
ShardInformation shardInfo = message.getShardInfo();
message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
} else {
LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
- message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
+ message.getSender().tell(new NoShardLeaderException(shardInfo.getShardId()), getSelf());
}
}
try {
shardId = ShardIdentifier.fromShardIdString(actorName);
} catch (IllegalArgumentException e) {
- LOG.debug("{}: ignoring actor {}", actorName, e);
+ LOG.debug("{}: ignoring actor {}", persistenceId, actorName, e);
return;
}
}
LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
- shardInformation.getShardName());
+ shardInformation);
Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
timeout, getSelf(),
} else {
LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
shardInformation.getShardName());
- getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
+ getSender().tell(new NoShardLeaderException(shardInformation.getShardId()), getSelf());
}
return;
getSender().tell(messageSupplier.get(), getSelf());
}
- private static NoShardLeaderException createNoShardLeaderException(final ShardIdentifier shardId) {
- return new NoShardLeaderException(null, shardId.toString());
- }
-
private static NotInitializedException createNotInitializedException(final ShardIdentifier shardId) {
return new NotInitializedException(String.format(
"Found primary shard %s but it's not initialized yet. Please try again later", shardId));
info.setLeaderAvailable(false);
primaryShardInfoCache.remove(info.getShardName());
+
+ notifyShardAvailabilityCallbacks(info);
}
info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
Map<String, String> peerAddresses = getPeerAddresses(shardName);
- localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
- newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
- shardSnapshots.get(shardName)), peerAddressResolver));
+ localShards.put(shardName, createShardInfoFor(shardName, shardId, peerAddresses,
+ newShardDatastoreContext(shardName), shardSnapshots));
}
}
+ @VisibleForTesting
+ ShardInformation createShardInfoFor(final String shardName, final ShardIdentifier shardId,
+ final Map<String, String> peerAddresses,
+ final DatastoreContext datastoreContext,
+ final Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots) {
+ return new ShardInformation(shardName, shardId, peerAddresses,
+ datastoreContext, Shard.builder().restoreFromSnapshot(shardSnapshots.get(shardName)),
+ peerAddressResolver);
+ }
+
/**
* Given the name of the shard find the addresses of all it's peers.
*
* @param shardName the shard name
*/
- private Map<String, String> getPeerAddresses(final String shardName) {
+ Map<String, String> getPeerAddresses(final String shardName) {
final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
return getPeerAddresses(shardName, members);
}
@Override
public SupervisorStrategy supervisorStrategy() {
- return new OneForOneStrategy(10, Duration.create("1 minute"),
+ return new OneForOneStrategy(10, FiniteDuration.create(1, TimeUnit.MINUTES),
(Function<Throwable, Directive>) t -> {
LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
return SupervisorStrategy.resume();
private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
if (shardReplicaOperationsInProgress.contains(shardName)) {
- String msg = String.format("A shard replica operation for %s is already in progress", shardName);
- LOG.debug("{}: {}", persistenceId(), msg);
- sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
+ LOG.debug("{}: A shard replica operation for {} is already in progress", persistenceId(), shardName);
+ sender.tell(new Status.Failure(new IllegalStateException(
+ String.format("A shard replica operation for %s is already in progress", shardName))), getSelf());
return true;
}
// Create the localShard
if (schemaContext == null) {
- String msg = String.format(
- "No SchemaContext is available in order to create a local shard instance for %s", shardName);
- LOG.debug("{}: {}", persistenceId(), msg);
- getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
+ LOG.debug("{}: No SchemaContext is available in order to create a local shard instance for {}",
+ persistenceId(), shardName);
+ getSender().tell(new Status.Failure(new IllegalStateException(
+ "No SchemaContext is available in order to create a local shard instance for " + shardName)),
+ getSelf());
return;
}
// verify the shard with the specified name is present in the cluster configuration
if (!this.configuration.isShardConfigured(shardName)) {
- String msg = String.format("No module configuration exists for shard %s", shardName);
- LOG.debug("{}: {}", persistenceId(), msg);
- getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
+ LOG.debug("{}: No module configuration exists for shard {}", persistenceId(), shardName);
+ getSender().tell(new Status.Failure(new IllegalArgumentException(
+ "No module configuration exists for shard " + shardName)), getSelf());
return;
}
// Create the localShard
if (schemaContext == null) {
- String msg = String.format(
- "No SchemaContext is available in order to create a local shard instance for %s", shardName);
- LOG.debug("{}: {}", persistenceId(), msg);
- getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
+ LOG.debug("{}: No SchemaContext is available in order to create a local shard instance for {}",
+ persistenceId(), shardName);
+ getSender().tell(new Status.Failure(new IllegalStateException(
+ "No SchemaContext is available in order to create a local shard instance for " + shardName)),
+ getSelf());
return;
}
});
}
+ @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+ justification = "https://github.com/spotbugs/spotbugs/issues/811")
private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) {
- String msg = String.format("Local shard %s already exists", shardName);
- LOG.debug("{}: {}", persistenceId(), msg);
- sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
+ LOG.debug("{}: Local shard {} already exists", persistenceId(), shardName);
+ sender.tell(new Status.Failure(new AlreadyExistsException(
+ String.format("Local shard %s already exists", shardName))), getSelf());
}
+ @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+ justification = "https://github.com/spotbugs/spotbugs/issues/811")
private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix,
final RemotePrimaryShardFound response, final ActorRef sender) {
if (isShardReplicaOperationInProgress(shardName, sender)) {
execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
}
+ @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+ justification = "https://github.com/spotbugs/spotbugs/issues/811")
private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
if (isShardReplicaOperationInProgress(shardName, sender)) {
return;
+ "Possible causes - there was a problem replicating the data or shard leadership changed "
+ "while replicating the shard data", leaderPath, shardId.getShardName()));
case NO_LEADER:
- return createNoShardLeaderException(shardId);
+ return new NoShardLeaderException(shardId);
case NOT_SUPPORTED:
return new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
serverChange.getSimpleName(), shardId.getShardName()));
getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response),
sender);
} else if (response instanceof LocalShardNotFound) {
- String msg = String.format("Local shard %s does not exist", shardName);
- LOG.debug("{}: {}", persistenceId, msg);
- sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
+ LOG.debug("{}: Local shard {} does not exist", persistenceId, shardName);
+ sender.tell(new Status.Failure(new IllegalArgumentException(
+ String.format("Local shard %s does not exist", shardName))), self());
} else {
- String msg = String.format("Failed to find local shard %s: received response: %s",
- shardName, response);
- LOG.debug("{}: {}", persistenceId, msg);
- sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
- new RuntimeException(msg)), self());
+ LOG.debug("{}: Failed to find local shard {}: received response: {}", persistenceId, shardName,
+ response);
+ sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response
+ : new RuntimeException(
+ String.format("Failed to find local shard %s: received response: %s", shardName,
+ response))), self());
}
}
}
public void onComplete(final Throwable failure, final Object response) {
shardReplicaOperationsInProgress.remove(shardName);
if (failure != null) {
- String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
- shardActorRef.path());
- LOG.debug("{}: {}", persistenceId(), msg, failure);
- sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+ LOG.debug("{}: ChangeServersVotingStatus request to local shard {} failed", persistenceId(),
+ shardActorRef.path(), failure);
+ sender.tell(new Status.Failure(new RuntimeException(
+ String.format("ChangeServersVotingStatus request to local shard %s failed",
+ shardActorRef.path()), failure)), self());
} else {
LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
*/
protected AutoFindPrimaryFailureResponseHandler(final ActorRef targetActor, final String shardName,
final String persistenceId, final ActorRef shardManagerActor) {
- this.targetActor = Preconditions.checkNotNull(targetActor);
- this.shardName = Preconditions.checkNotNull(shardName);
- this.persistenceId = Preconditions.checkNotNull(persistenceId);
- this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
+ this.targetActor = requireNonNull(targetActor);
+ this.shardName = requireNonNull(shardName);
+ this.persistenceId = requireNonNull(persistenceId);
+ this.shardManagerActor = requireNonNull(shardManagerActor);
}
public ActorRef getTargetActor() {
@Override
public void onUnknownResponse(final Object response) {
- String msg = String.format("Failed to find leader for shard %s: received response: %s",
- shardName, response);
- LOG.debug("{}: {}", persistenceId, msg);
- targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
- new RuntimeException(msg)), shardManagerActor);
+ LOG.debug("{}: Failed to find leader for shard {}: received response: {}", persistenceId, shardName,
+ response);
+ targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response
+ : new RuntimeException(String.format("Failed to find leader for shard %s: received response: %s",
+ shardName, response))), shardManagerActor);
}
}