2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import com.google.common.collect.Lists;
11 import com.google.common.util.concurrent.ThreadFactoryBuilder;
12 import java.util.Collection;
13 import java.util.Collections;
14 import java.util.List;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.TimeUnit;
18 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
19 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
20 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
21 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
22 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
23 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
24 import org.slf4j.Logger;
27 * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
28 * and journal log entry batch are de-serialized and applied to their own write transaction
29 * instance in parallel on a thread pool for faster recovery time. However the transactions are
30 * committed to the data store in the order the corresponding snapshot or log batch are received
31 * to preserve data store integrity.
33 * @author Thomas Panetelis
35 class ShardRecoveryCoordinator {
37 private static final int TIME_OUT = 10;
39 private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
40 private final SchemaContext schemaContext;
41 private final String shardName;
42 private final ExecutorService executor;
43 private final Logger log;
44 private final String name;
46 ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, Logger log,
48 this.schemaContext = schemaContext;
49 this.shardName = shardName;
53 executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
54 new ThreadFactoryBuilder().setDaemon(true)
55 .setNameFormat("ShardRecovery-" + shardName + "-%d").build());
59 * Submits a batch of journal log entries.
61 * @param logEntries the serialized journal log entries
62 * @param resultingTx the write Tx to which to apply the entries
64 void submit(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
65 LogRecoveryTask task = new LogRecoveryTask(logEntries, resultingTx);
66 resultingTxList.add(resultingTx);
67 executor.execute(task);
73 * @param snapshotBytes the serialized snapshot
74 * @param resultingTx the write Tx to which to apply the entries
76 void submit(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
77 SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshotBytes, resultingTx);
78 resultingTxList.add(resultingTx);
79 executor.execute(task);
82 Collection<DOMStoreWriteTransaction> getTransactions() {
83 // Shutdown the executor and wait for task completion.
87 if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES)) {
88 return resultingTxList;
90 log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT);
92 } catch (InterruptedException e) {
93 Thread.currentThread().interrupt();
96 return Collections.emptyList();
99 private static abstract class ShardRecoveryTask implements Runnable {
101 final DOMStoreWriteTransaction resultingTx;
103 ShardRecoveryTask(DOMStoreWriteTransaction resultingTx) {
104 this.resultingTx = resultingTx;
108 private class LogRecoveryTask extends ShardRecoveryTask {
110 private final List<Object> logEntries;
112 LogRecoveryTask(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
114 this.logEntries = logEntries;
119 for(int i = 0; i < logEntries.size(); i++) {
120 MutableCompositeModification.fromSerializable(
121 logEntries.get(i)).apply(resultingTx);
122 // Null out to GC quicker.
123 logEntries.set(i, null);
128 private class SnapshotRecoveryTask extends ShardRecoveryTask {
130 private final byte[] snapshotBytes;
132 SnapshotRecoveryTask(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
134 this.snapshotBytes = snapshotBytes;
139 NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
141 // delete everything first
142 resultingTx.delete(YangInstanceIdentifier.builder().build());
144 // Add everything from the remote node back
145 resultingTx.write(YangInstanceIdentifier.builder().build(), node);