import akka.cluster.ClusterEvent;
import akka.japi.Creator;
import akka.japi.Function;
+
import com.google.common.base.Preconditions;
+
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import scala.concurrent.duration.Duration;
import java.util.ArrayList;
private ShardManagerInfoMBean mBean;
+ private final DatastoreContext datastoreContext;
+
/**
* @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
* configuration or operational
*/
- private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
+ private ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
+ DatastoreContext datastoreContext) {
this.type = Preconditions.checkNotNull(type, "type should not be null");
this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
+ this.datastoreContext = datastoreContext;
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
public static Props props(final String type,
final ClusterWrapper cluster,
- final Configuration configuration) {
+ final Configuration configuration,
+ final DatastoreContext datastoreContext) {
Preconditions.checkNotNull(type, "type should not be null");
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
- return Props.create(new Creator<ShardManager>() {
-
- @Override
- public ShardManager create() throws Exception {
- return new ShardManager(type, cluster, configuration);
- }
- });
+ return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext));
}
-
@Override
public void handleReceive(Object message) throws Exception {
if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
ActorRef actor = getContext()
- .actorOf(Shard.props(shardId, peerAddresses),
- shardId.toString());
+ .actorOf(Shard.props(shardId, peerAddresses, datastoreContext).
+ withMailbox(ActorContext.MAILBOX), shardId.toString());
+
localShardActorNames.add(shardId.toString());
localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
}
@Override
public SupervisorStrategy supervisorStrategy() {
+
return new OneForOneStrategy(10, Duration.create("1 minute"),
new Function<Throwable, SupervisorStrategy.Directive>() {
@Override
public SupervisorStrategy.Directive apply(Throwable t) {
+ StringBuilder sb = new StringBuilder();
+ for(StackTraceElement element : t.getStackTrace()) {
+ sb.append("\n\tat ")
+ .append(element.toString());
+ }
+ LOG.warning("Supervisor Strategy of resume applied {}",sb.toString());
return SupervisorStrategy.resume();
}
}
}
}
}
+
+ private static class ShardManagerCreator implements Creator<ShardManager> {
+ private static final long serialVersionUID = 1L;
+
+ final String type;
+ final ClusterWrapper cluster;
+ final Configuration configuration;
+ final DatastoreContext datastoreContext;
+
+ ShardManagerCreator(String type, ClusterWrapper cluster,
+ Configuration configuration, DatastoreContext datastoreContext) {
+ this.type = type;
+ this.cluster = cluster;
+ this.configuration = configuration;
+ this.datastoreContext = datastoreContext;
+ }
+
+ @Override
+ public ShardManager create() throws Exception {
+ return new ShardManager(type, cluster, configuration, datastoreContext);
+ }
+ }
}