Fix Javadoc
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardRecoveryCoordinator.java
index 9457456205aae6e21338520a99f412c6bedef964..a916ea6dead760219d536e0061ac01c35c99a879 100644 (file)
@@ -7,27 +7,19 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
-import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import java.io.File;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeXMLOutput;
+import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
@@ -36,122 +28,99 @@ import com.google.protobuf.InvalidProtocolBufferException;
  * committed to the data store in the order the corresponding snapshot or log batch are received
  * to preserve data store integrity.
  *
- * @author Thomas Panetelis
+ * @author Thomas Pantelis
  */
-class ShardRecoveryCoordinator {
-
-    private static final int TIME_OUT = 10;
-
-    private static final Logger LOG = LoggerFactory.getLogger(ShardRecoveryCoordinator.class);
-
-    private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
-    private final SchemaContext schemaContext;
+class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
+    private final ShardDataTree store;
     private final String shardName;
-    private final ExecutorService executor;
+    private final Logger log;
+    private final Snapshot restoreFromSnapshot;
 
-    ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext) {
-        this.schemaContext = schemaContext;
-        this.shardName = shardName;
+    private boolean open;
 
-        executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
-                new ThreadFactoryBuilder().setDaemon(true)
-                        .setNameFormat("ShardRecovery-" + shardName + "-%d").build());
-    }
+    ShardRecoveryCoordinator(final ShardDataTree store,  final Snapshot restoreFromSnapshot, final String shardName,
+            final Logger log) {
+        this.store = Preconditions.checkNotNull(store);
+        this.shardName = Preconditions.checkNotNull(shardName);
+        this.log = Preconditions.checkNotNull(log);
 
-    /**
-     * Submits a batch of journal log entries.
-     *
-     * @param logEntries the serialized journal log entries
-     * @param resultingTx the write Tx to which to apply the entries
-     */
-    void submit(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
-        LogRecoveryTask task = new LogRecoveryTask(logEntries, resultingTx);
-        resultingTxList.add(resultingTx);
-        executor.execute(task);
+        this.restoreFromSnapshot = restoreFromSnapshot;
     }
 
-    /**
-     * Submits a snapshot.
-     *
-     * @param snapshot the serialized snapshot
-     * @param resultingTx the write Tx to which to apply the entries
-     */
-    void submit(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
-        SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshot, resultingTx);
-        resultingTxList.add(resultingTx);
-        executor.execute(task);
+    @Override
+    public void startLogRecoveryBatch(final int maxBatchSize) {
+        log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
+        open = true;
     }
 
-    Collection<DOMStoreWriteTransaction> getTransactions() {
-        // Shutdown the executor and wait for task completion.
-        executor.shutdown();
+    @Override
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public void appendRecoveredLogEntry(final Payload payload) {
+        Preconditions.checkState(open, "call startLogRecovery before calling appendRecoveredLogEntry");
 
         try {
-            if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES))  {
-                return resultingTxList;
-            } else {
-                LOG.error("Recovery for shard {} timed out after {} minutes", shardName, TIME_OUT);
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
+            store.applyRecoveryPayload(payload);
+        } catch (Exception e) {
+            log.error("{}: failed to apply payload {}", shardName, payload, e);
+            throw new IllegalStateException(String.format("%s: Failed to apply recovery payload %s",
+                shardName, payload), e);
         }
-
-        return Collections.emptyList();
     }
 
-    private static abstract class ShardRecoveryTask implements Runnable {
-
-        final DOMStoreWriteTransaction resultingTx;
-
-        ShardRecoveryTask(DOMStoreWriteTransaction resultingTx) {
-            this.resultingTx = resultingTx;
-        }
+    /**
+     * Applies the current batched log entries to the data store.
+     */
+    @Override
+    public void applyCurrentLogRecoveryBatch() {
+        Preconditions.checkState(open, "call startLogRecovery before calling applyCurrentLogRecoveryBatch");
+        open = false;
     }
 
-    private class LogRecoveryTask extends ShardRecoveryTask {
-
-        private final List<Object> logEntries;
-
-        LogRecoveryTask(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
-            super(resultingTx);
-            this.logEntries = logEntries;
-        }
-
-        @Override
-        public void run() {
-            for(int i = 0; i < logEntries.size(); i++) {
-                MutableCompositeModification.fromSerializable(
-                        logEntries.get(i)).apply(resultingTx);
-                // Null out to GC quicker.
-                logEntries.set(i, null);
-            }
-        }
+    private File writeRoot(final String kind, final NormalizedNode<?, ?> node) {
+        final File file = new File(System.getProperty("karaf.data", "."),
+            "failed-recovery-" + kind + "-" + shardName + ".xml");
+        NormalizedNodeXMLOutput.toFile(file, node);
+        return file;
     }
 
-    private class SnapshotRecoveryTask extends ShardRecoveryTask {
+    /**
+     * Applies a recovered snapshot to the data store.
+     *
+     * @param snapshotState the serialized snapshot
+     */
+    @Override
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public void applyRecoverySnapshot(final Snapshot.State snapshotState) {
+        if (!(snapshotState instanceof ShardSnapshotState)) {
+            log.debug("{}: applyRecoverySnapshot ignoring snapshot: {}", snapshotState);
+        }
 
-        private final ByteString snapshot;
+        log.debug("{}: Applying recovered snapshot", shardName);
 
-        SnapshotRecoveryTask(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
-            super(resultingTx);
-            this.snapshot = snapshot;
+        ShardDataTreeSnapshot shardSnapshot = ((ShardSnapshotState)snapshotState).getSnapshot();
+        try {
+            store.applyRecoverySnapshot(shardSnapshot);
+        } catch (Exception e) {
+            final File f = writeRoot("snapshot", shardSnapshot.getRootNode().orElse(null));
+            throw new IllegalStateException(String.format(
+                    "%s: Failed to apply recovery snapshot %s. Node data was written to file %s",
+                    shardName, shardSnapshot, f), e);
         }
+    }
 
-        @Override
-        public void run() {
-            try {
-                NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
-                NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext).decode(
-                        serializedNode);
-
-                // delete everything first
-                resultingTx.delete(YangInstanceIdentifier.builder().build());
+    @Override
+    public Snapshot getRestoreFromSnapshot() {
+        return restoreFromSnapshot;
+    }
 
-                // Add everything from the remote node back
-                resultingTx.write(YangInstanceIdentifier.builder().build(), node);
-            } catch (InvalidProtocolBufferException e) {
-                LOG.error("Error deserializing snapshot", e);
-            }
+    @Override
+    @Deprecated
+    public State deserializePreCarbonSnapshot(final byte[] from) {
+        try {
+            return new ShardSnapshotState(ShardDataTreeSnapshot.deserializePreCarbon(from));
+        } catch (IOException e) {
+            log.error("{}: failed to deserialize snapshot", shardName, e);
+            throw Throwables.propagate(e);
         }
     }
 }