Bug 7449: Add message slicing/re-assembly classes
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / messaging / AssembledMessageState.java
1 /*
2  * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.messaging;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.io.ByteSource;
12 import java.io.BufferedOutputStream;
13 import java.io.IOException;
14 import java.util.Arrays;
15 import javax.annotation.concurrent.NotThreadSafe;
16 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
17 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
18 import org.opendaylight.yangtools.concepts.Identifier;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22 /**
23  * Maintains the state of an assembled message.
24  *
25  * @author Thomas Pantelis
26  */
27 @NotThreadSafe
28 public class AssembledMessageState implements AutoCloseable {
29     private static final Logger LOG = LoggerFactory.getLogger(AssembledMessageState.class);
30
31     private final int totalSlices;
32     private final BufferedOutputStream bufferedStream;
33     private final FileBackedOutputStream fileBackedStream;
34     private final Identifier identifier;
35     private final String logContext;
36
37     private int lastSliceIndexReceived = SlicedMessageState.FIRST_SLICE_INDEX - 1;
38     private int lastSliceHashCodeReceived = SlicedMessageState.INITIAL_SLICE_HASH_CODE;
39     private boolean sealed = false;
40     private boolean closed = false;
41     private long assembledSize;
42
43     /**
44      * Constructor.
45      *
46      * @param identifier the identifier for this instance
47      * @param totalSlices the total number of slices to expect
48      * @param fileBackedStreamFactory factory for creating the FileBackedOutputStream instance used for streaming
49      * @param logContext the context for log messages
50      */
51     public AssembledMessageState(final Identifier identifier, final int totalSlices,
52             final FileBackedOutputStreamFactory fileBackedStreamFactory, final String logContext) {
53         this.identifier = identifier;
54         this.totalSlices = totalSlices;
55         this.logContext = logContext;
56
57         fileBackedStream = fileBackedStreamFactory.newInstance();
58         bufferedStream = new BufferedOutputStream(fileBackedStream);
59     }
60
61     /**
62      * Returns the identifier of this instance.
63      *
64      * @return the identifier
65      */
66     public Identifier getIdentifier() {
67         return identifier;
68     }
69
70     /**
71      * Adds a slice to the assembled stream.
72      *
73      * @param sliceIndex the index of the slice
74      * @param data the sliced data
75      * @param lastSliceHashCode the hash code of the last slice sent
76      * @return true if this is the last slice received, false otherwise
77      * @throws MessageSliceException
78      *         <ul>
79      *         <li>if the slice index is invalid</li>
80      *         <li>if the last slice hash code is invalid</li>
81      *         <li>if an error occurs writing the data to the stream</li>
82      *         </ul>
83      *         In addition, this instance is automatically closed and can no longer be used.
84      * @throws AssemblerSealedException if this instance is already sealed (ie has received all the slices)
85      * @throws AssemblerClosedException if this instance is already closed
86      */
87     public boolean addSlice(final int sliceIndex, final byte[] data, final int lastSliceHashCode)
88             throws MessageSliceException {
89         if (LOG.isDebugEnabled()) {
90             LOG.debug("{}: addSlice: identifier: {}, sliceIndex: {}, lastSliceIndex: {}, assembledSize: {}, "
91                     + "sliceHashCode: {}, lastSliceHashCode: {}", logContext, identifier, sliceIndex,
92                     lastSliceIndexReceived, assembledSize, lastSliceHashCode, lastSliceHashCodeReceived);
93         }
94
95         try {
96             validateSlice(sliceIndex, lastSliceHashCode);
97
98             assembledSize += data.length;
99             lastSliceIndexReceived = sliceIndex;
100             lastSliceHashCodeReceived = Arrays.hashCode(data);
101
102             bufferedStream.write(data);
103
104             sealed = sliceIndex == totalSlices;
105             if (sealed) {
106                 bufferedStream.close();
107             }
108         } catch (IOException e) {
109             close();
110             throw new MessageSliceException(String.format("Error writing data for slice %d of message %s",
111                     sliceIndex, identifier), e);
112         }
113
114         return sealed;
115     }
116
117     /**
118      * Returns the assembled bytes as a ByteSource. This method must only be called after this instance is sealed.
119      *
120      * @return a ByteSource containing the assembled bytes
121      * @throws IOException if an error occurs obtaining the assembled bytes
122      * @throws IllegalStateException is this instance is not sealed
123      */
124     public ByteSource getAssembledBytes() throws IOException {
125         Preconditions.checkState(sealed, "Last slice not received yet");
126         return fileBackedStream.asByteSource();
127     }
128
129     private void validateSlice(final int sliceIndex, final int lastSliceHashCode) throws MessageSliceException {
130         if (closed) {
131             throw new AssemblerClosedException(identifier);
132         }
133
134         if (sealed) {
135             throw new AssemblerSealedException(String.format(
136                     "Received slice index for message %s but all %d expected slices have already already received.",
137                     identifier, totalSlices));
138         }
139
140         if (lastSliceIndexReceived + 1 != sliceIndex) {
141             close();
142             throw new MessageSliceException(String.format("Expected sliceIndex %d but got %d for message %s",
143                     lastSliceIndexReceived + 1, sliceIndex, identifier), true);
144         }
145
146         if (lastSliceHashCode != lastSliceHashCodeReceived) {
147             close();
148             throw new MessageSliceException(String.format("The hash code of the recorded last slice (%d) does not "
149                     + "match the senders last hash code (%d) for message %s", lastSliceHashCodeReceived,
150                     lastSliceHashCode, identifier), true);
151         }
152     }
153
154     @Override
155     public void close() {
156         if (closed) {
157             return;
158         }
159
160         closed = true;
161         if (!sealed) {
162             try {
163                 bufferedStream.close();
164             } catch (IOException e) {
165                 LOG.debug("{}: Error closing output stream", logContext, e);
166             }
167         }
168
169         fileBackedStream.cleanup();
170     }
171 }