2 * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.messaging;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
13 import akka.actor.ActorRef;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.cache.Cache;
16 import com.google.common.cache.CacheBuilder;
17 import com.google.common.cache.RemovalNotification;
18 import java.io.IOException;
19 import java.io.ObjectOutputStream;
20 import java.io.Serializable;
21 import java.util.Optional;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.atomic.AtomicLong;
24 import java.util.function.Predicate;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
27 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
28 import org.opendaylight.yangtools.concepts.Identifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * This class slices messages into smaller chunks. {@link MessageAssembler} is used to re-assemble the messages.
35 * @author Thomas Pantelis
36 * @see MessageAssembler
38 public class MessageSlicer implements AutoCloseable {
39 private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
40 private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
41 public static final int DEFAULT_MAX_SLICING_TRIES = 3;
43 private final Cache<MessageSliceIdentifier, SlicedMessageState<ActorRef>> stateCache;
44 private final FileBackedOutputStreamFactory fileBackedStreamFactory;
45 private final int messageSliceSize;
46 private final int maxSlicingTries;
47 private final String logContext;
48 private final long id;
50 MessageSlicer(final Builder builder) {
51 fileBackedStreamFactory = builder.fileBackedStreamFactory;
52 messageSliceSize = builder.messageSliceSize;
53 maxSlicingTries = builder.maxSlicingTries;
55 id = SLICER_ID_COUNTER.getAndIncrement();
56 logContext = builder.logContext + "_slicer-id-" + id;
58 CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder =
59 CacheBuilder.newBuilder().removalListener(this::stateRemoved);
60 if (builder.expireStateAfterInactivityDuration > 0) {
61 cacheBuilder = cacheBuilder.expireAfterAccess(builder.expireStateAfterInactivityDuration,
62 builder.expireStateAfterInactivityUnit);
64 stateCache = cacheBuilder.build();
73 * Returns a new Builder for creating MessageSlicer instances.
75 * @return a Builder instance
77 public static Builder builder() {
82 * Checks if the given message is handled by this class. If so, it should be forwarded to the
83 * {@link #handleMessage(Object)} method
85 * @param message the message to check
86 * @return true if handled, false otherwise
88 public static boolean isHandledMessage(final Object message) {
89 return message instanceof MessageSliceReply;
93 * Slices a message into chunks based on the serialized size, the maximum message slice size and the given
96 * @param options the SliceOptions
97 * @return true if the message was sliced, false otherwise
99 public boolean slice(final SliceOptions options) {
100 final Identifier identifier = options.getIdentifier();
101 final Serializable message = options.getMessage();
102 final FileBackedOutputStream fileBackedStream;
103 if (message != null) {
104 LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
106 requireNonNull(fileBackedStreamFactory,
107 "The FiledBackedStreamFactory must be set in order to call this slice method");
109 // Serialize the message to a FileBackedOutputStream.
110 fileBackedStream = fileBackedStreamFactory.newInstance();
111 try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
112 out.writeObject(message);
113 } catch (IOException e) {
114 LOG.debug("{}: Error serializing message for {}", logContext, identifier, e);
115 fileBackedStream.cleanup();
116 options.getOnFailureCallback().accept(e);
120 fileBackedStream = options.getFileBackedStream();
123 return initializeSlicing(options, fileBackedStream);
126 private boolean initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
127 final Identifier identifier = options.getIdentifier();
128 MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id);
129 SlicedMessageState<ActorRef> state = null;
131 state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
132 options.getReplyTo(), options.getOnFailureCallback(), logContext);
134 final Serializable message = options.getMessage();
135 if (state.getTotalSlices() == 1 && message != null) {
136 LOG.debug("{}: Message does not need to be sliced - sending original message", logContext);
138 sendTo(options, message, options.getReplyTo());
142 final MessageSlice firstSlice = getNextSliceMessage(state);
144 LOG.debug("{}: Sending first slice: {}", logContext, firstSlice);
146 stateCache.put(messageSliceId, state);
147 sendTo(options, firstSlice, ActorRef.noSender());
149 } catch (IOException e) {
150 LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e);
154 fileBackedStream.cleanup();
157 options.getOnFailureCallback().accept(e);
162 private static void sendTo(final SliceOptions options, final Object message, final ActorRef sender) {
163 if (options.getSendToRef() != null) {
164 options.getSendToRef().tell(message, sender);
166 options.getSendToSelection().tell(message, sender);
171 * Invoked to handle messages pertaining to this class.
173 * @param message the message
174 * @return true if the message was handled, false otherwise
176 public boolean handleMessage(final Object message) {
177 if (message instanceof MessageSliceReply sliceReply) {
178 LOG.debug("{}: handleMessage: {}", logContext, sliceReply);
179 return onMessageSliceReply(sliceReply);
186 * Checks for and removes sliced message state that has expired due to inactivity from the assembling component
189 public void checkExpiredSlicedMessageState() {
190 if (stateCache.size() > 0) {
191 stateCache.cleanUp();
196 * Closes and removes all in-progress sliced message state.
199 public void close() {
200 LOG.debug("{}: Closing", logContext);
201 stateCache.invalidateAll();
205 * Cancels all in-progress sliced message state that matches the given filter.
207 * @param filter filters by Identifier
209 public void cancelSlicing(final @NonNull Predicate<Identifier> filter) {
210 stateCache.asMap().keySet().removeIf(
211 messageSliceIdentifier -> filter.test(messageSliceIdentifier.getClientIdentifier()));
214 private static MessageSlice getNextSliceMessage(final SlicedMessageState<ActorRef> state) throws IOException {
215 final byte[] firstSliceBytes = state.getNextSlice();
216 return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),
217 state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
220 private boolean onMessageSliceReply(final MessageSliceReply reply) {
221 final Identifier identifier = reply.getIdentifier();
222 if (!(identifier instanceof MessageSliceIdentifier sliceIdentifier) || sliceIdentifier.getSlicerId() != id) {
226 final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
228 LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
229 reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
233 synchronized (state) {
235 final Optional<MessageSliceException> failure = reply.getFailure();
236 if (failure.isPresent()) {
237 LOG.warn("{}: Received failed {}", logContext, reply);
238 processMessageSliceException(failure.orElseThrow(), state, reply.getSendTo());
242 if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
243 LOG.warn("{}: Slice index {} in {} does not match expected index {}", logContext,
244 reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
245 reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
246 possiblyRetrySlicing(state, reply.getSendTo());
250 if (state.isLastSlice(reply.getSliceIndex())) {
251 LOG.debug("{}: Received last slice reply for {}", logContext, identifier);
252 removeState(identifier);
254 final MessageSlice nextSlice = getNextSliceMessage(state);
255 LOG.debug("{}: Sending next slice: {}", logContext, nextSlice);
256 reply.getSendTo().tell(nextSlice, ActorRef.noSender());
258 } catch (IOException e) {
259 LOG.warn("{}: Error processing {}", logContext, reply, e);
267 private void processMessageSliceException(final MessageSliceException exception,
268 final SlicedMessageState<ActorRef> state, final ActorRef sendTo) throws IOException {
269 if (exception.isRetriable()) {
270 possiblyRetrySlicing(state, sendTo);
272 fail(state, exception.getCause() != null ? exception.getCause() : exception);
276 private void possiblyRetrySlicing(final SlicedMessageState<ActorRef> state, final ActorRef sendTo)
278 if (state.canRetry()) {
279 LOG.info("{}: Retrying message slicing for {}", logContext, state.getIdentifier());
281 sendTo.tell(getNextSliceMessage(state), ActorRef.noSender());
283 String message = String.format("Maximum slicing retries reached for identifier %s - failing the message",
284 state.getIdentifier());
286 fail(state, new RuntimeException(message));
290 private void removeState(final Identifier identifier) {
291 LOG.debug("{}: Removing state for {}", logContext, identifier);
292 stateCache.invalidate(identifier);
295 private void stateRemoved(final RemovalNotification<Identifier, SlicedMessageState<ActorRef>> notification) {
296 final SlicedMessageState<ActorRef> state = notification.getValue();
298 if (notification.wasEvicted()) {
299 LOG.warn("{}: SlicedMessageState for {} was expired from the cache", logContext, notification.getKey());
300 state.getOnFailureCallback().accept(new RuntimeException(String.format(
301 "The slicing state for message identifier %s was expired due to inactivity from the assembling "
302 + "component on the other end", state.getIdentifier())));
304 LOG.debug("{}: SlicedMessageState for {} was removed from the cache due to {}", logContext,
305 notification.getKey(), notification.getCause());
309 private void fail(final SlicedMessageState<ActorRef> state, final Throwable failure) {
310 removeState(state.getIdentifier());
311 state.getOnFailureCallback().accept(failure);
315 boolean hasState(final Identifier forIdentifier) {
316 boolean exists = stateCache.getIfPresent(forIdentifier) != null;
317 stateCache.cleanUp();
321 public static class Builder {
322 private FileBackedOutputStreamFactory fileBackedStreamFactory;
323 private int messageSliceSize = -1;
324 private long expireStateAfterInactivityDuration = -1;
325 private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
326 private int maxSlicingTries = DEFAULT_MAX_SLICING_TRIES;
327 private String logContext = "<no-context>";
330 * Sets the factory for creating FileBackedOutputStream instances used for streaming messages. This factory
331 * is used by the {@link MessageSlicer#slice(SliceOptions)} method if a Serializable message is passed.
332 * If Serializable messages aren't passed then the factory need not be set.
334 * @param newFileBackedStreamFactory the factory for creating FileBackedOutputStream instances
335 * @return this Builder
337 public Builder fileBackedStreamFactory(final FileBackedOutputStreamFactory newFileBackedStreamFactory) {
338 fileBackedStreamFactory = requireNonNull(newFileBackedStreamFactory);
343 * Sets the maximum size (in bytes) for a message slice.
345 * @param newMessageSliceSize the maximum size (in bytes)
346 * @return this Builder
348 public Builder messageSliceSize(final int newMessageSliceSize) {
349 checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0");
350 messageSliceSize = newMessageSliceSize;
355 * Sets the maximum number of tries for slicing a message. If exceeded, slicing fails. The default is
356 * defined by {@link #DEFAULT_MAX_SLICING_TRIES}
358 * @param newMaxSlicingTries the maximum number of tries
359 * @return this Builder
361 public Builder maxSlicingTries(final int newMaxSlicingTries) {
362 checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0");
363 maxSlicingTries = newMaxSlicingTries;
368 * Sets the duration and time unit whereby sliced message state is purged from the cache and the associated
369 * failure callback is notified due to inactivity from the assembling component on the other end. By default,
370 * state is not purged due to inactivity.
372 * @param duration the length of time after which a state entry is purged
373 * @param unit the unit the duration is expressed in
374 * @return this Builder
376 public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
377 checkArgument(duration > 0, "duration must be > 0");
378 expireStateAfterInactivityDuration = duration;
379 expireStateAfterInactivityUnit = unit;
384 * Sets the context for log messages.
386 * @param newLogContext the log context
387 * @return this Builder
389 public Builder logContext(final String newLogContext) {
390 logContext = requireNonNull(newLogContext);
395 * Builds a new MessageSlicer instance.
397 * @return a new MessageSlicer
399 public MessageSlicer build() {
400 return new MessageSlicer(this);