Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / SnapshotTracker.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.io.ByteSource;
13 import java.io.BufferedOutputStream;
14 import java.io.IOException;
15 import java.util.Arrays;
16 import java.util.OptionalInt;
17 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
18 import org.opendaylight.controller.cluster.raft.RaftActorContext;
19 import org.slf4j.Logger;
20
21 /**
22  * Helper class that maintains state for a snapshot that is being installed in chunks on a Follower.
23  */
24 class SnapshotTracker implements AutoCloseable {
25     private final Logger log;
26     private final int totalChunks;
27     private final String leaderId;
28     private final BufferedOutputStream bufferedStream;
29     private final FileBackedOutputStream fileBackedStream;
30     private int lastChunkIndex = LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1;
31     private boolean sealed = false;
32     private int lastChunkHashCode = LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE;
33     private long count;
34
35     SnapshotTracker(final Logger log, final int totalChunks, final String leaderId, final RaftActorContext context) {
36         this.log = log;
37         this.totalChunks = totalChunks;
38         this.leaderId = requireNonNull(leaderId);
39         fileBackedStream = context.getFileBackedOutputStreamFactory().newInstance();
40         bufferedStream = new BufferedOutputStream(fileBackedStream);
41     }
42
43     /**
44      * Adds a chunk to the tracker.
45      *
46      * @param chunkIndex the index of the chunk
47      * @param chunk the chunk data
48      * @param lastChunkHashCode the optional hash code for the chunk
49      * @return true if this is the last chunk is received
50      * @throws InvalidChunkException if the chunk index is invalid or out of order
51      * @throws IOException if there is a problem writing to the stream
52      */
53     boolean addChunk(final int chunkIndex, final byte[] chunk, final OptionalInt maybeLastChunkHashCode)
54             throws IOException {
55         log.debug("addChunk: chunkIndex={}, lastChunkIndex={}, collectedChunks.size={}, lastChunkHashCode={}",
56                 chunkIndex, lastChunkIndex, count, lastChunkHashCode);
57
58         if (sealed) {
59             throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex
60                     + " all chunks already received");
61         }
62
63         if (lastChunkIndex + 1 != chunkIndex) {
64             throw new InvalidChunkException("Expected chunkIndex " + (lastChunkIndex + 1) + " got " + chunkIndex);
65         }
66
67         if (maybeLastChunkHashCode.isPresent() && maybeLastChunkHashCode.orElseThrow() != lastChunkHashCode) {
68             throw new InvalidChunkException("The hash code of the recorded last chunk does not match "
69                     + "the senders hash code, expected " + lastChunkHashCode + " was "
70                     + maybeLastChunkHashCode.orElseThrow());
71         }
72
73         bufferedStream.write(chunk);
74
75         count += chunk.length;
76         sealed = chunkIndex == totalChunks;
77         lastChunkIndex = chunkIndex;
78         lastChunkHashCode = Arrays.hashCode(chunk);
79         return sealed;
80     }
81
82     ByteSource getSnapshotBytes() throws IOException {
83         if (!sealed) {
84             throw new IllegalStateException("lastChunk not received yet");
85         }
86
87         bufferedStream.close();
88         return fileBackedStream.asByteSource();
89     }
90
91     String getLeaderId() {
92         return leaderId;
93     }
94
95     @Override
96     public void close() {
97         fileBackedStream.cleanup();
98     }
99
100     public static class InvalidChunkException extends IOException {
101         private static final long serialVersionUID = 1L;
102
103         InvalidChunkException(final String message) {
104             super(message);
105         }
106     }
107 }