Improved thread error handling.
[packetcable.git] / packetcable-driver / src / main / java / org / umu / cops / prpep / COPSPepConnection.java
1 /*
2  * Copyright (c) 2004 University of Murcia.  All rights reserved.
3  * --------------------------------------------------------------
4  * For more information, please see <http://www.umu.euro6ix.org/>.
5  */
6
7 package org.umu.cops.prpep;
8
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11 import org.umu.cops.COPSConnection;
12 import org.umu.cops.stack.*;
13 import org.umu.cops.stack.COPSDecision.Command;
14 import org.umu.cops.stack.COPSDecision.DecisionFlag;
15
16 import javax.annotation.concurrent.ThreadSafe;
17 import java.io.IOException;
18 import java.net.Socket;
19 import java.util.Date;
20 import java.util.Map;
21 import java.util.Set;
22 import java.util.concurrent.ConcurrentHashMap;
23
24 /**
25  * COPSPepConnection represents a PEP-PDP Connection Manager.
26  * Responsible for processing messages received from PDP.
27  */
28 @ThreadSafe
29 public class COPSPepConnection extends COPSConnection {
30
31     private final static Logger logger = LoggerFactory.getLogger(COPSPepConnection.class);
32
33     /** Time to wait responses (milliseconds), default is 10 seconds */
34     protected transient int _responseTime;
35
36     /** COPS Client-type */
37     protected final short _clientType;
38
39     /**
40      Maps a COPS Client Handle to a Request State Manager
41      */
42     protected final Map<COPSHandle, COPSPepReqStateMan> _managerMap;
43
44     /**
45      * Creates a new PEP connection
46      * @param clientType    PEP's client-type
47      * @param sock          Socket connected to PDP
48      */
49     public COPSPepConnection(final short clientType, final Socket sock) {
50         super(sock, (short)0, (short)0);
51         _clientType = clientType;
52         _responseTime = 10000;
53         _managerMap = new ConcurrentHashMap<>();
54     }
55
56     /**
57      * Message-processing loop
58      */
59     public void run () {
60         Date lastSendKa = new Date();
61         Date lastSendAcc = new Date();
62         Date lastRecKa = new Date();
63             while (!_sock.isClosed()) {
64                 try {
65                     if (_sock.getInputStream().available() != 0) {
66                         processMessage(_sock);
67                         lastRecKa = new Date();
68                     }
69
70                     // Keep Alive
71                     if (_kaTimer > 0) {
72                         // Timeout at PDP
73                         int _startTime = (int) (lastRecKa.getTime());
74                         int cTime = (int) (new Date().getTime());
75
76                         if ((cTime - _startTime) > _kaTimer*1000) {
77                             _sock.close();
78                             // Notify all Request State Managers
79                             notifyNoKAAllReqStateMan();
80                         }
81
82                         // Send to PEP
83                         _startTime = (int) (lastSendKa.getTime());
84                         cTime = (int) (new Date().getTime());
85
86                         if ((cTime - _startTime) > ((_kaTimer*3/4) * 1000)) {
87                             final COPSKAMsg msg = new COPSKAMsg(null);
88                             COPSTransceiver.sendMsg(msg, _sock);
89                             lastSendKa = new Date();
90                         }
91                     }
92
93                     // Accounting
94                     if (_acctTimer > 0) {
95                         int _startTime = (int) (lastSendAcc.getTime());
96                         int cTime = (int) (new Date().getTime());
97
98                         if ((cTime - _startTime) > ((_acctTimer*3/4)*1000)) {
99                             // Notify all Request State Managers
100                             notifyAcctAllReqStateMan();
101                             lastSendAcc = new Date();
102                         }
103                     }
104
105                     try {
106                         Thread.sleep(500);
107                     } catch (InterruptedException e) {
108                         logger.error("Closing connection");
109                         break;
110                     } catch (Exception e) {
111                         logger.error("Unexpected exception while sleeping. Continue processing messages", e);
112                     }
113                 } catch (Exception e) {
114                     logger.error("Unexpected error while processing socket messages. Continue processing", e);
115                 } catch (Throwable e) {
116                     logger.error("Unexpected fatal error while processing COPS messages. Stopping thread", e);
117                     break;
118                 }
119             }
120
121         // connection closed by server
122         // COPSDebug.out(getClass().getName(),"Connection closed by server");
123         try {
124             _sock.close();
125         } catch (IOException e) {
126             logger.error("Error closing socket", e);
127         }
128
129         // Notify all Request State Managers
130         try {
131             notifyCloseAllReqStateMan();
132         } catch (COPSException e) {
133             logger.error("Error closing state managers");
134         }
135     }
136
137     /**
138      * Gets a COPS message from the socket and processes it
139      * @param conn  Socket connected to the PDP
140      * @throws COPSException
141      * @throws IOException
142      */
143     protected void processMessage(final Socket conn) throws COPSException, IOException {
144         final COPSMsg msg = COPSTransceiver.receiveMsg(conn);
145
146         switch (msg.getHeader().getOpCode()) {
147             case CC:
148                 handleClientCloseMsg(conn, (COPSClientCloseMsg)msg);
149                 break;
150             case DEC:
151                 handleDecisionMsg((COPSDecisionMsg)msg);
152                 break;
153             case SSQ:
154                 handleSyncStateReqMsg((COPSSyncStateMsg)msg);
155                 break;
156             case KA:
157                 handleKeepAliveMsg((COPSKAMsg)msg);
158                 break;
159             default:
160                 throw new COPSPepException("Message not expected (" + msg.getHeader().getOpCode() + ").");
161         }
162     }
163
164     /**
165      * Handle Keep Alive Message
166      * @param    cMsg                a  COPSKAMsg
167      */
168     private void handleKeepAliveMsg(final COPSKAMsg cMsg) {
169         logger.info("Get KAlive Msg");
170         try {
171             // Support
172             if (cMsg.getIntegrity() != null) {
173                 logger.warn("Unsupported objects (Integrity)");
174             }
175
176             // should we do anything else?? ....
177
178         } catch (Exception unae) {
179             logger.error("Unexpected exception while writing COPS data", unae);
180         }
181     }
182
183     /**
184      * Method handleDecisionMsg
185      * @param    dMsg                 a  COPSDecisionMsg
186      */
187     protected void handleDecisionMsg(final COPSDecisionMsg dMsg) throws COPSException {
188         final COPSHandle handle = dMsg.getClientHandle();
189         final Map<COPSContext, Set<COPSDecision>> decisions = dMsg.getDecisions();
190
191         for (final Set<COPSDecision> copsDecisions: decisions.values()) {
192             for (final COPSDecision decision : copsDecisions) {
193                 // Get the associated manager
194                 final COPSPepReqStateMan manager = _managerMap.get(handle);
195                 if (manager == null) {
196                     logger.warn("Unable to find state manager with key - " + handle);
197                     return;
198                 }
199
200                 // Check message type
201                 // TODO FIXME - Use of manager object could result in a NPE
202                 if (decision.getFlag().equals(DecisionFlag.REQSTATE)) {
203                     if (decision.getCommand().equals(Command.REMOVE))
204                         // Delete Request State
205                         manager.processDeleteRequestState(dMsg);
206                     else
207                         // Open new Request State
208                         handleOpenNewRequestStateMsg(handle);
209                 } else
210                     // Decision
211                     manager.processDecision(dMsg);
212             }
213         }
214     }
215
216
217     /**
218      * Method handleOpenNewRequestStateMsg
219      * @param    handle              a  COPSHandle
220      */
221     private void handleOpenNewRequestStateMsg(final COPSHandle handle) throws COPSPepException {
222         final COPSPepReqStateMan manager = _managerMap.get(handle);
223         if (manager == null)
224             logger.warn("Unable to find state manager with key - " + handle.getId().str());
225         else
226             manager.processOpenNewRequestState();
227     }
228
229     /**
230      * Method handleSyncStateReqMsg
231      * @param    cMsg                a  COPSSyncStateMsg
232      */
233     private void handleSyncStateReqMsg(final COPSSyncStateMsg cMsg) throws COPSException {
234         if (cMsg.getIntegrity() != null) {
235             logger.warn("Unsupported objects (Integrity)");
236         }
237
238         final COPSPepReqStateMan manager = _managerMap.get(cMsg.getClientHandle());
239         if (manager == null)
240             logger.warn("Unable to find state manager with key - " + cMsg.getClientHandle().getId().str());
241         else
242             manager.processSyncStateRequest(cMsg);
243     }
244
245     /**
246      * Method createRequestState
247      * @param    clientHandle             a  String
248      * @param    process                  a  COPSPepDataProcess
249      * @return   a COPSPepmanager
250      * @throws   COPSException
251      * @throws   COPSPepException
252      */
253     public COPSPepReqStateMan addRequestState(final COPSHandle clientHandle, final COPSPepDataProcess process)
254             throws COPSException {
255         final COPSPepReqStateMan manager = new COPSPepReqStateMan(_clientType, clientHandle, process, _sock);
256         if (_managerMap.get(clientHandle) != null)
257             throw new COPSPepException("Duplicate Handle, rejecting " + clientHandle);
258
259         _managerMap.put(clientHandle, manager);
260         logger.info("Added state manager with key - " + clientHandle);
261         manager.initRequestState();
262         return manager;
263     }
264
265     /**
266      * Method deleteRequestState
267      * @param    manager             a  COPSPepReqStateMan
268      * @throws   COPSException
269      */
270     public void deleteRequestState(COPSPepReqStateMan manager) throws COPSException {
271         manager.finalizeRequestState();
272     }
273
274     private void notifyCloseAllReqStateMan() throws COPSException {
275         for (final COPSPepReqStateMan man: _managerMap.values()) {
276             man.processClosedConnection(_error);
277         }
278     }
279
280     private void notifyNoKAAllReqStateMan() throws COPSException {
281         for (final COPSPepReqStateMan man: _managerMap.values()) {
282             man.processNoKAConnection();
283         }
284     }
285
286     private void notifyAcctAllReqStateMan() throws COPSException {
287         for (final COPSPepReqStateMan man: _managerMap.values()) {
288             man.processAcctReport();
289         }
290     }
291
292 }
293