Migrate InstallSnapshot/SnapshotTracker use of Optional
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / SnapshotTracker.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.io.ByteSource;
13 import java.io.BufferedOutputStream;
14 import java.io.IOException;
15 import java.util.Arrays;
16 import java.util.OptionalInt;
17 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
18 import org.opendaylight.controller.cluster.raft.RaftActorContext;
19 import org.slf4j.Logger;
20
21 /**
22  * Helper class that maintains state for a snapshot that is being installed in chunks on a Follower.
23  */
24 class SnapshotTracker implements AutoCloseable {
25     private final Logger log;
26     private final int totalChunks;
27     private final String leaderId;
28     private final BufferedOutputStream bufferedStream;
29     private final FileBackedOutputStream fileBackedStream;
30     private int lastChunkIndex = LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1;
31     private boolean sealed = false;
32     private int lastChunkHashCode = LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE;
33     private long count;
34
35     SnapshotTracker(final Logger log, final int totalChunks, final String leaderId, final RaftActorContext context) {
36         this.log = log;
37         this.totalChunks = totalChunks;
38         this.leaderId = requireNonNull(leaderId);
39         fileBackedStream = context.getFileBackedOutputStreamFactory().newInstance();
40         bufferedStream = new BufferedOutputStream(fileBackedStream);
41     }
42
43     /**
44      * Adds a chunk to the tracker.
45      *
46      * @param chunkIndex the index of the chunk
47      * @param chunk the chunk data
48      * @param lastChunkHashCode the optional hash code for the chunk
49      * @return true if this is the last chunk is received
50      * @throws InvalidChunkException if the chunk index is invalid or out of order
51      */
52     boolean addChunk(final int chunkIndex, final byte[] chunk, final OptionalInt maybeLastChunkHashCode)
53             throws InvalidChunkException, IOException {
54         log.debug("addChunk: chunkIndex={}, lastChunkIndex={}, collectedChunks.size={}, lastChunkHashCode={}",
55                 chunkIndex, lastChunkIndex, count, this.lastChunkHashCode);
56
57         if (sealed) {
58             throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex
59                     + " all chunks already received");
60         }
61
62         if (lastChunkIndex + 1 != chunkIndex) {
63             throw new InvalidChunkException("Expected chunkIndex " + (lastChunkIndex + 1) + " got " + chunkIndex);
64         }
65
66         if (maybeLastChunkHashCode.isPresent() && maybeLastChunkHashCode.getAsInt() != this.lastChunkHashCode) {
67             throw new InvalidChunkException("The hash code of the recorded last chunk does not match "
68                     + "the senders hash code, expected " + this.lastChunkHashCode + " was "
69                     + maybeLastChunkHashCode.getAsInt());
70         }
71
72         bufferedStream.write(chunk);
73
74         count += chunk.length;
75         sealed = chunkIndex == totalChunks;
76         lastChunkIndex = chunkIndex;
77         this.lastChunkHashCode = Arrays.hashCode(chunk);
78         return sealed;
79     }
80
81     ByteSource getSnapshotBytes() throws IOException {
82         if (!sealed) {
83             throw new IllegalStateException("lastChunk not received yet");
84         }
85
86         bufferedStream.close();
87         return fileBackedStream.asByteSource();
88     }
89
90     String getLeaderId() {
91         return leaderId;
92     }
93
94     @Override
95     public void close() {
96         fileBackedStream.cleanup();
97     }
98
99     public static class InvalidChunkException extends IOException {
100         private static final long serialVersionUID = 1L;
101
102         InvalidChunkException(final String message) {
103             super(message);
104         }
105     }
106 }