*/
package org.opendaylight.controller.cluster.datastore;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeXMLOutput;
+import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
/**
* committed to the data store in the order the corresponding snapshot or log batch are received
* to preserve data store integrity.
*
- * @author Thomas Panetelis
+ * @author Thomas Pantelis
*/
-class ShardRecoveryCoordinator {
-
- private static final int TIME_OUT = 10;
-
- private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
- private final SchemaContext schemaContext;
- private final String shardName;
- private final ExecutorService executor;
- private final Logger log;
- private final String name;
-
- ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, Logger log,
- String name) {
- this.schemaContext = schemaContext;
- this.shardName = shardName;
- this.log = log;
- this.name = name;
-
- executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
- new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("ShardRecovery-" + shardName + "-%d").build());
- }
-
- /**
- * Submits a batch of journal log entries.
- *
- * @param logEntries the serialized journal log entries
- * @param resultingTx the write Tx to which to apply the entries
- */
- void submit(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
- LogRecoveryTask task = new LogRecoveryTask(logEntries, resultingTx);
- resultingTxList.add(resultingTx);
- executor.execute(task);
- }
+abstract class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
+ private static final class Simple extends ShardRecoveryCoordinator {
+ Simple(final ShardDataTree store, final String shardName, final Logger log) {
+ super(store, shardName, log);
+ }
- /**
- * Submits a snapshot.
- *
- * @param snapshotBytes the serialized snapshot
- * @param resultingTx the write Tx to which to apply the entries
- */
- void submit(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
- SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshotBytes, resultingTx);
- resultingTxList.add(resultingTx);
- executor.execute(task);
+ @Override
+ public Snapshot getRestoreFromSnapshot() {
+ return null;
+ }
}
- Collection<DOMStoreWriteTransaction> getTransactions() {
- // Shutdown the executor and wait for task completion.
- executor.shutdown();
+ private static final class WithSnapshot extends ShardRecoveryCoordinator {
+ private final Snapshot restoreFromSnapshot;
- try {
- if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES)) {
- return resultingTxList;
- } else {
- log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ WithSnapshot(final ShardDataTree store, final String shardName, final Logger log, final Snapshot snapshot) {
+ super(store, shardName, log);
+ this.restoreFromSnapshot = Preconditions.checkNotNull(snapshot);
}
- return Collections.emptyList();
+ @Override
+ public Snapshot getRestoreFromSnapshot() {
+ return restoreFromSnapshot;
+ }
}
- private static abstract class ShardRecoveryTask implements Runnable {
+ private final ShardDataTree store;
+ private final String shardName;
+ private final Logger log;
- final DOMStoreWriteTransaction resultingTx;
+ private boolean open;
- ShardRecoveryTask(DOMStoreWriteTransaction resultingTx) {
- this.resultingTx = resultingTx;
- }
+ ShardRecoveryCoordinator(final ShardDataTree store, final String shardName, final Logger log) {
+ this.store = Preconditions.checkNotNull(store);
+ this.shardName = Preconditions.checkNotNull(shardName);
+ this.log = Preconditions.checkNotNull(log);
}
- private class LogRecoveryTask extends ShardRecoveryTask {
+ static ShardRecoveryCoordinator create(final ShardDataTree store, final String shardName, final Logger log) {
+ return new Simple(store, shardName, log);
+ }
- private final List<Object> logEntries;
+ static ShardRecoveryCoordinator forSnapshot(final ShardDataTree store, final String shardName, final Logger log,
+ final Snapshot snapshot) {
+ return new WithSnapshot(store, shardName, log, snapshot);
+ }
- LogRecoveryTask(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
- super(resultingTx);
- this.logEntries = logEntries;
- }
+ @Override
+ public void startLogRecoveryBatch(final int maxBatchSize) {
+ log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
+ open = true;
+ }
- @Override
- public void run() {
- for(int i = 0; i < logEntries.size(); i++) {
- MutableCompositeModification.fromSerializable(
- logEntries.get(i)).apply(resultingTx);
- // Null out to GC quicker.
- logEntries.set(i, null);
- }
+ @Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void appendRecoveredLogEntry(final Payload payload) {
+ Preconditions.checkState(open, "call startLogRecovery before calling appendRecoveredLogEntry");
+
+ try {
+ store.applyRecoveryPayload(payload);
+ } catch (Exception e) {
+ log.error("{}: failed to apply payload {}", shardName, payload, e);
+ throw new IllegalStateException(String.format("%s: Failed to apply recovery payload %s",
+ shardName, payload), e);
}
}
- private class SnapshotRecoveryTask extends ShardRecoveryTask {
+ /**
+ * Applies the current batched log entries to the data store.
+ */
+ @Override
+ public void applyCurrentLogRecoveryBatch() {
+ Preconditions.checkState(open, "call startLogRecovery before calling applyCurrentLogRecoveryBatch");
+ open = false;
+ }
- private final byte[] snapshotBytes;
+ private File writeRoot(final String kind, final NormalizedNode<?, ?> node) {
+ final File file = new File(System.getProperty("karaf.data", "."),
+ "failed-recovery-" + kind + "-" + shardName + ".xml");
+ NormalizedNodeXMLOutput.toFile(file, node);
+ return file;
+ }
- SnapshotRecoveryTask(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
- super(resultingTx);
- this.snapshotBytes = snapshotBytes;
+ /**
+ * Applies a recovered snapshot to the data store.
+ *
+ * @param snapshotState the serialized snapshot
+ */
+ @Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void applyRecoverySnapshot(final Snapshot.State snapshotState) {
+ if (!(snapshotState instanceof ShardSnapshotState)) {
+ log.debug("{}: applyRecoverySnapshot ignoring snapshot: {}", shardName, snapshotState);
}
- @Override
- public void run() {
- NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
-
- // delete everything first
- resultingTx.delete(YangInstanceIdentifier.builder().build());
+ log.debug("{}: Applying recovered snapshot", shardName);
- // Add everything from the remote node back
- resultingTx.write(YangInstanceIdentifier.builder().build(), node);
+ ShardDataTreeSnapshot shardSnapshot = ((ShardSnapshotState)snapshotState).getSnapshot();
+ try {
+ store.applyRecoverySnapshot(shardSnapshot);
+ } catch (Exception e) {
+ final File f = writeRoot("snapshot", shardSnapshot.getRootNode().orElse(null));
+ throw new IllegalStateException(String.format(
+ "%s: Failed to apply recovery snapshot %s. Node data was written to file %s",
+ shardName, shardSnapshot, f), e);
}
}
}