import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
private DatastoreContext datastoreContext;
- private DataPersistenceProvider dataPersistenceProvider;
-
private SchemaContext schemaContext;
private int createSnapshotTransactionCounter;
* Coordinates persistence recovery on startup.
*/
private ShardRecoveryCoordinator recoveryCoordinator;
- private List<Object> currentLogRecoveryBatch;
private final DOMTransactionFactory transactionFactory;
this.name = name.toString();
this.datastoreContext = datastoreContext;
this.schemaContext = schemaContext;
- this.dataPersistenceProvider = (datastoreContext.isPersistent())
- ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
this.txnDispatcherPath = new Dispatchers(context().system().dispatchers())
.getDispatcherPath(Dispatchers.DispatcherType.Transaction);
+ setPersistence(datastoreContext.isPersistent());
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
+
+ recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG);
}
private void setTransactionCommitTimeout() {
setTransactionCommitTimeout();
- if(datastoreContext.isPersistent() &&
- dataPersistenceProvider instanceof NonPersistentRaftDataProvider) {
- dataPersistenceProvider = new PersistentDataProvider();
- } else if(!datastoreContext.isPersistent() &&
- dataPersistenceProvider instanceof PersistentDataProvider) {
- dataPersistenceProvider = new NonPersistentRaftDataProvider();
+ if(datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) {
+ setPersistence(true);
+ } else if(!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) {
+ setPersistence(false);
}
updateConfigParams(datastoreContext.getShardRaftConfig());
@Override
protected
void startLogRecoveryBatch(final int maxBatchSize) {
- currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
- }
+ recoveryCoordinator.startLogRecoveryBatch(maxBatchSize);
}
@Override
protected void appendRecoveredLogEntry(final Payload data) {
- if(data instanceof ModificationPayload) {
- try {
- currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
- } catch (ClassNotFoundException | IOException e) {
- LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
- }
- } else if (data instanceof CompositeModificationPayload) {
- currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
- } else if (data instanceof CompositeModificationByteStringPayload) {
- currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
- } else {
- LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data);
- }
+ recoveryCoordinator.appendRecoveredLogPayload(data);
}
@Override
protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
- if(recoveryCoordinator == null) {
- recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
- LOG, name.toString());
- }
-
- recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: submitted recovery sbapshot", persistenceId());
- }
+ recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes);
}
@Override
protected void applyCurrentLogRecoveryBatch() {
- if(recoveryCoordinator == null) {
- recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
- LOG, name.toString());
- }
-
- recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(),
- currentLogRecoveryBatch.size());
- }
+ recoveryCoordinator.applyCurrentLogRecoveryBatch();
}
@Override
protected void onRecoveryComplete() {
- if(recoveryCoordinator != null) {
- Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size());
- }
-
- for(DOMStoreWriteTransaction tx: txList) {
- try {
- syncCommitTransaction(tx);
- shardMBean.incrementCommittedTransactionCount();
- } catch (InterruptedException | ExecutionException e) {
- shardMBean.incrementFailedTransactionsCount();
- LOG.error("{}: Failed to commit", persistenceId(), e);
- }
- }
- }
-
recoveryCoordinator = null;
- currentLogRecoveryBatch = null;
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());
}
@Override
- protected DataPersistenceProvider persistence() {
- return dataPersistenceProvider;
- }
-
- @Override public String persistenceId() {
+ public String persistenceId() {
return this.name;
}
- @VisibleForTesting
- DataPersistenceProvider getDataPersistenceProvider() {
- return dataPersistenceProvider;
- }
-
@VisibleForTesting
ShardCommitCoordinator getCommitCoordinator() {
return commitCoordinator;