Move Lithium-specific logic from AbstractNormalizedNodeDataOutput
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / messaging / AssembledMessageState.java
1 /*
2  * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.messaging;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.io.ByteSource;
12 import java.io.BufferedOutputStream;
13 import java.io.IOException;
14 import java.util.Arrays;
15 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
16 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
17 import org.opendaylight.yangtools.concepts.Identifier;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 /**
22  * Maintains the state of an assembled message. This class is NOT thread-safe.
23  *
24  * @author Thomas Pantelis
25  */
26 public class AssembledMessageState implements AutoCloseable {
27     private static final Logger LOG = LoggerFactory.getLogger(AssembledMessageState.class);
28
29     private final int totalSlices;
30     private final BufferedOutputStream bufferedStream;
31     private final FileBackedOutputStream fileBackedStream;
32     private final Identifier identifier;
33     private final String logContext;
34
35     private int lastSliceIndexReceived = SlicedMessageState.FIRST_SLICE_INDEX - 1;
36     private int lastSliceHashCodeReceived = SlicedMessageState.INITIAL_SLICE_HASH_CODE;
37     private boolean sealed = false;
38     private boolean closed = false;
39     private long assembledSize;
40
41     /**
42      * Constructor.
43      *
44      * @param identifier the identifier for this instance
45      * @param totalSlices the total number of slices to expect
46      * @param fileBackedStreamFactory factory for creating the FileBackedOutputStream instance used for streaming
47      * @param logContext the context for log messages
48      */
49     public AssembledMessageState(final Identifier identifier, final int totalSlices,
50             final FileBackedOutputStreamFactory fileBackedStreamFactory, final String logContext) {
51         this.identifier = identifier;
52         this.totalSlices = totalSlices;
53         this.logContext = logContext;
54
55         fileBackedStream = fileBackedStreamFactory.newInstance();
56         bufferedStream = new BufferedOutputStream(fileBackedStream);
57     }
58
59     /**
60      * Returns the identifier of this instance.
61      *
62      * @return the identifier
63      */
64     public Identifier getIdentifier() {
65         return identifier;
66     }
67
68     /**
69      * Adds a slice to the assembled stream.
70      *
71      * @param sliceIndex the index of the slice
72      * @param data the sliced data
73      * @param lastSliceHashCode the hash code of the last slice sent
74      * @return true if this is the last slice received, false otherwise
75      * @throws MessageSliceException
76      *         <ul>
77      *         <li>if the slice index is invalid</li>
78      *         <li>if the last slice hash code is invalid</li>
79      *         <li>if an error occurs writing the data to the stream</li>
80      *         </ul>
81      *         In addition, this instance is automatically closed and can no longer be used.
82      * @throws AssemblerSealedException if this instance is already sealed (ie has received all the slices)
83      * @throws AssemblerClosedException if this instance is already closed
84      */
85     public boolean addSlice(final int sliceIndex, final byte[] data, final int lastSliceHashCode)
86             throws MessageSliceException {
87         if (LOG.isDebugEnabled()) {
88             LOG.debug("{}: addSlice: identifier: {}, sliceIndex: {}, lastSliceIndex: {}, assembledSize: {}, "
89                     + "sliceHashCode: {}, lastSliceHashCode: {}", logContext, identifier, sliceIndex,
90                     lastSliceIndexReceived, assembledSize, lastSliceHashCode, lastSliceHashCodeReceived);
91         }
92
93         try {
94             validateSlice(sliceIndex, lastSliceHashCode);
95
96             assembledSize += data.length;
97             lastSliceIndexReceived = sliceIndex;
98             lastSliceHashCodeReceived = Arrays.hashCode(data);
99
100             bufferedStream.write(data);
101
102             sealed = sliceIndex == totalSlices;
103             if (sealed) {
104                 bufferedStream.close();
105             }
106         } catch (IOException e) {
107             close();
108             throw new MessageSliceException(String.format("Error writing data for slice %d of message %s",
109                     sliceIndex, identifier), e);
110         }
111
112         return sealed;
113     }
114
115     /**
116      * Returns the assembled bytes as a ByteSource. This method must only be called after this instance is sealed.
117      *
118      * @return a ByteSource containing the assembled bytes
119      * @throws IOException if an error occurs obtaining the assembled bytes
120      * @throws IllegalStateException is this instance is not sealed
121      */
122     public ByteSource getAssembledBytes() throws IOException {
123         Preconditions.checkState(sealed, "Last slice not received yet");
124         return fileBackedStream.asByteSource();
125     }
126
127     private void validateSlice(final int sliceIndex, final int lastSliceHashCode) throws MessageSliceException {
128         if (closed) {
129             throw new AssemblerClosedException(identifier);
130         }
131
132         if (sealed) {
133             throw new AssemblerSealedException(String.format(
134                     "Received slice index for message %s but all %d expected slices have already already received.",
135                     identifier, totalSlices));
136         }
137
138         if (lastSliceIndexReceived + 1 != sliceIndex) {
139             close();
140             throw new MessageSliceException(String.format("Expected sliceIndex %d but got %d for message %s",
141                     lastSliceIndexReceived + 1, sliceIndex, identifier), true);
142         }
143
144         if (lastSliceHashCode != lastSliceHashCodeReceived) {
145             close();
146             throw new MessageSliceException(String.format("The hash code of the recorded last slice (%d) does not "
147                     + "match the senders last hash code (%d) for message %s", lastSliceHashCodeReceived,
148                     lastSliceHashCode, identifier), true);
149         }
150     }
151
152     @Override
153     public void close() {
154         if (closed) {
155             return;
156         }
157
158         closed = true;
159         if (!sealed) {
160             try {
161                 bufferedStream.close();
162             } catch (IOException e) {
163                 LOG.debug("{}: Error closing output stream", logContext, e);
164             }
165         }
166
167         fileBackedStream.cleanup();
168     }
169 }