Reduce JSR305 proliferation
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / messaging / MessageAssembler.java
1 /*
2  * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.messaging;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorRef;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.cache.Cache;
16 import com.google.common.cache.CacheBuilder;
17 import com.google.common.cache.RemovalNotification;
18 import com.google.common.io.ByteSource;
19 import java.io.IOException;
20 import java.io.ObjectInputStream;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import java.util.function.BiConsumer;
24 import org.eclipse.jdt.annotation.NonNull;
25 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
26 import org.opendaylight.yangtools.concepts.Identifier;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31  * This class re-assembles messages sliced into smaller chunks by {@link MessageSlicer}.
32  *
33  * @author Thomas Pantelis
34  * @see MessageSlicer
35  */
36 public final  class MessageAssembler implements AutoCloseable {
37     private static final Logger LOG = LoggerFactory.getLogger(MessageAssembler.class);
38
39     private final Cache<Identifier, AssembledMessageState> stateCache;
40     private final FileBackedOutputStreamFactory fileBackedStreamFactory;
41     private final BiConsumer<Object, ActorRef> assembledMessageCallback;
42     private final String logContext;
43
44     MessageAssembler(final Builder builder) {
45         this.fileBackedStreamFactory = requireNonNull(builder.fileBackedStreamFactory,
46                 "FiledBackedStreamFactory cannot be null");
47         this.assembledMessageCallback = requireNonNull(builder.assembledMessageCallback,
48                 "assembledMessageCallback cannot be null");
49         this.logContext = builder.logContext;
50
51         stateCache = CacheBuilder.newBuilder()
52                 .expireAfterAccess(builder.expireStateAfterInactivityDuration, builder.expireStateAfterInactivityUnit)
53                 .removalListener(this::stateRemoved).build();
54     }
55
56     /**
57      * Returns a new Builder for creating MessageAssembler instances.
58      *
59      * @return a Builder instance
60      */
61     public static Builder builder() {
62         return new Builder();
63     }
64
65     /**
66      * Checks if the given message is handled by this class. If so, it should be forwarded to the
67      * {@link #handleMessage(Object, ActorRef)} method
68      *
69      * @param message the message to check
70      * @return true if handled, false otherwise
71      */
72     public static boolean isHandledMessage(final Object message) {
73         return message instanceof MessageSlice || message instanceof AbortSlicing;
74     }
75
76     @Override
77     public void close() {
78         LOG.debug("{}: Closing", logContext);
79         stateCache.invalidateAll();
80     }
81
82     /**
83      * Checks for and removes assembled message state that has expired due to inactivity from the slicing component
84      * on the other end.
85      */
86     public void checkExpiredAssembledMessageState() {
87         if (stateCache.size() > 0) {
88             stateCache.cleanUp();
89         }
90     }
91
92     /**
93      * Invoked to handle message slices and other messages pertaining to this class.
94      *
95      * @param message the message
96      * @param sendTo the reference of the actor to which subsequent message slices should be sent
97      * @return true if the message was handled, false otherwise
98      */
99     public boolean handleMessage(final Object message, final @NonNull ActorRef sendTo) {
100         if (message instanceof MessageSlice) {
101             LOG.debug("{}: handleMessage: {}", logContext, message);
102             onMessageSlice((MessageSlice) message, sendTo);
103             return true;
104         } else if (message instanceof AbortSlicing) {
105             LOG.debug("{}: handleMessage: {}", logContext, message);
106             onAbortSlicing((AbortSlicing) message);
107             return true;
108         }
109
110         return false;
111     }
112
113     private void onMessageSlice(final MessageSlice messageSlice, final ActorRef sendTo) {
114         final Identifier identifier = messageSlice.getIdentifier();
115         try {
116             final AssembledMessageState state = stateCache.get(identifier, () -> createState(messageSlice));
117             processMessageSliceForState(messageSlice, state, sendTo);
118         } catch (ExecutionException e) {
119             final MessageSliceException messageSliceEx;
120             final Throwable cause = e.getCause();
121             if (cause instanceof MessageSliceException) {
122                 messageSliceEx = (MessageSliceException) cause;
123             } else {
124                 messageSliceEx = new MessageSliceException(String.format(
125                         "Error creating state for identifier %s", identifier), cause);
126             }
127
128             messageSlice.getReplyTo().tell(MessageSliceReply.failed(identifier, messageSliceEx, sendTo),
129                     ActorRef.noSender());
130         }
131     }
132
133     private AssembledMessageState createState(final MessageSlice messageSlice) throws MessageSliceException {
134         final Identifier identifier = messageSlice.getIdentifier();
135         if (messageSlice.getSliceIndex() == SlicedMessageState.FIRST_SLICE_INDEX) {
136             LOG.debug("{}: Received first slice for {} - creating AssembledMessageState", logContext, identifier);
137             return new AssembledMessageState(identifier, messageSlice.getTotalSlices(),
138                     fileBackedStreamFactory, logContext);
139         }
140
141         LOG.debug("{}: AssembledMessageState not found for {} - returning failed reply", logContext, identifier);
142         throw new MessageSliceException(String.format(
143                 "No assembled state found for identifier %s and slice index %s", identifier,
144                 messageSlice.getSliceIndex()), true);
145     }
146
147     private void processMessageSliceForState(final MessageSlice messageSlice, final AssembledMessageState state,
148             final ActorRef sendTo) {
149         final Identifier identifier = messageSlice.getIdentifier();
150         final ActorRef replyTo = messageSlice.getReplyTo();
151         Object reAssembledMessage = null;
152         synchronized (state) {
153             final int sliceIndex = messageSlice.getSliceIndex();
154             try {
155                 final MessageSliceReply successReply = MessageSliceReply.success(identifier, sliceIndex, sendTo);
156                 if (state.addSlice(sliceIndex, messageSlice.getData(), messageSlice.getLastSliceHashCode())) {
157                     LOG.debug("{}: Received last slice for {}", logContext, identifier);
158
159                     reAssembledMessage = reAssembleMessage(state);
160
161                     replyTo.tell(successReply, ActorRef.noSender());
162                     removeState(identifier);
163                 } else {
164                     LOG.debug("{}: Added slice for {} - expecting more", logContext, identifier);
165                     replyTo.tell(successReply, ActorRef.noSender());
166                 }
167             } catch (MessageSliceException e) {
168                 LOG.warn("{}: Error processing {}", logContext, messageSlice, e);
169                 replyTo.tell(MessageSliceReply.failed(identifier, e, sendTo), ActorRef.noSender());
170                 removeState(identifier);
171             }
172         }
173
174         if (reAssembledMessage != null) {
175             LOG.debug("{}: Notifying callback of re-assembled message {}", logContext, reAssembledMessage);
176             assembledMessageCallback.accept(reAssembledMessage, replyTo);
177         }
178     }
179
180     private static Object reAssembleMessage(final AssembledMessageState state) throws MessageSliceException {
181         try {
182             final ByteSource assembledBytes = state.getAssembledBytes();
183             try (ObjectInputStream in = new ObjectInputStream(assembledBytes.openStream())) {
184                 return in.readObject();
185             }
186
187         } catch (IOException | ClassNotFoundException  e) {
188             throw new MessageSliceException(String.format("Error re-assembling bytes for identifier %s",
189                     state.getIdentifier()), e);
190         }
191     }
192
193     private void onAbortSlicing(final AbortSlicing message) {
194         removeState(message.getIdentifier());
195     }
196
197     private void removeState(final Identifier identifier) {
198         LOG.debug("{}: Removing state for {}", logContext, identifier);
199         stateCache.invalidate(identifier);
200     }
201
202     private void stateRemoved(final RemovalNotification<Identifier, AssembledMessageState> notification) {
203         if (notification.wasEvicted()) {
204             LOG.warn("{}: AssembledMessageState for {} was expired from the cache", logContext, notification.getKey());
205         } else {
206             LOG.debug("{}: AssembledMessageState for {} was removed from the cache due to {}", logContext,
207                     notification.getKey(), notification.getCause());
208         }
209
210         notification.getValue().close();
211     }
212
213     @VisibleForTesting
214     boolean hasState(final Identifier forIdentifier) {
215         boolean exists = stateCache.getIfPresent(forIdentifier) != null;
216         stateCache.cleanUp();
217         return exists;
218     }
219
220     public static class Builder {
221         private FileBackedOutputStreamFactory fileBackedStreamFactory;
222         private BiConsumer<Object, ActorRef> assembledMessageCallback;
223         private long expireStateAfterInactivityDuration = 1;
224         private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
225         private String logContext = "<no-context>";
226
227         /**
228          * Sets the factory for creating FileBackedOutputStream instances used for streaming messages.
229          *
230          * @param newFileBackedStreamFactory the factory for creating FileBackedOutputStream instances
231          * @return this Builder
232          */
233         public Builder fileBackedStreamFactory(final FileBackedOutputStreamFactory newFileBackedStreamFactory) {
234             this.fileBackedStreamFactory = requireNonNull(newFileBackedStreamFactory);
235             return this;
236         }
237
238         /**
239          * Sets the Consumer callback for assembled messages. The callback takes the assembled message and the
240          * original sender ActorRef as arguments.
241          *
242          * @param newAssembledMessageCallback the Consumer callback
243          * @return this Builder
244          */
245         public Builder assembledMessageCallback(final BiConsumer<Object, ActorRef> newAssembledMessageCallback) {
246             this.assembledMessageCallback = newAssembledMessageCallback;
247             return this;
248         }
249
250         /**
251          * Sets the duration and time unit whereby assembled message state is purged from the cache due to
252          * inactivity from the slicing component on the other end. By default, state is purged after 1 minute of
253          * inactivity.
254          *
255          * @param duration the length of time after which a state entry is purged
256          * @param unit the unit the duration is expressed in
257          * @return this Builder
258          */
259         public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
260             checkArgument(duration > 0, "duration must be > 0");
261             this.expireStateAfterInactivityDuration = duration;
262             this.expireStateAfterInactivityUnit = unit;
263             return this;
264         }
265
266         /**
267          * Sets the context for log messages.
268          *
269          * @param newLogContext the log context
270          * @return this Builder
271          */
272         public Builder logContext(final String newLogContext) {
273             this.logContext = newLogContext;
274             return this;
275         }
276
277         /**
278          * Builds a new MessageAssembler instance.
279          *
280          * @return a new MessageAssembler
281          */
282         public MessageAssembler build() {
283             return new MessageAssembler(this);
284         }
285     }
286 }