2 * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.messaging;
10 import akka.actor.ActorRef;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.cache.Cache;
14 import com.google.common.cache.CacheBuilder;
15 import com.google.common.cache.RemovalListener;
16 import com.google.common.cache.RemovalNotification;
17 import java.io.IOException;
18 import java.io.ObjectOutputStream;
19 import java.io.Serializable;
20 import java.util.Optional;
21 import java.util.concurrent.TimeUnit;
22 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
23 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
24 import org.opendaylight.yangtools.concepts.Identifier;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
29 * This class slices messages into smaller chunks. {@link MessageAssembler} is used to re-assemble the messages.
31 * @author Thomas Pantelis
32 * @see MessageAssembler
34 public class MessageSlicer implements AutoCloseable {
35 private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
36 public static final int DEFAULT_MAX_SLICING_TRIES = 3;
38 private final Cache<Identifier, SlicedMessageState<ActorRef>> stateCache;
39 private final FileBackedOutputStreamFactory filedBackedStreamFactory;
40 private final int messageSliceSize;
41 private final int maxSlicingTries;
42 private final String logContext;
44 private MessageSlicer(Builder builder) {
45 this.filedBackedStreamFactory = builder.filedBackedStreamFactory;
46 this.messageSliceSize = builder.messageSliceSize;
47 this.maxSlicingTries = builder.maxSlicingTries;
48 this.logContext = builder.logContext;
50 CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder = CacheBuilder.newBuilder().removalListener(
51 (RemovalListener<Identifier, SlicedMessageState<ActorRef>>) notification -> stateRemoved(notification));
52 if (builder.expireStateAfterInactivityDuration > 0) {
53 cacheBuilder = cacheBuilder.expireAfterAccess(builder.expireStateAfterInactivityDuration,
54 builder.expireStateAfterInactivityUnit);
57 stateCache = cacheBuilder.build();
61 * Returns a new Builder for creating MessageSlicer instances.
63 * @return a Builder instance
65 public static Builder builder() {
70 * Checks if the given message is handled by this class. If so, it should be forwarded to the
71 * {@link #handleMessage(Object)} method
73 * @param message the message to check
74 * @return true if handled, false otherwise
76 public static boolean isHandledMessage(Object message) {
77 return message instanceof MessageSliceReply;
81 * Slices a message into chunks based on the serialized size, the maximum message slice size and the given
84 * @param options the SliceOptions
86 public void slice(SliceOptions options) {
87 final Identifier identifier = options.getIdentifier();
88 final Serializable message = options.getMessage();
89 final FileBackedOutputStream fileBackedStream;
90 if (message != null) {
91 LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
94 Preconditions.checkNotNull(filedBackedStreamFactory,
95 "The FiledBackedStreamFactory must be set in order to call this slice method");
97 // Serialize the message to a FileBackedOutputStream.
98 fileBackedStream = filedBackedStreamFactory.newInstance();
99 try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
100 out.writeObject(message);
101 } catch (IOException e) {
102 LOG.debug("{}: Error serializing message for {}", logContext, identifier, e);
103 fileBackedStream.cleanup();
104 options.getOnFailureCallback().accept(e);
108 fileBackedStream = options.getFileBackedStream();
111 initializeSlicing(options, fileBackedStream);
114 private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
115 final Identifier identifier = options.getIdentifier();
116 MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier);
117 SlicedMessageState<ActorRef> state = null;
119 state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
120 options.getReplyTo(), options.getOnFailureCallback(), logContext);
122 final Serializable message = options.getMessage();
123 if (state.getTotalSlices() == 1 && message != null) {
124 LOG.debug("{}: Message does not need to be sliced - sending original message", logContext);
126 sendTo(options, message, options.getReplyTo());
130 final MessageSlice firstSlice = getNextSliceMessage(state);
132 LOG.debug("{}: Sending first slice: {}", logContext, firstSlice);
134 stateCache.put(messageSliceId, state);
135 sendTo(options, firstSlice, ActorRef.noSender());
136 } catch (IOException e) {
137 LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e);
141 fileBackedStream.cleanup();
144 options.getOnFailureCallback().accept(e);
148 private void sendTo(SliceOptions options, Object message, ActorRef sender) {
149 if (options.getSendToRef() != null) {
150 options.getSendToRef().tell(message, sender);
152 options.getSendToSelection().tell(message, sender);
157 * Invoked to handle messages pertaining to this class.
159 * @param message the message
160 * @return true if the message was handled, false otherwise
162 public boolean handleMessage(final Object message) {
163 if (message instanceof MessageSliceReply) {
164 LOG.debug("{}: handleMessage: {}", logContext, message);
165 onMessageSliceReply((MessageSliceReply) message);
173 * Checks for and removes sliced message state that has expired due to inactivity from the assembling component
176 public void checkExpiredSlicedMessageState() {
177 if (stateCache.size() > 0) {
178 stateCache.cleanUp();
183 * Closes and removes all in-progress sliced message state.
186 public void close() {
187 LOG.debug("{}: Closing", logContext);
188 stateCache.invalidateAll();
191 private MessageSlice getNextSliceMessage(SlicedMessageState<ActorRef> state) throws IOException {
192 final byte[] firstSliceBytes = state.getNextSlice();
193 return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),
194 state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
197 private void onMessageSliceReply(final MessageSliceReply reply) {
198 final Identifier identifier = reply.getIdentifier();
199 final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
201 LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
202 reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
206 synchronized (state) {
208 final Optional<MessageSliceException> failure = reply.getFailure();
209 if (failure.isPresent()) {
210 LOG.warn("{}: Received failed {}", logContext, reply);
211 processMessageSliceException(failure.get(), state, reply.getSendTo());
215 if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
216 LOG.warn("{}: Slice index {} in {} does not match expected index {}", logContext,
217 reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
218 reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
219 possiblyRetrySlicing(state, reply.getSendTo());
223 if (state.isLastSlice(reply.getSliceIndex())) {
224 LOG.debug("{}: Received last slice reply for {}", logContext, identifier);
225 removeState(identifier);
227 final MessageSlice nextSlice = getNextSliceMessage(state);
228 LOG.debug("{}: Sending next slice: {}", logContext, nextSlice);
229 reply.getSendTo().tell(nextSlice, ActorRef.noSender());
231 } catch (IOException e) {
232 LOG.warn("{}: Error processing {}", logContext, reply, e);
238 private void processMessageSliceException(final MessageSliceException exception,
239 final SlicedMessageState<ActorRef> state, final ActorRef sendTo) throws IOException {
240 if (exception.isRetriable()) {
241 possiblyRetrySlicing(state, sendTo);
243 fail(state, exception.getCause() != null ? exception.getCause() : exception);
247 private void possiblyRetrySlicing(final SlicedMessageState<ActorRef> state, final ActorRef sendTo)
249 if (state.canRetry()) {
250 LOG.info("{}: Retrying message slicing for {}", logContext, state.getIdentifier());
252 sendTo.tell(getNextSliceMessage(state), ActorRef.noSender());
254 String message = String.format("Maximum slicing retries reached for identifier %s - failing the message",
255 state.getIdentifier());
257 fail(state, new RuntimeException(message));
261 private void removeState(final Identifier identifier) {
262 LOG.debug("{}: Removing state for {}", logContext, identifier);
263 stateCache.invalidate(identifier);
266 private void stateRemoved(RemovalNotification<Identifier, SlicedMessageState<ActorRef>> notification) {
267 final SlicedMessageState<ActorRef> state = notification.getValue();
269 if (notification.wasEvicted()) {
270 LOG.warn("{}: SlicedMessageState for {} was expired from the cache", logContext, notification.getKey());
271 state.getOnFailureCallback().accept(new RuntimeException(String.format(
272 "The slicing state for message identifier %s was expired due to inactivity from the assembling "
273 + "component on the other end", state.getIdentifier())));
275 LOG.debug("{}: SlicedMessageState for {} was removed from the cache due to {}", logContext,
276 notification.getKey(), notification.getCause());
280 private void fail(final SlicedMessageState<ActorRef> state, final Throwable failure) {
281 removeState(state.getIdentifier());
282 state.getOnFailureCallback().accept(failure);
286 boolean hasState(Identifier forIdentifier) {
287 boolean exists = stateCache.getIfPresent(forIdentifier) != null;
288 stateCache.cleanUp();
292 public static class Builder {
293 private FileBackedOutputStreamFactory filedBackedStreamFactory;
294 private int messageSliceSize = -1;
295 private long expireStateAfterInactivityDuration = -1;
296 private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
297 private int maxSlicingTries = DEFAULT_MAX_SLICING_TRIES;
298 private String logContext = "<no-context>";
301 * Sets the factory for creating FileBackedOutputStream instances used for streaming messages. This factory
302 * is used by the {@link MessageSlicer#slice(SliceOptions)} method if a Serializable message is passed.
303 * If Serializable messages aren't passed then the factory need not be set.
305 * @param newFiledBackedStreamFactory the factory for creating FileBackedOutputStream instances
306 * @return this Builder
308 public Builder filedBackedStreamFactory(final FileBackedOutputStreamFactory newFiledBackedStreamFactory) {
309 this.filedBackedStreamFactory = Preconditions.checkNotNull(newFiledBackedStreamFactory);
314 * Sets the maximum size (in bytes) for a message slice.
316 * @param newMessageSliceSize the maximum size (in bytes)
317 * @return this Builder
319 public Builder messageSliceSize(final int newMessageSliceSize) {
320 Preconditions.checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0");
321 this.messageSliceSize = newMessageSliceSize;
326 * Sets the maximum number of tries for slicing a message. If exceeded, slicing fails. The default is
327 * defined by {@link #DEFAULT_MAX_SLICING_TRIES}
329 * @param newMaxSlicingTries the maximum number of tries
330 * @return this Builder
332 public Builder maxSlicingTries(final int newMaxSlicingTries) {
333 Preconditions.checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0");
334 this.maxSlicingTries = newMaxSlicingTries;
339 * Sets the duration and time unit whereby sliced message state is purged from the cache and the associated
340 * failure callback is notified due to inactivity from the assembling component on the other end. By default,
341 * state is not purged due to inactivity.
343 * @param duration the length of time after which a state entry is purged
344 * @param unit the unit the duration is expressed in
345 * @return this Builder
347 public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
348 Preconditions.checkArgument(duration > 0, "duration must be > 0");
349 this.expireStateAfterInactivityDuration = duration;
350 this.expireStateAfterInactivityUnit = unit;
355 * Sets the context for log messages.
357 * @param newLogContext the log context
358 * @return this Builder
360 public Builder logContext(final String newLogContext) {
361 this.logContext = Preconditions.checkNotNull(newLogContext);
366 * Builds a new MessageSlicer instance.
368 * @return a new MessageSlicer
370 public MessageSlicer build() {
371 return new MessageSlicer(this);