import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.io.ByteSource;
import java.io.IOException;
* @author Thomas Pantelis
* @see MessageSlicer
*/
-public class MessageAssembler implements AutoCloseable {
+public final class MessageAssembler implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(MessageAssembler.class);
private final Cache<Identifier, AssembledMessageState> stateCache;
private final BiConsumer<Object, ActorRef> assembledMessageCallback;
private final String logContext;
- private MessageAssembler(Builder builder) {
+ MessageAssembler(final Builder builder) {
this.fileBackedStreamFactory = Preconditions.checkNotNull(builder.fileBackedStreamFactory,
"FiledBackedStreamFactory cannot be null");
this.assembledMessageCallback = Preconditions.checkNotNull(builder.assembledMessageCallback,
stateCache = CacheBuilder.newBuilder()
.expireAfterAccess(builder.expireStateAfterInactivityDuration, builder.expireStateAfterInactivityUnit)
- .removalListener((RemovalListener<Identifier, AssembledMessageState>) notification ->
- stateRemoved(notification)).build();
+ .removalListener(this::stateRemoved).build();
}
/**
* @param message the message to check
* @return true if handled, false otherwise
*/
- public static boolean isHandledMessage(Object message) {
+ public static boolean isHandledMessage(final Object message) {
return message instanceof MessageSlice || message instanceof AbortSlicing;
}
messageSlice.getSliceIndex()), true);
}
- private void processMessageSliceForState(final MessageSlice messageSlice, AssembledMessageState state,
+ private void processMessageSliceForState(final MessageSlice messageSlice, final AssembledMessageState state,
final ActorRef sendTo) {
final Identifier identifier = messageSlice.getIdentifier();
final ActorRef replyTo = messageSlice.getReplyTo();
}
}
- private Object reAssembleMessage(final AssembledMessageState state) throws MessageSliceException {
+ private static Object reAssembleMessage(final AssembledMessageState state) throws MessageSliceException {
try {
final ByteSource assembledBytes = state.getAssembledBytes();
try (ObjectInputStream in = new ObjectInputStream(assembledBytes.openStream())) {
}
}
- private void onAbortSlicing(AbortSlicing message) {
+ private void onAbortSlicing(final AbortSlicing message) {
removeState(message.getIdentifier());
}
stateCache.invalidate(identifier);
}
- private void stateRemoved(RemovalNotification<Identifier, AssembledMessageState> notification) {
+ private void stateRemoved(final RemovalNotification<Identifier, AssembledMessageState> notification) {
if (notification.wasEvicted()) {
LOG.warn("{}: AssembledMessageState for {} was expired from the cache", logContext, notification.getKey());
} else {
}
@VisibleForTesting
- boolean hasState(Identifier forIdentifier) {
+ boolean hasState(final Identifier forIdentifier) {
boolean exists = stateCache.getIfPresent(forIdentifier) != null;
stateCache.cleanUp();
return exists;