23 Xaddr G_null_xaddr = {0,0};
40 r->pending_signals =
_t_child(state,ReceptorPendingSignalsIdx);
41 r->pending_responses =
_t_child(state,ReceptorPendingResponsesIdx);
42 r->conversations =
_t_child(state,ReceptorConversationsIdx);
47 T *__r_add_aspect(
T *flux,
Aspect aspect) {
48 T *a = _t_newr(flux,aspect);
49 _t_newr(a,EXPECTATIONS);
56 T *f = _t_newr(t,FLUX);
57 __r_add_aspect(f,DEFAULT_ASPECT);
58 _t_newr(t,PENDING_SIGNALS);
59 _t_newr(t,PENDING_RESPONSES);
60 _t_newr(t,CONVERSATIONS);
61 _t_newi(t,RECEPTOR_ELAPSED_TIME,0);
66 T *__r_make_definitions() {
68 _t_newr(defs,STRUCTURES);
69 _t_newr(defs,SYMBOLS);
70 _t_newr(defs,PROCESSES);
71 _t_newr(defs,RECEPTORS);
72 _t_newr(defs,PROTOCOLS);
90 _t_news(t,INSTANCE_OF,r);
91 if (semeq(r,SYS_RECEPTOR)) {
92 _t_newi(t,CONTEXT_NUM,0);
93 _t_newi(t,PARENT_CONTEXT_NUM,-1);
96 _t_newi(t,CONTEXT_NUM,_d_get_receptor_context(sem,r));
97 _t_newi(t,PARENT_CONTEXT_NUM,r.context);
99 T *
state = _r_make_state();
101 return __r_init(t,sem);
117 raise_error(
"fix receptor address");
126 T *__r_build_default_until() {
128 _t_newr(until,UNLIMITED);
133 T *__r_build_expectation(
Symbol carrier,
T *pattern,
T *action,
T *with,
T *until,
T *
using,
T *cid) {
134 T *e = _t_newr(0,EXPECTATION);
135 _t_news(e,CARRIER,carrier);
141 until = __r_build_default_until();
165 T *e = __r_build_expectation(carrier,pattern,action,with,until,
using,cid);
166 __r_add_expectation(r,aspect,e);
170 T *a = __r_get_expectations(r,aspect);
174 void _r_remove_expectation(
Receptor *r,
T *expectation) {
238 va_start(params,num_params);
239 T *def = _d_make_vstruc_def(r->
sem,label,num_params,params);
290 if (!__sem_get_by_label(r->
sem,label,&sid,r->
context))
291 raise_error(
"label not found %s",label);
300 return _sem_get_symbol_structure(r->
sem,s);
395 return _a_set_instance(&r->
instances,x,t);
441 S *is = __a_serialize_instances(&r->
instances);
442 s = (
S *)realloc(s,s->total_size+is->total_size);
443 memcpy(((
void *)s)+s->total_size,is,is->total_size);
445 *lengthP = s->total_size+is->total_size;
446 *surfaceP = (
void *)s;
479 s = (
S *) (surface + s->total_size);
480 __a_unserialize_instances(sem,&r->
instances,(
S *)s);
490 T *a =
__t_newr(parent,type,is_run_node);
491 __t_newi(a,RECEPTOR_ADDR,addr.addr,is_run_node);
517 T *e = _t_newr(s,ENVELOPE);
518 T *m = _t_newr(s,MESSAGE);
519 T *h = _t_newr(m,HEAD);
521 __r_make_addr(h,FROM_ADDRESS,from);
522 __r_make_addr(h,TO_ADDRESS,to);
523 _t_news(h,ASPECT_IDENT,aspect);
524 _t_news(h,CARRIER,carrier);
525 UUIDt t = __uuid_gen();
526 _t_new(e,SIGNAL_UUID,&t,
sizeof(
UUIDt));
527 T *b =
_t_newt(m,BODY,signal_contents);
529 if (in_response_to && until) raise_error(
"attempt to make signal with both response_uuid and until");
531 _t_new(h,IN_RESPONSE_TO_UUID,in_response_to,
sizeof(
UUIDt));
542 _t_add(r->pending_signals,signal);
545 T *envelope =
_t_child(signal,SignalEnvelopeIdx);
546 return _t_rclone(
_t_child(envelope,EnvelopeSignalUUIDIdx));
557 debug(D_SIGNALS,
"sending %s\n",_t2s(r->
sem,signal));
559 T *result =__r_send(r,signal);
578 T *result = __r_send(r,signal);
579 T *pr = _t_newr(r->pending_responses,PENDING_RESPONSE);
581 _t_news(pr,CARRIER,response_carrier);
583 int p[] = {SignalMessageIdx,MessageHeadIdx,HeadOptionalsIdx,TREE_PATH_TERMINATOR};
585 if (!ec || !semeq(
_t_symbol(ec),END_CONDITIONS)) raise_error(
"request missing END_CONDITIONS");
589 debug(D_SIGNALS,
"sending request and adding pending response: %s\n",_td(r,pr));
597 void evaluateEndCondition(
T *ec,
bool *cleanup,
bool *allow) {
604 if (semeq(sym,COUNT)) {
607 if (*cP <= 1) *cleanup =
true;
608 if (*cP >= 1) *allow =
true;
610 debug(D_SIGNALS,
"decreasing count to: %d\n",*cP);
613 else if (semeq(sym,TIMEOUT_AT)) {
627 gmtime_r(&now_t, &now);
628 if ((year > now.tm_year) ||
629 (mon > now.tm_mon) ||
630 (mday > now.tm_mday) ||
631 (hour > now.tm_hour) ||
632 (min > now.tm_min) ||
633 (sec > now.tm_sec)) {
639 else if (semeq(sym,UNLIMITED)) {
643 raise_error(
"unknown end condition %s",t2s(c));
648 debug(D_SIGNALS,
"after end condition %s cleanup=%s allow=%s\n",t2s(ec),*cleanup?
"true":
"false",*allow?
"true":
"false");
658 int p[] = {SignalMessageIdx,MessageBodyIdx,TREE_PATH_TERMINATOR};
663 T *e_carrier =
_t_child(expectation,ExpectationCarrierIdx);
664 T *head =
_t_getv(signal,SignalMessageIdx,MessageHeadIdx,TREE_PATH_TERMINATOR);
665 T *s_carrier =
_t_child(head,HeadCarrierIdx);
667 debug(D_SIGNALS,
"checking signal carrier %s\n",_td(q->
r,s_carrier));
668 debug(D_SIGNALS,
"against expectation carrier %s\n",_td(q->
r,e_carrier));
671 if (!semeq(esym,*(
Symbol *)
_t_surface(s_carrier)) && !semeq(esym,NULL_SYMBOL))
return;
673 T *s_cid =
__t_find(head,CONVERSATION_IDENT,HeadOptionalsIdx);
674 T *e_cid =
__t_find(expectation,CONVERSATION_IDENT,ExpectationOptionalsIdx);
675 debug(D_SIGNALS,
"checking signal conversation %s\n",_td(q->
r,s_cid));
676 debug(D_SIGNALS,
"against expectation conversation %s\n",_td(q->
r,e_cid));
679 if (e_cid && !s_cid)
return;
681 if (s_cid && e_cid) {
682 if (!__cid_equal(r->
sem,s_cid,e_cid))
return;
686 pattern =
_t_child(expectation,ExpectationPatternIdx);
688 T *stx = _t_news(0,SEMTREX_GROUP,NULL_SYMBOL);
690 debug(D_SIGNALS,
"matching %s\n",_td(q->
r,signal_contents));
691 debug(D_SIGNALS,
"against %s\n",_td(q->
r,stx));
694 matched =
_t_matchr(stx,signal_contents,&m);
697 evaluateEndCondition(
_t_child(expectation,ExpectationEndCondsIdx),&cleanup,&allow);
699 if (allow && matched) {
700 debug(D_SIGNALS,
"got a match on %s\n",_td(q->
r,stx));
703 T *action =
_t_child(expectation,ExpectationActionIdx);
705 raise_error(
"null action in expectation!");
708 if (semeq(
_t_symbol(action),WAKEUP_REFERENCE)) {
711 T *params = _t_rclone(
_t_child(expectation,ExpectationParamsIdx));
713 _p_wakeup(q,action,params,noReductionErr);
720 T *params = _t_rclone(
_t_child(expectation,ExpectationParamsIdx));
722 T *sm =
__t_find(expectation,SEMANTIC_MAP,ExpectationOptionalsIdx);
724 debug(D_SIGNALS,
"creating a run tree for action %s with params %s\n",
_sem_get_name(r->
sem,p),_t2s(r->
sem,params));
734 debug(D_SIGNALS,
"cleaning up %s\n",_td(q->
r,expectation));
735 _r_remove_expectation(q->
r,expectation);
742 T* __r_sanatize_response(
Receptor *r,
T* response) {
743 return _t_rclone(response);
746 int __r_deliver_response(
Receptor *r,
T *response_to,
T *signal) {
747 T *head =
_t_getv(signal,SignalMessageIdx,MessageHeadIdx,TREE_PATH_TERMINATOR);
751 debug(D_SIGNALS,
"Delivering response: %s\n",_td(r,signal));
754 T *body =
_t_getv(signal,SignalMessageIdx,MessageBodyIdx,TREE_PATH_TERMINATOR);
757 DO_KIDS(r->pending_responses,
758 l =
_t_child(r->pending_responses,i);
762 T *ec =
_t_child(l,PendingResponseEndCondsIdx);
765 evaluateEndCondition(ec,&cleanup,&allow);
769 T *wakeup =
_t_child(l,PendingResponseWakeupIdx);
771 signal->context.flags &= ~TFLAG_SURFACE_IS_TREE;
772 if (!semeq(carrier,signal_carrier)) {
773 debug(D_SIGNALS,
"response failed carrier check, expecting %s, but got %s!\n",_r_get_symbol_name(r,carrier),_r_get_symbol_name(r,signal_carrier));
778 response = __r_sanatize_response(r,response);
784 _p_wakeup(r->
q,wakeup,response,noReductionErr);
788 debug(D_SIGNALS,
"removing pending response: %s\n",_td(r,l));
797 return noDeliveryErr;
801 bool __cid_equal(
SemTable *sem,
T *cid1,
T*cid2) {
802 UUIDt *u1 = __cid_getUUID(cid1);
803 UUIDt *u2 = __cid_getUUID(cid2);
804 return __uuid_equal(u1,u2);
808 T *__cid_new(
T *parent,
UUIDt *c,
T *topic) {
809 T *cid = _t_newr(parent,CONVERSATION_IDENT);
810 _t_new(cid,CONVERSATION_UUID,c,
sizeof(
UUIDt));
814 UUIDt *__cid_getUUID(
T *cid) {
823 T *cu = __cid_new(c,u,0);
825 _t_add(c, until ? until : __r_build_default_until());
826 _t_newr(c,CONVERSATIONS);
827 if (wakeup)
_t_add(c,wakeup);
832 p = _r_find_conversation(r,parent_u);
833 p =
_t_child(p,ConversationConversationsIdx);
834 if (!p) raise_error(
"parent conversation not found!");
836 else p = r->conversations;
843 T *__r_find_conversation(
T *conversations,
UUIDt *uuid) {
848 DO_KIDS(conversations,
851 if (__uuid_equal(uuid,u)) {
855 T *sub_conversations =
_t_child(c,ConversationConversationsIdx);
857 c = __r_find_conversation(sub_conversations,uuid);
858 if (c) {found =
true;
break;}
867 return __r_find_conversation(r->conversations, uuid);
871 typedef void (*doConversationFn)(
T *,
void *);
872 void __r_walk_conversation(T *conversation, doConversationFn fn,
void *param) {
873 (*fn)(
_t_child(conversation,ConversationIdentIdx),param);
875 T *conversations =
_t_child(conversation,ConversationConversationsIdx);
878 DO_KIDS(conversations,
880 __r_walk_conversation(c,param,fn);
885 void _cleaner(T *cid,
void *p) {
887 UUIDt *u = __cid_getUUID(cid);
897 T *cid =
__t_find(e,CONVERSATION_IDENT,ExpectationOptionalsIdx);
898 if (cid && __uuid_equal(u,__cid_getUUID(cid))) {
906 for(i=1;i<=
_t_children(r->pending_responses);i++) {
907 e =
_t_child(r->pending_responses,i);
908 T *cid =
_t_child(e,PendingResponseConversationIdentIdx);
909 if (cid && __uuid_equal(u,__cid_getUUID(cid))) {
921 T *c = _r_find_conversation(r,cuuid);
923 raise_error(
"can't find conversation");
927 __r_walk_conversation(c,_cleaner,r);
956 T *head =
_t_getv(signal,SignalMessageIdx,MessageHeadIdx,TREE_PATH_TERMINATOR);
958 T *conversation = NULL;
959 T *end_conditions = NULL;
960 T *response_to = NULL;
963 int optionals = HeadOptionalsIdx;
965 while(extra =
_t_child(head,optionals++)) {
967 if (semeq(CONVERSATION_IDENT,sym))
968 conversation = extra;
969 else if (semeq(IN_RESPONSE_TO_UUID,sym))
971 else if (semeq(END_CONDITIONS,sym))
972 end_conditions = extra;
977 UUIDt *cuuid = __cid_getUUID(conversation);
978 T *c = _r_find_conversation(r,cuuid);
980 c = _r_add_conversation(r,0,cuuid,end_conditions,NULL);
986 return __r_deliver_response(r,response_to,signal);
991 if (end_conditions) {
999 T *as = __r_get_signals(r,aspect);
1001 debug(D_SIGNALS,
"Delivering: %s\n",_td(r,signal));
1004 T *es = __r_get_expectations(r,aspect);
1005 debug(D_SIGNALS,
"Testing %d expectations\n",es ?
_t_children(es) : 0);
1012 return noDeliveryErr;
1025 a = __r_add_aspect(r->
flux,aspect);
1029 return _t_child(__r_get_aspect(r,aspect),aspectExpectationsIdx);
1032 return _t_child(__r_get_aspect(r,aspect),aspectSignalsIdx);
1040 if (! is_receptor(
_t_symbol(installed_receptor)))
1041 raise_error(
"expecting SEM_TYPE_RECEPTOR!");
1059 char __t_dump_buf[10000];
1062 __td(r,t,__t_dump_buf);
1065 char *__td(
Receptor *r,T *t,
char *buf) {
1066 if (!t) sprintf(buf,
"<null-tree>");
1068 __t_dump(r->
sem,t,0,buf);
1080 void __r_listenerCallback(
Stream *st,
void *arg) {
1092 _t_add(run_tree,_t_rclone(err_handler));
1095 _p_addrt2q(r->
q,run_tree);
1105 if (!params) params = _t_newr(e,PARAMS);
1107 if (err_handler)
_t_add(e,err_handler);
1109 if (r->
edge) raise_error(
"edge in use!!");
1122 p = _t_newr(code,SCOPE);
1123 p = _t_newr(p,ITERATE);
1129 T *params = _t_newr(p,PARAMS);
1130 T *eof = _t_newr(p,STREAM_ALIVE);
1134 T *say = _t_newr(p,SAY);
1136 __r_make_addr(say,TO_ADDRESS,to);
1137 _t_news(say,ASPECT_IDENT,aspect);
1138 _t_news(say,CARRIER,carrier);
1140 T *s = _t_new(say,STREAM_READ,0,0);
1142 _t_new(s,RESULT_SYMBOL,&result_symbol,
sizeof(
Symbol));
1144 T *run_tree = __p_build_run_tree(code,0);
1146 _p_addrt2q(r->
q,run_tree);
1153 char *stx =
"/<LINE:LINE>";
1162 T *t =_t_newr(expect,SEMTREX_SYMBOL_ANY);
1171 T* s = _t_newr(params,SLOT);
1172 _t_news(s,USAGE,NULL_SYMBOL);
1175 _sem_get_by_label(G_sem,
"echo2stream",&echo2stream);
1177 T *act =
_t_newp(0,ACTION,echo2stream);
1183 void _r_defineClockReceptor(
SemTable *sem) {
1184 Context clk_ctx = _d_get_receptor_context(sem,CLOCK_RECEPTOR);
1186 int p[] = {SignalMessageIdx,MessageHeadIdx,HeadCarrierIdx,TREE_PATH_TERMINATOR};
1187 _t_new(resp,SIGNAL_REF,p,
sizeof(
int)*4);
1190 T *g = _t_newr(resp,GET);
1191 _t_new(g,WHICH_XADDR,&x,
sizeof(
Xaddr));
1192 T *signature = __p_make_signature(
"result",SIGNATURE_SYMBOL,NULL_SYMBOL,NULL);
1194 T *act =
_t_newp(0,ACTION,proc);
1196 T *s =
_sl(pattern,CLOCK_TELL_TIME);
1198 T *req_act =
_t_newp(0,ACTION,time_request);
1203 GOAL,RESPONSE_HANDLER,
1204 INTERACTION,tell_time,
1205 INITIATE,TIME_HEARER,TIME_TELLER,req_act,
1206 EXPECT,TIME_TELLER,TIME_HEARER,pattern,act,NULL_SYMBOL,
1218 T *s =
_sl(expect,CLOCK_TELL_TIME);
1220 T *tick = __r_make_tick();
1224 __sem_get_by_label(sem,
"time",&time,r->
context);
1252 debug(D_CLOCK,
"clock started\n");
1255 while (r->
state == Alive) {
1256 T *tick =__r_make_tick();
1257 debug(D_CLOCK,
"%s\n",_td(r,tick));
1266 debug(D_CLOCK,
"clock stopped\n");
1271 T * __r_make_timestamp(
Symbol s,
int delta) {
1276 gmtime_r(&clock, &t);
1278 T *today = _t_newr(tick,TODAY);
1279 T *now = _t_newr(tick,NOW);
1280 _t_newi(today,YEAR,t.tm_year+1900);
1281 _t_newi(today,MONTH,t.tm_mon+1);
1282 _t_newi(today,DAY,t.tm_mday);
1283 _t_newi(now,HOUR,t.tm_hour);
1284 _t_newi(now,MINUTE,t.tm_min);
1285 _t_newi(now,SECOND,t.tm_sec);
1300 void __r_dump_instances(
Receptor *r) {
T * _t_new_root(Symbol symbol)
Receptor * r
back-pointer to receptor in which this Q is running (for defs and more)
T * edge
data store for edge receptors
char * _sem_get_name(SemTable *sem, SemanticID s)
void _st_close_listener(SocketListener *l)
int state
state information about the receptor that the vmhost manages
Structure __r_get_symbol_structure(Receptor *r, Symbol s)
find a symbol's structure
void _r_serialize(Receptor *r, void **surfaceP, size_t *lengthP)
Process _d_define_process(SemTable *sem, T *code, char *name, char *intention, T *signature, T *link, Context c)
T * __t_newi(T *parent, Symbol symbol, int surface, bool is_run_node)
Context parent
the context this receptor's definition lives in
T * _t_detach_by_idx(T *t, int i)
void __r_test_expectation(Receptor *r, T *expectation, T *signal)
Semantic tree regular expression header file.
void _p_wakeup(Q *q, T *wakeup, T *with, Error err)
TreeHash _t_hash(SemTable *sem, T *t)
T * __p_build_wakeup_info(T *code_point, int process_id)
void _r_free(Receptor *r)
T * __t_newr(T *parent, Symbol symbol, bool is_run_node)
T * _t_child(T *t, int i)
Receptor * __r_get_receptor(T *installed_receptor)
protocol helpers header file
T * _t_newp(T *parent, Symbol symbol, Process surface)
TreeHash _r_hash(Receptor *r, Xaddr t)
Receptor * _r_new_receptor_from_package(SemTable *sem, Symbol s, T *p, T *bindings)
Creates a new receptor from a receptor package.
streams abstraction header file
T * _d_build_def_semtrex(SemTable *sem, Symbol s, T *parent)
Context context
the context this receptor's definition creates
Structure _d_define_structure(SemTable *sem, char *label, T *structure_def, Context c)
ReceptorAddress addr
the address by which to get messages to this receptor instance
Instances instances
the instances store
receptor implementation header file
Structure _r_define_structure(Receptor *r, char *label, int num_params,...)
void _r_add_expectation(Receptor *r, Aspect aspect, Symbol carrier, T *pattern, T *action, T *with, T *until, T *using, T *cid)
Adds an expectation to a receptor's aspect.
size_t _d_get_symbol_size(SemTable *sem, Symbol s, void *surface)
int _t_match(T *semtrex, T *t)
Structure __r_define_structure(Receptor *r, char *label, T *structure_def)
T * flux
pointer for quick access to the flux
size_t __r_get_structure_size(Receptor *r, Structure s, void *surface)
Receptor * _r_new(SemTable *sem, SemanticID r)
Creates a new receptor.
size_t _d_get_structure_size(SemTable *sem, Structure s, void *surface)
Receptor * _r_unserialize(SemTable *sem, void *surface)
T * _t_newt(T *parent, Symbol symbol, T *surface)
T * _r_send(Receptor *r, T *signal)
Symbol _r_get_sem_by_label(Receptor *r, char *label)
SState * state(StateType type, int *statesP, int level)
Symbol _d_define_symbol(SemTable *sem, Structure s, char *label, Context c)
int _t_matchr(T *semtrex, T *t, T **rP)
Qe * __p_addrt2q(Q *q, T *run_tree, T *sem_map)
T * __r_make_signal(ReceptorAddress from, ReceptorAddress to, Aspect aspect, Symbol carrier, T *signal_contents, UUIDt *in_response_to, T *until, T *cid)
T * _r_build_def_semtrex(Receptor *r, Symbol s)
T * __t_find(T *t, SemanticID sym, int start_child)
void _p_fill_from_match(SemTable *sem, T *t, T *match_results, T *match_tree)
T * root
RECEPTOR_INSTANCE semantic tree.
SemTable * sem
pointer back to the genotype table for this receptor's vmhost instance
size_t __r_get_symbol_size(Receptor *r, Symbol s, void *surface)
void _o_express_role(Receptor *r, Protocol protocol, Symbol role, Aspect aspect, T *bindings)
T * _r_get_instance(Receptor *r, Xaddr x)
void * ___clock_thread(void *arg)
#define _sl(t, s)
macro to add a single symbol literal to semtrex tree
semantic tree matrix header file ``
Protocol _d_define_protocol(SemTable *sem, T *def, Context c)
Symbol _r_define_symbol(Receptor *r, Structure s, char *label)
Error _r_deliver(Receptor *r, T *signal)
header file for the accumulator
T * _r_set_instance(Receptor *r, Xaddr x, T *t)
T * _o_make_protocol_def(SemTable *sem, Context c, char *label,...)
Process _r_define_process(Receptor *r, T *code, char *name, char *intention, T *signature, T *link)
void _t_detach_by_ptr(T *t, T *c)
T * _r_delete_instance(Receptor *r, Xaddr x)
T * _r_request(Receptor *r, T *signal, Symbol response_carrier, T *code_point, int process_id, T *cid)
T * _p_make_run_tree(SemTable *sem, Process p, T *params, T *sem_map)
T * _t_new_cptr(T *parent, Symbol symbol, void *s)
int _r_def_match(Receptor *r, Symbol s, T *t)
Xaddr _r_new_instance(Receptor *r, T *t)
SocketListener * _st_new_socket_listener(int port, lisenterConnectionCallbackFn fn, void *callback_arg, char *delim)