2 * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.messaging;
10 import com.google.common.base.Preconditions;
11 import com.google.common.io.ByteSource;
12 import java.io.BufferedOutputStream;
13 import java.io.IOException;
14 import java.util.Arrays;
15 import javax.annotation.concurrent.NotThreadSafe;
16 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
17 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
18 import org.opendaylight.yangtools.concepts.Identifier;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
23 * Maintains the state of an assembled message.
25 * @author Thomas Pantelis
28 public class AssembledMessageState implements AutoCloseable {
29 private static final Logger LOG = LoggerFactory.getLogger(AssembledMessageState.class);
31 private final int totalSlices;
32 private final BufferedOutputStream bufferedStream;
33 private final FileBackedOutputStream fileBackedStream;
34 private final Identifier identifier;
35 private final String logContext;
37 private int lastSliceIndexReceived = SlicedMessageState.FIRST_SLICE_INDEX - 1;
38 private int lastSliceHashCodeReceived = SlicedMessageState.INITIAL_SLICE_HASH_CODE;
39 private boolean sealed = false;
40 private boolean closed = false;
41 private long assembledSize;
46 * @param identifier the identifier for this instance
47 * @param totalSlices the total number of slices to expect
48 * @param fileBackedStreamFactory factory for creating the FileBackedOutputStream instance used for streaming
49 * @param logContext the context for log messages
51 public AssembledMessageState(final Identifier identifier, final int totalSlices,
52 final FileBackedOutputStreamFactory fileBackedStreamFactory, final String logContext) {
53 this.identifier = identifier;
54 this.totalSlices = totalSlices;
55 this.logContext = logContext;
57 fileBackedStream = fileBackedStreamFactory.newInstance();
58 bufferedStream = new BufferedOutputStream(fileBackedStream);
62 * Returns the identifier of this instance.
64 * @return the identifier
66 public Identifier getIdentifier() {
71 * Adds a slice to the assembled stream.
73 * @param sliceIndex the index of the slice
74 * @param data the sliced data
75 * @param lastSliceHashCode the hash code of the last slice sent
76 * @return true if this is the last slice received, false otherwise
77 * @throws MessageSliceException
79 * <li>if the slice index is invalid</li>
80 * <li>if the last slice hash code is invalid</li>
81 * <li>if an error occurs writing the data to the stream</li>
83 * In addition, this instance is automatically closed and can no longer be used.
84 * @throws AssemblerSealedException if this instance is already sealed (ie has received all the slices)
85 * @throws AssemblerClosedException if this instance is already closed
87 public boolean addSlice(final int sliceIndex, final byte[] data, final int lastSliceHashCode)
88 throws MessageSliceException {
89 if (LOG.isDebugEnabled()) {
90 LOG.debug("{}: addSlice: identifier: {}, sliceIndex: {}, lastSliceIndex: {}, assembledSize: {}, "
91 + "sliceHashCode: {}, lastSliceHashCode: {}", logContext, identifier, sliceIndex,
92 lastSliceIndexReceived, assembledSize, lastSliceHashCode, lastSliceHashCodeReceived);
96 validateSlice(sliceIndex, lastSliceHashCode);
98 assembledSize += data.length;
99 lastSliceIndexReceived = sliceIndex;
100 lastSliceHashCodeReceived = Arrays.hashCode(data);
102 bufferedStream.write(data);
104 sealed = sliceIndex == totalSlices;
106 bufferedStream.close();
108 } catch (IOException e) {
110 throw new MessageSliceException(String.format("Error writing data for slice %d of message %s",
111 sliceIndex, identifier), e);
118 * Returns the assembled bytes as a ByteSource. This method must only be called after this instance is sealed.
120 * @return a ByteSource containing the assembled bytes
121 * @throws IOException if an error occurs obtaining the assembled bytes
122 * @throws IllegalStateException is this instance is not sealed
124 public ByteSource getAssembledBytes() throws IOException {
125 Preconditions.checkState(sealed, "Last slice not received yet");
126 return fileBackedStream.asByteSource();
129 private void validateSlice(final int sliceIndex, final int lastSliceHashCode) throws MessageSliceException {
131 throw new AssemblerClosedException(identifier);
135 throw new AssemblerSealedException(String.format(
136 "Received slice index for message %s but all %d expected slices have already already received.",
137 identifier, totalSlices));
140 if (lastSliceIndexReceived + 1 != sliceIndex) {
142 throw new MessageSliceException(String.format("Expected sliceIndex %d but got %d for message %s",
143 lastSliceIndexReceived + 1, sliceIndex, identifier), true);
146 if (lastSliceHashCode != lastSliceHashCodeReceived) {
148 throw new MessageSliceException(String.format("The hash code of the recorded last slice (%d) does not "
149 + "match the senders last hash code (%d) for message %s", lastSliceHashCodeReceived,
150 lastSliceHashCode, identifier), true);
155 public void close() {
163 bufferedStream.close();
164 } catch (IOException e) {
165 LOG.debug("{}: Error closing output stream", logContext, e);
169 fileBackedStream.cleanup();