import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import akka.actor.Props;
-import akka.japi.Creator;
import akka.persistence.RecoveryFailure;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.io.IOException;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
- protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
- final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()),
- DataStoreVersions.CURRENT_VERSION);
- this.name = name.toString();
- this.datastoreContext = datastoreContext;
+ private ShardSnapshot restoreFromSnapshot;
+
+ private final ShardTransactionMessageRetrySupport messageRetrySupport;
+
+ protected Shard(AbstractBuilder<?, ?> builder) {
+ super(builder.getId().toString(), builder.getPeerAddresses(),
+ Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
+
+ this.name = builder.getId().toString();
+ this.datastoreContext = builder.getDatastoreContext();
+ this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
setPersistence(datastoreContext.isPersistent());
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
- store = new ShardDataTree(schemaContext);
+ store = new ShardDataTree(builder.getSchemaContext(), builder.getTreeType());
shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
datastoreContext.getDataStoreMXBeanType());
commitCoordinator = new ShardCommitCoordinator(store,
datastoreContext.getShardCommitQueueExpiryTimeoutInMillis(),
- datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
+ datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, this.name);
setTransactionCommitTimeout();
snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
-
+ messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
}
private void setTransactionCommitTimeout() {
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2;
}
- public static Props props(final ShardIdentifier name,
- final Map<String, String> peerAddresses,
- final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- Preconditions.checkNotNull(name, "name should not be null");
- Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
- Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
- Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
-
- return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
- }
-
private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
super.postStop();
+ messageRetrySupport.close();
+
if(txCommitTimeoutCheckSchedule != null) {
txCommitTimeoutCheckSchedule.cancel();
}
} else if (BatchedModifications.class.isInstance(message)) {
handleBatchedModifications((BatchedModifications)message);
} else if (message instanceof ForwardedReadyTransaction) {
- commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message,
- getSender(), this);
+ handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
} else if (message instanceof ReadyLocalTransaction) {
handleReadyLocalTransaction((ReadyLocalTransaction)message);
} else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
context().parent().tell(message, self());
} else if(GET_SHARD_MBEAN_MESSAGE.equals(message)){
sender().tell(getShardMBean(), self());
+ } else if(message instanceof GetShardDataTree) {
+ sender().tell(store.getDataTree(), self());
+ } else if(message instanceof ServerRemoved){
+ context().parent().forward(message, context());
+ } else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
+ messageRetrySupport.onTimerMessage(message);
} else {
super.onReceiveCommand(message);
}
leaderPayloadVersion);
}
- private void onDatastoreContext(DatastoreContext context) {
+ protected void onDatastoreContext(DatastoreContext context) {
datastoreContext = context;
commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity());
return ModificationType.UNMODIFIED.equals(candidate.getRootNode().getModificationType());
}
- void continueCommit(final CohortEntry cohortEntry) throws Exception {
+ void continueCommit(final CohortEntry cohortEntry) {
final DataTreeCandidate candidate = cohortEntry.getCandidate();
// If we do not have any followers and we are not using persistence
applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
} else {
Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
- DataTreeCandidatePayload.create(candidate));
+ DataTreeCandidatePayload.create(candidate));
}
}
commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
}
- private void noLeaderError(String errMessage, Object message) {
- // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
- // it more resilient in case we're in the process of electing a new leader.
- getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf());
+ protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
+ try {
+ commitCoordinator.handleBatchedModifications(batched, sender, this);
+ } catch (Exception e) {
+ LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
+ batched.getTransactionID(), e);
+ sender.tell(new akka.actor.Status.Failure(e), getSelf());
+ }
}
private void handleBatchedModifications(BatchedModifications batched) {
if(isLeader()) {
failIfIsolatedLeader(getSender());
- try {
- commitCoordinator.handleBatchedModifications(batched, getSender(), this);
- } catch (Exception e) {
- LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
- batched.getTransactionID(), e);
- getSender().tell(new akka.actor.Status.Failure(e), getSelf());
- }
+ handleBatchedModificationsLocal(batched, getSender());
} else {
ActorSelection leader = getLeader();
if(leader != null) {
LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
leader.forward(batched, getContext());
} else {
- noLeaderError("Could not commit transaction " + batched.getTransactionID(), batched);
+ messageRetrySupport.addMessageToRetry(batched, getSender(),
+ "Could not commit transaction " + batched.getTransactionID());
}
}
}
private boolean failIfIsolatedLeader(ActorRef sender) {
- if(getRaftState() == RaftState.IsolatedLeader) {
+ if(isIsolatedLeader()) {
sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
"Shard %s was the leader but has lost contact with all of its followers. Either all" +
" other follower nodes are down or this node is isolated by a network partition.",
return false;
}
+ protected boolean isIsolatedLeader() {
+ return getRaftState() == RaftState.IsolatedLeader;
+ }
+
private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
if (isLeader()) {
failIfIsolatedLeader(getSender());
message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
leader.forward(message, getContext());
} else {
- noLeaderError("Could not commit transaction " + message.getTransactionID(), message);
+ messageRetrySupport.addMessageToRetry(message, getSender(),
+ "Could not commit transaction " + message.getTransactionID());
+ }
+ }
+ }
+
+ private void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReady) {
+ if (isLeader()) {
+ failIfIsolatedLeader(getSender());
+
+ commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
+ } else {
+ ActorSelection leader = getLeader();
+ if (leader != null) {
+ LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
+
+ ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionID(),
+ forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit());
+ readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
+ leader.forward(readyLocal, getContext());
+ } else {
+ messageRetrySupport.addMessageToRetry(forwardedReady, getSender(),
+ "Could not commit transaction " + forwardedReady.getTransactionID());
}
}
}
@Override
@Nonnull
protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
- return new ShardRecoveryCoordinator(store, store.getSchemaContext(), persistenceId(), LOG);
+ return new ShardRecoveryCoordinator(store, store.getSchemaContext(),
+ restoreFromSnapshot != null ? restoreFromSnapshot.getSnapshot() : null, persistenceId(), LOG);
}
@Override
protected void onRecoveryComplete() {
+ restoreFromSnapshot = null;
+
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());
@Override
protected void onLeaderChanged(String oldLeader, String newLeader) {
shardMBean.incrementLeadershipChangeCount();
+
+ if(hasLeader()) {
+ messageRetrySupport.retryMessages();
+ }
}
@Override
return commitCoordinator;
}
+ public DatastoreContext getDatastoreContext() {
+ return datastoreContext;
+ }
- private static class ShardCreator implements Creator<Shard> {
+ @VisibleForTesting
+ public ShardDataTree getDataStore() {
+ return store;
+ }
- private static final long serialVersionUID = 1L;
+ @VisibleForTesting
+ ShardStats getShardMBean() {
+ return shardMBean;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static abstract class AbstractBuilder<T extends AbstractBuilder<T, S>, S extends Shard> {
+ private final Class<S> shardClass;
+ private ShardIdentifier id;
+ private Map<String, String> peerAddresses = Collections.emptyMap();
+ private DatastoreContext datastoreContext;
+ private SchemaContext schemaContext;
+ private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
+ private volatile boolean sealed;
+
+ protected AbstractBuilder(Class<S> shardClass) {
+ this.shardClass = shardClass;
+ }
+
+ protected void checkSealed() {
+ Preconditions.checkState(!sealed, "Builder isalready sealed - further modifications are not allowed");
+ }
+
+ @SuppressWarnings("unchecked")
+ private T self() {
+ return (T) this;
+ }
- final ShardIdentifier name;
- final Map<String, String> peerAddresses;
- final DatastoreContext datastoreContext;
- final SchemaContext schemaContext;
+ public T id(ShardIdentifier id) {
+ checkSealed();
+ this.id = id;
+ return self();
+ }
- ShardCreator(final ShardIdentifier name, final Map<String, String> peerAddresses,
- final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- this.name = name;
+ public T peerAddresses(Map<String, String> peerAddresses) {
+ checkSealed();
this.peerAddresses = peerAddresses;
+ return self();
+ }
+
+ public T datastoreContext(DatastoreContext datastoreContext) {
+ checkSealed();
this.datastoreContext = datastoreContext;
+ return self();
+ }
+
+ public T schemaContext(SchemaContext schemaContext) {
+ checkSealed();
this.schemaContext = schemaContext;
+ return self();
}
- @Override
- public Shard create() throws Exception {
- return new Shard(name, peerAddresses, datastoreContext, schemaContext);
+ public T restoreFromSnapshot(DatastoreSnapshot.ShardSnapshot restoreFromSnapshot) {
+ checkSealed();
+ this.restoreFromSnapshot = restoreFromSnapshot;
+ return self();
}
- }
- @VisibleForTesting
- public ShardDataTree getDataStore() {
- return store;
+ public ShardIdentifier getId() {
+ return id;
+ }
+
+ public Map<String, String> getPeerAddresses() {
+ return peerAddresses;
+ }
+
+ public DatastoreContext getDatastoreContext() {
+ return datastoreContext;
+ }
+
+ public SchemaContext getSchemaContext() {
+ return schemaContext;
+ }
+
+ public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
+ return restoreFromSnapshot;
+ }
+
+ public TreeType getTreeType() {
+ switch (datastoreContext.getLogicalStoreType()) {
+ case CONFIGURATION:
+ return TreeType.CONFIGURATION;
+ case OPERATIONAL:
+ return TreeType.OPERATIONAL;
+ }
+
+ throw new IllegalStateException("Unhandled logical store type " + datastoreContext.getLogicalStoreType());
+ }
+
+ protected void verify() {
+ Preconditions.checkNotNull(id, "id should not be null");
+ Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
+ Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
+ Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+ }
+
+ public Props props() {
+ sealed = true;
+ verify();
+ return Props.create(shardClass, this);
+ }
}
- @VisibleForTesting
- ShardStats getShardMBean() {
- return shardMBean;
+ public static class Builder extends AbstractBuilder<Builder, Shard> {
+ private Builder() {
+ super(Shard.class);
+ }
}
}