Merge "Cleanup root pom "name"."
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardRecoveryCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import com.google.common.collect.Lists;
11 import com.google.common.util.concurrent.ThreadFactoryBuilder;
12 import java.util.Collection;
13 import java.util.Collections;
14 import java.util.List;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.TimeUnit;
18 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
19 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
20 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
21 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
22 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
23 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
24 import org.slf4j.Logger;
25
26 /**
27  * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
28  * and journal log entry batch are de-serialized and applied to their own write transaction
29  * instance in parallel on a thread pool for faster recovery time. However the transactions are
30  * committed to the data store in the order the corresponding snapshot or log batch are received
31  * to preserve data store integrity.
32  *
33  * @author Thomas Panetelis
34  */
35 class ShardRecoveryCoordinator {
36
37     private static final int TIME_OUT = 10;
38
39     private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
40     private final SchemaContext schemaContext;
41     private final String shardName;
42     private final ExecutorService executor;
43     private final Logger log;
44     private final String name;
45
46     ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, Logger log,
47             String name) {
48         this.schemaContext = schemaContext;
49         this.shardName = shardName;
50         this.log = log;
51         this.name = name;
52
53         executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
54                 new ThreadFactoryBuilder().setDaemon(true)
55                         .setNameFormat("ShardRecovery-" + shardName + "-%d").build());
56     }
57
58     /**
59      * Submits a batch of journal log entries.
60      *
61      * @param logEntries the serialized journal log entries
62      * @param resultingTx the write Tx to which to apply the entries
63      */
64     void submit(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
65         LogRecoveryTask task = new LogRecoveryTask(logEntries, resultingTx);
66         resultingTxList.add(resultingTx);
67         executor.execute(task);
68     }
69
70     /**
71      * Submits a snapshot.
72      *
73      * @param snapshotBytes the serialized snapshot
74      * @param resultingTx the write Tx to which to apply the entries
75      */
76     void submit(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
77         SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshotBytes, resultingTx);
78         resultingTxList.add(resultingTx);
79         executor.execute(task);
80     }
81
82     Collection<DOMStoreWriteTransaction> getTransactions() {
83         // Shutdown the executor and wait for task completion.
84         executor.shutdown();
85
86         try {
87             if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES))  {
88                 return resultingTxList;
89             } else {
90                 log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT);
91             }
92         } catch (InterruptedException e) {
93             Thread.currentThread().interrupt();
94         }
95
96         return Collections.emptyList();
97     }
98
99     private static abstract class ShardRecoveryTask implements Runnable {
100
101         final DOMStoreWriteTransaction resultingTx;
102
103         ShardRecoveryTask(DOMStoreWriteTransaction resultingTx) {
104             this.resultingTx = resultingTx;
105         }
106     }
107
108     private class LogRecoveryTask extends ShardRecoveryTask {
109
110         private final List<Object> logEntries;
111
112         LogRecoveryTask(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
113             super(resultingTx);
114             this.logEntries = logEntries;
115         }
116
117         @Override
118         public void run() {
119             for(int i = 0; i < logEntries.size(); i++) {
120                 MutableCompositeModification.fromSerializable(
121                         logEntries.get(i)).apply(resultingTx);
122                 // Null out to GC quicker.
123                 logEntries.set(i, null);
124             }
125         }
126     }
127
128     private class SnapshotRecoveryTask extends ShardRecoveryTask {
129
130         private final byte[] snapshotBytes;
131
132         SnapshotRecoveryTask(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
133             super(resultingTx);
134             this.snapshotBytes = snapshotBytes;
135         }
136
137         @Override
138         public void run() {
139             NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
140
141             // delete everything first
142             resultingTx.delete(YangInstanceIdentifier.builder().build());
143
144             // Add everything from the remote node back
145             resultingTx.write(YangInstanceIdentifier.builder().build(), node);
146         }
147     }
148 }