2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import java.util.Collection;
11 import java.util.Collections;
12 import java.util.List;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.TimeUnit;
17 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
18 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
19 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
20 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
21 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
22 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
23 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
27 import com.google.common.collect.Lists;
28 import com.google.common.util.concurrent.ThreadFactoryBuilder;
29 import com.google.protobuf.ByteString;
30 import com.google.protobuf.InvalidProtocolBufferException;
33 * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
34 * and journal log entry batch are de-serialized and applied to their own write transaction
35 * instance in parallel on a thread pool for faster recovery time. However the transactions are
36 * committed to the data store in the order the corresponding snapshot or log batch are received
37 * to preserve data store integrity.
39 * @author Thomas Panetelis
41 class ShardRecoveryCoordinator {
43 private static final int TIME_OUT = 10;
45 private static final Logger LOG = LoggerFactory.getLogger(ShardRecoveryCoordinator.class);
47 private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
48 private final SchemaContext schemaContext;
49 private final String shardName;
50 private final ExecutorService executor;
52 ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext) {
53 this.schemaContext = schemaContext;
54 this.shardName = shardName;
56 executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
57 new ThreadFactoryBuilder().setDaemon(true)
58 .setNameFormat("ShardRecovery-" + shardName + "-%d").build());
62 * Submits a batch of journal log entries.
64 * @param logEntries the serialized journal log entries
65 * @param resultingTx the write Tx to which to apply the entries
67 void submit(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
68 LogRecoveryTask task = new LogRecoveryTask(logEntries, resultingTx);
69 resultingTxList.add(resultingTx);
70 executor.execute(task);
76 * @param snapshot the serialized snapshot
77 * @param resultingTx the write Tx to which to apply the entries
79 void submit(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
80 SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshot, resultingTx);
81 resultingTxList.add(resultingTx);
82 executor.execute(task);
85 Collection<DOMStoreWriteTransaction> getTransactions() {
86 // Shutdown the executor and wait for task completion.
90 if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES)) {
91 return resultingTxList;
93 LOG.error("Recovery for shard {} timed out after {} minutes", shardName, TIME_OUT);
95 } catch (InterruptedException e) {
96 Thread.currentThread().interrupt();
99 return Collections.emptyList();
102 private static abstract class ShardRecoveryTask implements Runnable {
104 final DOMStoreWriteTransaction resultingTx;
106 ShardRecoveryTask(DOMStoreWriteTransaction resultingTx) {
107 this.resultingTx = resultingTx;
111 private class LogRecoveryTask extends ShardRecoveryTask {
113 private final List<Object> logEntries;
115 LogRecoveryTask(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
117 this.logEntries = logEntries;
122 for(int i = 0; i < logEntries.size(); i++) {
123 MutableCompositeModification.fromSerializable(
124 logEntries.get(i), schemaContext).apply(resultingTx);
125 // Null out to GC quicker.
126 logEntries.set(i, null);
131 private class SnapshotRecoveryTask extends ShardRecoveryTask {
133 private final ByteString snapshot;
135 SnapshotRecoveryTask(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
137 this.snapshot = snapshot;
143 NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
144 NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext).decode(
145 YangInstanceIdentifier.builder().build(), serializedNode);
147 // delete everything first
148 resultingTx.delete(YangInstanceIdentifier.builder().build());
150 // Add everything from the remote node back
151 resultingTx.write(YangInstanceIdentifier.builder().build(), node);
152 } catch (InvalidProtocolBufferException e) {
153 LOG.error("Error deserializing snapshot", e);