BUG 2676 : Use notification-dispatcher for DataChangeListener actors
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardRecoveryCoordinator.java
index 8afdb4c2801d186e0627669bc38ac6da7bdd3df7..50528575e77123ce433bf0b4e6ab73f0077c39c5 100644 (file)
@@ -7,27 +7,21 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
-import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
@@ -42,16 +36,19 @@ class ShardRecoveryCoordinator {
 
     private static final int TIME_OUT = 10;
 
-    private static final Logger LOG = LoggerFactory.getLogger(ShardRecoveryCoordinator.class);
-
     private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
     private final SchemaContext schemaContext;
     private final String shardName;
     private final ExecutorService executor;
+    private final Logger log;
+    private final String name;
 
-    ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext) {
+    ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, Logger log,
+            String name) {
         this.schemaContext = schemaContext;
         this.shardName = shardName;
+        this.log = log;
+        this.name = name;
 
         executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
                 new ThreadFactoryBuilder().setDaemon(true)
@@ -73,11 +70,11 @@ class ShardRecoveryCoordinator {
     /**
      * Submits a snapshot.
      *
-     * @param snapshot the serialized snapshot
+     * @param snapshotBytes the serialized snapshot
      * @param resultingTx the write Tx to which to apply the entries
      */
-    void submit(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
-        SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshot, resultingTx);
+    void submit(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
+        SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshotBytes, resultingTx);
         resultingTxList.add(resultingTx);
         executor.execute(task);
     }
@@ -90,7 +87,7 @@ class ShardRecoveryCoordinator {
             if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES))  {
                 return resultingTxList;
             } else {
-                LOG.error("Recovery for shard {} timed out after {} minutes", shardName, TIME_OUT);
+                log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT);
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -121,7 +118,7 @@ class ShardRecoveryCoordinator {
         public void run() {
             for(int i = 0; i < logEntries.size(); i++) {
                 MutableCompositeModification.fromSerializable(
-                        logEntries.get(i), schemaContext).apply(resultingTx);
+                        logEntries.get(i)).apply(resultingTx);
                 // Null out to GC quicker.
                 logEntries.set(i, null);
             }
@@ -130,28 +127,22 @@ class ShardRecoveryCoordinator {
 
     private class SnapshotRecoveryTask extends ShardRecoveryTask {
 
-        private final ByteString snapshot;
+        private final byte[] snapshotBytes;
 
-        SnapshotRecoveryTask(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
+        SnapshotRecoveryTask(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
             super(resultingTx);
-            this.snapshot = snapshot;
+            this.snapshotBytes = snapshotBytes;
         }
 
         @Override
         public void run() {
-            try {
-                NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
-                NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext).decode(
-                        YangInstanceIdentifier.builder().build(), serializedNode);
-
-                // delete everything first
-                resultingTx.delete(YangInstanceIdentifier.builder().build());
-
-                // Add everything from the remote node back
-                resultingTx.write(YangInstanceIdentifier.builder().build(), node);
-            } catch (InvalidProtocolBufferException e) {
-                LOG.error("Error deserializing snapshot", e);
-            }
+            NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+
+            // delete everything first
+            resultingTx.delete(YangInstanceIdentifier.builder().build());
+
+            // Add everything from the remote node back
+            resultingTx.write(YangInstanceIdentifier.builder().build(), node);
         }
     }
 }