package org.opendaylight.controller.cluster.datastore.shardmanager;
import static akka.pattern.Patterns.ask;
+import static java.util.Objects.requireNonNull;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.persistence.SnapshotSelectionCriteria;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
/**
// Stores a mapping between a shard name and it's corresponding information
// Shard names look like inventory, topology etc and are as specified in
// configuration
- private final Map<String, ShardInformation> localShards = new HashMap<>();
+ @VisibleForTesting
+ final Map<String, ShardInformation> localShards = new HashMap<>();
// The type of a ShardManager reflects the type of the datastore itself
// A data store could be of type config/operational
private final Configuration configuration;
- private final String shardDispatcherPath;
+ @VisibleForTesting
+ final String shardDispatcherPath;
private final ShardManagerInfo shardManagerMBean;
private final PrimaryShardInfoFutureCache primaryShardInfoCache;
- private final ShardPeerAddressResolver peerAddressResolver;
+ @VisibleForTesting
+ final ShardPeerAddressResolver peerAddressResolver;
private SchemaContext schemaContext;
private final String persistenceId;
private final AbstractDataStore dataStore;
- private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
private PrefixedShardConfigUpdateHandler configUpdateHandler;
ShardManager(final AbstractShardManagerCreator<?> builder) {
LOG.info("Stopping ShardManager {}", persistenceId());
shardManagerMBean.unregisterMBean();
-
- if (configListenerReg != null) {
- configListenerReg.close();
- configListenerReg = null;
- }
}
@Override
}
}
- private void onRegisterForShardAvailabilityChanges(RegisterForShardAvailabilityChanges message) {
+ private void onRegisterForShardAvailabilityChanges(final RegisterForShardAvailabilityChanges message) {
LOG.debug("{}: onRegisterForShardAvailabilityChanges: {}", persistenceId(), message);
final Consumer<String> callback = message.getCallback();
}
}
- private void notifyShardAvailabilityCallbacks(ShardInformation shardInformation) {
+ private void notifyShardAvailabilityCallbacks(final ShardInformation shardInformation) {
shardAvailabilityCallbacks.forEach(callback -> callback.accept(shardInformation.getShardName()));
}
message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
} else {
LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
- message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
+ message.getSender().tell(new NoShardLeaderException(shardInfo.getShardId()), getSelf());
}
}
} else {
LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
shardInformation.getShardName());
- getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
+ getSender().tell(new NoShardLeaderException(shardInformation.getShardId()), getSelf());
}
return;
getSender().tell(messageSupplier.get(), getSelf());
}
- private static NoShardLeaderException createNoShardLeaderException(final ShardIdentifier shardId) {
- return new NoShardLeaderException(null, shardId.toString());
- }
-
private static NotInitializedException createNotInitializedException(final ShardIdentifier shardId) {
return new NotInitializedException(String.format(
"Found primary shard %s but it's not initialized yet. Please try again later", shardId));
LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
Map<String, String> peerAddresses = getPeerAddresses(shardName);
- localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
- newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
- shardSnapshots.get(shardName)), peerAddressResolver));
+ localShards.put(shardName, createShardInfoFor(shardName, shardId, peerAddresses,
+ newShardDatastoreContext(shardName), shardSnapshots));
}
}
+ @VisibleForTesting
+ ShardInformation createShardInfoFor(String shardName, ShardIdentifier shardId,
+ Map<String, String> peerAddresses,
+ DatastoreContext datastoreContext,
+ Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots) {
+ return new ShardInformation(shardName, shardId, peerAddresses,
+ datastoreContext, Shard.builder().restoreFromSnapshot(shardSnapshots.get(shardName)),
+ peerAddressResolver);
+ }
+
/**
* Given the name of the shard find the addresses of all it's peers.
*
* @param shardName the shard name
*/
- private Map<String, String> getPeerAddresses(final String shardName) {
+ Map<String, String> getPeerAddresses(final String shardName) {
final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
return getPeerAddresses(shardName, members);
}
@Override
public SupervisorStrategy supervisorStrategy() {
- return new OneForOneStrategy(10, Duration.create("1 minute"),
+ return new OneForOneStrategy(10, FiniteDuration.create(1, TimeUnit.MINUTES),
(Function<Throwable, Directive>) t -> {
LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
return SupervisorStrategy.resume();
+ "Possible causes - there was a problem replicating the data or shard leadership changed "
+ "while replicating the shard data", leaderPath, shardId.getShardName()));
case NO_LEADER:
- return createNoShardLeaderException(shardId);
+ return new NoShardLeaderException(shardId);
case NOT_SUPPORTED:
return new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
serverChange.getSimpleName(), shardId.getShardName()));
*/
protected AutoFindPrimaryFailureResponseHandler(final ActorRef targetActor, final String shardName,
final String persistenceId, final ActorRef shardManagerActor) {
- this.targetActor = Preconditions.checkNotNull(targetActor);
- this.shardName = Preconditions.checkNotNull(shardName);
- this.persistenceId = Preconditions.checkNotNull(persistenceId);
- this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
+ this.targetActor = requireNonNull(targetActor);
+ this.shardName = requireNonNull(shardName);
+ this.persistenceId = requireNonNull(persistenceId);
+ this.shardManagerActor = requireNonNull(shardManagerActor);
}
public ActorRef getTargetActor() {