Merge "BUG 1839 - HTTP delete of non existing data"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardRecoveryCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import java.util.Collection;
11 import java.util.Collections;
12 import java.util.List;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.TimeUnit;
16
17 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
18 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
19 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
20 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
21 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
22 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
23 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 import com.google.common.collect.Lists;
28 import com.google.common.util.concurrent.ThreadFactoryBuilder;
29 import com.google.protobuf.ByteString;
30 import com.google.protobuf.InvalidProtocolBufferException;
31
32 /**
33  * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
34  * and journal log entry batch are de-serialized and applied to their own write transaction
35  * instance in parallel on a thread pool for faster recovery time. However the transactions are
36  * committed to the data store in the order the corresponding snapshot or log batch are received
37  * to preserve data store integrity.
38  *
39  * @author Thomas Panetelis
40  */
41 class ShardRecoveryCoordinator {
42
43     private static final int TIME_OUT = 10;
44
45     private static final Logger LOG = LoggerFactory.getLogger(ShardRecoveryCoordinator.class);
46
47     private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
48     private final SchemaContext schemaContext;
49     private final String shardName;
50     private final ExecutorService executor;
51
52     ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext) {
53         this.schemaContext = schemaContext;
54         this.shardName = shardName;
55
56         executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
57                 new ThreadFactoryBuilder().setDaemon(true)
58                         .setNameFormat("ShardRecovery-" + shardName + "-%d").build());
59     }
60
61     /**
62      * Submits a batch of journal log entries.
63      *
64      * @param logEntries the serialized journal log entries
65      * @param resultingTx the write Tx to which to apply the entries
66      */
67     void submit(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
68         LogRecoveryTask task = new LogRecoveryTask(logEntries, resultingTx);
69         resultingTxList.add(resultingTx);
70         executor.execute(task);
71     }
72
73     /**
74      * Submits a snapshot.
75      *
76      * @param snapshot the serialized snapshot
77      * @param resultingTx the write Tx to which to apply the entries
78      */
79     void submit(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
80         SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshot, resultingTx);
81         resultingTxList.add(resultingTx);
82         executor.execute(task);
83     }
84
85     Collection<DOMStoreWriteTransaction> getTransactions() {
86         // Shutdown the executor and wait for task completion.
87         executor.shutdown();
88
89         try {
90             if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES))  {
91                 return resultingTxList;
92             } else {
93                 LOG.error("Recovery for shard {} timed out after {} minutes", shardName, TIME_OUT);
94             }
95         } catch (InterruptedException e) {
96             Thread.currentThread().interrupt();
97         }
98
99         return Collections.emptyList();
100     }
101
102     private static abstract class ShardRecoveryTask implements Runnable {
103
104         final DOMStoreWriteTransaction resultingTx;
105
106         ShardRecoveryTask(DOMStoreWriteTransaction resultingTx) {
107             this.resultingTx = resultingTx;
108         }
109     }
110
111     private class LogRecoveryTask extends ShardRecoveryTask {
112
113         private final List<Object> logEntries;
114
115         LogRecoveryTask(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
116             super(resultingTx);
117             this.logEntries = logEntries;
118         }
119
120         @Override
121         public void run() {
122             for(int i = 0; i < logEntries.size(); i++) {
123                 MutableCompositeModification.fromSerializable(
124                         logEntries.get(i), schemaContext).apply(resultingTx);
125                 // Null out to GC quicker.
126                 logEntries.set(i, null);
127             }
128         }
129     }
130
131     private class SnapshotRecoveryTask extends ShardRecoveryTask {
132
133         private final ByteString snapshot;
134
135         SnapshotRecoveryTask(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
136             super(resultingTx);
137             this.snapshot = snapshot;
138         }
139
140         @Override
141         public void run() {
142             try {
143                 NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
144                 NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext).decode(
145                         serializedNode);
146
147                 // delete everything first
148                 resultingTx.delete(YangInstanceIdentifier.builder().build());
149
150                 // Add everything from the remote node back
151                 resultingTx.write(YangInstanceIdentifier.builder().build(), node);
152             } catch (InvalidProtocolBufferException e) {
153                 LOG.error("Error deserializing snapshot", e);
154             }
155         }
156     }
157 }