root/trunk/mio/mio_impl.h

Revision 576, 11.7 kB (checked in by smoku, 3 months ago)

Merged asynchronous domain resolving in s2s component support by Simon Arlott. Closes #206 #208 #216

Line 
1/*
2 * jabberd - Jabber Open Source Server
3 * Copyright (c) 2002 Jeremie Miller, Thomas Muldowney,
4 *                    Ryan Eatmon, Robert Norris, Christof Meerwald
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA02111-1307USA
19 */
20
21/*
22   MIO -- Managed Input/Output
23   ---------------------------
24*/
25
26#include "util/inaddr.h"
27
28/* win32 wrappers around strerror */
29#ifdef _WIN32
30#define close(x) closesocket(x)
31JABBERD2_API char *mio_strerror(int code)
32{
33  static char buff[1024];
34  if(FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, code, 0, buff, sizeof(buff), NULL))
35    return buff;
36  return strerror(code);
37}
38#endif /* _WIN32 */
39
40/** our internal wrapper around a fd */
41typedef enum { 
42    type_CLOSED = 0x00, 
43    type_NORMAL = 0x01, 
44    type_LISTEN = 0x02, 
45    type_CONNECT = 0x10, 
46    type_CONNECT_READ = 0x11,
47    type_CONNECT_WRITE = 0x12
48} mio_type_t;
49typedef struct mio_priv_fd_st
50{
51    struct mio_fd_st mio_fd;
52
53    mio_type_t type;
54    /* app event handler and data */
55    mio_handler_t app;
56    void *arg;
57
58    MIO_FD_VARS
59} *mio_priv_fd_t;
60
61/** now define our master data type */
62typedef struct mio_priv_st
63{
64    struct mio_st *mio;
65
66    int maxfd;
67    MIO_VARS
68} *mio_priv_t;
69
70/* lazy factor */
71#define MIO(m) ((mio_priv_t) m)
72#define FD(m,f) ((mio_priv_fd_t) f)
73#define ACT(m,f,a,d) (*(FD(m,f)->app))(m,a,&FD(m,f)->mio_fd,d,FD(m,f)->arg)
74
75/* temp debug outputter */
76#define ZONE __LINE__
77#ifndef MIO_DEBUG
78#define MIO_DEBUG 0
79#endif
80#define mio_debug if(MIO_DEBUG) _mio_debug
81static void _mio_debug(int line, const char *msgfmt, ...)
82{
83    va_list ap;
84    va_start(ap,msgfmt);
85    fprintf(stderr,"mio.c#%d: ",line);
86    vfprintf(stderr,msgfmt,ap);
87    va_end(ap);
88    fprintf(stderr,"\n");
89}
90
91MIO_FUNCS
92
93/** add and set up this fd to this mio */
94static mio_fd_t _mio_setup_fd(mio_t m, int fd, mio_handler_t app, void *arg)
95{
96    int flags;
97    mio_fd_t mio_fd;
98
99    mio_debug(ZONE, "adding fd #%d", fd);
100
101    mio_fd = MIO_ALLOC_FD(m, fd);
102    /* ok to process this one, welcome to the family */
103    FD(m,mio_fd)->type = type_NORMAL;
104    FD(m,mio_fd)->app = app;
105    FD(m,mio_fd)->arg = arg;
106
107    /* set the socket to non-blocking */
108#if defined(HAVE_FCNTL)
109    flags = fcntl(fd, F_GETFL);
110    flags |= O_NONBLOCK;
111    fcntl(fd, F_SETFL, flags);
112#elif defined(HAVE_IOCTL)
113    flags = 1;
114    ioctl(fd, FIONBIO, &flags);
115#endif
116
117    return mio_fd;
118}
119
120/** internal close function */
121static void _mio_close(mio_t m, mio_fd_t fd)
122{
123    mio_debug(ZONE,"actually closing fd #%d", fd->fd);
124
125    /* take out of poll sets */
126    MIO_REMOVE_FD(m, FD(m,fd));
127
128    /* let the app know, it must process any waiting write data it has and free it's arg */
129    if (FD(m,fd)->app != NULL)
130        ACT(m, fd, action_CLOSE, NULL);
131
132    /* close the socket, and reset all memory */
133    close(fd->fd);
134    FD(m,fd)->type = type_CLOSED;
135    FD(m,fd)->app = NULL;
136    FD(m,fd)->arg = NULL;
137
138    if (MIO_CAN_FREE(m))
139    {
140        MIO_FREE_FD(m, fd);
141    }
142}
143
144/** internally accept an incoming connection from a listen sock */
145static void _mio_accept(mio_t m, mio_fd_t fd)
146{
147    struct sockaddr_storage serv_addr;
148    socklen_t addrlen = (socklen_t) sizeof(serv_addr);
149    int newfd;
150    mio_fd_t mio_fd;
151    char ip[INET6_ADDRSTRLEN];
152
153    mio_debug(ZONE, "accepting on fd #%d", fd->fd);
154
155    /* pull a socket off the accept queue and check */
156    newfd = accept(fd->fd, (struct sockaddr*)&serv_addr, &addrlen);
157    if(newfd <= 0) return;
158    if(addrlen <= 0) {
159        close(newfd);
160        return;
161    }
162
163    j_inet_ntop(&serv_addr, ip, sizeof(ip));
164    mio_debug(ZONE, "new socket accepted fd #%d, %s:%d", newfd, ip, j_inet_getport(&serv_addr));
165
166    /* set up the entry for this new socket */
167    mio_fd = _mio_setup_fd(m, newfd, FD(m,fd)->app, FD(m,fd)->arg);
168
169    /* tell the app about the new socket, if they reject it clean up */
170    if (ACT(m, mio_fd, action_ACCEPT, ip))
171    {
172        mio_debug(ZONE, "accept was rejected for %s:%d", ip, newfd);
173        MIO_REMOVE_FD(m, FD(m,mio_fd));
174
175        /* close the socket, and reset all memory */
176        close(newfd);
177        MIO_FREE_FD(m, mio_fd);
178    }
179
180    return;
181}
182
183/** internally change a connecting socket to a normal one */
184static void _mio__connect(mio_t m, mio_fd_t fd)
185{
186    mio_type_t type = FD(m,fd)->type;
187
188    mio_debug(ZONE, "connect processing for fd #%d", fd->fd);
189
190    /* reset type and clear the "write" event that flags connect() is done */
191    FD(m,fd)->type = type_NORMAL;
192    MIO_UNSET_WRITE(m,FD(m,fd));
193
194    /* if the app had asked to do anything in the meantime, do those now */
195    if(type & type_CONNECT_READ) mio_read(m,fd);
196    if(type & type_CONNECT_WRITE) mio_write(m,fd);
197}
198
199/** reset app stuff for this fd */
200static void _mio_app(mio_t m, mio_fd_t fd, mio_handler_t app, void *arg)
201{
202    FD(m,fd)->app = app;
203    FD(m,fd)->arg = arg;
204}
205
206/** main select loop runner */
207static void _mio_run(mio_t m, int timeout)
208{
209    int retval;
210    MIO_INIT_ITERATOR(iter);
211
212    mio_debug(ZONE, "mio running for %d", timeout);
213
214    /* wait for a socket event */
215    retval = MIO_CHECK(m, timeout);
216
217    /* nothing to do */
218    if(retval == 0) return;
219
220    /* an error */
221    if(retval < 0)
222    {
223        mio_debug(ZONE, "MIO_CHECK returned an error (%d)", MIO_ERROR);
224
225        return;
226    }
227
228    mio_debug(ZONE,"mio working: %d", retval);
229
230    /* loop through the sockets, check for stuff to do */
231    MIO_ITERATE_RESULTS(m, retval, iter)
232    {
233        mio_fd_t fd = MIO_ITERATOR_FD(m,iter);
234
235        /* skip already dead slots */ 
236        if(FD(m,fd)->type == type_CLOSED) continue; 
237
238        /* new conns on a listen socket */
239        if(FD(m,fd)->type == type_LISTEN && MIO_CAN_READ(m,iter))
240        {
241            _mio_accept(m, fd);
242            continue;
243        }
244
245        /* check for connecting sockets */
246        if(FD(m,fd)->type & type_CONNECT &&
247           (MIO_CAN_READ(m,iter) || MIO_CAN_WRITE(m,iter)))
248        {
249            _mio__connect(m, fd);
250            continue;
251        }
252
253        /* read from ready sockets */
254        if(FD(m,fd)->type == type_NORMAL && MIO_CAN_READ(m,iter))
255        {
256            /* if they don't want to read any more right now */
257            if(ACT(m, fd, action_READ, NULL) == 0)
258                MIO_UNSET_READ(m, FD(m,fd));
259        }
260
261        /* write to ready sockets */
262        if(FD(m,fd)->type == type_NORMAL && MIO_CAN_WRITE(m,iter))
263        {
264            /* don't wait for writeability if nothing to write anymore */
265            if(ACT(m, fd, action_WRITE, NULL) == 0)
266                MIO_UNSET_WRITE(m, FD(m,fd));
267        }
268
269        /* deferred closing fd
270         * one of previous actions might change the state of fd */ 
271        if(FD(m,fd)->type == type_CLOSED)
272        {
273            MIO_FREE_FD(m, fd);
274        }
275    }
276}
277
278/** start processing read events */
279static void _mio_read(mio_t m, mio_fd_t fd)
280{
281    if(m == NULL || fd == NULL) return;
282
283    /* if connecting, do this later */
284    if(FD(m,fd)->type & type_CONNECT)
285    {
286        FD(m,fd)->type |= type_CONNECT_READ;
287        return;
288    }
289
290    MIO_SET_READ(m, FD(m,fd));
291}
292
293/** try writing to the socket via the app */
294static void _mio_write(mio_t m, mio_fd_t fd)
295{
296    if(m == NULL || fd == NULL) return;
297
298    /* if connecting, do this later */
299    if(FD(m,fd)->type & type_CONNECT)
300    {
301        FD(m,fd)->type |= type_CONNECT_WRITE;
302        return;
303    }
304
305    if(FD(m,fd)->type != type_NORMAL)
306        return;
307
308    if(ACT(m, fd, action_WRITE, NULL) == 0) return;
309
310    /* not all written, do more l8r */
311    MIO_SET_WRITE(m, FD(m,fd));
312}
313
314/** set up a listener in this mio w/ this default app/arg */
315static mio_fd_t _mio_listen(mio_t m, int port, char *sourceip, mio_handler_t app, void *arg)
316{
317    int fd, flag = 1;
318    mio_fd_t mio_fd;
319    struct sockaddr_storage sa;
320
321    if(m == NULL) return NULL;
322
323    mio_debug(ZONE, "mio to listen on %d [%s]", port, sourceip);
324
325    memset(&sa, 0, sizeof(sa));
326
327    /* if we specified an ip to bind to */
328    if(sourceip != NULL && !j_inet_pton(sourceip, &sa))
329        return NULL;
330
331    if(sa.ss_family == 0)
332        sa.ss_family = AF_INET;
333   
334    /* attempt to create a socket */
335    if((fd = socket(sa.ss_family,SOCK_STREAM,0)) < 0) return NULL;
336    if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&flag, sizeof(flag)) < 0) return NULL;
337
338    /* set up and bind address info */
339    j_inet_setport(&sa, port);
340    if(bind(fd,(struct sockaddr*)&sa,j_inet_addrlen(&sa)) < 0)
341    {
342        close(fd);
343        return NULL;
344    }
345
346    /* start listening with a max accept queue of 10 */
347    if(listen(fd, 10) < 0)
348    {
349        close(fd);
350        return NULL;
351    }
352
353    /* now set us up the bomb */
354    mio_fd = _mio_setup_fd(m, fd, app, arg);
355    if(mio_fd == NULL)
356    {
357        close(fd);
358        return NULL;
359    }
360    FD(m,mio_fd)->type = type_LISTEN;
361    /* by default we read for new sockets */
362    mio_read(m,mio_fd);
363
364    return mio_fd;
365}
366
367/** create an fd and connect to the given ip/port */
368static mio_fd_t _mio_connect(mio_t m, int port, char *hostip, mio_handler_t app, void *arg)
369{
370    int fd, flag, flags;
371    mio_fd_t mio_fd;
372    struct sockaddr_storage sa;
373
374    memset(&sa, 0, sizeof(sa));
375
376    if(m == NULL || port <= 0 || hostip == NULL) return NULL;
377
378    mio_debug(ZONE, "mio connecting to %s, port=%d",hostip,port);
379
380    /* convert the hostip */
381    if(j_inet_pton(hostip, &sa)<=0)
382        return NULL;
383
384    /* attempt to create a socket */
385    if((fd = socket(sa.ss_family,SOCK_STREAM,0)) < 0) return NULL;
386
387    /* set the socket to non-blocking before connecting */
388#if defined(HAVE_FCNTL)
389    flags = fcntl(fd, F_GETFL);
390    flags |= O_NONBLOCK;
391    fcntl(fd, F_SETFL, flags);
392#elif defined(HAVE_IOCTL)
393    flags = 1;
394    ioctl(fd, FIONBIO, &flags);
395#endif
396
397    /* set up address info */
398    j_inet_setport(&sa, port);
399
400    /* try to connect */
401    flag = connect(fd,(struct sockaddr*)&sa,j_inet_addrlen(&sa));
402
403    mio_debug(ZONE, "connect returned %d and %s", flag, MIO_STRERROR(MIO_ERROR));
404
405    /* already connected?  great! */
406    if(flag == 0)
407    {
408        mio_fd = _mio_setup_fd(m,fd,app,arg);
409        if(mio_fd != NULL) return mio_fd;
410    }
411
412    /* gotta wait till later */
413#ifdef _WIN32
414    if(flag == -1 && WSAGetLastError() == WSAEWOULDBLOCK)
415#else
416    if(flag == -1 && errno == EINPROGRESS)
417#endif
418    {
419        mio_fd = _mio_setup_fd(m,fd,app,arg);
420        if(mio_fd != NULL)
421        {
422            mio_debug(ZONE, "connect processing non-blocking mode");
423
424            FD(m,mio_fd)->type = type_CONNECT;
425            MIO_SET_WRITE(m,FD(m,mio_fd));
426            return mio_fd;
427        }
428    }
429
430    /* bummer dude */
431    close(fd);
432    return NULL;
433}
434
435
436/** adam */
437static void _mio_free(mio_t m)
438{
439    MIO_FREE_VARS(m);
440
441    free(m);
442}
443
444/** eve */
445static mio_t _mio_new(int maxfd)
446{
447    static struct mio_st mio_impl = {
448        _mio_free,
449        _mio_listen, _mio_connect, _mio_setup_fd,
450        _mio_app,
451        _mio_close,
452        _mio_write, _mio_read,
453        _mio_run
454    };
455    mio_t m;
456
457    /* init winsock if we are in Windows */
458#ifdef _WIN32
459    WSADATA wsaData;
460    if (WSAStartup(MAKEWORD( 1, 1 ), &wsaData))
461        return NULL;
462#endif
463
464    /* allocate and zero out main memory */
465    if((m = malloc(sizeof(struct mio_priv_st))) == NULL) return NULL;
466
467    /* set up our internal vars */
468    *m = &mio_impl;
469    MIO(m)->maxfd = maxfd;
470
471    MIO_INIT_VARS(m);
472
473    return m;
474}
Note: See TracBrowser for help on using the browser.