import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.cluster.raft.ConfigParams;
-import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
/**
* A Shard represents a portion of the logical data tree <br/>
*/
public class Shard extends RaftActor {
- private static final ConfigParams configParams = new ShardConfigParams();
-
public static final String DEFAULT_NAME = "default";
// The state of this Shard
private ActorRef createSnapshotTransaction;
+ /**
+ * Coordinates persistence recovery on startup.
+ */
+ private ShardRecoveryCoordinator recoveryCoordinator;
+ private List<Object> currentLogRecoveryBatch;
+
private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
- private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
+ protected Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
DatastoreContext datastoreContext, SchemaContext schemaContext) {
- super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
+ super(name.toString(), mapPeerAddresses(peerAddresses),
+ Optional.of(datastoreContext.getShardRaftConfig()));
this.name = name;
this.datastoreContext = datastoreContext;
DOMStoreThreePhaseCommitCohort cohort =
modificationToCohort.remove(serialized);
if (cohort == null) {
-
- if(LOG.isDebugEnabled()) {
- LOG.debug(
- "Could not find cohort for modification : {}. Writing modification using a new transaction",
- modification);
- }
-
- DOMStoreWriteTransaction transaction =
- store.newWriteOnlyTransaction();
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("Created new transaction {}", transaction.getIdentifier().toString());
- }
-
- modification.apply(transaction);
- try {
- syncCommitTransaction(transaction);
- } catch (InterruptedException | ExecutionException e) {
- shardMBean.incrementFailedTransactionsCount();
- LOG.error("Failed to commit", e);
- return;
- }
- //we want to just apply the recovery commit and return
- shardMBean.incrementCommittedTransactionCount();
+ // If there's no cached cohort then we must be applying replicated state.
+ commitWithNewTransaction(serialized);
return;
}
-
- if(sender == null){
+ if(sender == null) {
LOG.error("Commit failed. Sender cannot be null");
return;
}
}
+ private void commitWithNewTransaction(Object modification) {
+ DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
+ MutableCompositeModification.fromSerializable(modification, schemaContext).apply(tx);
+ try {
+ syncCommitTransaction(tx);
+ shardMBean.incrementCommittedTransactionCount();
+ } catch (InterruptedException | ExecutionException e) {
+ shardMBean.incrementFailedTransactionsCount();
+ LOG.error(e, "Failed to commit");
+ }
+ }
+
private void handleForwardedCommit(ForwardedCommitTransaction message) {
Object serializedModification =
message.getModification().toSerializable();
return config.isMetricCaptureEnabled();
}
- @Override protected void applyState(ActorRef clientActor, String identifier,
- Object data) {
+ @Override
+ protected
+ void startLogRecoveryBatch(int maxBatchSize) {
+ currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
+ }
+ }
+
+ @Override
+ protected void appendRecoveredLogEntry(Payload data) {
+ if (data instanceof CompositeModificationPayload) {
+ currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
+ } else {
+ LOG.error("Unknown state received {} during recovery", data);
+ }
+ }
+
+ @Override
+ protected void applyRecoverySnapshot(ByteString snapshot) {
+ if(recoveryCoordinator == null) {
+ recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+ }
+
+ recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{} : submitted recovery sbapshot", persistenceId());
+ }
+ }
+
+ @Override
+ protected void applyCurrentLogRecoveryBatch() {
+ if(recoveryCoordinator == null) {
+ recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+ }
+
+ recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(),
+ currentLogRecoveryBatch.size());
+ }
+ }
+
+ @Override
+ protected void onRecoveryComplete() {
+ if(recoveryCoordinator != null) {
+ Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size());
+ }
+
+ for(DOMStoreWriteTransaction tx: txList) {
+ try {
+ syncCommitTransaction(tx);
+ shardMBean.incrementCommittedTransactionCount();
+ } catch (InterruptedException | ExecutionException e) {
+ shardMBean.incrementFailedTransactionsCount();
+ LOG.error(e, "Failed to commit");
+ }
+ }
+ }
+
+ recoveryCoordinator = null;
+ currentLogRecoveryBatch = null;
+ updateJournalStats();
+ }
+
+ @Override
+ protected void applyState(ActorRef clientActor, String identifier, Object data) {
if (data instanceof CompositeModificationPayload) {
- Object modification =
- ((CompositeModificationPayload) data).getModification();
+ Object modification = ((CompositeModificationPayload) data).getModification();
if (modification != null) {
commit(clientActor, modification);
} else {
LOG.error(
"modification is null - this is very unexpected, clientActor = {}, identifier = {}",
- identifier, clientActor.path().toString());
+ identifier, clientActor != null ? clientActor.path().toString() : null);
}
} else {
- LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}", data, data.getClass().getClassLoader(), CompositeModificationPayload.class.getClassLoader());
+ LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
+ data, data.getClass().getClassLoader(),
+ CompositeModificationPayload.class.getClassLoader());
}
- // Update stats
+ updateJournalStats();
+
+ }
+
+ private void updateJournalStats() {
ReplicatedLogEntry lastLogEntry = getLastLogEntry();
if (lastLogEntry != null) {
shardMBean.setCommitIndex(getCommitIndex());
shardMBean.setLastApplied(getLastApplied());
-
}
- @Override protected void createSnapshot() {
+ @Override
+ protected void createSnapshot() {
if (createSnapshotTransaction == null) {
// Create a transaction. We are really going to treat the transaction as a worker
}
}
- @VisibleForTesting @Override protected void applySnapshot(ByteString snapshot) {
+ @VisibleForTesting
+ @Override
+ protected void applySnapshot(ByteString snapshot) {
// Since this will be done only on Recovery or when this actor is a Follower
// we can safely commit everything in here. We not need to worry about event notifications
// as they would have already been disabled on the follower
return this.name.toString();
}
-
- private static class ShardConfigParams extends DefaultConfigParamsImpl {
- public static final FiniteDuration HEART_BEAT_INTERVAL =
- new FiniteDuration(500, TimeUnit.MILLISECONDS);
-
- @Override public FiniteDuration getHeartBeatInterval() {
- return HEART_BEAT_INTERVAL;
- }
- }
-
private static class ShardCreator implements Creator<Shard> {
private static final long serialVersionUID = 1L;
}
}
- @VisibleForTesting NormalizedNode readStore() throws ExecutionException, InterruptedException {
+ @VisibleForTesting
+ NormalizedNode<?,?> readStore(YangInstanceIdentifier id)
+ throws ExecutionException, InterruptedException {
DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
- transaction.read(YangInstanceIdentifier.builder().build());
+ transaction.read(id);
- NormalizedNode<?, ?> node = future.get().get();
+ Optional<NormalizedNode<?, ?>> optional = future.get();
+ NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
transaction.close();
return node;
}
- @VisibleForTesting void writeToStore(YangInstanceIdentifier id, NormalizedNode node)
+ @VisibleForTesting
+ void writeToStore(YangInstanceIdentifier id, NormalizedNode<?,?> node)
throws ExecutionException, InterruptedException {
DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
syncCommitTransaction(transaction);
}
+ @VisibleForTesting
+ ShardStats getShardMBean() {
+ return shardMBean;
+ }
}