import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import akka.actor.Props;
-import akka.japi.Creator;
import akka.persistence.RecoveryFailure;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.io.IOException;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
- protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
- final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()),
- DataStoreVersions.CURRENT_VERSION);
+ protected Shard(AbstractBuilder<?, ?> builder) {
+ super(builder.getId().toString(), builder.getPeerAddresses(),
+ Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
- this.name = name.toString();
- this.datastoreContext = datastoreContext;
+ this.name = builder.getId().toString();
+ this.datastoreContext = builder.getDatastoreContext();
setPersistence(datastoreContext.isPersistent());
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
- store = new ShardDataTree(schemaContext);
+ store = new ShardDataTree(builder.getSchemaContext());
shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
datastoreContext.getDataStoreMXBeanType());
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2;
}
- public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
- final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
- }
-
private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
return datastoreContext;
}
- protected abstract static class AbstractShardCreator implements Creator<Shard> {
- private static final long serialVersionUID = 1L;
+ @VisibleForTesting
+ public ShardDataTree getDataStore() {
+ return store;
+ }
+
+ @VisibleForTesting
+ ShardStats getShardMBean() {
+ return shardMBean;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
- protected final ShardIdentifier name;
- protected final Map<String, String> peerAddresses;
- protected final DatastoreContext datastoreContext;
- protected final SchemaContext schemaContext;
+ public static abstract class AbstractBuilder<T extends AbstractBuilder<T, S>, S extends Shard> {
+ private final Class<S> shardClass;
+ private ShardIdentifier id;
+ private Map<String, String> peerAddresses = Collections.emptyMap();
+ private DatastoreContext datastoreContext;
+ private SchemaContext schemaContext;
+ private volatile boolean sealed;
- protected AbstractShardCreator(final ShardIdentifier name, final Map<String, String> peerAddresses,
- final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- this.name = Preconditions.checkNotNull(name, "name should not be null");
- this.peerAddresses = Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
- this.datastoreContext = Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
- this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+ protected AbstractBuilder(Class<S> shardClass) {
+ this.shardClass = shardClass;
}
- }
- private static class ShardCreator extends AbstractShardCreator {
- private static final long serialVersionUID = 1L;
+ protected void checkSealed() {
+ Preconditions.checkState(!sealed, "Builder isalready sealed - further modifications are not allowed");
+ }
- ShardCreator(final ShardIdentifier name, final Map<String, String> peerAddresses,
- final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- super(name, peerAddresses, datastoreContext, schemaContext);
+ @SuppressWarnings("unchecked")
+ private T self() {
+ return (T) this;
}
- @Override
- public Shard create() throws Exception {
- return new Shard(name, peerAddresses, datastoreContext, schemaContext);
+ public T id(ShardIdentifier id) {
+ checkSealed();
+ this.id = id;
+ return self();
}
- }
- @VisibleForTesting
- public ShardDataTree getDataStore() {
- return store;
+ public T peerAddresses(Map<String, String> peerAddresses) {
+ checkSealed();
+ this.peerAddresses = peerAddresses;
+ return self();
+ }
+
+ public T datastoreContext(DatastoreContext datastoreContext) {
+ checkSealed();
+ this.datastoreContext = datastoreContext;
+ return self();
+ }
+
+ public T schemaContext(SchemaContext schemaContext) {
+ checkSealed();
+ this.schemaContext = schemaContext;
+ return self();
+ }
+
+ public ShardIdentifier getId() {
+ return id;
+ }
+
+ public Map<String, String> getPeerAddresses() {
+ return peerAddresses;
+ }
+
+ public DatastoreContext getDatastoreContext() {
+ return datastoreContext;
+ }
+
+ public SchemaContext getSchemaContext() {
+ return schemaContext;
+ }
+
+ protected void verify() {
+ Preconditions.checkNotNull(id, "id should not be null");
+ Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
+ Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
+ Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+ }
+
+ public Props props() {
+ sealed = true;
+ verify();
+ return Props.create(shardClass, this);
+ }
}
- @VisibleForTesting
- ShardStats getShardMBean() {
- return shardMBean;
+ public static class Builder extends AbstractBuilder<Builder, Shard> {
+ private Builder() {
+ super(Shard.class);
+ }
}
}