ceptr
 All Data Structures Files Functions Variables Typedefs Macros Modules Pages
stream_spec.h
Go to the documentation of this file.
1 
7 #include "../src/ceptr.h"
8 #include "../src/stream.h"
9 #include <unistd.h>
10 #include <errno.h>
11 
12 void testStreamCreate() {
13  // test basic stream creation
14  Stream *s = _st_new_unix_stream(stdout,0);
15  spec_is_equal(s->type,UnixStream);
16  spec_is_equal(s->flags,StreamCloseOnFree);
17  spec_is_ptr_equal(s->data.unix_stream,stdout);
18  s->flags &= ~StreamCloseOnFree; // don't close the stdin on free...
19  _st_free(s);
20 }
21 
22 void testStreamAlive() {
23  FILE *input;
24 
25  char data[] = "line1\nline2\n";
26  input = fmemopen(data, strlen(data), "r");
27  Stream *s = _st_new_unix_stream(input,1);
28 
29  spec_is_true(_st_is_alive(s));
30  _st_kill(s);
31  spec_is_false(_st_is_alive(s));
32 
33  void *status;
34  int rc;
35 
36  rc = pthread_join(s->pthread, &status);
37  if (rc) {
38  raise_error("ERROR; return code from pthread_join() is %d\n", rc);
39  }
40  _st_free(s);
41 }
42 
43 void testStreamScan() {
44 
45  Stream * s = __st_alloc_stream();
46  s->type = 99; // typed doesn't matter, just testing scan.
47  s->buf_size = 100;
48 
49  // setup stream buffer as if three lines have been read in
50  s->buf = "line1\ncat\ndog";
51  s->bytes_used = strlen(s->buf);
52 
53  // test initializing the scan state
54  __st_init_scan(s);
55  spec_is_equal(s->scan_state,StreamScanInitial);
56 
57  // test how scans loads state data with offsets into the buffer for each unit
58  __st_scan(s);
59  spec_is_equal(s->scan_state,StreamScanSuccess);
60  spec_is_equal(s->unit_start,0);
61  spec_is_equal(s->unit_size,5);
62  __st_scan(s);
63  spec_is_equal(s->scan_state,StreamScanSuccess);
64  spec_is_equal(s->unit_start,6);
65  spec_is_equal(s->unit_size,3);
66 
67  // test how scan of unterminated data at end of buffer sets state to Partial
68  __st_scan(s);
69  spec_is_equal(s->scan_state,StreamScanPartial);
70  spec_is_equal(s->unit_start,10);
71 
72  // repeat calls to scan continue with Partial result
73  __st_scan(s);
74  spec_is_equal(s->scan_state,StreamScanPartial);
75 
76  // simulate reading in some extra bytes into the buffer (but not enough)
77  s->buf = "line1\ncat\ndogg"; s->bytes_used = strlen(s->buf);
78 
79  __st_scan(s);
80  spec_is_equal(s->scan_state,StreamScanPartial);
81 
82  // simulate reading in more bytes plus newline
83  s->buf = "line1\ncat\ndoggy\n"; s->bytes_used = strlen(s->buf);
84 
85  __st_scan(s);
86  spec_is_equal(s->scan_state,StreamScanSuccess);
87  spec_is_equal(s->unit_start,10);
88  spec_is_equal(s->unit_size,5);
89 
90  __st_scan(s);
91  spec_is_equal(s->scan_state,StreamScanComplete);
92 
93  s->buf = "";
94  s->bytes_used = strlen(s->buf);
95 
96  __st_init_scan(s);
97  __st_scan(s);
98  spec_is_equal(s->scan_state,StreamScanComplete);
99 
100  // test scanning with a multi-char delimiter
101  s->delim = DELIM_CRLF;
102  s->delim_len = 2;
103  s->buf = "line1\r\ncat\r\ndoggy\r\n";
104  s->bytes_used = strlen(s->buf);
105  __st_init_scan(s);
106  __st_scan(s);
107  spec_is_equal(s->scan_state,StreamScanSuccess);
108  spec_is_equal(s->unit_start,0);
109  spec_is_equal(s->unit_size,5);
110  __st_scan(s);
111  spec_is_equal(s->scan_state,StreamScanSuccess);
112  spec_is_equal(s->unit_start,7);
113  spec_is_equal(s->unit_size,3);
114 
115  _st_free(s);
116 
117 }
118 
119 void testStreamFileLoad() {
120  FILE *input;
121  //debug_enable(D_STREAM);
122 
123  char data[] = "line1\nline2\nline3\n";
124  input = fmemopen(data, strlen(data), "r");
125  Stream *s = _st_new_unix_stream(input,false);
126  // manually allocate a buffer for reading
127  s->buf = malloc(100);
128  s->buf_size = 99;
129  memset(s->buf,0,100);
130 
131  // start by loading line, by line
132  s->flags |= StreamLoadByLine;
133  spec_is_equal(__st_unix_stream_load(s),6);
134  spec_is_str_equal(s->buf,"line1\n");
135 
136  // then just switch to slurp mode.
137  s->flags &= ~StreamLoadByLine;
138  spec_is_equal(__st_unix_stream_load(s),12);
139  spec_is_str_equal(s->buf,data);
140  spec_is_equal(s->bytes_used,strlen(data));
141  spec_is_equal(errno,0);
142  spec_is_equal(s->err,0);
143 
144  // test reading after eof
145  spec_is_equal(__st_unix_stream_load(s),0);
146  spec_is_equal(errno,0);
147  spec_is_equal(s->err,EOF);
148 
149  free(s->buf);
150  _st_free(s);
151 
152  // redo with a small buffer
153  input = fmemopen(data, strlen(data), "r");
154  s = _st_new_unix_stream(input,false);
155 
156  // manually allocate a buffer for reading
157  s->buf = malloc(11);
158  s->buf_size = 10;
159  memset(s->buf,0,11);
160 
161  spec_is_equal(__st_unix_stream_load(s),10);
162  spec_is_str_equal(s->buf,"line1\nline");
163  spec_is_true(__st_buf_full(s));
164  spec_is_equal(s->bytes_used,10);
165  spec_is_equal(errno,0);
166  spec_is_equal(s->err,0);
167 
168  spec_is_equal(__st_unix_stream_load(s),8);
169  s->buf[s->bytes_used] = 0;
170  spec_is_str_equal(s->buf,data);
171  spec_is_false(__st_buf_full(s));
172  spec_is_equal(s->buf_size,20);
173  spec_is_equal(s->bytes_used,strlen(data));
174  spec_is_equal(errno,0);
175  spec_is_equal(s->err,0);
176 
177  free(s->buf);
178  _st_free(s);
179  debug_disable(D_STREAM);
180 }
181 
182 void testCallback(Stream *st) {
183  spec_is_buffer_equal(_st_data(st),"line2",_st_data_size(st));
184  spec_is_str_equal((char *)st->callback_arg1,"arg val");
185  st->callback = 0;
186 }
187 
188 // test reading from a file stream
189 void testStreamRead(size_t rs) {
190  //debug_enable(D_STREAM);
191  FILE *input;
192 
193  char data[] = "line1\nline2\nline3";
194  input = fmemopen(data, strlen(data), "r");
195  Stream *s = __st_new_unix_stream(input,rs);
196  spec_is_true(s->flags&StreamReader);
197  spec_is_true(s->flags&StreamWaiting);
198 
199  _st_start_read(s);
200  while(!(s->flags&StreamHasData) && _st_is_alive(s)) {sleepms(1);};
201  spec_is_buffer_equal(_st_data(s),"line1",_st_data_size(s));
203  spec_is_false(s->flags&StreamHasData);
204  spec_is_true(_st_is_alive(s));
205 
206  // test callback getting the value
207  s->callback = testCallback;
208  s->callback_arg1 = "arg val";
209  _st_start_read(s);
210  while(!(s->flags&StreamHasData) && _st_is_alive(s) ) {sleepms(1);};
211  spec_is_true(s->flags&StreamHasData);
213  spec_is_false(s->flags&StreamHasData);
214 
215  // test final value with no new-line
216  _st_start_read(s);
217  while(!(s->flags&StreamHasData) && _st_is_alive(s) ) {sleepms(1);};
218  spec_is_true(s->flags&StreamHasData);
219  spec_is_buffer_equal(_st_data(s),"line3",_st_data_size(s));
221  spec_is_false(s->flags&StreamHasData);
222 
223  _st_start_read(s);
224  while(!(s->flags&StreamHasData) && _st_is_alive(s) ) {sleepms(1);};
225  spec_is_false(s->flags&StreamHasData);
226  spec_is_false(_st_is_alive(s));
227 
228  debug_disable(D_STREAM);
229 
230  void *status;
231  int rc;
232 
233  rc = pthread_join(s->pthread, &status);
234  if (rc) {
235  raise_error("ERROR; return code from pthread_join() is %d\n", rc);
236  }
237 
238  _st_free(s);
239 }
240 
241 void testStreamWrite() {
242  char buffer[500] = "x";
243  FILE *stream;
244  stream = fmemopen(buffer, 500, "r+");
245 
246  Stream *st = _st_new_unix_stream(stream,1);
247  spec_is_equal(_st_write(st,"fishy",5),5);
248 
249  char *expected_result = "fishy";
250  spec_is_str_equal(buffer,expected_result);
251 
252  spec_is_equal(_st_write(st," in the sea",11),11);
253  spec_is_str_equal(buffer,"fishy in the sea");
254 
255  _st_free(st);
256 }
257 
258 void testStreamWriteLine() {
259  char buffer[500] = "x";
260  FILE *stream;
261  stream = fmemopen(buffer, 500, "r+");
262 
263  Stream *st = _st_new_unix_stream(stream,1);
264  spec_is_equal(_st_writeln(st,"fishy"),6);
265 
266  char *expected_result = "fishy\n";
267  spec_is_str_equal(buffer,expected_result);
268 
269  spec_is_equal(_st_writeln(st,"in the sea"),11);
270  spec_is_str_equal(buffer,"fishy\nin the sea\n");
271 
272  st->delim = DELIM_CRLF;
273  st->delim_len = 2;
274 
275  spec_is_equal(_st_writeln(st,"not me"),8);
276  spec_is_str_equal(buffer,"fishy\nin the sea\nnot me\r\n");
277 
278  _st_free(st);
279 }
280 
281 
282 void testSocketListernCallback(Stream *s,void *arg) {
283  spec_is_true(s->flags&StreamReader);
284  spec_is_true(s->flags&StreamWaiting);
285  spec_is_equal(*(int *)arg,31415);
286 
287  _st_start_read(s);
288  while(!(s->flags&StreamHasData) && s->flags&StreamAlive ) {sleepms(1);};
289  spec_is_buffer_equal(_st_data(s),"testing!",_st_data_size(s));
291 
292  _st_start_read(s);
293  while(!(s->flags&StreamHasData) && s->flags&StreamAlive ) {sleepms(1);};
294  spec_is_buffer_equal(_st_data(s),"fish",_st_data_size(s));
296  _st_write(s,"fishy",6);
297  _st_free(s);
298 }
299 
300 void testStreamSocket() {
301  // debug_enable(D_SOCKET+D_STREAM);
302 
303  int arg = 31415;
304  SocketListener *l = _st_new_socket_listener(8888,testSocketListernCallback,&arg,DELIM_LF);
305  char *result = doSys("echo 'testing!\nfish\n' | nc localhost 8888");
306  spec_is_str_equal(result,"fishy");
307  free(result);
308 
309  result = doSys("echo 'testing!\nfish\n' | nc localhost 8888");
310  spec_is_str_equal(result,"fishy");
311  free(result);
312 
314 
315  debug_disable(D_SOCKET+D_STREAM);
316 }
317 
318 void testStream() {
319  testStreamCreate();
320  testStreamAlive();
321  testStreamScan();
322  testStreamFileLoad();
323  testStreamRead(1000);
324  testStreamRead(10);
325  testStreamRead(2);
326  testStreamWrite();
327  testStreamWriteLine();
328  testStreamSocket();
329 }
void _st_close_listener(SocketListener *l)
Definition: stream.c:444
int _st_write(Stream *st, char *buf, size_t len)
Definition: stream.c:534
Definition: stream.h:30
Stream * __st_new_unix_stream(FILE *stream, size_t reader_buffer_size)
Definition: stream.c:300
void _st_data_consumed(Stream *st)
Definition: stream.c:470
void _st_kill(Stream *st)
Definition: stream.c:481
void _st_start_read(Stream *st)
Definition: stream.c:455
void _st_free(Stream *st)
Definition: stream.c:507
void __st_scan(Stream *st)
Definition: stream.c:125
int _st_writeln(Stream *stream, char *str)
Definition: stream.c:563
size_t __st_unix_stream_load(Stream *st)
Definition: stream.c:49
SocketListener * _st_new_socket_listener(int port, lisenterConnectionCallbackFn fn, void *callback_arg, char *delim)
Definition: stream.c:372