*/
package org.opendaylight.controller.cluster.datastore;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
-import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
/**
* Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
private static final int TIME_OUT = 10;
- private static final Logger LOG = LoggerFactory.getLogger(ShardRecoveryCoordinator.class);
-
private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
private final SchemaContext schemaContext;
private final String shardName;
private final ExecutorService executor;
+ private final Logger log;
+ private final String name;
- ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext) {
+ ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, Logger log,
+ String name) {
this.schemaContext = schemaContext;
this.shardName = shardName;
+ this.log = log;
+ this.name = name;
executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
new ThreadFactoryBuilder().setDaemon(true)
/**
* Submits a snapshot.
*
- * @param snapshot the serialized snapshot
+ * @param snapshotBytes the serialized snapshot
* @param resultingTx the write Tx to which to apply the entries
*/
- void submit(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
- SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshot, resultingTx);
+ void submit(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
+ SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshotBytes, resultingTx);
resultingTxList.add(resultingTx);
executor.execute(task);
}
if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES)) {
return resultingTxList;
} else {
- LOG.error("Recovery for shard {} timed out after {} minutes", shardName, TIME_OUT);
+ log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
public void run() {
for(int i = 0; i < logEntries.size(); i++) {
MutableCompositeModification.fromSerializable(
- logEntries.get(i), schemaContext).apply(resultingTx);
+ logEntries.get(i)).apply(resultingTx);
// Null out to GC quicker.
logEntries.set(i, null);
}
private class SnapshotRecoveryTask extends ShardRecoveryTask {
- private final ByteString snapshot;
+ private final byte[] snapshotBytes;
- SnapshotRecoveryTask(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
+ SnapshotRecoveryTask(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
super(resultingTx);
- this.snapshot = snapshot;
+ this.snapshotBytes = snapshotBytes;
}
@Override
public void run() {
- try {
- NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
- NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext).decode(
- YangInstanceIdentifier.builder().build(), serializedNode);
-
- // delete everything first
- resultingTx.delete(YangInstanceIdentifier.builder().build());
-
- // Add everything from the remote node back
- resultingTx.write(YangInstanceIdentifier.builder().build(), node);
- } catch (InvalidProtocolBufferException e) {
- LOG.error("Error deserializing snapshot", e);
- }
+ NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+
+ // delete everything first
+ resultingTx.delete(YangInstanceIdentifier.builder().build());
+
+ // Add everything from the remote node back
+ resultingTx.write(YangInstanceIdentifier.builder().build(), node);
}
}
}