15 #include <sys/types.h>
16 #include <sys/socket.h>
17 #include <netinet/in.h>
19 #include <arpa/inet.h>
30 char *DELIM_LF =
"\n";
31 char *DELIM_CRLF =
"\r\n";
33 void __st_realloc_reader(
Stream *st) {
35 debug(D_STREAM,
"realloc buffer to %ld\n",st->buf_size);
36 st->buf = realloc(st->buf,st->buf_size+1);
50 FILE *stream = st->data.unix_stream;
52 debug(D_STREAM,
"eof before load\n");
56 if (__st_buf_full(st)) {
57 __st_realloc_reader(st);
59 size_t max = st->buf_size-st->bytes_used;
60 char *buf = st->buf+st->bytes_used;
64 if (st->flags & StreamLoadByLine) {
67 while(l < max && (c = fgetc(stream)) != EOF) {
72 else l = fread(buf,1,max,stream);
73 debug(D_STREAM,
"loaded %ld bytes from file: %.*s\n",l,(
int)l,buf);
76 debug(D_STREAM,
"read 0 bytes with errno: %d\n",st->err);
97 int sockfd = st->data.socket_stream;
99 if (__st_buf_full(st)) {
100 __st_realloc_reader(st);
102 size_t max = st->buf_size-st->bytes_used;
103 char *buf = st->buf+st->bytes_used;
106 size_t l = recv(sockfd, buf, max, 0);
107 debug(D_STREAM,
"loaded %ld bytes from socket: %.*s\n",l,(
int)l,buf);
126 int delim_len = st->delim_len;
128 if (st->scan_state == StreamScanInitial)
132 else if (st->scan_state == StreamScanSuccess)
133 st->unit_start += st->unit_size + delim_len;
135 if (st->unit_start >= st->bytes_used) {
136 st->scan_state = StreamScanComplete;
141 size_t i= (st->scan_state == StreamScanPartial) ? st->partial : st->unit_start;
143 int chars_matched = 0;
144 while (i<st->bytes_used) {
145 if (st->buf[i] == st->delim[chars_matched]) chars_matched++;
146 if (chars_matched == delim_len) {
147 st->scan_state = StreamScanSuccess;
148 st->unit_size = i - st->unit_start - (delim_len -1);
154 st->scan_state = StreamScanPartial;
158 char *ss2str(
int s) {
160 case StreamScanSuccess:
return "StreamScanSuccess";
161 case StreamScanPartial:
return "StreamScanPartial";
162 case StreamScanComplete:
return "StreamScanComplete";
163 case StreamScanInitial:
return "StreamScanInitial";
181 if (st->bytes_used > 0) {
182 debug(D_STREAM,
"data in buffer, skipping read to continue scanning\n");
189 if (st->type == UnixStream) {
192 else if (st->type == SocketStream) {
195 else raise_error(
"unknown stream type");
198 debug(D_STREAM,
"load return zero, clearing alive bit. scan state: %s\n",ss2str(st->scan_state));
199 if (st->scan_state == StreamScanPartial) {
201 st->scan_state = StreamScanComplete;
202 st->flags |= StreamHasData;
204 else if (st->scan_state == StreamScanInitial)
205 st->scan_state = StreamScanComplete;
207 st->flags &= ~StreamAlive;
212 debug(D_STREAM,
"scanned with state: %s\n",ss2str(st->scan_state));
213 if (st->scan_state == StreamScanSuccess) {
214 debug(D_STREAM,
"scan value: %.*s\n",(
int)_st_data_size(st),_st_data(st));
215 st->flags |= StreamHasData;
218 else if (st->scan_state == StreamScanPartial) {
219 debug(D_STREAM,
"partial found, trying to load more data\n");
222 else if (st->scan_state == StreamScanComplete) {
223 debug(D_STREAM,
"buffer fully read, reinitialzing buffer\n");
227 raise_error(
"unknown scan state!");
232 void *_st_stream_read(
void *arg) {
235 debug(D_STREAM,
"wating for read.\n");
236 pthread_mutex_lock(&st->mutex);
237 st->flags |= StreamAlive;
238 st->flags |= StreamWaiting;
239 pthread_cond_wait(&st->cv, &st->mutex);
240 st->flags &= ~StreamWaiting;
242 if (!(st->flags & StreamHasData) && _st_is_alive(st)) {
243 debug(D_STREAM,
"starting read.\n");
249 if (st->flags & StreamHasData) debug(D_STREAM,
"hmmm, stream already has data on read wakeup!\n");
250 if (!(st->flags & StreamAlive)) debug(D_STREAM,
"hmmm, stream dead on read wakeup!\n");
252 pthread_mutex_unlock(&st->mutex);
256 }
while _st_is_alive(st);
257 debug(D_STREAM,
"stream reading finished.\n");
262 Stream *__st_alloc_stream() {
264 memset(s,0,
sizeof(
Stream));
265 s->flags = StreamCloseOnFree;
277 s->flags |= StreamReader;
279 s->buf = malloc(reader_buffer_size+1);
280 s->buf_size = reader_buffer_size;
281 pthread_mutex_init(&(s->mutex), NULL);
282 pthread_cond_init(&(s->cv), NULL);
284 int rc = pthread_create(&s->pthread,0,_st_stream_read,s);
286 raise_error(
"Error starting reader thread; return code from pthread_create() is %d\n", rc);
290 while(!(s->flags & StreamWaiting)) {
301 Stream *s = __st_alloc_stream();
302 s->type = UnixStream;
303 s->data.unix_stream = stream;
305 if (reader_buffer_size) {
315 Stream *s = __st_alloc_stream();
316 s->type = SocketStream;
317 s->data.socket_stream = sockfd;
325 void *get_in_addr(
struct sockaddr *sa)
327 if (sa->sa_family == AF_INET) {
328 return &(((
struct sockaddr_in*)sa)->sin_addr);
331 return &(((
struct sockaddr_in6*)sa)->sin6_addr);
334 void *__st_socket_stream_accept(
void *arg) {
337 struct sockaddr_storage their_addr;
339 char s[INET6_ADDRSTRLEN];
342 sin_size =
sizeof their_addr;
343 debug(D_SOCKET,
"listener on %d: starting accept\n",l->port);
344 new_fd = accept(l->sockfd, (
struct sockaddr *)&their_addr, &sin_size);
345 debug(D_SOCKET,
"listener on %d: got accept with fd %d\n",l->port,new_fd);
347 debug(D_SOCKET,
"accept err %d\n",errno);
351 if (debugging(D_SOCKET)) {
352 inet_ntop(their_addr.ss_family,
353 get_in_addr((
struct sockaddr *)&their_addr),
355 debug(D_SOCKET,
"listener on %d: got connection from %s\n",l->port, s);
359 st->delim = l->delim;
360 st->delim_len = strlen(st->delim);
362 (l->callback)(st,l->callback_arg);
374 sprintf(portstr,
"%d",port);
378 l->callback_arg = callback_arg;
383 struct addrinfo hints, *servinfo, *p;
385 char s[INET6_ADDRSTRLEN];
388 memset(&hints, 0,
sizeof hints);
389 hints.ai_family = AF_UNSPEC;
390 hints.ai_socktype = SOCK_STREAM;
391 hints.ai_flags = AI_PASSIVE;
393 if ((rv = getaddrinfo(NULL, portstr, &hints, &servinfo)) != 0) {
394 raise_error(
"getaddrinfo error: %s\n", gai_strerror(rv));
398 for(p = servinfo; p != NULL; p = p->ai_next) {
399 if ((sockfd = socket(p->ai_family, p->ai_socktype,
400 p->ai_protocol)) == -1) {
401 debug(D_SOCKET,
"server: socket err %d",errno);
405 if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes,
406 sizeof(
int)) == -1) {
407 raise_error(
"setsockopt %d",errno);
410 if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
412 debug(D_SOCKET,
"server: bind err %d",errno);
420 freeaddrinfo(servinfo);
423 raise_error(
"server: failed to bind %d\n",errno);
426 if (listen(sockfd, BACKLOG) == -1) {
427 raise_error(
"listen error %d\n",errno);
429 debug(D_SOCKET,
"starting listener thread on %d\n",port);
430 int rc = pthread_create(&l->pthread,0,__st_socket_stream_accept,l);
432 raise_error(
"Error starting listener thread; return code from pthread_create() is %d\n", rc);
434 pthread_detach(l->pthread);
436 raise_error(
"Error detaching listener thread; return code from pthread_detach() is %d\n", rc);
446 shutdown(l->sockfd,SHUT_RDWR);
460 if ((st->flags & StreamHasData) && !(st->flags & StreamDying)) {raise_error(
"stream data hasn't been consumed!");}
461 debug(D_STREAM,
"waking stream reader\n");
462 pthread_mutex_lock(&st->mutex);
463 pthread_cond_signal(&st->cv);
464 pthread_mutex_unlock(&st->mutex);
471 debug(D_STREAM,
"marking data as read\n");
472 st->flags &= ~StreamHasData;
482 if (st->flags & StreamDying) {
483 debug(D_STREAM,
"already dying\n");
487 st->flags &= ~StreamAlive;
488 st->flags |= StreamDying;
489 if (st->type == SocketStream) {
490 debug(D_SOCKET,
"shutting down socket in st_kill\n");
491 shutdown(st->data.socket_stream,SHUT_RDWR);
493 if (st->flags & StreamReader) {
494 debug(D_STREAM,
"shutting down reader in st_kill\n");
495 st->scan_state = StreamScanComplete;
496 if (st->flags & StreamWaiting) {
498 while(st->flags & StreamWaiting) {sleepms(1);};
509 if (st->flags & StreamCloseOnFree) {
510 debug(D_STREAM,
"cleaning up stream\n");
512 if (st->type == UnixStream)
513 fclose(st->data.unix_stream);
514 else if (st->type == SocketStream) {
515 close(st->data.socket_stream);
521 if (st->flags & StreamReader) {
522 debug(D_STREAM,
"cleaning up reader\n");
524 pthread_mutex_destroy(&st->mutex);
525 pthread_cond_destroy(&st->cv);
536 if (st->type == UnixStream) {
537 FILE *stream = st->data.unix_stream;
538 bytes_written = fwrite(buf,
sizeof(
char),len,stream);
539 if (bytes_written > 0) {
542 long pos = ftell(stream);
543 int err = fflush(stream);
544 if (err) raise_error(
"got error on flush: %d",errno);
545 fseek(stream,pos,SEEK_SET);
548 else if (st->type == SocketStream) {
549 bytes_written = send(st->data.socket_stream,buf,len,MSG_NOSIGNAL);
551 else raise_error(
"unknown stream type:%d\n",st->type);
552 debug(D_STREAM,
"write of '%s' results in %d\n",buf,bytes_written);
553 if (st->flags & StreamCloseAfterOneWrite) {
556 return bytes_written;
564 int len = strlen(str);
566 if (len) err =
_st_write(stream,str,len);
567 if (!len || err > 0) {
568 int e =
_st_write(stream,stream->delim,stream->delim_len);
void _st_close_listener(SocketListener *l)
int _st_write(Stream *st, char *buf, size_t len)
Stream * __st_new_unix_stream(FILE *stream, size_t reader_buffer_size)
void _st_data_consumed(Stream *st)
void __st_start_reader(Stream *s, size_t reader_buffer_size)
Stream * _st_new_socket_stream(int sockfd)
void __st_stream_read(Stream *st)
streams abstraction header file
void _st_kill(Stream *st)
void _st_start_read(Stream *st)
void _st_free(Stream *st)
void __st_scan(Stream *st)
size_t __st_socket_stream_load(Stream *st)
int _st_writeln(Stream *stream, char *str)
size_t __st_unix_stream_load(Stream *st)
SocketListener * _st_new_socket_listener(int port, lisenterConnectionCallbackFn fn, void *callback_arg, char *delim)