X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardRecoveryCoordinator.java;h=50528575e77123ce433bf0b4e6ab73f0077c39c5;hp=9457456205aae6e21338520a99f412c6bedef964;hb=707e2aa73c7314180472539ed4137950d33f5776;hpb=0d4c11af06567b4692b8894bbe2cac16cb4db0ad diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java index 9457456205..50528575e7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java @@ -7,27 +7,21 @@ */ package org.opendaylight.controller.cluster.datastore; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; - import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; -import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; -import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; +import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; /** * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot @@ -42,16 +36,19 @@ class ShardRecoveryCoordinator { private static final int TIME_OUT = 10; - private static final Logger LOG = LoggerFactory.getLogger(ShardRecoveryCoordinator.class); - private final List resultingTxList = Lists.newArrayList(); private final SchemaContext schemaContext; private final String shardName; private final ExecutorService executor; + private final Logger log; + private final String name; - ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext) { + ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, Logger log, + String name) { this.schemaContext = schemaContext; this.shardName = shardName; + this.log = log; + this.name = name; executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder().setDaemon(true) @@ -73,11 +70,11 @@ class ShardRecoveryCoordinator { /** * Submits a snapshot. * - * @param snapshot the serialized snapshot + * @param snapshotBytes the serialized snapshot * @param resultingTx the write Tx to which to apply the entries */ - void submit(ByteString snapshot, DOMStoreWriteTransaction resultingTx) { - SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshot, resultingTx); + void submit(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) { + SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshotBytes, resultingTx); resultingTxList.add(resultingTx); executor.execute(task); } @@ -90,7 +87,7 @@ class ShardRecoveryCoordinator { if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES)) { return resultingTxList; } else { - LOG.error("Recovery for shard {} timed out after {} minutes", shardName, TIME_OUT); + log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -130,28 +127,22 @@ class ShardRecoveryCoordinator { private class SnapshotRecoveryTask extends ShardRecoveryTask { - private final ByteString snapshot; + private final byte[] snapshotBytes; - SnapshotRecoveryTask(ByteString snapshot, DOMStoreWriteTransaction resultingTx) { + SnapshotRecoveryTask(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) { super(resultingTx); - this.snapshot = snapshot; + this.snapshotBytes = snapshotBytes; } @Override public void run() { - try { - NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot); - NormalizedNode node = new NormalizedNodeToNodeCodec(schemaContext).decode( - serializedNode); - - // delete everything first - resultingTx.delete(YangInstanceIdentifier.builder().build()); - - // Add everything from the remote node back - resultingTx.write(YangInstanceIdentifier.builder().build(), node); - } catch (InvalidProtocolBufferException e) { - LOG.error("Error deserializing snapshot", e); - } + NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); + + // delete everything first + resultingTx.delete(YangInstanceIdentifier.builder().build()); + + // Add everything from the remote node back + resultingTx.write(YangInstanceIdentifier.builder().build(), node); } } }