Complete PDP connection refactor.
[packetcable.git] / packetcable-driver / src / main / java / org / umu / cops / prpdp / COPSPdpConnection.java
1 /*
2  * Copyright (c) 2004 University of Murcia.  All rights reserved.
3  * --------------------------------------------------------------
4  * For more information, please see <http://www.umu.euro6ix.org/>.
5  */
6
7 package org.umu.cops.prpdp;
8
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11 import org.umu.cops.COPSConnection;
12 import org.umu.cops.stack.*;
13
14 import javax.annotation.concurrent.ThreadSafe;
15 import java.io.IOException;
16 import java.net.Socket;
17 import java.util.Date;
18 import java.util.Map;
19 import java.util.concurrent.ConcurrentHashMap;
20
21 /**
22  * Class for managing an provisioning connection at the PDP side.
23  */
24 @ThreadSafe
25 public class COPSPdpConnection extends COPSConnection {
26
27     public final static Logger logger = LoggerFactory.getLogger(COPSPdpConnection.class);
28
29     /**
30      * PEP identifier
31      * TODO FIXME - Why is this member never being used?
32      */
33     private final COPSPepId _pepId;
34
35     /**
36      *  Time of the latest keep-alive sent
37      * TODO FIXME - Why is this member never being used?
38      */
39     private volatile Date _lastKa;
40
41     /**
42      * Maps a Client Handle to a Handler
43      */
44     protected final Map<COPSHandle, COPSPdpReqStateMan> _managerMap;
45
46     /**
47      *  PDP policy data processor class
48      */
49     protected final COPSPdpDataProcess _process;
50
51     /**
52      * Creates a new PDP connection
53      *
54      * @param pepId PEP-ID of the connected PEP
55      * @param sock  Socket connected to PEP
56      * @param process   Object for processing policy data
57      */
58     public COPSPdpConnection(final COPSPepId pepId, Socket sock, final COPSPdpDataProcess process) {
59         this(pepId, sock, process, (short)0, (short)0);
60     }
61
62     /**
63      * Constructor for this or extended classes
64      * @param pepId - PEP-ID of the connected PEP
65      * @param sock - Socket connected to PEP
66      * @param process - Object for processing policy data
67      * @param kaTimer - the Keep-alive timer value
68      * @param acctTimer - the accounting timer value
69      */
70     protected COPSPdpConnection(final COPSPepId pepId, Socket sock, final COPSPdpDataProcess process,
71                                 final short kaTimer, final short acctTimer) {
72         super(sock, kaTimer, acctTimer);
73         _pepId = pepId;
74         _process = process;
75         _lastKa = new Date();
76         _managerMap = new ConcurrentHashMap<>();
77     }
78
79     public void addStateMan(final COPSHandle handle, final COPSPdpReqStateMan man) {
80         _managerMap.put(handle, man);
81     }
82
83     /**
84      * Main loop
85      */
86     public void run () {
87         Date lastSendKa = new Date();
88         Date lastRecKa = new Date();
89         while (!_sock.isClosed()) {
90             try {
91                 if (_sock.getInputStream().available() != 0) {
92                     processMessage(_sock);
93                     lastRecKa = new Date();
94                 }
95
96                 // Keep Alive
97                 if (_kaTimer > 0) {
98                     // Timeout at PDP
99                     int _startTime = (int) (lastRecKa.getTime());
100                     int cTime = (int) (new Date().getTime());
101
102                     if ((cTime - _startTime) > _kaTimer*1000) {
103                         _sock.close();
104                         // Notify all Request State Managers
105                         notifyNoKAAllReqStateMan();
106                     }
107
108                     // Send to PEP
109                     _startTime = (int) (lastSendKa.getTime());
110                     cTime = (int) (new Date().getTime());
111
112                     if ((cTime - _startTime) > ((_kaTimer*3/4)*1000)) {
113                         // TODO - what should the real clientType be here???
114                         final COPSKAMsg msg = new COPSKAMsg(null);
115                         COPSTransceiver.sendMsg(msg, _sock);
116                         lastSendKa = new Date();
117                     }
118                 }
119
120                 try {
121                     Thread.sleep(500);
122                 } catch (InterruptedException e) {
123                     logger.warn("Thread interrupted, shutting down", e);
124                     break;
125                 }
126             } catch (Exception e) {
127                 logger.error("Error while processing socket messages, continue processing", e);
128             }
129         }
130
131         // connection closed by server
132         try {
133             logger.info("Closing socket");
134             _sock.close();
135         } catch (IOException e) {
136             logger.error("Error closing socket", e);
137         }
138
139         // Notify all Request State Managers
140         try {
141             logger.info("Notifying state managers that PDP connection is closing");
142             notifyCloseAllReqStateMan();
143         } catch (COPSException e) {
144             logger.error("Error closing state managers");
145         }
146     }
147
148     /**
149      * Gets a COPS message from the socket and processes it
150      * @param    conn Socket connected to the PEP
151      */
152     private void processMessage(final Socket conn) throws COPSException, IOException {
153         final COPSMsg msg = COPSTransceiver.receiveMsg(conn);
154         switch (msg.getHeader().getOpCode()) {
155             case CC:
156                 handleClientCloseMsg(conn, (COPSClientCloseMsg)msg);
157                 break;
158             case KA:
159                 handleKeepAliveMsg(conn, (COPSKAMsg)msg);
160                 break;
161             case REQ:
162                 handleRequestMsg(conn, (COPSReqMsg)msg);
163                 break;
164             case RPT:
165                 handleReportMsg(conn, (COPSReportMsg)msg);
166                 break;
167             case DRQ:
168                 handleDeleteRequestMsg(conn, (COPSDeleteMsg)msg);
169                 break;
170             case SSC:
171                 handleSyncComplete(conn, (COPSSyncStateMsg)msg);
172                 break;
173             default:
174                 throw new COPSPdpException("Message not expected (" + msg.getHeader().getOpCode() + ").");
175         }
176     }
177
178     /**
179      * Handle Keep Alive Message
180      * @param    conn                a  Socket
181      * @param    kaMsg               a  COPSKAMsg
182      */
183     private void handleKeepAliveMsg(final Socket conn, final COPSKAMsg kaMsg) {
184         try {
185             // Support
186             if (kaMsg.getIntegrity() != null) {
187                 logger.warn("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
188             }
189             kaMsg.writeData(conn);
190             _lastKa = new Date();
191         } catch (Exception unae) {
192             logger.error("Unexpected exception while writing COPS data", unae);
193         }
194     }
195
196     /**
197      * Handle Delete Request Message
198      * @param    conn                a  Socket
199      * @param    cMsg                a  COPSDeleteMsg
200      */
201     private void handleDeleteRequestMsg(final Socket conn, final COPSDeleteMsg cMsg) throws COPSException {
202         // Support
203         if (cMsg.getIntegrity() != null) {
204             logger.warn("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
205         }
206
207         final COPSPdpReqStateMan man = _managerMap.remove(cMsg.getClientHandle());
208         if (man == null) {
209             logger.warn("No state manager found with ID - " + cMsg.getClientHandle().getId().str());
210         } else {
211             man.processDeleteRequestState(cMsg);
212         }
213     }
214
215     /**
216      * Handle Request Message
217      * @param    conn                a  Socket
218      * @param    reqMsg              a  COPSReqMsg
219      */
220     protected void handleRequestMsg(final Socket conn, final COPSReqMsg reqMsg) throws COPSException {
221         final COPSHeader header = reqMsg.getHeader();
222
223         // Support
224         if (reqMsg.getIntegrity() != null) {
225             logger.warn("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
226         }
227
228         final COPSPdpReqStateMan man;
229         if (_managerMap.get(reqMsg.getClientHandle()) == null) {
230
231             man = createStateManager(reqMsg);
232             _managerMap.put(reqMsg.getClientHandle(), man);
233             man.initRequestState(_sock);
234
235             logger.info("createHandler called, clientType=" + header.getClientType() + " msgType=" + ", connId=" +
236                     conn.toString());
237         } else {
238             man = _managerMap.get(reqMsg.getClientHandle());
239         }
240         man.processRequest(reqMsg);
241     }
242
243     /**
244      * Returns an instance of a COPSPdpReqStateMan
245      * @param reqMsg - the request on which to create the state manager
246      * @return - the state manager
247      */
248     protected COPSPdpReqStateMan createStateManager(final COPSReqMsg reqMsg) {
249         return new COPSPdpReqStateMan(reqMsg.getHeader().getClientType(), reqMsg.getClientHandle(), _process);
250     }
251
252     /**
253      * Handle Report Message
254      * @param    conn                a  Socket
255      * @param    repMsg              a  COPSReportMsg
256      */
257     private void handleReportMsg(final Socket conn, final COPSReportMsg repMsg) throws COPSException {
258         // Support
259         if (repMsg.getIntegrity() != null) {
260             logger.warn("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
261         }
262
263         final COPSPdpReqStateMan man = _managerMap.get(repMsg.getClientHandle());
264         if (man == null) {
265             logger.warn("No state manager found with ID - " + repMsg.getClientHandle().getId().str());
266         } else {
267             man.processReport(repMsg);
268         }
269     }
270
271     /**
272      * Method handleSyncComplete
273      * @param    conn                a  Socket
274      * @param    cMsg                a  COPSSyncStateMsg
275      */
276     private void handleSyncComplete(final Socket conn, final COPSSyncStateMsg cMsg) throws COPSException {
277         // Support
278         if (cMsg.getIntegrity() != null) {
279             logger.warn("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
280         }
281
282         final COPSPdpReqStateMan man = _managerMap.get(cMsg.getClientHandle());
283         if (man == null) {
284             logger.warn("No state manager found with ID - " + cMsg.getClientHandle().getId().str());
285         } else {
286             man.processSyncComplete(cMsg);
287         }
288     }
289
290     /**
291      * Requests a COPS sync from the PEP
292      * @throws COPSException
293      * @throws COPSPdpException
294      */
295     public void syncAllRequestState() throws COPSException {
296         for (final COPSPdpReqStateMan man : _managerMap.values()) {
297             man.syncRequestState();
298         }
299     }
300
301     private void notifyCloseAllReqStateMan() throws COPSException {
302         for (final COPSPdpReqStateMan man : _managerMap.values()) {
303             man.processClosedConnection(_error);
304         }
305     }
306
307     private void notifyNoKAAllReqStateMan() throws COPSException {
308         for (final COPSPdpReqStateMan man : _managerMap.values()) {
309             man.processNoKAConnection();
310         }
311     }
312
313 }
314