ceptr
 All Data Structures Files Functions Variables Typedefs Macros Modules Pages
stream.c
Go to the documentation of this file.
1 
12 #include "stream.h"
13 #include <stdlib.h>
14 #include <errno.h>
15 #include <sys/types.h>
16 #include <sys/socket.h>
17 #include <netinet/in.h>
18 #include <netdb.h>
19 #include <arpa/inet.h>
20 #include <sys/wait.h>
21 #include <signal.h>
22 #include <stdio.h>
23 #include <unistd.h>
24 
25 
26 #include "process.h"
27 #include "debug.h"
28 #include "util.h"
29 
30 char *DELIM_LF = "\n";
31 char *DELIM_CRLF = "\r\n";
32 
33 void __st_realloc_reader(Stream *st) {
34  st->buf_size *= 2;
35  debug(D_STREAM,"realloc buffer to %ld\n",st->buf_size);
36  st->buf = realloc(st->buf,st->buf_size+1); //add 1 for a null
37 }
38 
50  FILE *stream = st->data.unix_stream;
51  if (feof(stream)) {
52  debug(D_STREAM,"eof before load\n");
53  st->err = EOF;
54  return 0;
55  }
56  if (__st_buf_full(st)) {
57  __st_realloc_reader(st);
58  }
59  size_t max = st->buf_size-st->bytes_used;
60  char *buf = st->buf+st->bytes_used;
61  st->err = 0;
62 
63  size_t l;
64  if (st->flags & StreamLoadByLine) {
65  int c;
66  l = 0;
67  while(l < max && (c = fgetc(stream)) != EOF) {
68  buf[l++] = c;
69  if (c == '\n') break;
70  }
71  }
72  else l = fread(buf,1,max,stream);
73  debug(D_STREAM,"loaded %ld bytes from file: %.*s\n",l,(int)l,buf);
74  if (l == 0) {
75  st->err = errno;
76  debug(D_STREAM,"read 0 bytes with errno: %d\n",st->err);
77  }
78  else {
79  st->bytes_used += l;
80  }
81  return l;
82 }
83 
84 
96 
97  int sockfd = st->data.socket_stream;
98 
99  if (__st_buf_full(st)) {
100  __st_realloc_reader(st);
101  }
102  size_t max = st->buf_size-st->bytes_used;
103  char *buf = st->buf+st->bytes_used;
104  st->err = 0;
105 
106  size_t l = recv(sockfd, buf, max, 0);
107  debug(D_STREAM,"loaded %ld bytes from socket: %.*s\n",l,(int)l,buf);
108  if (l == -1) {
109  l = 0;
110  st->err = errno;
111  }
112  else {
113  st->bytes_used += l;
114  }
115  return l;
116 }
117 
118 
125 void __st_scan(Stream *st) {
126  int delim_len = st->delim_len;
127  // if this is the initial scan, then setup the unit_start to 0
128  if (st->scan_state == StreamScanInitial)
129  st->unit_start = 0;
130  // otherwise set the unit start the byte after the terminator from the
131  // last read (hence the +1)
132  else if (st->scan_state == StreamScanSuccess)
133  st->unit_start += st->unit_size + delim_len;
134  // we have completely scanned the buffer if the unit start is >= the bytes in the buffer
135  if (st->unit_start >= st->bytes_used) {
136  st->scan_state = StreamScanComplete;
137  return;
138  }
139 
140  // set the current read offset taking into account previous partial scans
141  size_t i= (st->scan_state == StreamScanPartial) ? st->partial : st->unit_start;
142 
143  int chars_matched = 0;
144  while (i<st->bytes_used) {
145  if (st->buf[i] == st->delim[chars_matched]) chars_matched++;
146  if (chars_matched == delim_len) {
147  st->scan_state = StreamScanSuccess;
148  st->unit_size = i - st->unit_start - (delim_len -1);
149  return;
150  }
151  i++;
152  }
153  st->partial = i;
154  st->scan_state = StreamScanPartial;
155 }
156 
157 
158 char *ss2str(int s) {
159  switch(s) {
160  case StreamScanSuccess:return "StreamScanSuccess";
161  case StreamScanPartial:return "StreamScanPartial";
162  case StreamScanComplete:return "StreamScanComplete";
163  case StreamScanInitial:return "StreamScanInitial";
164  }
165 }
166 
167 
179  size_t l;
180 
181  if (st->bytes_used > 0) {
182  debug(D_STREAM,"data in buffer, skipping read to continue scanning\n");
183  goto scan;
184  }
185  // woot! a good use case for gotos!
186  init:
187  __st_init_scan(st);
188  load:
189  if (st->type == UnixStream) {
190  l = __st_unix_stream_load(st);
191  }
192  else if (st->type == SocketStream) {
193  l = __st_socket_stream_load(st);
194  }
195  else raise_error("unknown stream type");
196 
197  if (l==0) {
198  debug(D_STREAM,"load return zero, clearing alive bit. scan state: %s\n",ss2str(st->scan_state));
199  if (st->scan_state == StreamScanPartial) {
200  // treat as successful line
201  st->scan_state = StreamScanComplete;
202  st->flags |= StreamHasData;
203  }
204  else if (st->scan_state == StreamScanInitial)
205  st->scan_state = StreamScanComplete;
206 
207  st->flags &= ~StreamAlive;
208  return;
209  }
210  scan:
211  __st_scan(st);
212  debug(D_STREAM,"scanned with state: %s\n",ss2str(st->scan_state));
213  if (st->scan_state == StreamScanSuccess) {
214  debug(D_STREAM,"scan value: %.*s\n",(int)_st_data_size(st),_st_data(st));
215  st->flags |= StreamHasData;
216  return;
217  }
218  else if (st->scan_state == StreamScanPartial) {
219  debug(D_STREAM,"partial found, trying to load more data\n");
220  goto load;
221  }
222  else if (st->scan_state == StreamScanComplete) {
223  debug(D_STREAM,"buffer fully read, reinitialzing buffer\n");
224  st->bytes_used = 0;
225  goto init;
226  }
227  raise_error("unknown scan state!");
228 }
229 
230 
231 // stream reading thread function
232 void *_st_stream_read(void *arg) {
233  Stream *st = (Stream *) arg;
234  do {
235  debug(D_STREAM,"wating for read.\n");
236  pthread_mutex_lock(&st->mutex);
237  st->flags |= StreamAlive; // don't change the state until the mutex is locked
238  st->flags |= StreamWaiting;
239  pthread_cond_wait(&st->cv, &st->mutex);
240  st->flags &= ~StreamWaiting;
241 
242  if (!(st->flags & StreamHasData) && _st_is_alive(st)) {
243  debug(D_STREAM,"starting read.\n");
244  // this call is expected to block until a unit can be read and will
245  // result in StreamHasData being set when it returns
246  __st_stream_read(st);
247  }
248  else {
249  if (st->flags & StreamHasData) debug(D_STREAM,"hmmm, stream already has data on read wakeup!\n");
250  if (!(st->flags & StreamAlive)) debug(D_STREAM,"hmmm, stream dead on read wakeup!\n");
251  }
252  pthread_mutex_unlock(&st->mutex);
253  if (st->callback) {
254  (st->callback)(st);
255  }
256  } while _st_is_alive(st);
257  debug(D_STREAM,"stream reading finished.\n");
258  pthread_exit(NULL);
259 }
260 
261 // lo level stream allocator function
262 Stream *__st_alloc_stream() {
263  Stream *s = malloc(sizeof(Stream));
264  memset(s,0,sizeof(Stream));
265  s->flags = StreamCloseOnFree;
266  s->delim = DELIM_LF;
267  s->delim_len = 1;
268  return s;
269 }
270 
276 void __st_start_reader(Stream *s,size_t reader_buffer_size) {
277  s->flags |= StreamReader;
278 
279  s->buf = malloc(reader_buffer_size+1);
280  s->buf_size = reader_buffer_size;
281  pthread_mutex_init(&(s->mutex), NULL);
282  pthread_cond_init(&(s->cv), NULL);
283 
284  int rc = pthread_create(&s->pthread,0,_st_stream_read,s);
285  if (rc){
286  raise_error("Error starting reader thread; return code from pthread_create() is %d\n", rc);
287  }
288 
289  // wait for the the reader thread to come alive and block on mutex wait
290  while(!(s->flags & StreamWaiting)) {
291  sleepns(1);
292  };
293 }
294 
300 Stream *__st_new_unix_stream(FILE *stream,size_t reader_buffer_size) {
301  Stream *s = __st_alloc_stream();
302  s->type = UnixStream;
303  s->data.unix_stream = stream;
304 
305  if (reader_buffer_size) {
306  __st_start_reader(s,reader_buffer_size);
307  }
308  return s;
309 }
310 
315  Stream *s = __st_alloc_stream();
316  s->type = SocketStream;
317  s->data.socket_stream = sockfd;
318 
319  __st_start_reader(s, DEFAULT_READER_BUFFER_SIZE);
320 
321  return s;
322 }
323 
324 // get sockaddr, IPv4 or IPv6:
325 void *get_in_addr(struct sockaddr *sa)
326 {
327  if (sa->sa_family == AF_INET) {
328  return &(((struct sockaddr_in*)sa)->sin_addr);
329  }
330 
331  return &(((struct sockaddr_in6*)sa)->sin6_addr);
332 }
333 
334 void *__st_socket_stream_accept(void *arg) {
335  SocketListener *l = (SocketListener *) arg;
336  socklen_t sin_size;
337  struct sockaddr_storage their_addr; // connector's address information
338  int new_fd;
339  char s[INET6_ADDRSTRLEN];
340 
341  do {
342  sin_size = sizeof their_addr;
343  debug(D_SOCKET,"listener on %d: starting accept\n",l->port);
344  new_fd = accept(l->sockfd, (struct sockaddr *)&their_addr, &sin_size);
345  debug(D_SOCKET,"listener on %d: got accept with fd %d\n",l->port,new_fd);
346  if (new_fd == -1) {
347  debug(D_SOCKET,"accept err %d\n",errno);
348  break;
349  }
350 
351  if (debugging(D_SOCKET)) {
352  inet_ntop(their_addr.ss_family,
353  get_in_addr((struct sockaddr *)&their_addr),
354  s, sizeof s);
355  debug(D_SOCKET,"listener on %d: got connection from %s\n",l->port, s);
356  }
357 
358  Stream *st = _st_new_socket_stream(new_fd);
359  st->delim = l->delim;
360  st->delim_len = strlen(st->delim);
361 
362  (l->callback)(st,l->callback_arg);
363 
364  } while(l->alive);
365  pthread_exit(NULL);
366 }
367 
368 #define BACKLOG 10
369 
372 SocketListener *_st_new_socket_listener(int port,lisenterConnectionCallbackFn fn,void *callback_arg,char * delim) {
373  char portstr[255];
374  sprintf(portstr,"%d",port);
375  SocketListener *l = malloc(sizeof(SocketListener));
376  l->port = port;
377  l->callback = fn;
378  l->callback_arg = callback_arg;
379  l->alive = true;
380  l->delim = delim;
381 
382  int sockfd; // listen on sock_fd, new connection on new_fd
383  struct addrinfo hints, *servinfo, *p;
384  int yes=1;
385  char s[INET6_ADDRSTRLEN];
386  int rv;
387 
388  memset(&hints, 0, sizeof hints);
389  hints.ai_family = AF_UNSPEC;
390  hints.ai_socktype = SOCK_STREAM;
391  hints.ai_flags = AI_PASSIVE; // use my IP
392 
393  if ((rv = getaddrinfo(NULL, portstr, &hints, &servinfo)) != 0) {
394  raise_error("getaddrinfo error: %s\n", gai_strerror(rv));
395  }
396 
397  // loop through all the results and bind to the first we can
398  for(p = servinfo; p != NULL; p = p->ai_next) {
399  if ((sockfd = socket(p->ai_family, p->ai_socktype,
400  p->ai_protocol)) == -1) {
401  debug(D_SOCKET,"server: socket err %d",errno);
402  continue;
403  }
404 
405  if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes,
406  sizeof(int)) == -1) {
407  raise_error("setsockopt %d",errno);
408  }
409 
410  if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
411  close(sockfd);
412  debug(D_SOCKET,"server: bind err %d",errno);
413  continue;
414  }
415 
416  break;
417  }
418  l->sockfd = sockfd;
419 
420  freeaddrinfo(servinfo); // all done with this structure
421 
422  if (p == NULL) {
423  raise_error("server: failed to bind %d\n",errno);
424  }
425 
426  if (listen(sockfd, BACKLOG) == -1) {
427  raise_error("listen error %d\n",errno);
428  }
429  debug(D_SOCKET,"starting listener thread on %d\n",port);
430  int rc = pthread_create(&l->pthread,0,__st_socket_stream_accept,l);
431  if (rc){
432  raise_error("Error starting listener thread; return code from pthread_create() is %d\n", rc);
433  }
434  pthread_detach(l->pthread);
435  if (rc){
436  raise_error("Error detaching listener thread; return code from pthread_detach() is %d\n", rc);
437  }
438  return l;
439 }
440 
445  l->alive = false;
446  shutdown(l->sockfd,SHUT_RDWR);
447  //@todo this is a cheap way to signal the listener accept thread to die, we need to do something better
448  sleepms(1);
449  free(l);
450 }
451 
456 
457  // don't throw the error if the stream is being killed
458  // @todo figure out why this didn't work by testing the value of StreamAlive which is
459  // also cleared in _st_kill at the same time StreamDying is set.
460  if ((st->flags & StreamHasData) && !(st->flags & StreamDying)) {raise_error("stream data hasn't been consumed!");}
461  debug(D_STREAM,"waking stream reader\n");
462  pthread_mutex_lock(&st->mutex);
463  pthread_cond_signal(&st->cv);
464  pthread_mutex_unlock(&st->mutex);
465 }
466 
471  debug(D_STREAM,"marking data as read\n");
472  st->flags &= ~StreamHasData;
473 }
474 
481 void _st_kill(Stream *st) {
482  if (st->flags & StreamDying) {
483  debug(D_STREAM,"already dying\n");
484  return;
485  }
486 
487  st->flags &= ~StreamAlive;
488  st->flags |= StreamDying;
489  if (st->type == SocketStream) {
490  debug(D_SOCKET,"shutting down socket in st_kill\n");
491  shutdown(st->data.socket_stream,SHUT_RDWR);
492  }
493  if (st->flags & StreamReader) {
494  debug(D_STREAM,"shutting down reader in st_kill\n");
495  st->scan_state = StreamScanComplete;
496  if (st->flags & StreamWaiting) {
497  _st_start_read(st);
498  while(st->flags & StreamWaiting) {sleepms(1);};
499  }
500  }
501 }
502 
507 void _st_free(Stream *st) {
508 
509  if (st->flags & StreamCloseOnFree) {
510  debug(D_STREAM,"cleaning up stream\n");
511 
512  if (st->type == UnixStream)
513  fclose(st->data.unix_stream);
514  else if (st->type == SocketStream) {
515  close(st->data.socket_stream);
516  }
517  //else raise_error("unknown stream type:%d\n",st->type);
518  }
519  _st_kill(st);
520  //@todo who should clean up the mutexes??
521  if (st->flags & StreamReader) {
522  debug(D_STREAM,"cleaning up reader\n");
523  free(st->buf);
524  pthread_mutex_destroy(&st->mutex);
525  pthread_cond_destroy(&st->cv);
526  }
527  free(st);
528 }
529 
534 int _st_write(Stream *st,char *buf,size_t len) {
535  int bytes_written;
536  if (st->type == UnixStream) {
537  FILE *stream = st->data.unix_stream;
538  bytes_written = fwrite(buf,sizeof(char),len,stream);
539  if (bytes_written > 0) {
540  // reading and restoring the stream position here is due to a bug in the glibc 2.23
541  // see: https://sourceware.org/bugzilla/show_bug.cgi?id=20005
542  long pos = ftell(stream);
543  int err = fflush(stream);
544  if (err) raise_error("got error on flush: %d",errno);
545  fseek(stream,pos,SEEK_SET);
546  }
547  }
548  else if (st->type == SocketStream) {
549  bytes_written = send(st->data.socket_stream,buf,len,MSG_NOSIGNAL);
550  }
551  else raise_error("unknown stream type:%d\n",st->type);
552  debug(D_STREAM,"write of '%s' results in %d\n",buf,bytes_written);
553  if (st->flags & StreamCloseAfterOneWrite) {
554  _st_kill(st);
555  }
556  return bytes_written;
557 }
558 
559 
563 int _st_writeln(Stream *stream,char *str) {
564  int len = strlen(str);
565  int err = 0;
566  if (len) err = _st_write(stream,str,len);
567  if (!len || err > 0) {
568  int e = _st_write(stream,stream->delim,stream->delim_len);
569  if (e > 0) err += e;
570  }
571  return err;
572 }
void _st_close_listener(SocketListener *l)
Definition: stream.c:444
int _st_write(Stream *st, char *buf, size_t len)
Definition: stream.c:534
Definition: stream.h:30
Stream * __st_new_unix_stream(FILE *stream, size_t reader_buffer_size)
Definition: stream.c:300
void _st_data_consumed(Stream *st)
Definition: stream.c:470
void __st_start_reader(Stream *s, size_t reader_buffer_size)
Definition: stream.c:276
Stream * _st_new_socket_stream(int sockfd)
Definition: stream.c:314
void __st_stream_read(Stream *st)
Definition: stream.c:178
streams abstraction header file
void _st_kill(Stream *st)
Definition: stream.c:481
void _st_start_read(Stream *st)
Definition: stream.c:455
void _st_free(Stream *st)
Definition: stream.c:507
processing header files
void __st_scan(Stream *st)
Definition: stream.c:125
size_t __st_socket_stream_load(Stream *st)
Definition: stream.c:95
int _st_writeln(Stream *stream, char *str)
Definition: stream.c:563
size_t __st_unix_stream_load(Stream *st)
Definition: stream.c:49
SocketListener * _st_new_socket_listener(int port, lisenterConnectionCallbackFn fn, void *callback_arg, char *delim)
Definition: stream.c:372