import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.io.ByteSource;
import java.io.IOException;
* @author Thomas Pantelis
* @see MessageSlicer
*/
-public class MessageAssembler implements AutoCloseable {
+public final class MessageAssembler implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(MessageAssembler.class);
private final Cache<Identifier, AssembledMessageState> stateCache;
- private final FileBackedOutputStreamFactory filedBackedStreamFactory;
+ private final FileBackedOutputStreamFactory fileBackedStreamFactory;
private final BiConsumer<Object, ActorRef> assembledMessageCallback;
private final String logContext;
- private MessageAssembler(Builder builder) {
- this.filedBackedStreamFactory = Preconditions.checkNotNull(builder.filedBackedStreamFactory,
+ MessageAssembler(final Builder builder) {
+ this.fileBackedStreamFactory = Preconditions.checkNotNull(builder.fileBackedStreamFactory,
"FiledBackedStreamFactory cannot be null");
this.assembledMessageCallback = Preconditions.checkNotNull(builder.assembledMessageCallback,
"assembledMessageCallback cannot be null");
stateCache = CacheBuilder.newBuilder()
.expireAfterAccess(builder.expireStateAfterInactivityDuration, builder.expireStateAfterInactivityUnit)
- .removalListener((RemovalListener<Identifier, AssembledMessageState>) notification ->
- stateRemoved(notification)).build();
+ .removalListener(this::stateRemoved).build();
}
/**
* @param message the message to check
* @return true if handled, false otherwise
*/
- public static boolean isHandledMessage(Object message) {
+ public static boolean isHandledMessage(final Object message) {
return message instanceof MessageSlice || message instanceof AbortSlicing;
}
if (messageSlice.getSliceIndex() == SlicedMessageState.FIRST_SLICE_INDEX) {
LOG.debug("{}: Received first slice for {} - creating AssembledMessageState", logContext, identifier);
return new AssembledMessageState(identifier, messageSlice.getTotalSlices(),
- filedBackedStreamFactory, logContext);
+ fileBackedStreamFactory, logContext);
}
LOG.debug("{}: AssembledMessageState not found for {} - returning failed reply", logContext, identifier);
messageSlice.getSliceIndex()), true);
}
- private void processMessageSliceForState(final MessageSlice messageSlice, AssembledMessageState state,
+ private void processMessageSliceForState(final MessageSlice messageSlice, final AssembledMessageState state,
final ActorRef sendTo) {
final Identifier identifier = messageSlice.getIdentifier();
final ActorRef replyTo = messageSlice.getReplyTo();
}
}
- private Object reAssembleMessage(final AssembledMessageState state) throws MessageSliceException {
+ private static Object reAssembleMessage(final AssembledMessageState state) throws MessageSliceException {
try {
final ByteSource assembledBytes = state.getAssembledBytes();
try (ObjectInputStream in = new ObjectInputStream(assembledBytes.openStream())) {
}
}
- private void onAbortSlicing(AbortSlicing message) {
+ private void onAbortSlicing(final AbortSlicing message) {
removeState(message.getIdentifier());
}
stateCache.invalidate(identifier);
}
- private void stateRemoved(RemovalNotification<Identifier, AssembledMessageState> notification) {
+ private void stateRemoved(final RemovalNotification<Identifier, AssembledMessageState> notification) {
if (notification.wasEvicted()) {
LOG.warn("{}: AssembledMessageState for {} was expired from the cache", logContext, notification.getKey());
} else {
}
@VisibleForTesting
- boolean hasState(Identifier forIdentifier) {
+ boolean hasState(final Identifier forIdentifier) {
boolean exists = stateCache.getIfPresent(forIdentifier) != null;
stateCache.cleanUp();
return exists;
}
public static class Builder {
- private FileBackedOutputStreamFactory filedBackedStreamFactory;
+ private FileBackedOutputStreamFactory fileBackedStreamFactory;
private BiConsumer<Object, ActorRef> assembledMessageCallback;
private long expireStateAfterInactivityDuration = 1;
private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
/**
* Sets the factory for creating FileBackedOutputStream instances used for streaming messages.
*
- * @param newFiledBackedStreamFactory the factory for creating FileBackedOutputStream instances
+ * @param newFileBackedStreamFactory the factory for creating FileBackedOutputStream instances
* @return this Builder
*/
- public Builder filedBackedStreamFactory(final FileBackedOutputStreamFactory newFiledBackedStreamFactory) {
- this.filedBackedStreamFactory = Preconditions.checkNotNull(newFiledBackedStreamFactory);
+ public Builder fileBackedStreamFactory(final FileBackedOutputStreamFactory newFileBackedStreamFactory) {
+ this.fileBackedStreamFactory = Preconditions.checkNotNull(newFileBackedStreamFactory);
return this;
}