import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
-import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
private final String logContext;
private final long id;
- private MessageSlicer(final Builder builder) {
+ MessageSlicer(final Builder builder) {
this.fileBackedStreamFactory = builder.fileBackedStreamFactory;
this.messageSliceSize = builder.messageSliceSize;
this.maxSlicingTries = builder.maxSlicingTries;
id = SLICER_ID_COUNTER.getAndIncrement();
this.logContext = builder.logContext + "_slicer-id-" + id;
- CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder = CacheBuilder.newBuilder().removalListener(
- (RemovalListener<Identifier, SlicedMessageState<ActorRef>>) notification -> stateRemoved(notification));
+ CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder =
+ CacheBuilder.newBuilder().removalListener(this::stateRemoved);
if (builder.expireStateAfterInactivityDuration > 0) {
cacheBuilder = cacheBuilder.expireAfterAccess(builder.expireStateAfterInactivityDuration,
builder.expireStateAfterInactivityUnit);
}
-
stateCache = cacheBuilder.build();
}
* @param filter filters by Identifier
*/
public void cancelSlicing(@Nonnull final Predicate<Identifier> filter) {
- final Iterator<MessageSliceIdentifier> iter = stateCache.asMap().keySet().iterator();
- while (iter.hasNext()) {
- if (filter.test(iter.next().getClientIdentifier())) {
- iter.remove();
- }
- }
+ stateCache.asMap().keySet().removeIf(
+ messageSliceIdentifier -> filter.test(messageSliceIdentifier.getClientIdentifier()));
}
private static MessageSlice getNextSliceMessage(final SlicedMessageState<ActorRef> state) throws IOException {