import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers;
import org.opendaylight.controller.cluster.common.actor.MessageTracker;
import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
-import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
}
};
+ static final Object RESUME_NEXT_PENDING_TRANSACTION = new Object() {
+ @Override
+ public String toString() {
+ return "resumeNextPendingTransaction";
+ }
+ };
+
// FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant.
public static final String DEFAULT_NAME = "default";
maybeError.get());
}
+ store.resetTransactionBatch();
+
if (message instanceof RequestEnvelope) {
final long now = ticker().read();
final RequestEnvelope envelope = (RequestEnvelope)message;
persistPayload(txId, AbortTransactionPayload.create(txId), true);
} else if (message instanceof MakeLeaderLocal) {
onMakeLeaderLocal();
+ } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
+ store.resumeNextPendingTransaction();
} else {
super.handleNonRaftCommand(message);
}
private void handleConnectClient(final ConnectClientRequest message) {
try {
if (!isLeader() || !isLeaderActive()) {
- LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), message);
+ LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+ + "isLeadershipTransferInProgress: {}.",
+ persistenceId(), message, isLeader(), isLeaderActive(), isLeadershipTransferInProgress());
throw new NotLeaderException(getSelf());
}
throws RequestException {
// We are not the leader, hence we want to fail-fast.
if (!isLeader() || !isLeaderActive()) {
- LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope);
+ LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+ + "isLeadershipTransferInProgress: {}.",
+ persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress());
throw new NotLeaderException(getSelf());
}
private ShardIdentifier id;
private Map<String, String> peerAddresses = Collections.emptyMap();
private DatastoreContext datastoreContext;
- private SchemaContext schemaContext;
+ private SchemaContextProvider schemaContextProvider;
private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
private TipProducingDataTree dataTree;
private volatile boolean sealed;
return self();
}
- public T schemaContext(final SchemaContext newSchemaContext) {
+ public T schemaContextProvider(final SchemaContextProvider schemaContextProvider) {
checkSealed();
- this.schemaContext = newSchemaContext;
+ this.schemaContextProvider = Preconditions.checkNotNull(schemaContextProvider);
return self();
}
}
public SchemaContext getSchemaContext() {
- return schemaContext;
+ return Verify.verifyNotNull(schemaContextProvider.getSchemaContext());
}
public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
Preconditions.checkNotNull(id, "id should not be null");
Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
- Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+ Preconditions.checkNotNull(schemaContextProvider, "schemaContextProvider should not be null");
}
public Props props() {
Ticker ticker() {
return Ticker.systemTicker();
}
+
+ void scheduleNextPendingTransaction() {
+ self().tell(RESUME_NEXT_PENDING_TRANSACTION, ActorRef.noSender());
+ }
}