Bug 7449: Add message slicing/re-assembly classes
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / messaging / MessageSlicer.java
1 /*
2  * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.messaging;
9
10 import akka.actor.ActorRef;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.cache.Cache;
14 import com.google.common.cache.CacheBuilder;
15 import com.google.common.cache.RemovalListener;
16 import com.google.common.cache.RemovalNotification;
17 import java.io.IOException;
18 import java.io.ObjectOutputStream;
19 import java.io.Serializable;
20 import java.util.Optional;
21 import java.util.concurrent.TimeUnit;
22 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
23 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
24 import org.opendaylight.yangtools.concepts.Identifier;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * This class slices messages into smaller chunks. {@link MessageAssembler} is used to re-assemble the messages.
30  *
31  * @author Thomas Pantelis
32  * @see MessageAssembler
33  */
34 public class MessageSlicer implements AutoCloseable {
35     private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
36     public static final int DEFAULT_MAX_SLICING_TRIES = 3;
37
38     private final Cache<Identifier, SlicedMessageState<ActorRef>> stateCache;
39     private final FileBackedOutputStreamFactory filedBackedStreamFactory;
40     private final int messageSliceSize;
41     private final int maxSlicingTries;
42     private final String logContext;
43
44     private MessageSlicer(Builder builder) {
45         this.filedBackedStreamFactory = builder.filedBackedStreamFactory;
46         this.messageSliceSize = builder.messageSliceSize;
47         this.maxSlicingTries = builder.maxSlicingTries;
48         this.logContext = builder.logContext;
49
50         CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder = CacheBuilder.newBuilder().removalListener(
51                 (RemovalListener<Identifier, SlicedMessageState<ActorRef>>) notification -> stateRemoved(notification));
52         if (builder.expireStateAfterInactivityDuration > 0) {
53             cacheBuilder = cacheBuilder.expireAfterAccess(builder.expireStateAfterInactivityDuration,
54                     builder.expireStateAfterInactivityUnit);
55         }
56
57         stateCache = cacheBuilder.build();
58     }
59
60     /**
61      * Returns a new Builder for creating MessageSlicer instances.
62      *
63      * @return a Builder instance
64      */
65     public static Builder builder() {
66         return new Builder();
67     }
68
69     /**
70      * Checks if the given message is handled by this class. If so, it should be forwarded to the
71      * {@link #handleMessage(Object)} method
72      *
73      * @param message the message to check
74      * @return true if handled, false otherwise
75      */
76     public static boolean isHandledMessage(Object message) {
77         return message instanceof MessageSliceReply;
78     }
79
80     /**
81      * Slices a message into chunks based on the serialized size, the maximum message slice size and the given
82      * options.
83      *
84      * @param options the SliceOptions
85      */
86     public void slice(SliceOptions options) {
87         final Identifier identifier = options.getIdentifier();
88         final Serializable message = options.getMessage();
89         final FileBackedOutputStream fileBackedStream;
90         if (message != null) {
91             LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
92
93
94             Preconditions.checkNotNull(filedBackedStreamFactory,
95                     "The FiledBackedStreamFactory must be set in order to call this slice method");
96
97             // Serialize the message to a FileBackedOutputStream.
98             fileBackedStream = filedBackedStreamFactory.newInstance();
99             try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
100                 out.writeObject(message);
101             } catch (IOException e) {
102                 LOG.debug("{}: Error serializing message for {}", logContext, identifier, e);
103                 fileBackedStream.cleanup();
104                 options.getOnFailureCallback().accept(e);
105                 return;
106             }
107         } else {
108             fileBackedStream = options.getFileBackedStream();
109         }
110
111         initializeSlicing(options, fileBackedStream);
112     }
113
114     private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
115         final Identifier identifier = options.getIdentifier();
116         MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier);
117         SlicedMessageState<ActorRef> state = null;
118         try {
119             state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
120                     options.getReplyTo(), options.getOnFailureCallback(), logContext);
121
122             final Serializable message = options.getMessage();
123             if (state.getTotalSlices() == 1 && message != null) {
124                 LOG.debug("{}: Message does not need to be sliced - sending original message", logContext);
125                 state.close();
126                 sendTo(options, message, options.getReplyTo());
127                 return;
128             }
129
130             final MessageSlice firstSlice = getNextSliceMessage(state);
131
132             LOG.debug("{}: Sending first slice: {}", logContext, firstSlice);
133
134             stateCache.put(messageSliceId, state);
135             sendTo(options, firstSlice, ActorRef.noSender());
136         } catch (IOException e) {
137             LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e);
138             if (state != null) {
139                 state.close();
140             } else {
141                 fileBackedStream.cleanup();
142             }
143
144             options.getOnFailureCallback().accept(e);
145         }
146     }
147
148     private void sendTo(SliceOptions options, Object message, ActorRef sender) {
149         if (options.getSendToRef() != null) {
150             options.getSendToRef().tell(message, sender);
151         } else {
152             options.getSendToSelection().tell(message, sender);
153         }
154     }
155
156     /**
157      * Invoked to handle messages pertaining to this class.
158      *
159      * @param message the message
160      * @return true if the message was handled, false otherwise
161      */
162     public boolean handleMessage(final Object message) {
163         if (message instanceof MessageSliceReply) {
164             LOG.debug("{}: handleMessage: {}", logContext, message);
165             onMessageSliceReply((MessageSliceReply) message);
166             return true;
167         }
168
169         return false;
170     }
171
172     /**
173      * Checks for and removes sliced message state that has expired due to inactivity from the assembling component
174      * on the other end.
175      */
176     public void checkExpiredSlicedMessageState() {
177         if (stateCache.size() > 0) {
178             stateCache.cleanUp();
179         }
180     }
181
182     /**
183      * Closes and removes all in-progress sliced message state.
184      */
185     @Override
186     public void close() {
187         LOG.debug("{}: Closing", logContext);
188         stateCache.invalidateAll();
189     }
190
191     private MessageSlice getNextSliceMessage(SlicedMessageState<ActorRef> state) throws IOException {
192         final byte[] firstSliceBytes = state.getNextSlice();
193         return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),
194                 state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
195     }
196
197     private void onMessageSliceReply(final MessageSliceReply reply) {
198         final Identifier identifier = reply.getIdentifier();
199         final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
200         if (state == null) {
201             LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
202             reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
203             return;
204         }
205
206         synchronized (state) {
207             try {
208                 final Optional<MessageSliceException> failure = reply.getFailure();
209                 if (failure.isPresent()) {
210                     LOG.warn("{}: Received failed {}", logContext, reply);
211                     processMessageSliceException(failure.get(), state, reply.getSendTo());
212                     return;
213                 }
214
215                 if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
216                     LOG.warn("{}: Slice index {} in {} does not match expected index {}", logContext,
217                             reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
218                     reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
219                     possiblyRetrySlicing(state, reply.getSendTo());
220                     return;
221                 }
222
223                 if (state.isLastSlice(reply.getSliceIndex())) {
224                     LOG.debug("{}: Received last slice reply for {}", logContext, identifier);
225                     removeState(identifier);
226                 } else {
227                     final MessageSlice nextSlice = getNextSliceMessage(state);
228                     LOG.debug("{}: Sending next slice: {}", logContext, nextSlice);
229                     reply.getSendTo().tell(nextSlice, ActorRef.noSender());
230                 }
231             } catch (IOException e) {
232                 LOG.warn("{}: Error processing {}", logContext, reply, e);
233                 fail(state, e);
234             }
235         }
236     }
237
238     private void processMessageSliceException(final MessageSliceException exception,
239             final SlicedMessageState<ActorRef> state, final ActorRef sendTo) throws IOException {
240         if (exception.isRetriable()) {
241             possiblyRetrySlicing(state, sendTo);
242         } else {
243             fail(state, exception.getCause() != null ? exception.getCause() : exception);
244         }
245     }
246
247     private void possiblyRetrySlicing(final SlicedMessageState<ActorRef> state, final ActorRef sendTo)
248             throws IOException {
249         if (state.canRetry()) {
250             LOG.info("{}: Retrying message slicing for {}", logContext, state.getIdentifier());
251             state.reset();
252             sendTo.tell(getNextSliceMessage(state), ActorRef.noSender());
253         } else {
254             String message = String.format("Maximum slicing retries reached for identifier %s - failing the message",
255                     state.getIdentifier());
256             LOG.warn(message);
257             fail(state, new RuntimeException(message));
258         }
259     }
260
261     private void removeState(final Identifier identifier) {
262         LOG.debug("{}: Removing state for {}", logContext, identifier);
263         stateCache.invalidate(identifier);
264     }
265
266     private void stateRemoved(RemovalNotification<Identifier, SlicedMessageState<ActorRef>> notification) {
267         final SlicedMessageState<ActorRef> state = notification.getValue();
268         state.close();
269         if (notification.wasEvicted()) {
270             LOG.warn("{}: SlicedMessageState for {} was expired from the cache", logContext, notification.getKey());
271             state.getOnFailureCallback().accept(new RuntimeException(String.format(
272                     "The slicing state for message identifier %s was expired due to inactivity from the assembling "
273                      + "component on the other end", state.getIdentifier())));
274         } else {
275             LOG.debug("{}: SlicedMessageState for {} was removed from the cache due to {}", logContext,
276                     notification.getKey(), notification.getCause());
277         }
278     }
279
280     private void fail(final SlicedMessageState<ActorRef> state, final Throwable failure) {
281         removeState(state.getIdentifier());
282         state.getOnFailureCallback().accept(failure);
283     }
284
285     @VisibleForTesting
286     boolean hasState(Identifier forIdentifier) {
287         boolean exists = stateCache.getIfPresent(forIdentifier) != null;
288         stateCache.cleanUp();
289         return exists;
290     }
291
292     public static class Builder {
293         private FileBackedOutputStreamFactory filedBackedStreamFactory;
294         private int messageSliceSize = -1;
295         private long expireStateAfterInactivityDuration = -1;
296         private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
297         private int maxSlicingTries = DEFAULT_MAX_SLICING_TRIES;
298         private String logContext = "<no-context>";
299
300         /**
301          * Sets the factory for creating FileBackedOutputStream instances used for streaming messages. This factory
302          * is used by the {@link MessageSlicer#slice(SliceOptions)} method if a Serializable message is passed.
303          * If Serializable messages aren't passed then the factory need not be set.
304          *
305          * @param newFiledBackedStreamFactory the factory for creating FileBackedOutputStream instances
306          * @return this Builder
307          */
308         public Builder filedBackedStreamFactory(final FileBackedOutputStreamFactory newFiledBackedStreamFactory) {
309             this.filedBackedStreamFactory = Preconditions.checkNotNull(newFiledBackedStreamFactory);
310             return this;
311         }
312
313         /**
314          * Sets the maximum size (in bytes) for a message slice.
315          *
316          * @param newMessageSliceSize the maximum size (in bytes)
317          * @return this Builder
318          */
319         public Builder messageSliceSize(final int newMessageSliceSize) {
320             Preconditions.checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0");
321             this.messageSliceSize = newMessageSliceSize;
322             return this;
323         }
324
325         /**
326          * Sets the maximum number of tries for slicing a message. If exceeded, slicing fails. The default is
327          * defined by {@link #DEFAULT_MAX_SLICING_TRIES}
328          *
329          * @param newMaxSlicingTries the maximum number of tries
330          * @return this Builder
331          */
332         public Builder maxSlicingTries(final int newMaxSlicingTries) {
333             Preconditions.checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0");
334             this.maxSlicingTries = newMaxSlicingTries;
335             return this;
336         }
337
338         /**
339          * Sets the duration and time unit whereby sliced message state is purged from the cache and the associated
340          * failure callback is notified due to inactivity from the assembling component on the other end. By default,
341          * state is not purged due to inactivity.
342          *
343          * @param duration the length of time after which a state entry is purged
344          * @param unit the unit the duration is expressed in
345          * @return this Builder
346          */
347         public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
348             Preconditions.checkArgument(duration > 0, "duration must be > 0");
349             this.expireStateAfterInactivityDuration = duration;
350             this.expireStateAfterInactivityUnit = unit;
351             return this;
352         }
353
354         /**
355          * Sets the context for log messages.
356          *
357          * @param newLogContext the log context
358          * @return this Builder
359          */
360         public Builder logContext(final String newLogContext) {
361             this.logContext = Preconditions.checkNotNull(newLogContext);
362             return this;
363         }
364
365         /**
366          * Builds a new MessageSlicer instance.
367          *
368          * @return a new MessageSlicer
369          */
370         public MessageSlicer build() {
371             return new MessageSlicer(this);
372         }
373     }
374 }