7 #include "../src/ceptr.h"
8 #include "../src/stream.h"
12 void testStreamCreate() {
14 Stream *s = _st_new_unix_stream(stdout,0);
15 spec_is_equal(s->type,UnixStream);
16 spec_is_equal(s->flags,StreamCloseOnFree);
17 spec_is_ptr_equal(s->data.unix_stream,stdout);
18 s->flags &= ~StreamCloseOnFree;
22 void testStreamAlive() {
25 char data[] =
"line1\nline2\n";
26 input = fmemopen(data, strlen(data),
"r");
27 Stream *s = _st_new_unix_stream(input,1);
29 spec_is_true(_st_is_alive(s));
31 spec_is_false(_st_is_alive(s));
36 rc = pthread_join(s->pthread, &status);
38 raise_error(
"ERROR; return code from pthread_join() is %d\n", rc);
43 void testStreamScan() {
45 Stream * s = __st_alloc_stream();
50 s->buf =
"line1\ncat\ndog";
51 s->bytes_used = strlen(s->buf);
55 spec_is_equal(s->scan_state,StreamScanInitial);
59 spec_is_equal(s->scan_state,StreamScanSuccess);
60 spec_is_equal(s->unit_start,0);
61 spec_is_equal(s->unit_size,5);
63 spec_is_equal(s->scan_state,StreamScanSuccess);
64 spec_is_equal(s->unit_start,6);
65 spec_is_equal(s->unit_size,3);
69 spec_is_equal(s->scan_state,StreamScanPartial);
70 spec_is_equal(s->unit_start,10);
74 spec_is_equal(s->scan_state,StreamScanPartial);
77 s->buf =
"line1\ncat\ndogg"; s->bytes_used = strlen(s->buf);
80 spec_is_equal(s->scan_state,StreamScanPartial);
83 s->buf =
"line1\ncat\ndoggy\n"; s->bytes_used = strlen(s->buf);
86 spec_is_equal(s->scan_state,StreamScanSuccess);
87 spec_is_equal(s->unit_start,10);
88 spec_is_equal(s->unit_size,5);
91 spec_is_equal(s->scan_state,StreamScanComplete);
94 s->bytes_used = strlen(s->buf);
98 spec_is_equal(s->scan_state,StreamScanComplete);
101 s->delim = DELIM_CRLF;
103 s->buf =
"line1\r\ncat\r\ndoggy\r\n";
104 s->bytes_used = strlen(s->buf);
107 spec_is_equal(s->scan_state,StreamScanSuccess);
108 spec_is_equal(s->unit_start,0);
109 spec_is_equal(s->unit_size,5);
111 spec_is_equal(s->scan_state,StreamScanSuccess);
112 spec_is_equal(s->unit_start,7);
113 spec_is_equal(s->unit_size,3);
119 void testStreamFileLoad() {
123 char data[] =
"line1\nline2\nline3\n";
124 input = fmemopen(data, strlen(data),
"r");
125 Stream *s = _st_new_unix_stream(input,
false);
127 s->buf = malloc(100);
129 memset(s->buf,0,100);
132 s->flags |= StreamLoadByLine;
134 spec_is_str_equal(s->buf,
"line1\n");
137 s->flags &= ~StreamLoadByLine;
139 spec_is_str_equal(s->buf,data);
140 spec_is_equal(s->bytes_used,strlen(data));
141 spec_is_equal(errno,0);
142 spec_is_equal(s->err,0);
146 spec_is_equal(errno,0);
147 spec_is_equal(s->err,EOF);
153 input = fmemopen(data, strlen(data),
"r");
154 s = _st_new_unix_stream(input,
false);
162 spec_is_str_equal(s->buf,
"line1\nline");
163 spec_is_true(__st_buf_full(s));
164 spec_is_equal(s->bytes_used,10);
165 spec_is_equal(errno,0);
166 spec_is_equal(s->err,0);
169 s->buf[s->bytes_used] = 0;
170 spec_is_str_equal(s->buf,data);
171 spec_is_false(__st_buf_full(s));
172 spec_is_equal(s->buf_size,20);
173 spec_is_equal(s->bytes_used,strlen(data));
174 spec_is_equal(errno,0);
175 spec_is_equal(s->err,0);
179 debug_disable(D_STREAM);
182 void testCallback(
Stream *st) {
183 spec_is_buffer_equal(_st_data(st),
"line2",_st_data_size(st));
184 spec_is_str_equal((
char *)st->callback_arg1,
"arg val");
189 void testStreamRead(
size_t rs) {
193 char data[] =
"line1\nline2\nline3";
194 input = fmemopen(data, strlen(data),
"r");
196 spec_is_true(s->flags&StreamReader);
197 spec_is_true(s->flags&StreamWaiting);
200 while(!(s->flags&StreamHasData) && _st_is_alive(s)) {sleepms(1);};
201 spec_is_buffer_equal(_st_data(s),
"line1",_st_data_size(s));
203 spec_is_false(s->flags&StreamHasData);
204 spec_is_true(_st_is_alive(s));
207 s->callback = testCallback;
208 s->callback_arg1 =
"arg val";
210 while(!(s->flags&StreamHasData) && _st_is_alive(s) ) {sleepms(1);};
211 spec_is_true(s->flags&StreamHasData);
213 spec_is_false(s->flags&StreamHasData);
217 while(!(s->flags&StreamHasData) && _st_is_alive(s) ) {sleepms(1);};
218 spec_is_true(s->flags&StreamHasData);
219 spec_is_buffer_equal(_st_data(s),
"line3",_st_data_size(s));
221 spec_is_false(s->flags&StreamHasData);
224 while(!(s->flags&StreamHasData) && _st_is_alive(s) ) {sleepms(1);};
225 spec_is_false(s->flags&StreamHasData);
226 spec_is_false(_st_is_alive(s));
228 debug_disable(D_STREAM);
233 rc = pthread_join(s->pthread, &status);
235 raise_error(
"ERROR; return code from pthread_join() is %d\n", rc);
241 void testStreamWrite() {
242 char buffer[500] =
"x";
244 stream = fmemopen(buffer, 500,
"r+");
246 Stream *st = _st_new_unix_stream(stream,1);
247 spec_is_equal(
_st_write(st,
"fishy",5),5);
249 char *expected_result =
"fishy";
250 spec_is_str_equal(buffer,expected_result);
252 spec_is_equal(
_st_write(st,
" in the sea",11),11);
253 spec_is_str_equal(buffer,
"fishy in the sea");
258 void testStreamWriteLine() {
259 char buffer[500] =
"x";
261 stream = fmemopen(buffer, 500,
"r+");
263 Stream *st = _st_new_unix_stream(stream,1);
266 char *expected_result =
"fishy\n";
267 spec_is_str_equal(buffer,expected_result);
270 spec_is_str_equal(buffer,
"fishy\nin the sea\n");
272 st->delim = DELIM_CRLF;
276 spec_is_str_equal(buffer,
"fishy\nin the sea\nnot me\r\n");
282 void testSocketListernCallback(
Stream *s,
void *arg) {
283 spec_is_true(s->flags&StreamReader);
284 spec_is_true(s->flags&StreamWaiting);
285 spec_is_equal(*(
int *)arg,31415);
288 while(!(s->flags&StreamHasData) && s->flags&StreamAlive ) {sleepms(1);};
289 spec_is_buffer_equal(_st_data(s),
"testing!",_st_data_size(s));
293 while(!(s->flags&StreamHasData) && s->flags&StreamAlive ) {sleepms(1);};
294 spec_is_buffer_equal(_st_data(s),
"fish",_st_data_size(s));
300 void testStreamSocket() {
305 char *result = doSys(
"echo 'testing!\nfish\n' | nc localhost 8888");
306 spec_is_str_equal(result,
"fishy");
309 result = doSys(
"echo 'testing!\nfish\n' | nc localhost 8888");
310 spec_is_str_equal(result,
"fishy");
315 debug_disable(D_SOCKET+D_STREAM);
322 testStreamFileLoad();
323 testStreamRead(1000);
327 testStreamWriteLine();
void _st_close_listener(SocketListener *l)
int _st_write(Stream *st, char *buf, size_t len)
Stream * __st_new_unix_stream(FILE *stream, size_t reader_buffer_size)
void _st_data_consumed(Stream *st)
void _st_kill(Stream *st)
void _st_start_read(Stream *st)
void _st_free(Stream *st)
void __st_scan(Stream *st)
int _st_writeln(Stream *stream, char *str)
size_t __st_unix_stream_load(Stream *st)
SocketListener * _st_new_socket_listener(int port, lisenterConnectionCallbackFn fn, void *callback_arg, char *delim)