ceptr
 All Data Structures Files Functions Variables Typedefs Macros Modules Pages
stream.h
Go to the documentation of this file.
1 
12 #ifndef _CEPTR_STREAM_H
13 #define _CEPTR_STREAM_H
14 
15 #include <stdio.h>
16 #include <pthread.h>
17 #include <stdbool.h>
18 
19 enum StreamTypes {UnixStream,SocketStream};
20 enum {StreamHasData=0x0001,StreamCloseOnFree=0x0002,StreamReader=0x0004,StreamWaiting=0x0008,StreamAlive=0x8000,StreamCloseAfterOneWrite=0x0010,StreamDying=0x0100,StreamLoadByLine=0x0200};
21 
22 typedef struct Stream Stream;
23 
24 typedef void (*hasDataCallbackFn)(Stream *);
25 
26 enum ScanStates {StreamScanInitial,StreamScanSuccess,StreamScanPartial,StreamScanComplete};
30 struct Stream {
31  int type;
32  int flags;
33  int err;
34  pthread_t pthread;
35  pthread_mutex_t mutex;
36  pthread_cond_t cv;
37  char *buf;
38  size_t buf_size;
39  size_t bytes_used;
40  int scan_state;
41  size_t unit_start;
42  size_t unit_size;
43  size_t partial;
44  hasDataCallbackFn callback;
45  void *callback_arg1;
46  int callback_arg2;
47  char *delim;
48  int delim_len;
49  // hold data for the different types of streams that are implemented
50  // for now just unix_streams
51  union {
52  FILE *unix_stream;
53  int socket_stream;
54  } data;
55 };
56 
57 typedef struct SocketListener SocketListener;
58 typedef void (*lisenterConnectionCallbackFn)(Stream *,void *);
59 
61  int port;
62  int sockfd;
63  pthread_t pthread;
64  lisenterConnectionCallbackFn callback;
65  void *callback_arg;
66  bool alive;
67  char *delim;
68 };
69 
70 char *DELIM_LF;
71 char *DELIM_CRLF;
72 
73 #define DEFAULT_READER_BUFFER_SIZE 1000
74 #define _st_new_unix_stream(s,r) __st_new_unix_stream(s,r?DEFAULT_READER_BUFFER_SIZE:0)
75 Stream *__st_new_unix_stream(FILE *stream,size_t reader_buffer_size);
76 Stream *__st_alloc_stream();
77 size_t __st_unix_stream_load(Stream *st);
78 #define __st_init_scan(s) s->scan_state = StreamScanInitial
79 #define __st_buf_full(s) (s->bytes_used == s->buf_size)
80 
81 void __st_scan(Stream *st);
82 
83 SocketListener *_st_new_socket_listener(int port,lisenterConnectionCallbackFn fn,void *callback_arg,char * delim);
85 
86 void _st_start_read(Stream *st);
87 void _st_data_consumed(Stream *st);
88 #define _st_is_alive(st) ((st->flags & StreamAlive) || (st->flags & StreamReader && (st->scan_state != StreamScanComplete)))
89 void _st_kill(Stream *st);
90 void _st_free(Stream *);
91 #define _st_data(st) (&(st)->buf[st->unit_start])
92 #define _st_data_size(st) (st)->unit_size
93 
94 int _st_write(Stream *stream,char *buf,size_t len);
95 int _st_writeln(Stream *stream,char *buf);
96 
97 #endif
98 
Stream * __st_new_unix_stream(FILE *stream, size_t reader_buffer_size)
Definition: stream.c:300
Definition: stream.h:30
SocketListener * _st_new_socket_listener(int port, lisenterConnectionCallbackFn fn, void *callback_arg, char *delim)
Definition: stream.c:372
void _st_kill(Stream *st)
Definition: stream.c:481
void _st_data_consumed(Stream *st)
Definition: stream.c:470
void __st_scan(Stream *st)
Definition: stream.c:125
int _st_writeln(Stream *stream, char *buf)
Definition: stream.c:563
void _st_start_read(Stream *st)
Definition: stream.c:455
void _st_close_listener(SocketListener *l)
Definition: stream.c:444
int _st_write(Stream *stream, char *buf, size_t len)
Definition: stream.c:534
size_t __st_unix_stream_load(Stream *st)
Definition: stream.c:49
void _st_free(Stream *)
Definition: stream.c:507