Merge "Update lispflowmapping options in custom.properties"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardRecoveryCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import com.google.common.collect.Lists;
11 import java.io.IOException;
12 import java.util.List;
13 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
14 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
15 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
16 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
17 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
18 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
19 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
20 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
23 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
24 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
25 import org.slf4j.Logger;
26
27 /**
28  * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
29  * and journal log entry batch are de-serialized and applied to their own write transaction
30  * instance in parallel on a thread pool for faster recovery time. However the transactions are
31  * committed to the data store in the order the corresponding snapshot or log batch are received
32  * to preserve data store integrity.
33  *
34  * @author Thomas Panetelis
35  */
36 class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
37
38     private final InMemoryDOMDataStore store;
39     private List<ModificationPayload> currentLogRecoveryBatch;
40     private final String shardName;
41     private final Logger log;
42
43     ShardRecoveryCoordinator(InMemoryDOMDataStore store, String shardName, Logger log) {
44         this.store = store;
45         this.shardName = shardName;
46         this.log = log;
47     }
48
49     @Override
50     public void startLogRecoveryBatch(int maxBatchSize) {
51         currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
52
53         log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
54     }
55
56     @Override
57     public void appendRecoveredLogEntry(Payload payload) {
58         try {
59             if(payload instanceof ModificationPayload) {
60                 currentLogRecoveryBatch.add((ModificationPayload) payload);
61             } else if (payload instanceof CompositeModificationPayload) {
62                 currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
63                         ((CompositeModificationPayload) payload).getModification())));
64             } else if (payload instanceof CompositeModificationByteStringPayload) {
65                 currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
66                         ((CompositeModificationByteStringPayload) payload).getModification())));
67             } else {
68                 log.error("{}: Unknown payload {} received during recovery", shardName, payload);
69             }
70         } catch (IOException e) {
71             log.error("{}: Error extracting ModificationPayload", shardName, e);
72         }
73
74     }
75
76     private void commitTransaction(DOMStoreWriteTransaction transaction) {
77         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
78         try {
79             commitCohort.preCommit().get();
80             commitCohort.commit().get();
81         } catch (Exception e) {
82             log.error("{}: Failed to commit Tx on recovery", shardName, e);
83         }
84     }
85
86     /**
87      * Applies the current batched log entries to the data store.
88      */
89     @Override
90     public void applyCurrentLogRecoveryBatch() {
91         log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
92
93         DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
94         for(ModificationPayload payload: currentLogRecoveryBatch) {
95             try {
96                 MutableCompositeModification.fromSerializable(payload.getModification()).apply(writeTx);
97             } catch (Exception e) {
98                 log.error("{}: Error extracting ModificationPayload", shardName, e);
99             }
100         }
101
102         commitTransaction(writeTx);
103
104         currentLogRecoveryBatch = null;
105     }
106
107     /**
108      * Applies a recovered snapshot to the data store.
109      *
110      * @param snapshotBytes the serialized snapshot
111      */
112     @Override
113     public void applyRecoverySnapshot(final byte[] snapshotBytes) {
114         log.debug("{}: Applyng recovered sbapshot", shardName);
115
116         DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
117
118         NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
119
120         writeTx.write(YangInstanceIdentifier.builder().build(), node);
121
122         commitTransaction(writeTx);
123     }
124 }