*/
package org.opendaylight.controller.cluster.messaging;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
import org.opendaylight.yangtools.concepts.Identifier;
private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
public static final int DEFAULT_MAX_SLICING_TRIES = 3;
- private final Cache<Identifier, SlicedMessageState<ActorRef>> stateCache;
- private final FileBackedOutputStreamFactory filedBackedStreamFactory;
+ private final Cache<MessageSliceIdentifier, SlicedMessageState<ActorRef>> stateCache;
+ private final FileBackedOutputStreamFactory fileBackedStreamFactory;
private final int messageSliceSize;
private final int maxSlicingTries;
private final String logContext;
private final long id;
- private MessageSlicer(Builder builder) {
- this.filedBackedStreamFactory = builder.filedBackedStreamFactory;
+ MessageSlicer(final Builder builder) {
+ this.fileBackedStreamFactory = builder.fileBackedStreamFactory;
this.messageSliceSize = builder.messageSliceSize;
this.maxSlicingTries = builder.maxSlicingTries;
id = SLICER_ID_COUNTER.getAndIncrement();
this.logContext = builder.logContext + "_slicer-id-" + id;
- CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder = CacheBuilder.newBuilder().removalListener(
- (RemovalListener<Identifier, SlicedMessageState<ActorRef>>) notification -> stateRemoved(notification));
+ CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder =
+ CacheBuilder.newBuilder().removalListener(this::stateRemoved);
if (builder.expireStateAfterInactivityDuration > 0) {
cacheBuilder = cacheBuilder.expireAfterAccess(builder.expireStateAfterInactivityDuration,
builder.expireStateAfterInactivityUnit);
}
-
stateCache = cacheBuilder.build();
}
* @param message the message to check
* @return true if handled, false otherwise
*/
- public static boolean isHandledMessage(Object message) {
+ public static boolean isHandledMessage(final Object message) {
return message instanceof MessageSliceReply;
}
* options.
*
* @param options the SliceOptions
+ * @return true if the message was sliced, false otherwise
*/
- public void slice(SliceOptions options) {
+ public boolean slice(final SliceOptions options) {
final Identifier identifier = options.getIdentifier();
final Serializable message = options.getMessage();
final FileBackedOutputStream fileBackedStream;
if (message != null) {
LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
- Preconditions.checkNotNull(filedBackedStreamFactory,
+ requireNonNull(fileBackedStreamFactory,
"The FiledBackedStreamFactory must be set in order to call this slice method");
// Serialize the message to a FileBackedOutputStream.
- fileBackedStream = filedBackedStreamFactory.newInstance();
+ fileBackedStream = fileBackedStreamFactory.newInstance();
try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
out.writeObject(message);
} catch (IOException e) {
LOG.debug("{}: Error serializing message for {}", logContext, identifier, e);
fileBackedStream.cleanup();
options.getOnFailureCallback().accept(e);
- return;
+ return false;
}
} else {
fileBackedStream = options.getFileBackedStream();
}
- initializeSlicing(options, fileBackedStream);
+ return initializeSlicing(options, fileBackedStream);
}
- private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
+ private boolean initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
final Identifier identifier = options.getIdentifier();
MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id);
SlicedMessageState<ActorRef> state = null;
LOG.debug("{}: Message does not need to be sliced - sending original message", logContext);
state.close();
sendTo(options, message, options.getReplyTo());
- return;
+ return false;
}
final MessageSlice firstSlice = getNextSliceMessage(state);
stateCache.put(messageSliceId, state);
sendTo(options, firstSlice, ActorRef.noSender());
+ return true;
} catch (IOException e) {
LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e);
if (state != null) {
}
options.getOnFailureCallback().accept(e);
+ return false;
}
}
- private void sendTo(SliceOptions options, Object message, ActorRef sender) {
+ private static void sendTo(final SliceOptions options, final Object message, final ActorRef sender) {
if (options.getSendToRef() != null) {
options.getSendToRef().tell(message, sender);
} else {
stateCache.invalidateAll();
}
- private MessageSlice getNextSliceMessage(SlicedMessageState<ActorRef> state) throws IOException {
+ /**
+ * Cancels all in-progress sliced message state that matches the given filter.
+ *
+ * @param filter filters by Identifier
+ */
+ public void cancelSlicing(final @NonNull Predicate<Identifier> filter) {
+ stateCache.asMap().keySet().removeIf(
+ messageSliceIdentifier -> filter.test(messageSliceIdentifier.getClientIdentifier()));
+ }
+
+ private static MessageSlice getNextSliceMessage(final SlicedMessageState<ActorRef> state) throws IOException {
final byte[] firstSliceBytes = state.getNextSlice();
return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),
state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
stateCache.invalidate(identifier);
}
- private void stateRemoved(RemovalNotification<Identifier, SlicedMessageState<ActorRef>> notification) {
+ private void stateRemoved(final RemovalNotification<Identifier, SlicedMessageState<ActorRef>> notification) {
final SlicedMessageState<ActorRef> state = notification.getValue();
state.close();
if (notification.wasEvicted()) {
}
@VisibleForTesting
- boolean hasState(Identifier forIdentifier) {
+ boolean hasState(final Identifier forIdentifier) {
boolean exists = stateCache.getIfPresent(forIdentifier) != null;
stateCache.cleanUp();
return exists;
}
public static class Builder {
- private FileBackedOutputStreamFactory filedBackedStreamFactory;
+ private FileBackedOutputStreamFactory fileBackedStreamFactory;
private int messageSliceSize = -1;
private long expireStateAfterInactivityDuration = -1;
private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
* is used by the {@link MessageSlicer#slice(SliceOptions)} method if a Serializable message is passed.
* If Serializable messages aren't passed then the factory need not be set.
*
- * @param newFiledBackedStreamFactory the factory for creating FileBackedOutputStream instances
+ * @param newFileBackedStreamFactory the factory for creating FileBackedOutputStream instances
* @return this Builder
*/
- public Builder filedBackedStreamFactory(final FileBackedOutputStreamFactory newFiledBackedStreamFactory) {
- this.filedBackedStreamFactory = Preconditions.checkNotNull(newFiledBackedStreamFactory);
+ public Builder fileBackedStreamFactory(final FileBackedOutputStreamFactory newFileBackedStreamFactory) {
+ this.fileBackedStreamFactory = requireNonNull(newFileBackedStreamFactory);
return this;
}
* @return this Builder
*/
public Builder messageSliceSize(final int newMessageSliceSize) {
- Preconditions.checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0");
+ checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0");
this.messageSliceSize = newMessageSliceSize;
return this;
}
* @return this Builder
*/
public Builder maxSlicingTries(final int newMaxSlicingTries) {
- Preconditions.checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0");
+ checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0");
this.maxSlicingTries = newMaxSlicingTries;
return this;
}
* @return this Builder
*/
public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
- Preconditions.checkArgument(duration > 0, "duration must be > 0");
+ checkArgument(duration > 0, "duration must be > 0");
this.expireStateAfterInactivityDuration = duration;
this.expireStateAfterInactivityUnit = unit;
return this;
* @return this Builder
*/
public Builder logContext(final String newLogContext) {
- this.logContext = Preconditions.checkNotNull(newLogContext);
+ this.logContext = requireNonNull(newLogContext);
return this;
}