2 * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.messaging;
10 import com.google.common.base.Preconditions;
11 import com.google.common.io.ByteSource;
12 import java.io.BufferedOutputStream;
13 import java.io.IOException;
14 import java.util.Arrays;
15 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
16 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
17 import org.opendaylight.yangtools.concepts.Identifier;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
22 * Maintains the state of an assembled message. This class is NOT thread-safe.
24 * @author Thomas Pantelis
26 public class AssembledMessageState implements AutoCloseable {
27 private static final Logger LOG = LoggerFactory.getLogger(AssembledMessageState.class);
29 private final int totalSlices;
30 private final BufferedOutputStream bufferedStream;
31 private final FileBackedOutputStream fileBackedStream;
32 private final Identifier identifier;
33 private final String logContext;
35 private int lastSliceIndexReceived = SlicedMessageState.FIRST_SLICE_INDEX - 1;
36 private int lastSliceHashCodeReceived = SlicedMessageState.INITIAL_SLICE_HASH_CODE;
37 private boolean sealed = false;
38 private boolean closed = false;
39 private long assembledSize;
44 * @param identifier the identifier for this instance
45 * @param totalSlices the total number of slices to expect
46 * @param fileBackedStreamFactory factory for creating the FileBackedOutputStream instance used for streaming
47 * @param logContext the context for log messages
49 public AssembledMessageState(final Identifier identifier, final int totalSlices,
50 final FileBackedOutputStreamFactory fileBackedStreamFactory, final String logContext) {
51 this.identifier = identifier;
52 this.totalSlices = totalSlices;
53 this.logContext = logContext;
55 fileBackedStream = fileBackedStreamFactory.newInstance();
56 bufferedStream = new BufferedOutputStream(fileBackedStream);
60 * Returns the identifier of this instance.
62 * @return the identifier
64 public Identifier getIdentifier() {
69 * Adds a slice to the assembled stream.
71 * @param sliceIndex the index of the slice
72 * @param data the sliced data
73 * @param lastSliceHashCode the hash code of the last slice sent
74 * @return true if this is the last slice received, false otherwise
75 * @throws MessageSliceException
77 * <li>if the slice index is invalid</li>
78 * <li>if the last slice hash code is invalid</li>
79 * <li>if an error occurs writing the data to the stream</li>
81 * In addition, this instance is automatically closed and can no longer be used.
82 * @throws AssemblerSealedException if this instance is already sealed (ie has received all the slices)
83 * @throws AssemblerClosedException if this instance is already closed
85 public boolean addSlice(final int sliceIndex, final byte[] data, final int lastSliceHashCode)
86 throws MessageSliceException {
87 if (LOG.isDebugEnabled()) {
88 LOG.debug("{}: addSlice: identifier: {}, sliceIndex: {}, lastSliceIndex: {}, assembledSize: {}, "
89 + "sliceHashCode: {}, lastSliceHashCode: {}", logContext, identifier, sliceIndex,
90 lastSliceIndexReceived, assembledSize, lastSliceHashCode, lastSliceHashCodeReceived);
94 validateSlice(sliceIndex, lastSliceHashCode);
96 assembledSize += data.length;
97 lastSliceIndexReceived = sliceIndex;
98 lastSliceHashCodeReceived = Arrays.hashCode(data);
100 bufferedStream.write(data);
102 sealed = sliceIndex == totalSlices;
104 bufferedStream.close();
106 } catch (IOException e) {
108 throw new MessageSliceException(String.format("Error writing data for slice %d of message %s",
109 sliceIndex, identifier), e);
116 * Returns the assembled bytes as a ByteSource. This method must only be called after this instance is sealed.
118 * @return a ByteSource containing the assembled bytes
119 * @throws IOException if an error occurs obtaining the assembled bytes
120 * @throws IllegalStateException is this instance is not sealed
122 public ByteSource getAssembledBytes() throws IOException {
123 Preconditions.checkState(sealed, "Last slice not received yet");
124 return fileBackedStream.asByteSource();
127 private void validateSlice(final int sliceIndex, final int lastSliceHashCode) throws MessageSliceException {
129 throw new AssemblerClosedException(identifier);
133 throw new AssemblerSealedException(String.format(
134 "Received slice index for message %s but all %d expected slices have already already received.",
135 identifier, totalSlices));
138 if (lastSliceIndexReceived + 1 != sliceIndex) {
140 throw new MessageSliceException(String.format("Expected sliceIndex %d but got %d for message %s",
141 lastSliceIndexReceived + 1, sliceIndex, identifier), true);
144 if (lastSliceHashCode != lastSliceHashCodeReceived) {
146 throw new MessageSliceException(String.format("The hash code of the recorded last slice (%d) does not "
147 + "match the senders last hash code (%d) for message %s", lastSliceHashCodeReceived,
148 lastSliceHashCode, identifier), true);
153 public void close() {
161 bufferedStream.close();
162 } catch (IOException e) {
163 LOG.debug("{}: Error closing output stream", logContext, e);
167 fileBackedStream.cleanup();