Slice front-end request messages
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / messaging / MessageSlicer.java
1 /*
2  * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.messaging;
9
10 import akka.actor.ActorRef;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.cache.Cache;
14 import com.google.common.cache.CacheBuilder;
15 import com.google.common.cache.RemovalListener;
16 import com.google.common.cache.RemovalNotification;
17 import java.io.IOException;
18 import java.io.ObjectOutputStream;
19 import java.io.Serializable;
20 import java.util.Iterator;
21 import java.util.Optional;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.atomic.AtomicLong;
24 import java.util.function.Predicate;
25 import javax.annotation.Nonnull;
26 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
27 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
28 import org.opendaylight.yangtools.concepts.Identifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * This class slices messages into smaller chunks. {@link MessageAssembler} is used to re-assemble the messages.
34  *
35  * @author Thomas Pantelis
36  * @see MessageAssembler
37  */
38 public class MessageSlicer implements AutoCloseable {
39     private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
40     private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
41     public static final int DEFAULT_MAX_SLICING_TRIES = 3;
42
43     private final Cache<MessageSliceIdentifier, SlicedMessageState<ActorRef>> stateCache;
44     private final FileBackedOutputStreamFactory fileBackedStreamFactory;
45     private final int messageSliceSize;
46     private final int maxSlicingTries;
47     private final String logContext;
48     private final long id;
49
50     private MessageSlicer(final Builder builder) {
51         this.fileBackedStreamFactory = builder.fileBackedStreamFactory;
52         this.messageSliceSize = builder.messageSliceSize;
53         this.maxSlicingTries = builder.maxSlicingTries;
54
55         id = SLICER_ID_COUNTER.getAndIncrement();
56         this.logContext = builder.logContext + "_slicer-id-" + id;
57
58         CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder = CacheBuilder.newBuilder().removalListener(
59                 (RemovalListener<Identifier, SlicedMessageState<ActorRef>>) notification -> stateRemoved(notification));
60         if (builder.expireStateAfterInactivityDuration > 0) {
61             cacheBuilder = cacheBuilder.expireAfterAccess(builder.expireStateAfterInactivityDuration,
62                     builder.expireStateAfterInactivityUnit);
63         }
64
65         stateCache = cacheBuilder.build();
66     }
67
68     @VisibleForTesting
69     long getId() {
70         return id;
71     }
72
73     /**
74      * Returns a new Builder for creating MessageSlicer instances.
75      *
76      * @return a Builder instance
77      */
78     public static Builder builder() {
79         return new Builder();
80     }
81
82     /**
83      * Checks if the given message is handled by this class. If so, it should be forwarded to the
84      * {@link #handleMessage(Object)} method
85      *
86      * @param message the message to check
87      * @return true if handled, false otherwise
88      */
89     public static boolean isHandledMessage(final Object message) {
90         return message instanceof MessageSliceReply;
91     }
92
93     /**
94      * Slices a message into chunks based on the serialized size, the maximum message slice size and the given
95      * options.
96      *
97      * @param options the SliceOptions
98      * @return true if the message was sliced, false otherwise
99      */
100     public boolean slice(final SliceOptions options) {
101         final Identifier identifier = options.getIdentifier();
102         final Serializable message = options.getMessage();
103         final FileBackedOutputStream fileBackedStream;
104         if (message != null) {
105             LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
106
107             Preconditions.checkNotNull(fileBackedStreamFactory,
108                     "The FiledBackedStreamFactory must be set in order to call this slice method");
109
110             // Serialize the message to a FileBackedOutputStream.
111             fileBackedStream = fileBackedStreamFactory.newInstance();
112             try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
113                 out.writeObject(message);
114             } catch (IOException e) {
115                 LOG.debug("{}: Error serializing message for {}", logContext, identifier, e);
116                 fileBackedStream.cleanup();
117                 options.getOnFailureCallback().accept(e);
118                 return false;
119             }
120         } else {
121             fileBackedStream = options.getFileBackedStream();
122         }
123
124         return initializeSlicing(options, fileBackedStream);
125     }
126
127     private boolean initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
128         final Identifier identifier = options.getIdentifier();
129         MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id);
130         SlicedMessageState<ActorRef> state = null;
131         try {
132             state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
133                     options.getReplyTo(), options.getOnFailureCallback(), logContext);
134
135             final Serializable message = options.getMessage();
136             if (state.getTotalSlices() == 1 && message != null) {
137                 LOG.debug("{}: Message does not need to be sliced - sending original message", logContext);
138                 state.close();
139                 sendTo(options, message, options.getReplyTo());
140                 return false;
141             }
142
143             final MessageSlice firstSlice = getNextSliceMessage(state);
144
145             LOG.debug("{}: Sending first slice: {}", logContext, firstSlice);
146
147             stateCache.put(messageSliceId, state);
148             sendTo(options, firstSlice, ActorRef.noSender());
149             return true;
150         } catch (IOException e) {
151             LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e);
152             if (state != null) {
153                 state.close();
154             } else {
155                 fileBackedStream.cleanup();
156             }
157
158             options.getOnFailureCallback().accept(e);
159             return false;
160         }
161     }
162
163     private static void sendTo(final SliceOptions options, final Object message, final ActorRef sender) {
164         if (options.getSendToRef() != null) {
165             options.getSendToRef().tell(message, sender);
166         } else {
167             options.getSendToSelection().tell(message, sender);
168         }
169     }
170
171     /**
172      * Invoked to handle messages pertaining to this class.
173      *
174      * @param message the message
175      * @return true if the message was handled, false otherwise
176      */
177     public boolean handleMessage(final Object message) {
178         if (message instanceof MessageSliceReply) {
179             LOG.debug("{}: handleMessage: {}", logContext, message);
180             return onMessageSliceReply((MessageSliceReply) message);
181         }
182
183         return false;
184     }
185
186     /**
187      * Checks for and removes sliced message state that has expired due to inactivity from the assembling component
188      * on the other end.
189      */
190     public void checkExpiredSlicedMessageState() {
191         if (stateCache.size() > 0) {
192             stateCache.cleanUp();
193         }
194     }
195
196     /**
197      * Closes and removes all in-progress sliced message state.
198      */
199     @Override
200     public void close() {
201         LOG.debug("{}: Closing", logContext);
202         stateCache.invalidateAll();
203     }
204
205     /**
206      * Cancels all in-progress sliced message state that matches the given filter.
207      *
208      * @param filter filters by Identifier
209      */
210     public void cancelSlicing(@Nonnull final Predicate<Identifier> filter) {
211         final Iterator<MessageSliceIdentifier> iter = stateCache.asMap().keySet().iterator();
212         while (iter.hasNext()) {
213             if (filter.test(iter.next().getClientIdentifier())) {
214                 iter.remove();
215             }
216         }
217     }
218
219     private static MessageSlice getNextSliceMessage(final SlicedMessageState<ActorRef> state) throws IOException {
220         final byte[] firstSliceBytes = state.getNextSlice();
221         return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),
222                 state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
223     }
224
225     private boolean onMessageSliceReply(final MessageSliceReply reply) {
226         final Identifier identifier = reply.getIdentifier();
227         if (!(identifier instanceof MessageSliceIdentifier)
228                 || ((MessageSliceIdentifier)identifier).getSlicerId() != id) {
229             return false;
230         }
231
232         final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
233         if (state == null) {
234             LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
235             reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
236             return true;
237         }
238
239         synchronized (state) {
240             try {
241                 final Optional<MessageSliceException> failure = reply.getFailure();
242                 if (failure.isPresent()) {
243                     LOG.warn("{}: Received failed {}", logContext, reply);
244                     processMessageSliceException(failure.get(), state, reply.getSendTo());
245                     return true;
246                 }
247
248                 if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
249                     LOG.warn("{}: Slice index {} in {} does not match expected index {}", logContext,
250                             reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
251                     reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
252                     possiblyRetrySlicing(state, reply.getSendTo());
253                     return true;
254                 }
255
256                 if (state.isLastSlice(reply.getSliceIndex())) {
257                     LOG.debug("{}: Received last slice reply for {}", logContext, identifier);
258                     removeState(identifier);
259                 } else {
260                     final MessageSlice nextSlice = getNextSliceMessage(state);
261                     LOG.debug("{}: Sending next slice: {}", logContext, nextSlice);
262                     reply.getSendTo().tell(nextSlice, ActorRef.noSender());
263                 }
264             } catch (IOException e) {
265                 LOG.warn("{}: Error processing {}", logContext, reply, e);
266                 fail(state, e);
267             }
268         }
269
270         return true;
271     }
272
273     private void processMessageSliceException(final MessageSliceException exception,
274             final SlicedMessageState<ActorRef> state, final ActorRef sendTo) throws IOException {
275         if (exception.isRetriable()) {
276             possiblyRetrySlicing(state, sendTo);
277         } else {
278             fail(state, exception.getCause() != null ? exception.getCause() : exception);
279         }
280     }
281
282     private void possiblyRetrySlicing(final SlicedMessageState<ActorRef> state, final ActorRef sendTo)
283             throws IOException {
284         if (state.canRetry()) {
285             LOG.info("{}: Retrying message slicing for {}", logContext, state.getIdentifier());
286             state.reset();
287             sendTo.tell(getNextSliceMessage(state), ActorRef.noSender());
288         } else {
289             String message = String.format("Maximum slicing retries reached for identifier %s - failing the message",
290                     state.getIdentifier());
291             LOG.warn(message);
292             fail(state, new RuntimeException(message));
293         }
294     }
295
296     private void removeState(final Identifier identifier) {
297         LOG.debug("{}: Removing state for {}", logContext, identifier);
298         stateCache.invalidate(identifier);
299     }
300
301     private void stateRemoved(final RemovalNotification<Identifier, SlicedMessageState<ActorRef>> notification) {
302         final SlicedMessageState<ActorRef> state = notification.getValue();
303         state.close();
304         if (notification.wasEvicted()) {
305             LOG.warn("{}: SlicedMessageState for {} was expired from the cache", logContext, notification.getKey());
306             state.getOnFailureCallback().accept(new RuntimeException(String.format(
307                     "The slicing state for message identifier %s was expired due to inactivity from the assembling "
308                      + "component on the other end", state.getIdentifier())));
309         } else {
310             LOG.debug("{}: SlicedMessageState for {} was removed from the cache due to {}", logContext,
311                     notification.getKey(), notification.getCause());
312         }
313     }
314
315     private void fail(final SlicedMessageState<ActorRef> state, final Throwable failure) {
316         removeState(state.getIdentifier());
317         state.getOnFailureCallback().accept(failure);
318     }
319
320     @VisibleForTesting
321     boolean hasState(final Identifier forIdentifier) {
322         boolean exists = stateCache.getIfPresent(forIdentifier) != null;
323         stateCache.cleanUp();
324         return exists;
325     }
326
327     public static class Builder {
328         private FileBackedOutputStreamFactory fileBackedStreamFactory;
329         private int messageSliceSize = -1;
330         private long expireStateAfterInactivityDuration = -1;
331         private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
332         private int maxSlicingTries = DEFAULT_MAX_SLICING_TRIES;
333         private String logContext = "<no-context>";
334
335         /**
336          * Sets the factory for creating FileBackedOutputStream instances used for streaming messages. This factory
337          * is used by the {@link MessageSlicer#slice(SliceOptions)} method if a Serializable message is passed.
338          * If Serializable messages aren't passed then the factory need not be set.
339          *
340          * @param newFileBackedStreamFactory the factory for creating FileBackedOutputStream instances
341          * @return this Builder
342          */
343         public Builder fileBackedStreamFactory(final FileBackedOutputStreamFactory newFileBackedStreamFactory) {
344             this.fileBackedStreamFactory = Preconditions.checkNotNull(newFileBackedStreamFactory);
345             return this;
346         }
347
348         /**
349          * Sets the maximum size (in bytes) for a message slice.
350          *
351          * @param newMessageSliceSize the maximum size (in bytes)
352          * @return this Builder
353          */
354         public Builder messageSliceSize(final int newMessageSliceSize) {
355             Preconditions.checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0");
356             this.messageSliceSize = newMessageSliceSize;
357             return this;
358         }
359
360         /**
361          * Sets the maximum number of tries for slicing a message. If exceeded, slicing fails. The default is
362          * defined by {@link #DEFAULT_MAX_SLICING_TRIES}
363          *
364          * @param newMaxSlicingTries the maximum number of tries
365          * @return this Builder
366          */
367         public Builder maxSlicingTries(final int newMaxSlicingTries) {
368             Preconditions.checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0");
369             this.maxSlicingTries = newMaxSlicingTries;
370             return this;
371         }
372
373         /**
374          * Sets the duration and time unit whereby sliced message state is purged from the cache and the associated
375          * failure callback is notified due to inactivity from the assembling component on the other end. By default,
376          * state is not purged due to inactivity.
377          *
378          * @param duration the length of time after which a state entry is purged
379          * @param unit the unit the duration is expressed in
380          * @return this Builder
381          */
382         public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
383             Preconditions.checkArgument(duration > 0, "duration must be > 0");
384             this.expireStateAfterInactivityDuration = duration;
385             this.expireStateAfterInactivityUnit = unit;
386             return this;
387         }
388
389         /**
390          * Sets the context for log messages.
391          *
392          * @param newLogContext the log context
393          * @return this Builder
394          */
395         public Builder logContext(final String newLogContext) {
396             this.logContext = Preconditions.checkNotNull(newLogContext);
397             return this;
398         }
399
400         /**
401          * Builds a new MessageSlicer instance.
402          *
403          * @return a new MessageSlicer
404          */
405         public MessageSlicer build() {
406             return new MessageSlicer(this);
407         }
408     }
409 }