16 #include "../spec/spec_utils.h"
23 if (!(t->context.flags & TFLAG_RUN_NODE)) raise_error(
"Whoa! Not a run node! %s\n",_td(r,t));
26 uint32_t get_rt_cur_child(
Receptor *r,
T *tP) {
28 return (((
rT *)tP)->cur_child);
31 void set_rt_cur_child(
Receptor *r,
T *tP,uint32_t idx) {
33 (((
rT *)tP)->cur_child) = idx;
36 void processUnblocker(
Stream *st) {
47 while ((i-- > 0) && (err =
_p_unblock((
Q *)st->callback_arg1,st->callback_arg2))) {
52 if (err) raise_error(
"process never blocked!");
56 T *defaultRequestUntil() {
57 T *until = _t_newr(0,END_CONDITIONS);
58 T *ts = __r_make_timestamp(TIMEOUT_AT,30);
60 _t_newi(until,COUNT,1);
84 if (semeq(HTTP_RESPONSE,src_sym) && semeq(LINES,to_sym)) {
85 return http_response_2_lines;
87 else if (semeq(CONTENT_TYPE,src_sym) && semeq(LINE,to_sym))
88 return content_type_2_line;
89 else if (semeq(ASCII_CHARS,src_sym) && semeq(HTTP_REQUEST,to_sym))
90 return ascii_chars_2_http_req;
92 Structure src_s = _sem_get_symbol_structure(sem,src_sym);
93 Structure to_s = _sem_get_symbol_structure(sem,to_sym);
94 if (semeq(src_s,DATE) && semeq(to_s,CSTRING)) {
95 return date2usshortdate;
97 if (semeq(src_s,TIME) && semeq(to_s,CSTRING)) {
100 if (semeq(HTTP_RESPONSE_STATUS,src_sym) && semeq(to_s,CSTRING)) {
101 return http_response_status_2_ascii_str;
112 int err = noReductionErr;
113 if (semeq(to_sym,src_sym)) {
118 Process p = _p_get_transcoder(sem,src_sym,to_sym);
119 if (!semeq(p,NULL_PROCESS)) {
129 Structure src_s = _sem_get_symbol_structure(sem,src_sym);
130 if (semeq(to_s,src_s)) {
132 x->contents.symbol = to_sym;
135 else if (semeq(to_s,INTEGER)) {
136 if (semeq(src_s,CSTRING)) {
139 else return incompatibleTypeReductionErr;
141 else if (semeq(to_sym,ASCII_CHARS)) {
142 if (semeq(src_s,CSTRING)) {
152 else return incompatibleTypeReductionErr;
154 else if (semeq(to_s,CSTRING)) {
155 if (semeq(src_s,INTEGER)) {
160 else if (semeq(src_s,FLOAT)) {
165 else if (semeq(src_s,CHAR)) {
174 T *def = _sem_get_def(sem,src_s);
179 if (!semeq(s_def,NULL_SYMBOL)) {
187 e = _p_transcode(sem,k,to_sym,to_s,&r);
188 if (e && e != redoReduction) {
201 debug(D_TRANSCODE,
"trying to find structural match\n");
206 T *sdef = _sem_get_def(sem,src_s);
207 sdef =
_t_child(sdef,SymbolDefStructureIdx);
208 debug(D_TRANSCODE,
"source def: %s\n",t2s(sdef));
211 T *tdef = _sem_get_def(sem,to_s);
212 tdef =
_t_child(tdef,SymbolDefStructureIdx);
213 debug(D_TRANSCODE,
"to def: %s\n",t2s(tdef));
218 bool tsym_isa_simple_list = (semeq(tdef_sym,STRUCTURE_ZERO_OR_MORE)||
219 semeq(tdef_sym,STRUCTURE_ZERO_OR_ONE)||
220 semeq(tdef_sym,STRUCTURE_ONE_OR_MORE)) &&
221 semeq(k_sym,STRUCTURE_SYMBOL);
223 if (tsym_isa_simple_list) {
225 if (semeq(tdef_sym,sdef_sym) &&
227 debug(D_TRANSCODE,
"transcoding elements of simple list\n");
233 Structure to_s = _sem_get_symbol_structure(sem,to_list_of_sym);
235 e = _p_transcode(sem,m,to_list_of_sym,to_s,&r);
236 if (e && e != redoReduction) {
248 debug(D_TRANSCODE,
"transcoding singleton into simple list\n");
250 if (!semeq(src_sym,to_list_of_sym)) {
251 debug(D_TRANSCODE,
"src doesn't match, recurring..\n");
253 e = _p_transcode(sem,src,to_list_of_sym,_sem_get_symbol_structure(sem,to_list_of_sym),&singleton);
254 if (e && e != redoReduction) {
266 debug(D_TRANSCODE,
"unable to match\n");
268 return incompatibleTypeReductionErr;
274 debug(D_TRANSCODE,
"transcode result: %s\n",_t2s(sem,x));
292 T *processes = _sem_get_defs(sem,p);
293 T *def = _d_get_process_code(processes,p);
294 T *signature =
_t_child(def,ProcessDefSignatureIdx);
302 for(i=SignatureOutputSigIdx+1;i<=sigs;i++) {
305 if (semeq(sym,INPUT_SIGNATURE)) {
309 bool is_optional =
_t_child(s,InputSigOptionalIdx) != NULL;
310 if (!param && !is_optional)
312 return tooFewParamsReductionErr;
313 if (!param && is_optional) {
318 if(semeq(
_t_symbol(sig),SIGNATURE_STRUCTURE)) {
320 if (!semeq(_sem_get_symbol_structure(sem,
_t_symbol(param)),ss) && !semeq(ss,TREE))
321 return signatureMismatchReductionErr;
323 else if(semeq(
_t_symbol(sig),SIGNATURE_SYMBOL)) {
326 raise_error(
"signatureMismatchReductionErr expected:%s got:%s\n",
_sem_get_name(sem,ss),_t2s(sem,param));
329 else if(semeq(
_t_symbol(sig),SIGNATURE_ANY)) {
331 else if(semeq(
_t_symbol(sig),SIGNATURE_PROCESS)) {
334 if (!semeq(expected,actual)) {
335 raise_error(
"expecting process to reduce to %s, got: %s\n",
_sem_get_name(sem,expected),_t2s(sem,param));
343 else if (semeq(sym,TEMPLATE_SIGNATURE)) {
345 return missingSemanticMapReductionErr;
348 if (map_children < c )
return mismatchSemanticMapReductionErr;
352 TreeHash mapped[map_children];
354 for(j=1;j<=map_children;j++) {
364 for (k=0;k<map_children;k++) {
365 if (mapped[k] == h) {
370 if (k == map_children)
return mismatchSemanticMapReductionErr;
375 if (param_count > input_sigs)
return tooManyParamsReductionErr;
376 if (param_count < input_sigs)
return tooFewParamsReductionErr;
382 void __p_unwind_to_point(
R *context,
T *code_point,
T *with) {
399 T *x,*t,*match_results,*match_tree;
400 Error err = noReductionErr;
403 debug(D_REDUCE,
"Reducing sys proc: %s\n",
_sem_get_name(sem,s));
404 debug(D_STEP,
"Reducing %s\n",_t2s(sem,code));
406 bool dissolve =
false;
419 if (!v) raise_error(
"Invalid xaddr in GET");
422 if (s.id == DEL_ID) {
432 x =
__t_news(0,RESULT_SYMBOL,ns,
true);
435 case DEF_STRUCTURE_ID:
440 x =
__t_news(0,RESULT_STRUCTURE,ns,
true);
448 x =
__t_news(0,RESULT_PROCESS,ns,
true);
451 case DEF_RECEPTOR_ID:
456 x =
__t_news(0,RESULT_RECEPTOR,ns,
true);
459 case DEF_PROTOCOL_ID:
464 x =
__t_news(0,RESULT_PROTOCOL,ns,
true);
473 Structure struc_new = _sem_get_symbol_structure(sem,s);
475 if (!semeq(struc_new,struc_val)) {
476 return structureMismatchReductionErr;
479 t->contents.symbol = s;
494 if (s.id == CONVERSE_ID) {
522 if (state->phase == EvalCondCondtions) {
527 state->phase = EvalCondResult;
533 state->phase = EvalCondResult;
544 code->contents.size = 0;
558 *((
int *)&x->contents.surface) = c+*((
int *)&x->contents.surface);
563 *((
int *)&x->contents.surface) = *((
int *)&x->contents.surface)-c;
568 *((
int *)&x->contents.surface) = *((
int *)&x->contents.surface)*c;
575 return divideByZeroReductionErr;
577 *((
int *)&x->contents.surface) = *((
int *)&x->contents.surface)/c;
584 return divideByZeroReductionErr;
586 *((
int *)&x->contents.surface) = *((
int *)&x->contents.surface)%c;
591 *((
int *)&x->contents.surface) = *((
int *)&x->contents.surface)==c;
592 x->contents.symbol = BOOLEAN;
597 *((
int *)&x->contents.surface) = *((
int *)&x->contents.surface)<c;
598 x->contents.symbol = BOOLEAN;
603 *((
int *)&x->contents.surface) = *((
int *)&x->contents.surface)>c;
604 x->contents.symbol = BOOLEAN;
609 *((
int *)&x->contents.surface) = *((
int *)&x->contents.surface)<=c;
610 x->contents.symbol = BOOLEAN;
615 *((
int *)&x->contents.surface) = *((
int *)&x->contents.surface)>=c;
616 x->contents.symbol = BOOLEAN;
624 int i = count ? *(
int *)
_t_surface(count) : 1;
627 if (i>d) path[0] = TREE_PATH_TERMINATOR;
628 else path[d-i] = TREE_PATH_TERMINATOR;
631 case CONTRACT_STR_ID:
635 if (!x)
return tooFewParamsReductionErr;
637 if (semeq(RESULT_SYMBOL,sy)) {
641 Structure struc = _sem_get_symbol_structure(sem,sy);
642 if (!semeq(struc,CSTRING))
643 return signatureMismatchReductionErr;
645 }
else if (s.id == CONTRACT_STR_ID) {
647 return signatureMismatchReductionErr;
650 return tooFewParamsReductionErr;
656 if (!(x->context.flags & TFLAG_ALLOCATED)) {
657 str = malloc(x->contents.size);
658 memcpy(str,&x->contents.surface,x->contents.size);
659 x->contents.surface = str;
660 x->context.flags = TFLAG_ALLOCATED+TFLAG_RUN_NODE;
665 if (semeq(struc,CHAR)) {
666 x->contents.surface = realloc(x->contents.surface,++x->contents.size);
667 ((
char *)x->contents.surface)[1] = 0;
669 else if (!semeq(struc,CSTRING)) {
671 return incompatibleTypeReductionErr;
676 struc = _sem_get_symbol_structure(sem,
_t_symbol(t));
679 if (semeq(struc,CSTRING)) size = strlen(str);
680 else if (semeq(struc,CHAR)) size = 1;
683 return incompatibleTypeReductionErr;
685 x->contents.surface = realloc(x->contents.surface,x->contents.size+size);
686 memcpy(x->contents.surface+x->contents.size-1,str,size);
687 x->contents.size+=size;
688 *( (
char *)x->contents.surface + x->contents.size -1) = 0;
690 x->contents.symbol = sy;
702 if (!signal || !semeq(
_t_symbol(signal),SIGNAL))
703 return notInSignalContextReductionError;
705 if (!semeq(CARRIER,
_t_symbol(t))) raise_error(
"expected CARRIER got %s",_t2s(sem,t));
709 T *head =
_t_getv(signal,SignalMessageIdx,MessageHeadIdx,TREE_PATH_TERMINATOR);
714 T *su =
_t_getv(signal,SignalEnvelopeIdx,EnvelopeSignalUUIDIdx,TREE_PATH_TERMINATOR);
736 raise_error(
"expected ASPECT_IDENT!");
750 if (s.id == SAY_ID) {
754 else if (s.id == REQUEST_ID) {
755 T *response_point = NULL;
764 return(tooManyParamsReductionErr);
769 if (semeq(
_t_symbol(t),END_CONDITIONS)) {
774 if (!until) until = defaultRequestUntil();
777 debug(D_SIGNALS,
"blocking at %s\n",_td(q->
r,code));
778 response_point = code;
781 raise_error(
"request callback not implemented for %s",t2s(callback));
784 signal =
__r_make_signal(from,to,aspect,carrier,signal_contents,0,until,cid);
786 x =
_r_request(q->
r,signal,response_carrier,response_point,context->
id,cid);
793 raise_error(
"whoa THIS_SCOPE executed outside CONVERSE!");
803 if (!c) raise_error(
"continue at point invalid");
804 __p_unwind_to_point(context,c,with);
819 raise_error(
"COMPLETE invoked without conversation id outside of CONVERSE");
821 UUIDt *cuuid = __cid_getUUID(cid);
822 T *w = __r_cleanup_conversation(q->
r,cuuid);
828 __p_unwind_to_point(context,c,with);
834 UUIDt *cuuid = __cid_getUUID(cid);
835 T *w = __r_cleanup_conversation(q->
r,cuuid);
840 debug(D_LOCK,
"complete LOCK\n");
841 pthread_mutex_lock(&q->mutex);
842 Qe *e = __p_find_context(q->
blocked,process_id);
846 if (!c) raise_error(
"failed to find code path when completing converse!");
847 __p_unwind_to_point(e->context,c,with);
849 debug(D_SIGNALS,
"unblocking CONVERSE\n");
850 __p_unblock(q,e,noReductionErr);
853 pthread_mutex_unlock(&q->mutex);
854 debug(D_LOCK,
"complete UNLOCK\n");
863 if (!params)
return signatureMismatchReductionErr;
865 if (!to)
return signatureMismatchReductionErr;
870 Structure to_s = _sem_get_symbol_structure(sem,to_sym);
873 if (!items)
return signatureMismatchReductionErr;
877 int e = _p_transcode(sem,src,to_sym,to_s,&x);
878 if (e != noReductionErr) {
879 if (e != redoReduction)
return e;
897 if (!
_t_parent(code))
return structureMismatchReductionErr;
919 x = _t_rclone(results);
923 else return incompatibleTypeReductionErr;
937 _t_fill_template(x,sem_map);
941 case FILL_FROM_MATCH_ID:
949 return raiseReductionErr;
959 if (st->flags & StreamHasData) {
962 if (st->err)
return(st->err);
966 if (semeq(RESULT_SYMBOL,sy)) {
969 size_t l = _st_data_size(st);
970 char *c = _st_data(st);
972 Structure to_s = _sem_get_symbol_structure(sem,sy);
973 if (semeq(to_s,CSTRING)) {
974 debug(D_STREAM,
"creating CSTRING: %s '%.*s'\n",
_sem_get_name(sem,sy),(
int)l,c);
982 debug(D_STREAM,
"non CSTRING RESULT_SYMBOL so converting to ASCII_CHARS and transcoding to %s \n",
_sem_get_name(sem,sy));
988 int e = _p_transcode(sem,src,sy,to_s,&x);
989 if (e != noReductionErr) {
990 if (e != redoReduction)
return e;
996 else {raise_error(
"expecting RESULT_SYMBOL");}
998 else if (_st_is_alive(st)) {
999 st->callback = processUnblocker;
1000 st->callback_arg1 = q;
1001 st->callback_arg2 = q->
active->id;
1010 return deadStreamReadReductionErr;
1014 case STREAM_WRITE_ID:
1024 if (err == 0)
return unixErrnoReductionErr;
1027 x =
__t_news(0,REDUCTION_ERROR_SYMBOL,NULL_SYMBOL,1);
1030 case STREAM_ALIVE_ID:
1035 x =
__t_newi(0,BOOLEAN,_st_is_alive(st),
true);
1039 case STREAM_CLOSE_ID:
1048 rc = pthread_join(st->pthread, &status);
1050 raise_error(
"ERROR; return code from pthread_join() is %d\n", rc);
1066 switch(state->phase) {
1067 case EvalCondition: {
1070 if (state->type == IterateTypeUnknown) {
1072 if (semeq(c,BOOLEAN)) {
1073 state->type = IterateTypeCond;
1076 else if (semeq(c,ITERATE_ON_SYMBOL)) {
1077 state->type = IterateTypeOnSymbol;
1079 T *list = _t_newr(params,ITERATION_DATA);
1085 Structure s = _sem_get_symbol_structure(sem,c);
1086 if (semeq(s,INTEGER)) {
1087 state->type = IterateTypeCount;
1091 raise_error(
"unable to determine iteration type! symbol was:%s",
_sem_get_name(sem,s));
1096 switch(state->type) {
1097 case IterateTypeCond:
1100 case IterateTypeCount:
1101 done = (--state->count < 0);
1105 next_phase = EvalBody;
1114 next_phase = (state->type == IterateTypeCond) ? EvalCondition : EvalBody;
1115 if (state->type == IterateTypeCount){
1116 if (--state->count < 0) done =
true;
1122 if (state->type == IterateTypeOnSymbol) {
1138 code->contents.size = 0;
1143 _t_add(code,_t_rclone(
_t_child(state->code,next_phase == EvalBody ? 3 : 2)));
1145 set_rt_cur_child(q->
r,code,1);
1146 state->phase = next_phase;
1156 if (!t || !semeq(
_t_symbol(t),ASPECT_IDENT)) {
1157 raise_error(
"expected ASPECT_IDENT!");
1171 return(tooManyParamsReductionErr);
1176 if (semeq(sym,END_CONDITIONS)) {
1179 else if (semeq(sym,ACTION)) {
1182 else if (semeq(sym,PARAMS)) {
1188 T *s = _t_newr(with,SLOT);
1189 _t_news(s,USAGE,NULL_SYMBOL);
1196 x =
__t_news(0,REDUCTION_ERROR_SYMBOL,NULL_SYMBOL,1);
1197 debug(D_LISTEN,
"adding expectation\n");
1203 _t_newi(until,COUNT,1);
1206 debug(D_LISTEN,
"adding expectation and blocking at %d,%s\n",context->
id,_td(q->
r,code));
1211 case INITIATE_PROTOCOL_ID:
1220 err = redoReduction;
1229 x = ___r_make_addr(0,sym,addr,
true);
1248 if (!l) raise_error(
"label not found for symbol");
1254 case MagicReceptors:
1256 char *s = malloc(10000);
1259 for (i=0;i<G_vm->receptor_count;i++) {
1260 Receptor *r = G_vm->routing_table[i].r;
1267 sprintf(&s[l],
":%d ",r->
addr.addr);
1277 if (!debugging(D_SIGNALS)) {
1278 debug_enable(D_SIGNALS);
1282 debug_disable(D_SIGNALS);
1296 raise_error(
"unknown sys-process id: %d",s.id);
1299 if (debugging(D_STEP)) {
1300 if (err==redoReduction) debug(D_STEP,
" redoing reduction\n");
1310 code->structure.child_count = x->structure.child_count;
1311 code->structure.children = x->structure.children;
1312 code->contents = x->contents;
1313 code->context = x->context;
1315 DO_KIDS(code,
_t_child(code,i)->structure.parent = code);
1317 debug(D_STEP,
" to %s\n",_t2s(sem,code));
1322 int path[2] = {0,TREE_PATH_TERMINATOR};
1335 debug(D_STEP,
" dissolving to %s\n",_t2s(sem,parent));
1346 R *context = malloc(
sizeof(
R));
1347 context->
id = process_id;
1348 context->
state = Eval;
1353 context->
parent = run_tree;
1355 context->
caller = caller;
1359 if (caller) caller->
callee = context;
1366 printf(
"%p(<-%p) -> ",qe,qe->prev);
1375 #define __p_dequeue(list,qe) \
1376 if (qe->next) {qe->next->prev = qe->prev;} \
1381 qe->prev->next = qe->next; \
1386 #define __p_enqueue(list,qe) { \
1389 if (d) d->prev = qe; \
1393 #define __p_append(list,qe) { \
1395 if (!list) {list=qe;} \
1406 void _p_enqueue(
Qe **listP,
Qe *e) {
1407 __p_enqueue(*listP,e);
1410 Qe *__p_find_context(
Qe *e,
int process_id) {
1411 while (e && e->id != process_id) e = e->next;
1416 void __p_unblock(
Q *q,
Qe *e,Error err) {
1418 __p_enqueue(q->
active,e);
1420 e->context->
state = err ? err : Eval;
1429 debug(D_LOCK,
"unblock LOCK\n");
1430 pthread_mutex_lock(&q->mutex);
1431 Qe *e = __p_find_context(q->
blocked,
id);
1433 __p_unblock(q,e,noReductionErr);
1440 e = __p_find_context(q->
active,
id);
1444 pthread_mutex_unlock(&q->mutex);
1445 debug(D_LOCK,
"unblock UNLOCK\n");
1459 int process_id = *(
int *)
_t_surface(
_t_child(wakeup,WakeupReferenceProcessIdentIdx));
1462 debug(D_LOCK,
"wakeup LOCK\n");
1463 pthread_mutex_lock(&q->mutex);
1464 Qe *e = __p_find_context(q->
blocked,process_id);
1474 if (!(with->context.flags & TFLAG_RUN_NODE)) {
1475 T *w = _t_rclone(with);
1484 __p_unblock(q,e,err);
1486 else {
if (with)
_t_free(with);}
1487 pthread_mutex_unlock(&q->mutex);
1488 debug(D_LOCK,
"wakeup UNLOCK\n");
1519 while(
_p_step(&q, &context) != Done);
1525 T * __p_buildErr(
R *context) {
1536 switch(context->
state) {
1537 case tooFewParamsReductionErr: se=TOO_FEW_PARAMS_ERR;
break;
1538 case tooManyParamsReductionErr: se=TOO_MANY_PARAMS_ERR;
break;
1539 case signatureMismatchReductionErr: se=SIGNATURE_MISMATCH_ERR;
break;
1540 case notProcessReductionError: se=NOT_A_PROCESS_ERR;
break;
1541 case notInSignalContextReductionError: se=NOT_IN_SIGNAL_CONTEXT_ERR;
1542 case divideByZeroReductionErr: se=ZERO_DIVIDE_ERR;
break;
1543 case incompatibleTypeReductionErr: se=INCOMPATIBLE_TYPE_ERR;
break;
1544 case deadStreamReadReductionErr: se=DEAD_STREAM_READ_ERR;
break;
1545 case missingSemanticMapReductionErr: se=MISSING_SEMANTIC_MAP_ERR;
break;
1546 case mismatchSemanticMapReductionErr: se=MISMATCH_SEMANTIC_MAP_ERR;
break;
1547 case structureMismatchReductionErr: se=STRUCTURE_MISMATCH_ERR;
break;
1549 case unixErrnoReductionErr:
1551 extra = _t_new_str(0,TEST_STR_SYMBOL,strerror(errno));
1553 case raiseReductionErr:
1556 default: raise_error(
"unknown reduction error: %d",context->
err);
1560 _t_new(err,ERROR_LOCATION,path,
sizeof(
int)*(
_t_path_depth(path)+1));
1579 R *context = *contextP;
1582 switch(context->
state) {
1583 case noReductionErr:
1585 raise_error(
"whoa, virtual states can't be executed!");
1595 context->
err = noReductionErr;
1601 if (debugging(D_REDUCE) && (context->
err)) {
1603 debug(D_REDUCE,
"finishing reduction with unhandled error: %d\n",context->
err);
1606 context->
state = Done;
1612 context = context->
caller;
1618 set_rt_cur_child(q->
r,np,RUN_TREE_EVALUATED);
1620 context->
state = Eval;
1627 *contextP = context;
1635 raise_error(
"Whoa! Null node pointer");
1639 if (semeq(s,PARAMETER)) {
1643 if (semeq(ref_sym,PARAM_PATH)) {
1646 else if (semeq(ref_sym,PARAM_LABEL)) {
1647 raise_error(
"reference by PARAM_LABEL not implemented");
1649 else raise_error(
"unknown reference type %s",
_sem_get_name(sem,ref_sym));
1653 if (semeq(result_sym,RESULT_VALUE)) {
1654 np = _t_rclone(param);
1656 else if (semeq(result_sym,RESULT_SYMBOL)) {
1659 else if (semeq(result_sym,RESULT_LABEL)) {
1663 if (!l) raise_error(
"label not found for param symbol");
1666 else raise_error(
"unknown result type %s",
_sem_get_name(sem,result_sym));
1672 else if (semeq(s,PARAM_REF)) {
1678 raise_error(
"request for non-existent param %s in %s",buf,_t2s(sem,context->
run_tree));
1684 else if (semeq(s,SIGNAL_REF)) {
1687 raise_error(
"not in signal context!");
1694 raise_error(
"request for non-existent signal portion. signal was: %s \n path was:%s\n",t2s(sig),
_t_sprint_path((
int *)
_t_surface(np),buf));
1703 if (!is_process(s)) {
1709 #ifndef RUN_TREE_SHALLOW_PARAM_REF_SEARCH
1710 int node_cur_child = get_rt_cur_child(q->
r,np);
1711 if ((node_cur_child != RUN_TREE_EVALUATED) && count && (count != node_cur_child))
1712 context->
state = Descend;
1715 context->
state = Ascend;
1717 if (semeq(s,ITERATE)) {
1722 if (
_t_children(np) != 3) {raise_error(
"ITERATE must have 3 params");}
1725 state->phase = EvalCondition;
1726 state->code = _t_rclone(np);
1727 state->type = IterateTypeUnknown;
1736 else if (semeq(s,COND)) {
1748 state->phase = EvalCondCondtions;
1751 state->phase = EvalCondResult;
1754 np->contents.size =
sizeof(
CondState *);
1757 else if (semeq(s,CONVERSE)) {
1762 UUIDt cuuid = __uuid_gen();
1763 T *until,*wait = NULL;
1782 T *c = _r_add_conversation(q->
r,parent_u,&cuuid,until?
_t_clone(until):NULL,
1791 np->context.flags |= TFLAG_ALLOCATED;
1799 if (count == get_rt_cur_child(q->
r,np) || semeq(s,QUOTE)) {
1804 if (!is_sys_process(s)) {
1815 context->
state = Pushed;
1820 debug(D_REDUCE,
"New context for %s: %s\n\n",
_sem_get_name(sem,s),_t2s(sem,run_tree));
1831 if (e == redoReduction) {
1839 context->
state = Pushed;
1846 p->structure.children[i-1] = dummy;
1847 dummy->structure.parent = p;
1848 np->structure.parent = NULL;
1850 debug(D_REDUCE,
"Redoing with a new context for: %s\n\n",_t2s(sem,np));
1853 context->
state = Eval;
1854 set_rt_cur_child(q->
r,np,RUN_TREE_NOT_EVAULATED);
1857 else context->
state = e ? e : Ascend;
1863 context->
state = Descend;
1866 raise_error(
"whoa! brain fart! on %d,%s",count,_t2s(sem,np));
1872 set_rt_cur_child(q->
r,context->
node_pointer,RUN_TREE_EVALUATED);
1879 context->
idx = get_rt_cur_child(q->
r,context->
parent);
1882 context->
state = Pop;
1884 context->
state = Eval;
1891 context->
state = Eval;
1897 if (debugging(D_STEP)) {
1898 T *err = __p_buildErr(context);
1899 debug(D_STEP,
"Reduction Err (no handler): %s\n",_t2s(sem,err));
1900 debug(D_STEP,
" node_pointer @ err: %s\n",_t2s(sem,context->
node_pointer));
1903 context->
state = Pop;
1909 T *ps = _t_newr(context->
run_tree,PARAMS);
1911 T *err = __p_buildErr(context);
1912 debug(D_STEP,
"In Error Handler with %s\n",_t2s(sem,err));
1913 debug(D_STEP,
" node_pointer @ err: %s\n",_t2s(sem,context->
node_pointer));
1918 context->
idx = RunTreeErrorCodeIdx;
1921 context->
state = Eval;
1924 return context->
state;
1933 _t_newi(wakeup,PROCESS_IDENT,process_id);
1935 _t_new(wakeup,CODE_PATH,path,
sizeof(
int)*(
_t_path_depth(path)+1));
1945 T *c = _t_rclone(code);
1947 T *ps = _t_newr(t,PARAMS);
1949 for(i=1;i<=num_params;i++) {
1955 T *__p_build_run_tree(
T* code,
int num_params,...) {
1957 va_start(params,num_params);
1975 if (!is_process(p)) {
1976 raise_error(
"not a Process!");
1978 T *processes = _sem_get_defs(sem,p);
1979 T *code_def = _d_get_process_code(processes,p);
1983 T *code =
_t_child(code_def,ProcessDefCodeIdx);
1988 if (semeq(
_t_symbol(code),NULL_PROCESS)) {
1994 T *c = _t_rclone(code);
1996 ps = _t_newr(t,PARAMS);
1999 for(i=1;i<=num_params;i++) {
2016 Q *q = malloc(
sizeof(
Q));
2022 pthread_mutex_init(&(q->mutex), NULL);
2027 void _p_free_context(
R *c) {
2041 void _p_free_elements(
Qe *e) {
2043 _p_free_context(e->context);
2056 _p_free_elements(q->
active);
2062 int G_next_process_id = 0;
2069 Qe *n = malloc(
sizeof(
Qe));
2070 n->id = ++G_next_process_id;
2073 n->accounts.elapsed_time = 0;
2074 debug(D_LOCK,
"addrt2q LOCK\n");
2075 pthread_mutex_lock(&q->mutex);
2078 pthread_mutex_unlock(&q->mutex);
2079 debug(D_LOCK,
"addrt2q UNLOCK\n");
2096 void debug_np(
int type,
T *np) {
2100 debug(type,
"Node Pointer:%s\n",pp);
2107 char *sn[]={
"Done",
"Ascend",
"Descend",
"Pushed",
"Pop",
"Eval",
"Block"};
2108 char *__debug_state_str(
R *context) {
2109 int s = context->
state;
2112 if (s <= 0) result = sn[-s];
2114 T *e = __p_buildErr(context);
2127 debug(D_REDUCE+D_REDUCEV,
"Starting reduce:\n");
2131 struct timespec start, end;
2135 if (debugging(D_REDUCEV)) {
2136 R *context = qe->context;
2137 char *s = __debug_state_str(context);
2138 debug(D_REDUCEV,
"ID:%d -- State %s(%d)\n",qe->id,s,context->
state);
2139 debug(D_REDUCEV,
" idx:%d\n",context->
idx);
2140 debug(D_REDUCEV,
"%s\n",_t2s(q->
r->
sem,context->
run_tree));
2143 debug(D_REDUCEV,
"Node Pointer: NULL!\n");
2146 debug(D_REDUCEV,
"rt_cur_child:%d\n",rt_cur_child(context->
node_pointer));
2152 if (debugging(D_REDUCEV+D_REDUCE)) {
2153 prev_state = qe->context->
state;
2157 clock_gettime(CLOCK_MONOTONIC, &start);
2158 next_state =
_p_step(q, &qe->context);
2159 clock_gettime(CLOCK_MONOTONIC, &end);
2160 qe->accounts.elapsed_time += diff_micro(&start, &end);
2163 debug(D_REDUCEV,
"result state:%s\n\n",__debug_state_str(qe->context));
2164 if (debugging(D_REDUCE) && prev_state == Eval) {
2166 debug(D_REDUCE,
"Eval: %s\n\n",_t2s(q->
r->
sem,qe->context->
run_tree));
2169 debug(D_LOCK,
"reduce LOCK\n");
2170 pthread_mutex_lock(&q->mutex);
2171 Qe *next = qe->next;
2172 if (next_state == Done) {
2174 __p_dequeue(q->
active,qe);
2176 debug(D_REDUCEV,
"Just completed:%d\n",qe->id);
2182 else if (next_state == Block) {
2184 __p_dequeue(q->
active,qe);
2190 qe = next ? next : q->
active;
2191 pthread_mutex_unlock(&q->mutex);
2192 debug(D_LOCK,
"reduce UNLOCK\n");
2199 debug(D_REDUCE+D_REDUCEV,
"Ending reduce\n");
2209 debug(D_LOCK,
"cleanup LOCK\n");
2210 pthread_mutex_lock(&q->mutex);
2215 (*et) += e->accounts.elapsed_time;
2220 pthread_mutex_unlock(&q->mutex);
2221 debug(D_LOCK,
"cleanup UNLOCK\n");
2237 va_start(params,output_sem);
2242 T *o = _t_newr(signature,OUTPUT_SIGNATURE);
2243 T *l = _t_newr(o,SIGNATURE_LABEL);
2244 _t_new_str(l,ENGLISH_LABEL,output_label);
2245 if (semeq(output_type,SIGNATURE_PASSTHRU)) {
2246 _t_newr(o,output_type);
2249 _t_news(o,output_type,output_sem);
2251 while ((label = va_arg(params,
char*))) {
2252 type = va_arg(params,
Symbol);
2253 if (semeq(type,SIGNATURE_OPTIONAL)) {
2255 type = va_arg(params,
Symbol);
2260 value = va_arg(params,
Symbol);
2261 T *i = _t_newr(signature,INPUT_SIGNATURE);
2262 l = _t_newr(i,SIGNATURE_LABEL);
2263 _t_new_str(l,ENGLISH_LABEL,label);
2264 _t_news(i,type,value);
2265 if (optional) _t_newr(i,SIGNATURE_OPTIONAL);
T * __p_make_form(Symbol sym, char *output_label, Symbol output_type, SemanticID output_sem,...)
T * _t_new_root(Symbol symbol)
Receptor * r
back-pointer to receptor in which this Q is running (for defs and more)
char * _sem_get_name(SemTable *sem, SemanticID s)
T * parent
node_pointer's parent (cached here for efficiency)
Error _p_step(Q *q, R **contextP)
T * __t_newi(T *parent, Symbol symbol, int surface, bool is_run_node)
int _t_write(SemTable *sem, T *t, Stream *stream)
bool __t_fill_template(T *template, T *sem_map, bool as_run_node)
T * _t_detach_by_idx(T *t, int i)
header file for symbol and structure definition functions
int _t_path_depth(int *p)
T * converse_pointer
pointer to the CONVERSE instruction in the run tree
R * caller
a pointer to the context that invoked this run-tree/context
Semantic tree regular expression header file.
T * sem_map
semantic map in effect for this context
void _p_wakeup(Q *q, T *wakeup, T *with, Error err)
void * _p_reduceq_thread(void *arg)
void _st_data_consumed(Stream *st)
TreeHash _t_hash(SemTable *sem, T *t)
T * __p_build_wakeup_info(T *code_point, int process_id)
int state
process state machine state
T * __t_newr(T *parent, Symbol symbol, bool is_run_node)
T * __t_new(T *parent, Symbol symbol, void *surface, size_t size, bool is_run_node)
T * _t_child(T *t, int i)
protocol helpers header file
T * makeASCIITree(char *c)
int err
process error value
void _t_insert_at(T *t, int *path, T *i)
T * cid
pointer to CONVERSATION_IDENT in receptors CONVERSATIONS tree
Context context
the context this receptor's definition creates
T * _sem_get_label(SemTable *sem, SemanticID s, Symbol label_type)
ReceptorAddress addr
the address by which to get messages to this receptor instance
Error __p_check_signature(SemTable *sem, Process p, T *code, T *sem_map)
Instances instances
the instances store
void _st_kill(Stream *st)
receptor implementation header file
void _r_add_expectation(Receptor *r, Aspect aspect, Symbol carrier, T *pattern, T *action, T *with, T *until, T *using, T *cid)
Adds an expectation to a receptor's aspect.
int _t_match(T *semtrex, T *t)
T * __t_new_str(T *parent, Symbol symbol, char *surface, bool is_run_node)
Error _p_unblock(Q *q, int id)
void _st_start_read(Stream *st)
T * __p_build_run_tree_va(T *code, int num_params, va_list params)
Qe * active
active processes
T * _r_send(Receptor *r, T *signal)
int id
the process id this context exists in
Error __p_reduce_sys_proc(R *context, Symbol s, T *code, Q *q)
SState * state(StateType type, int *statesP, int level)
R * callee
a pointer to the context we've invoked
int contexts_count
number of active processes
void _st_free(Stream *st)
int _t_matchr(T *semtrex, T *t, T **rP)
Qe * __p_addrt2q(Q *q, T *run_tree, T *sem_map)
T * node_pointer
pointer to the tree node to execute next
T * __r_make_signal(ReceptorAddress from, ReceptorAddress to, Aspect aspect, Symbol carrier, T *signal_contents, UUIDt *in_response_to, T *until, T *cid)
T * root
RECEPTOR_INSTANCE semantic tree.
void _p_fill_from_match(SemTable *sem, T *t, T *match_results, T *match_tree)
Qe * blocked
blocked processes
SemTable * sem
pointer back to the genotype table for this receptor's vmhost instance
Error _p_reduce(SemTable *sem, T *rt)
T * _r_get_instance(Receptor *r, Xaddr x)
Receptor * r
Receptor data for this vm host.
ConversationState * conversation
record of the conversation state active in this context frame
SemanticID __d_define_receptor(SemTable *sem, T *def, Context c)
int idx
node pointers child index (cached here for efficiency)
Qe * completed
completed processes (pending cleanup)
T * run_tree
pointer to the root of the run_tree
R * __p_make_context(T *run_tree, R *caller, int process_id, T *sem_map)
T * _stx_results2sem_map(SemTable *sem, T *match_results, T *match_tree)
header file for the accumulator
void _t_replace(T *t, int i, T *r)
void _t_detach_by_ptr(T *t, T *c)
T * __o_initiate(Receptor *r, SemanticID protocol, SemanticID interaction, T *bindings, T **sem_mapP)
T * _r_delete_instance(Receptor *r, Xaddr x)
T * __t_news(T *parent, Symbol symbol, SemanticID surface, bool is_run_node)
T * _r_request(Receptor *r, T *signal, Symbol response_carrier, T *code_point, int process_id, T *cid)
T * _p_make_run_tree(SemTable *sem, Process p, T *params, T *sem_map)
T * __t_newc(T *parent, Symbol symbol, char surface, bool is_run_node)
Xaddr _r_new_instance(Receptor *r, T *t)
char * _t_sprint_path(int *fp, char *buf)