X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardRecoveryCoordinator.java;h=238b4e46dce041add47117503fcb68feb54e8e27;hp=94fb5841021ea994e019d946e3e106f7d95256c3;hb=cfc94248aa44307e7dc9aaefcb6748c478f93138;hpb=b06d2c5bbffa48b1e219ac92cf0be60528aff34a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java index 94fb584102..238b4e46dc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java @@ -7,16 +7,16 @@ */ package org.opendaylight.controller.cluster.datastore; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; - import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; -import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; -import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; +import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -24,11 +24,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; - /** * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot * and journal log entry batch are de-serialized and applied to their own write transaction @@ -73,11 +68,11 @@ class ShardRecoveryCoordinator { /** * Submits a snapshot. * - * @param snapshot the serialized snapshot + * @param snapshotBytes the serialized snapshot * @param resultingTx the write Tx to which to apply the entries */ - void submit(ByteString snapshot, DOMStoreWriteTransaction resultingTx) { - SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshot, resultingTx); + void submit(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) { + SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshotBytes, resultingTx); resultingTxList.add(resultingTx); executor.execute(task); } @@ -121,7 +116,7 @@ class ShardRecoveryCoordinator { public void run() { for(int i = 0; i < logEntries.size(); i++) { MutableCompositeModification.fromSerializable( - logEntries.get(i), schemaContext).apply(resultingTx); + logEntries.get(i)).apply(resultingTx); // Null out to GC quicker. logEntries.set(i, null); } @@ -130,28 +125,22 @@ class ShardRecoveryCoordinator { private class SnapshotRecoveryTask extends ShardRecoveryTask { - private final ByteString snapshot; + private final byte[] snapshotBytes; - SnapshotRecoveryTask(ByteString snapshot, DOMStoreWriteTransaction resultingTx) { + SnapshotRecoveryTask(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) { super(resultingTx); - this.snapshot = snapshot; + this.snapshotBytes = snapshotBytes; } @Override public void run() { - try { - NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot); - NormalizedNode node = new NormalizedNodeToNodeCodec(schemaContext).decode( - serializedNode); - - // delete everything first - resultingTx.delete(YangInstanceIdentifier.builder().build()); - - // Add everything from the remote node back - resultingTx.write(YangInstanceIdentifier.builder().build(), node); - } catch (InvalidProtocolBufferException e) { - LOG.error("Error deserializing snapshot", e); - } + NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); + + // delete everything first + resultingTx.delete(YangInstanceIdentifier.builder().build()); + + // Add everything from the remote node back + resultingTx.write(YangInstanceIdentifier.builder().build(), node); } } }