import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import akka.actor.Props;
-import akka.japi.Creator;
import akka.persistence.RecoveryFailure;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.io.IOException;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
- protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
- final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()),
- DataStoreVersions.CURRENT_VERSION);
- this.name = name.toString();
- this.datastoreContext = datastoreContext;
+ private ShardSnapshot restoreFromSnapshot;
+
+
+
+ protected Shard(AbstractBuilder<?, ?> builder) {
+ super(builder.getId().toString(), builder.getPeerAddresses(),
+ Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
+
+ this.name = builder.getId().toString();
+ this.datastoreContext = builder.getDatastoreContext();
+ this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
setPersistence(datastoreContext.isPersistent());
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
- store = new ShardDataTree(schemaContext);
+ // FIXME: BUG-1014: pass down the proper TreeType
+ store = new ShardDataTree(builder.getSchemaContext());
shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
datastoreContext.getDataStoreMXBeanType());
commitCoordinator = new ShardCommitCoordinator(store,
datastoreContext.getShardCommitQueueExpiryTimeoutInMillis(),
- datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
+ datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, this.name);
setTransactionCommitTimeout();
snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
+
}
private void setTransactionCommitTimeout() {
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2;
}
- public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
- final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
- }
-
private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
context().parent().tell(message, self());
} else if(GET_SHARD_MBEAN_MESSAGE.equals(message)){
sender().tell(getShardMBean(), self());
+ } else if(message instanceof GetShardDataTree) {
+ sender().tell(store.getDataTree(), self());
+ } else if(message instanceof ServerRemoved){
+ context().parent().forward(message, context());
} else {
super.onReceiveCommand(message);
}
return ModificationType.UNMODIFIED.equals(candidate.getRootNode().getModificationType());
}
- void continueCommit(final CohortEntry cohortEntry) throws Exception {
+ void continueCommit(final CohortEntry cohortEntry) {
final DataTreeCandidate candidate = cohortEntry.getCandidate();
// If we do not have any followers and we are not using persistence
applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
} else {
Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
- DataTreeCandidatePayload.create(candidate));
+ DataTreeCandidatePayload.create(candidate));
}
}
@Override
@Nonnull
protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
- return new ShardRecoveryCoordinator(store, store.getSchemaContext(), persistenceId(), LOG);
+ return new ShardRecoveryCoordinator(store, store.getSchemaContext(),
+ restoreFromSnapshot != null ? restoreFromSnapshot.getSnapshot() : null, persistenceId(), LOG);
}
@Override
protected void onRecoveryComplete() {
+ restoreFromSnapshot = null;
+
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());
return commitCoordinator;
}
- protected DatastoreContext getDatastoreContext() {
+ public DatastoreContext getDatastoreContext() {
return datastoreContext;
}
- protected abstract static class AbstractShardCreator implements Creator<Shard> {
- private static final long serialVersionUID = 1L;
+ @VisibleForTesting
+ public ShardDataTree getDataStore() {
+ return store;
+ }
- protected final ShardIdentifier name;
- protected final Map<String, String> peerAddresses;
- protected final DatastoreContext datastoreContext;
- protected final SchemaContext schemaContext;
+ @VisibleForTesting
+ ShardStats getShardMBean() {
+ return shardMBean;
+ }
- protected AbstractShardCreator(final ShardIdentifier name, final Map<String, String> peerAddresses,
- final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- this.name = Preconditions.checkNotNull(name, "name should not be null");
- this.peerAddresses = Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
- this.datastoreContext = Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
- this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
- }
+ public static Builder builder() {
+ return new Builder();
}
- private static class ShardCreator extends AbstractShardCreator {
- private static final long serialVersionUID = 1L;
+ public static abstract class AbstractBuilder<T extends AbstractBuilder<T, S>, S extends Shard> {
+ private final Class<S> shardClass;
+ private ShardIdentifier id;
+ private Map<String, String> peerAddresses = Collections.emptyMap();
+ private DatastoreContext datastoreContext;
+ private SchemaContext schemaContext;
+ private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
+ private volatile boolean sealed;
- ShardCreator(final ShardIdentifier name, final Map<String, String> peerAddresses,
- final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- super(name, peerAddresses, datastoreContext, schemaContext);
+ protected AbstractBuilder(Class<S> shardClass) {
+ this.shardClass = shardClass;
}
- @Override
- public Shard create() throws Exception {
- return new Shard(name, peerAddresses, datastoreContext, schemaContext);
+ protected void checkSealed() {
+ Preconditions.checkState(!sealed, "Builder isalready sealed - further modifications are not allowed");
}
- }
- @VisibleForTesting
- public ShardDataTree getDataStore() {
- return store;
+ @SuppressWarnings("unchecked")
+ private T self() {
+ return (T) this;
+ }
+
+ public T id(ShardIdentifier id) {
+ checkSealed();
+ this.id = id;
+ return self();
+ }
+
+ public T peerAddresses(Map<String, String> peerAddresses) {
+ checkSealed();
+ this.peerAddresses = peerAddresses;
+ return self();
+ }
+
+ public T datastoreContext(DatastoreContext datastoreContext) {
+ checkSealed();
+ this.datastoreContext = datastoreContext;
+ return self();
+ }
+
+ public T schemaContext(SchemaContext schemaContext) {
+ checkSealed();
+ this.schemaContext = schemaContext;
+ return self();
+ }
+
+ public T restoreFromSnapshot(DatastoreSnapshot.ShardSnapshot restoreFromSnapshot) {
+ checkSealed();
+ this.restoreFromSnapshot = restoreFromSnapshot;
+ return self();
+ }
+
+ public ShardIdentifier getId() {
+ return id;
+ }
+
+ public Map<String, String> getPeerAddresses() {
+ return peerAddresses;
+ }
+
+ public DatastoreContext getDatastoreContext() {
+ return datastoreContext;
+ }
+
+ public SchemaContext getSchemaContext() {
+ return schemaContext;
+ }
+
+ public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
+ return restoreFromSnapshot;
+ }
+
+ protected void verify() {
+ Preconditions.checkNotNull(id, "id should not be null");
+ Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
+ Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
+ Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+ }
+
+ public Props props() {
+ sealed = true;
+ verify();
+ return Props.create(shardClass, this);
+ }
}
- @VisibleForTesting
- ShardStats getShardMBean() {
- return shardMBean;
+ public static class Builder extends AbstractBuilder<Builder, Shard> {
+ private Builder() {
+ super(Shard.class);
+ }
}
}