ceptr
 All Data Structures Files Functions Variables Typedefs Macros Modules Pages
process.c
Go to the documentation of this file.
1 
11 #include "process.h"
12 #include "def.h"
13 #include "semtrex.h"
14 #include <stdarg.h>
15 #include "receptor.h"
16 #include "../spec/spec_utils.h"
17 #include "util.h"
18 #include "debug.h"
19 #include <errno.h>
20 #include "accumulator.h"
21 #include "protocol.h"
22 void rt_check(Receptor *r,T *t) {
23  if (!(t->context.flags & TFLAG_RUN_NODE)) raise_error("Whoa! Not a run node! %s\n",_td(r,t));
24 }
25 
26 uint32_t get_rt_cur_child(Receptor *r,T *tP) {
27  rt_check(r,tP);
28  return (((rT *)tP)->cur_child);
29 }
30 
31 void set_rt_cur_child(Receptor *r,T *tP,uint32_t idx) {
32  rt_check(r,tP);
33  (((rT *)tP)->cur_child) = idx;
34 }
35 
36 void processUnblocker(Stream *st) {
37  int err;
38  // we might need to wait for the process to actually be blocked in the processing thread
39  // so if unblock reports that the process isn't blocked, then just wait for a bit.
40  // @todo, this is really ugly and could lead to deadlock, so we should fix it somehow!
41 
42  // sometimes the unblocker gets called by the stream completion after the actual
43  // code finished running for what ever reason,so if the code is already dead, we
44  // just ignore that case.
45  // @todo, this is gonna bite our ass, and should be fixed
46  int i = 1000;
47  while ((i-- > 0) && (err = _p_unblock((Q *)st->callback_arg1,st->callback_arg2))) {
48  if (err == 2) return; //raise_error("can't unblock completed process!");
49  if (err == 3) return; //raise_error("can't unblock non-existent process");
50  sleepms(1);
51  }
52  if (err) raise_error("process never blocked!");
53 }
54 
55 // setup the default until condition (only once, and 30 second timeout)
56 T *defaultRequestUntil() {
57  T *until = _t_newr(0,END_CONDITIONS);
58  T *ts = __r_make_timestamp(TIMEOUT_AT,30);
59  _t_add(until,ts);
60  _t_newi(until,COUNT,1);
61  return until;
62 }
63 
76 void _p_fill_from_match(SemTable *sem,T *t,T *match_results,T *match_tree) {
77  T *sem_map = _stx_results2sem_map(sem,match_results,match_tree);
78  __t_fill_template(t,sem_map,true);
79  _t_free(sem_map);
80 }
81 
82 Process _p_get_transcoder(SemTable *sem,Symbol src_sym,Symbol to_sym) {
83 
84  if (semeq(HTTP_RESPONSE,src_sym) && semeq(LINES,to_sym)) {
85  return http_response_2_lines;
86  }
87  else if (semeq(CONTENT_TYPE,src_sym) && semeq(LINE,to_sym))
88  return content_type_2_line;
89  else if (semeq(ASCII_CHARS,src_sym) && semeq(HTTP_REQUEST,to_sym))
90  return ascii_chars_2_http_req;
91  else {
92  Structure src_s = _sem_get_symbol_structure(sem,src_sym);
93  Structure to_s = _sem_get_symbol_structure(sem,to_sym);
94  if (semeq(src_s,DATE) && semeq(to_s,CSTRING)) {
95  return date2usshortdate;
96  }
97  if (semeq(src_s,TIME) && semeq(to_s,CSTRING)) {
98  return time2shortime;
99  }
100  if (semeq(HTTP_RESPONSE_STATUS,src_sym) && semeq(to_s,CSTRING)) {
101  return http_response_status_2_ascii_str;
102  }
103  }
104  return NULL_PROCESS;
105 }
106 
107 int _p_transcode(SemTable *sem, T* src,Symbol to_sym, Structure to_s,T **result) {
108  debug(D_TRANSCODE,"transcoding a %s to a %s\n",_sem_get_name(sem,_t_symbol(src)),_sem_get_name(sem,to_sym));
109  Symbol src_sym = _t_symbol(src);
110  T *x;
111  bool dofree = true;
112  int err = noReductionErr;
113  if (semeq(to_sym,src_sym)) {
114  x = src;
115  dofree = false;
116  }
117  else {
118  Process p = _p_get_transcoder(sem,src_sym,to_sym);
119  if (!semeq(p,NULL_PROCESS)) {
120  // we found a defined process for trans coding between the symbols
121  x = __t_newr(0,p,true);
122  _t_add(x,src);
123  err=redoReduction;
124  dofree = false;
125  // __t_newi(x,TEST_INT_SYMBOL,3333,true);
126  }
127  else {
128  // built in transcodings for built in structures
129  Structure src_s = _sem_get_symbol_structure(sem,src_sym);
130  if (semeq(to_s,src_s)) {
131  x = src;
132  x->contents.symbol = to_sym;
133  dofree = false;
134  }
135  else if (semeq(to_s,INTEGER)) {
136  if (semeq(src_s,CSTRING)) {
137  x = __t_newi(0,to_sym,atoi(_t_surface(src)),true);
138  }
139  else return incompatibleTypeReductionErr;
140  }
141  else if (semeq(to_sym,ASCII_CHARS)) {
142  if (semeq(src_s,CSTRING)) {
143  char *c = (char *)_t_surface(src);
144  int l = _t_size(src);
145  x = __t_newr(0,ASCII_CHARS,true);
146  while (--l) { // ignore the terminating null
147  __t_newc(x,ASCII_CHAR,*c,true);
148  c++;
149  }
150 
151  }
152  else return incompatibleTypeReductionErr;
153  }
154  else if (semeq(to_s,CSTRING)) {
155  if (semeq(src_s,INTEGER)) {
156  char buf[100];
157  sprintf(buf,"%d",*(int *)_t_surface(src));
158  x = __t_new_str(0,to_sym,buf,true);
159  }
160  else if (semeq(src_s,FLOAT)) {
161  char buf[100];
162  sprintf(buf,"%f",*(float *)_t_surface(src));
163  x = __t_new_str(0,to_sym,buf,true);
164  }
165  else if (semeq(src_s,CHAR)) {
166  char buf[2];
167  buf[0] = *(char *)_t_surface(src);
168  buf[1] = 0;
169  x = __t_new_str(0,to_sym,buf,true);
170  }
171  else {
172 
173  // get the definition of the structure of the src symbol.
174  T *def = _sem_get_def(sem,src_s);
175  Symbol s_def = *(Symbol *)_t_surface(_t_child(def,2));
176 
177  // if it's an optionality structure then we can recurse on transcode
178  // and dissolve the results into the parent
179  if (!semeq(s_def,NULL_SYMBOL)) {
180  if (_t_children(src) == 0) x = __t_new_str(0,to_sym,"",true);
181  else {
182  x = __t_newr(0,DISSOLVE,true);
183  T *xx = __t_newr(x,LINES,true);
184  T *k,*r;
185  int e;
186  while ((k = _t_detach_by_idx(src,1))) {
187  e = _p_transcode(sem,k,to_sym,to_s,&r);
188  if (e && e != redoReduction) {
189  _t_free(src);
190  return e;
191  }
192  _t_add(xx,r);
193  }
194  err = redoReduction;
195  }
196  }
197  else x = __t_new_str(0,to_sym,_t2s(sem,src),true);
198  }
199  }
200  else {
201  debug(D_TRANSCODE,"trying to find structural match\n");
202  // This a special case where if the src and dest strutctures
203  // of the same form *X then we can transcode the elements
204  // @todo generalize
205  // get the definition of the structure of the src and dest
206  T *sdef = _sem_get_def(sem,src_s);
207  sdef = _t_child(sdef,SymbolDefStructureIdx);
208  debug(D_TRANSCODE,"source def: %s\n",t2s(sdef));
209  Symbol sdef_sym = _t_symbol(sdef);
210 
211  T *tdef = _sem_get_def(sem,to_s);
212  tdef = _t_child(tdef,SymbolDefStructureIdx);
213  debug(D_TRANSCODE,"to def: %s\n",t2s(tdef));
214  Symbol tdef_sym = _t_symbol(tdef);
215 
216  T *k;
217  Symbol k_sym = _t_symbol(k = _t_child(tdef,1));
218  bool tsym_isa_simple_list = (semeq(tdef_sym,STRUCTURE_ZERO_OR_MORE)||
219  semeq(tdef_sym,STRUCTURE_ZERO_OR_ONE)||
220  semeq(tdef_sym,STRUCTURE_ONE_OR_MORE)) &&
221  semeq(k_sym,STRUCTURE_SYMBOL);
222  x = NULL;
223  if (tsym_isa_simple_list) {
224  Symbol to_list_of_sym = *(Symbol *)_t_surface(k);
225  if (semeq(tdef_sym,sdef_sym) &&
226  semeq(_t_symbol( _t_child(sdef,1)),STRUCTURE_SYMBOL)) {
227  debug(D_TRANSCODE,"transcoding elements of simple list\n");
228  x = __t_newr(0,to_sym,true);
229  T *m,*r;
230  int e;
231  // the to becomes the surface of the STRUCTURE_SYMBOL def
232 
233  Structure to_s = _sem_get_symbol_structure(sem,to_list_of_sym);
234  while ((m = _t_detach_by_idx(src,1))) {
235  e = _p_transcode(sem,m,to_list_of_sym,to_s,&r);
236  if (e && e != redoReduction) {
237  _t_free(src);
238  return e;
239  }
240  _t_add(x,r);
241  }
242  err = redoReduction;
243  }
244  else {
245  // if the to is a simple list but the src isn't then if the
246  // src happens to be of the right type we can simply added it as a
247  // singleton to the list. Otherwise we first have to try transcoding it
248  debug(D_TRANSCODE,"transcoding singleton into simple list\n");
249  T *singleton = src;
250  if (!semeq(src_sym,to_list_of_sym)) {
251  debug(D_TRANSCODE,"src doesn't match, recurring..\n");
252  int e;
253  e = _p_transcode(sem,src,to_list_of_sym,_sem_get_symbol_structure(sem,to_list_of_sym),&singleton);
254  if (e && e != redoReduction) {
255  _t_free(src);
256  return e;
257  }
258  err = redoReduction;
259  }
260  x = __t_newr(0,to_sym,true);
261  _t_add(x,singleton);
262  dofree=false;
263  }
264  }
265  if (!x) {
266  debug(D_TRANSCODE,"unable to match\n");
267  _t_free(src);
268  return incompatibleTypeReductionErr;
269  }
270  }
271  }
272  }
273  *result = x;
274  debug(D_TRANSCODE,"transcode result: %s\n",_t2s(sem,x));
275  if (dofree) _t_free(src);
276  return err;
277 }
278 
279 
291 Error __p_check_signature(SemTable *sem,Process p,T *code,T *sem_map) {
292  T *processes = _sem_get_defs(sem,p);
293  T *def = _d_get_process_code(processes,p);
294  T *signature = _t_child(def,ProcessDefSignatureIdx);
295  // @todo if there's no signature we should probably fail, but instead we assume everything's ok
296  // (sig should always have at least 1 child, the output sig)
297  int sigs = _t_children(signature);
298  if (!sigs) return 0;
299  int input_sigs = 0;
300  int i;
301 
302  for(i=SignatureOutputSigIdx+1;i<=sigs;i++) { // skip the output signature which is always first
303  T *s = _t_child(signature,i);
304  Symbol sym = _t_symbol(s);
305  if (semeq(sym,INPUT_SIGNATURE)) {
306  input_sigs++;
307  T *param = _t_child(code,i-1);
308  T *sig = _t_child(s,2); // input signatures start at 2
309  bool is_optional = _t_child(s,InputSigOptionalIdx) != NULL;
310  if (!param && !is_optional)
311  // raise_error("missing non-optional param");
312  return tooFewParamsReductionErr;
313  if (!param && is_optional) {
314  // don't count as required sig
315  input_sigs--;
316  }
317  if (param) {
318  if(semeq(_t_symbol(sig),SIGNATURE_STRUCTURE)) {
319  Structure ss = *(Symbol *)_t_surface(sig);
320  if (!semeq(_sem_get_symbol_structure(sem,_t_symbol(param)),ss) && !semeq(ss,TREE))
321  return signatureMismatchReductionErr;
322  }
323  else if(semeq(_t_symbol(sig),SIGNATURE_SYMBOL)) {
324  Symbol ss = *(Symbol *)_t_surface(sig);
325  if (!semeq(ss,_t_symbol(param)))
326  raise_error("signatureMismatchReductionErr expected:%s got:%s\n",_sem_get_name(sem,ss),_t2s(sem,param));
327  // return signatureMismatchReductionErr;
328  }
329  else if(semeq(_t_symbol(sig),SIGNATURE_ANY)) {
330  }
331  else if(semeq(_t_symbol(sig),SIGNATURE_PROCESS)) {
332  Symbol expected = *(Symbol *)_t_surface(sig);
333  Symbol actual = _t_symbol(param);
334  if (!semeq(expected,actual)) {
335  raise_error("expecting process to reduce to %s, got: %s\n",_sem_get_name(sem,expected),_t2s(sem,param));
336  }
337  }
338  else {
339  raise_error("unknown signature checking symbol: %s",_sem_get_name(sem,_t_symbol(sig)));
340  }
341  }
342  }
343  else if (semeq(sym,TEMPLATE_SIGNATURE)) {
344  if (!sem_map)
345  return missingSemanticMapReductionErr;
346  int c = _t_children(s);
347  int map_children = _t_children(sem_map);
348  if (map_children < c ) return mismatchSemanticMapReductionErr;
349 
350  // build up hashes of all the semantic references in our map
351  // @todo cache this someplace so we don't need to do it every time
352  TreeHash mapped[map_children];
353  int j;
354  for(j=1;j<=map_children;j++) {
355  T *t = _t_child(_t_child(sem_map,j),SemanticMapSemanticRefIdx);
356  mapped[j-1] = _t_hash(sem,t);
357  }
358  // now scan through the signature and see if all it's expected slots are actually mapped
359  // @todo convert this to a true hash lookup algorithm
360  for(j=1;j<=c;j++) {
361  T *t = _t_child(_t_child(s,j),1);
362  TreeHash h = _t_hash(sem,t);
363  int k;
364  for (k=0;k<map_children;k++) {
365  if (mapped[k] == h) {
366  break;
367  }
368  }
369  // not found so return a mismatch error
370  if (k == map_children) return mismatchSemanticMapReductionErr;
371  }
372  }
373  }
374  int param_count = _t_children(code);
375  if (param_count > input_sigs) return tooManyParamsReductionErr;
376  if (param_count < input_sigs) return tooFewParamsReductionErr;
377 
378  return 0;
379 }
380 
381 /* low level function to unwind a run-tree to a specific point*/
382 void __p_unwind_to_point(R *context,T *code_point,T *with) {
383  T *p = _t_parent(code_point);
384  _t_replace(p,_t_node_index(code_point), with);
385  context->parent = p;
386  context->node_pointer = with;
387 }
388 
395 Error __p_reduce_sys_proc(R *context,Symbol s,T *code,Q *q) {
396  int b,c;
397  char *str;
398  Symbol sy;
399  T *x,*t,*match_results,*match_tree;
400  Error err = noReductionErr;
401  SemTable *sem = q ? q->r->sem : G_sem;
402 
403  debug(D_REDUCE,"Reducing sys proc: %s\n",_sem_get_name(sem,s));
404  debug(D_STEP,"Reducing %s\n",_t2s(sem,code));
405 
406  bool dissolve = false;
407  switch(s.id) {
408  case NOOP_ID:
409  // noop simply replaces itself with it's own child
411  x = _t_detach_by_idx(code,1);
412  break;
413  case GET_ID:
414  case DEL_ID:
415  {
416  T *t = _t_detach_by_idx(code,1);
417  Xaddr xa = *(Xaddr *)_t_surface(t);
418  T *v = _r_get_instance(q->r,xa);
419  if (!v) raise_error("Invalid xaddr in GET");
420  x = _t_rclone(v);
421  _t_free(t);
422  if (s.id == DEL_ID) {
423  _r_delete_instance(q->r,xa);
424  }
425  }
426  break;
427  case DEF_SYMBOL_ID:
428  {
429  T *def = _t_detach_by_idx(code,1);
430  //@todo some kind of validation of the def??
431  SemanticID ns = _d_define(sem,def, SEM_TYPE_SYMBOL,q->r->context);
432  x = __t_news(0,RESULT_SYMBOL,ns,true);
433  }
434  break;
435  case DEF_STRUCTURE_ID:
436  {
437  T *def = _t_detach_by_idx(code,1);
438  //@todo some kind of validation of the def??
439  SemanticID ns = _d_define(sem,def, SEM_TYPE_STRUCTURE,q->r->context);
440  x = __t_news(0,RESULT_STRUCTURE,ns,true);
441  }
442  break;
443  case DEF_PROCESS_ID:
444  {
445  T *def = _t_detach_by_idx(code,1);
446  //@todo some kind of validation of the def??
447  SemanticID ns = _d_define(sem,def, SEM_TYPE_PROCESS,q->r->context);
448  x = __t_news(0,RESULT_PROCESS,ns,true);
449  }
450  break;
451  case DEF_RECEPTOR_ID:
452  {
453  T *def = _t_detach_by_idx(code,1);
454  //@todo some kind of validation of the def??
455  SemanticID ns = __d_define_receptor(sem,def,q->r->context);
456  x = __t_news(0,RESULT_RECEPTOR,ns,true);
457  }
458  break;
459  case DEF_PROTOCOL_ID:
460  {
461  T *def = _t_detach_by_idx(code,1);
462  //@todo some kind of validation of the def??
463  SemanticID ns = _d_define(sem,def,SEM_TYPE_PROTOCOL,q->r->context);
464  x = __t_news(0,RESULT_PROTOCOL,ns,true);
465  }
466  break;
467  case NEW_ID:
468  {
469  T *t = _t_detach_by_idx(code,1);
470  Symbol s = *(Symbol *)_t_surface(t);
471  _t_free(t);
472  t = _t_detach_by_idx(code,1);
473  Structure struc_new = _sem_get_symbol_structure(sem,s);
474  Structure struc_val = _sem_get_symbol_structure(sem,_t_symbol(t));
475  if (!semeq(struc_new,struc_val)) {
476  return structureMismatchReductionErr;
477  }
478  else {
479  t->contents.symbol = s;
480  Xaddr xa = _r_new_instance(q->r,t);
481  x = __t_new(0,WHICH_XADDR,&xa,sizeof(Xaddr),1);
482  }
483  }
484  break;
485  case DO_ID:
486  case CONVERSE_ID:
487  {
488  // all of the scope's children should have been reduced
489  // so all we need to do is return the last one.
490  T *scope = _t_detach_by_idx(code,1);
491  int p = _t_children(scope);
492  x = _t_detach_by_idx(scope,p);
493  _t_free(scope);
494  if (s.id == CONVERSE_ID) {
495  //pop off the last conversation reference
496  if (context->conversation)
497  context->conversation = context->conversation->next;
498  }
499  T *t;
500  while ((t = _t_detach_by_idx(code,1))) {
501  if (semeq(_t_symbol(t),BOOLEAN)) {
502  if (*(int *)_t_surface(t)) {
503  err = Block;
504  }
505  }
506  _t_free(t);
507  }
508  }
509  break;
510  case IF_ID:
511  t = _t_child(code,1);
512  b = (*(int *)_t_surface(t)) ? 2 : 3;
513  x = _t_detach_by_idx(code,b);
514  break;
515  case COND_ID:
516  // COND is a special case, we have to check the phase to see what to do
517  // after the children have been evaluated.
518  {
519  CondState *state = *(CondState **)_t_surface(code);
520  // get the condition or else results into x
521  x = _t_detach_by_idx(code,1);
522  if (state->phase == EvalCondCondtions) {
523  // if the condition was true, then we have to load the body for evaluation
524  T *cond_pair = _t_detach_by_idx(state->conditions,1);
525  if (*(int *)_t_surface(x)) {
526  _t_add(code,_t_detach_by_idx(cond_pair,1));
527  state->phase = EvalCondResult;
528  } else {
529  // otherwise we have to move on to the next condition or the Else
530  T *c = _t_child(state->conditions,1);
531  _t_add(code,_t_detach_by_idx(c,1));
532  if (semeq(_t_symbol(c),COND_ELSE)) {
533  state->phase = EvalCondResult;
534  }
535  }
536  _t_free(cond_pair);
537  _t_free(x);
538  return Eval;
539  }
540  else {
541  // cleanup the state before returning.
542  _t_free(state->conditions);
543  free(state);
544  code->contents.size = 0;
545  }
546  }
547  break;
548  case EQ_SYM_ID:
549  x = __t_newi(0,BOOLEAN,
550  semeq(
551  *(Symbol *)_t_surface(_t_child(code,1)),
552  *(Symbol *)_t_surface(_t_child(code,2))),
553  true);
554  break;
555  case ADD_INT_ID:
556  x = _t_detach_by_idx(code,1);
557  c = *(int *)_t_surface(_t_child(code,1));
558  *((int *)&x->contents.surface) = c+*((int *)&x->contents.surface);
559  break;
560  case SUB_INT_ID:
561  x = _t_detach_by_idx(code,1);
562  c = *(int *)_t_surface(_t_child(code,1));
563  *((int *)&x->contents.surface) = *((int *)&x->contents.surface)-c;
564  break;
565  case MULT_INT_ID:
566  x = _t_detach_by_idx(code,1);
567  c = *(int *)_t_surface(_t_child(code,1));
568  *((int *)&x->contents.surface) = *((int *)&x->contents.surface)*c;
569  break;
570  case DIV_INT_ID:
571  x = _t_detach_by_idx(code,1);
572  c = *(int *)_t_surface(_t_child(code,1));
573  if (!c) {
574  _t_free(x);
575  return divideByZeroReductionErr;
576  }
577  *((int *)&x->contents.surface) = *((int *)&x->contents.surface)/c;
578  break;
579  case MOD_INT_ID:
580  x = _t_detach_by_idx(code,1);
581  c = *(int *)_t_surface(_t_child(code,1));
582  if (!c) {
583  _t_free(x);
584  return divideByZeroReductionErr;
585  }
586  *((int *)&x->contents.surface) = *((int *)&x->contents.surface)%c;
587  break;
588  case EQ_INT_ID:
589  x = _t_detach_by_idx(code,1);
590  c = *(int *)_t_surface(_t_child(code,1));
591  *((int *)&x->contents.surface) = *((int *)&x->contents.surface)==c;
592  x->contents.symbol = BOOLEAN;
593  break;
594  case LT_INT_ID:
595  x = _t_detach_by_idx(code,1);
596  c = *(int *)_t_surface(_t_child(code,1));
597  *((int *)&x->contents.surface) = *((int *)&x->contents.surface)<c;
598  x->contents.symbol = BOOLEAN;
599  break;
600  case GT_INT_ID:
601  x = _t_detach_by_idx(code,1);
602  c = *(int *)_t_surface(_t_child(code,1));
603  *((int *)&x->contents.surface) = *((int *)&x->contents.surface)>c;
604  x->contents.symbol = BOOLEAN;
605  break;
606  case LTE_INT_ID:
607  x = _t_detach_by_idx(code,1);
608  c = *(int *)_t_surface(_t_child(code,1));
609  *((int *)&x->contents.surface) = *((int *)&x->contents.surface)<=c;
610  x->contents.symbol = BOOLEAN;
611  break;
612  case GTE_INT_ID:
613  x = _t_detach_by_idx(code,1);
614  c = *(int *)_t_surface(_t_child(code,1));
615  *((int *)&x->contents.surface) = *((int *)&x->contents.surface)>=c;
616  x->contents.symbol = BOOLEAN;
617  break;
618  case POP_PATH_ID:
619  {
620  x = _t_detach_by_idx(code,1);
621  T *as = _t_child(code,1);
622  T *count = _t_child(code,2);
623  x->contents.symbol = *(Symbol *)_t_surface(as);
624  int i = count ? *(int *)_t_surface(count) : 1;
625  int *path = (int *)_t_surface(x);
626  int d = _t_path_depth(path);
627  if (i>d) path[0] = TREE_PATH_TERMINATOR;
628  else path[d-i] = TREE_PATH_TERMINATOR;
629  }
630  break;
631  case CONTRACT_STR_ID:
632  case CONCAT_STR_ID:
633  // if the first parameter is a RESULT SYMBOL then we use that as the symbol type for the result tree.
634  x = _t_detach_by_idx(code,1);
635  if (!x) return tooFewParamsReductionErr;
636  sy = _t_symbol(x);
637  if (semeq(RESULT_SYMBOL,sy)) {
638  sy = *(Symbol *)_t_surface(x);
639  _t_free(x);
640  // confirm that the result structure is a CSTRING
641  Structure struc = _sem_get_symbol_structure(sem,sy);
642  if (!semeq(struc,CSTRING))
643  return signatureMismatchReductionErr;
644  x = _t_detach_by_idx(code,1);
645  } else if (s.id == CONTRACT_STR_ID) {// CONTRACT requires first param to be RESULT_SYMBOL
646  _t_free(x);
647  return signatureMismatchReductionErr;
648  }
649  if (!x) {
650  return tooFewParamsReductionErr;
651  }
652  c = _t_children(code);
653 
654  // make sure the surface was allocated and if not, converted to an alloced surface
655  if (c > 0) {
656  if (!(x->context.flags & TFLAG_ALLOCATED)) {
657  str = malloc(x->contents.size);
658  memcpy(str,&x->contents.surface,x->contents.size);
659  x->contents.surface = str;
660  x->context.flags = TFLAG_ALLOCATED+TFLAG_RUN_NODE;
661  }
662  }
663  // check type the first node
664  Structure struc = _sem_get_symbol_structure(sem,_t_symbol(x));
665  if (semeq(struc,CHAR)) {
666  x->contents.surface = realloc(x->contents.surface,++x->contents.size);
667  ((char *)x->contents.surface)[1] = 0;
668  }
669  else if (!semeq(struc,CSTRING)) {
670  _t_free(x);
671  return incompatibleTypeReductionErr;
672  }
674  for(b=1;b<=c;b++) {
675  T *t = _t_child(code,b);
676  struc = _sem_get_symbol_structure(sem,_t_symbol(t));
677  str = (char *)_t_surface(t);
678  int size;
679  if (semeq(struc,CSTRING)) size = strlen(str);
680  else if (semeq(struc,CHAR)) size = 1;
681  else {
682  _t_free(x);
683  return incompatibleTypeReductionErr;
684  }
685  x->contents.surface = realloc(x->contents.surface,x->contents.size+size);
686  memcpy(x->contents.surface+x->contents.size-1,str,size);
687  x->contents.size+=size;
688  *( (char *)x->contents.surface + x->contents.size -1) = 0;
689  }
690  x->contents.symbol = sy;
691  break;
692  case EXPAND_STR_ID:
693  {
694  T *t = _t_detach_by_idx(code,1);
695  x = makeASCIITree((char *)_t_surface(t));
696  _t_free(t);
697  }
698  break;
699  case RESPOND_ID:
700  {
701  T *signal = _t_parent(context->run_tree);
702  if (!signal || !semeq(_t_symbol(signal),SIGNAL))
703  return notInSignalContextReductionError;
704  T *t = _t_detach_by_idx(code,1);
705  if (!semeq(CARRIER,_t_symbol(t))) raise_error("expected CARRIER got %s",_t2s(sem,t));
706  Symbol carrier = *(Symbol*)_t_surface(t);
707  _t_free(t);
708  T *response_contents = _t_detach_by_idx(code,1);
709  T *head = _t_getv(signal,SignalMessageIdx,MessageHeadIdx,TREE_PATH_TERMINATOR);
710 
711  ReceptorAddress to = __r_get_addr(_t_child(head,HeadFromIdx)); // from and to reverse in response
712  ReceptorAddress from = __r_get_addr(_t_child(head,HeadToIdx));
713  Aspect a = *(Aspect *)_t_surface(_t_child(head,HeadAspectIdx));
714  T *su = _t_getv(signal,SignalEnvelopeIdx,EnvelopeSignalUUIDIdx,TREE_PATH_TERMINATOR);
715  UUIDt uuid = *(UUIDt *)_t_surface(su);
716 
717  T *response = __r_make_signal(from,to,a,carrier,response_contents,&uuid,0,context->conversation ? context->conversation->cid : NULL);
718  x = _r_send(q->r,response);
719  }
720  break;
721  case QUOTE_ID:
723  // Note that QUOTE seems to be the same as NOOP?
724  x = _t_detach_by_idx(code,1);
725  break;
726  case REQUEST_ID:
727  case SAY_ID:
728  {
729 
730  T *t = _t_detach_by_idx(code,1);
731  ReceptorAddress to = __r_get_addr(t);
732  _t_free(t);
733 
734  t = _t_detach_by_idx(code,1);
735  if (!semeq(_t_symbol(t),ASPECT_IDENT)) {
736  raise_error("expected ASPECT_IDENT!");
737  }
738  Aspect aspect = *(Aspect *)_t_surface(t);
739  _t_free(t);
740 
741  t = _t_detach_by_idx(code,1);
742  Symbol carrier = *(Symbol*)_t_surface(t);
743  _t_free(t);
744 
745  T* signal_contents = _t_detach_by_idx(code,1);
746 
747  ReceptorAddress from = __r_get_self_address(q->r);
748  T *signal;
749 
750  if (s.id == SAY_ID) {
751  signal = __r_make_signal(from,to,aspect,carrier,signal_contents,0,0,context->conversation ? context->conversation->cid : NULL);
752  x = _r_send(q->r,signal);
753  }
754  else if (s.id == REQUEST_ID) {
755  T *response_point = NULL;
756 
757  t = _t_detach_by_idx(code,1);
758  Symbol response_carrier = *(Symbol*)_t_surface(t);
759  _t_free(t);
760 
761  int kids = _t_children(code);
762  T *until = NULL;
763  if (kids > 2) {
764  return(tooManyParamsReductionErr);
765  }
766  T *callback = NULL;
767  while(kids--) {
768  t = _t_detach_by_idx(code,1);
769  if (semeq(_t_symbol(t),END_CONDITIONS)) {
770  until = t;
771  }
772  else callback = t;
773  }
774  if (!until) until = defaultRequestUntil();
775  if (!callback) {
776  err = Block;
777  debug(D_SIGNALS,"blocking at %s\n",_td(q->r,code));
778  response_point = code;
779  }
780  else {
781  raise_error("request callback not implemented for %s",t2s(callback));
782  }
783  T *cid = context->conversation ? context->conversation->cid : NULL;
784  signal = __r_make_signal(from,to,aspect,carrier,signal_contents,0,until,cid);
785 
786  x = _r_request(q->r,signal,response_carrier,response_point,context->id,cid);
787  }
788  }
789  break;
790  case THIS_SCOPE_ID:
791  // @todo make this return an error that would invoke the error handler
792  if (!context->conversation)
793  raise_error("whoa THIS_SCOPE executed outside CONVERSE!");
794  x = _t_rclone(context->conversation->cid);
795  break;
796  case CONTINUE_ID:
797  {
798  T *at = _t_detach_by_idx(code,1);
799  T *with = _t_detach_by_idx(_t_child(code,1),1);
800  int *path = (int *)_t_surface(at);
801  // @todo validate that the path is on ok path to unwind to...
802  T *c = _t_get(context->run_tree,path);
803  if (!c) raise_error("continue at point invalid");
804  __p_unwind_to_point(context,c,with);
805  return(noErr);
806  }
807  break;
808  case COMPLETE_ID:
809  {
810  T *with = _t_detach_by_idx(code,1);
811  T *cid = _t_detach_by_idx(code,1);
812 
813  // @todo think about if this would cause other arbitrary weirdness, like what about any pending wakeups (from requests) that point to spots in the code that's getting unwound.
814  if (!cid) {
815  // in the case no conversation id parameter then we need to look in
816  // we get it from the context
817  // @todo make this return an error that would invoke the error handler
818  if (!context->conversation)
819  raise_error("COMPLETE invoked without conversation id outside of CONVERSE");
820  cid = context->conversation->cid;
821  UUIDt *cuuid = __cid_getUUID(cid);
822  T *w = __r_cleanup_conversation(q->r,cuuid);
823  if (w) _t_free(w);
824 
825  // now move the execution point up to the CONVERSE root.
826  // @todo maybe only use the wakeup_ref (like below) and don't store converse_pointer?
827  T *c = context->conversation->converse_pointer;
828  __p_unwind_to_point(context,c,with);
829  return(noErr);
830  }
831  else {
832  // if the conversation param was specified we need to get it from
833 
834  UUIDt *cuuid = __cid_getUUID(cid);
835  T *w = __r_cleanup_conversation(q->r,cuuid);
836  // restart the CONVERSE instruction that spawned this conversation
837  if (w) {
838  int *code_path = (int *)_t_surface(_t_child(w,WakeupReferenceCodePathIdx));
839  int process_id = *(int *)_t_surface(_t_child(w,WakeupReferenceProcessIdentIdx));
840  debug(D_LOCK,"complete LOCK\n");
841  pthread_mutex_lock(&q->mutex);
842  Qe *e = __p_find_context(q->blocked,process_id);
843  if (e) {
844  if (with) {
845  T *c = _t_get(e->context->run_tree,code_path);
846  if (!c) raise_error("failed to find code path when completing converse!");
847  __p_unwind_to_point(e->context,c,with);
848  }
849  debug(D_SIGNALS,"unblocking CONVERSE\n");
850  __p_unblock(q,e,noReductionErr);
851  }
852  else if (with) _t_free(with);
853  pthread_mutex_unlock(&q->mutex);
854  debug(D_LOCK,"complete UNLOCK\n");
855  }
856  x = cid;
857  }
858  }
859  break;
860  case TRANSCODE_ID:
861  {
862  T *params = _t_detach_by_idx(code,1);
863  if (!params) return signatureMismatchReductionErr;
864  T *to = _t_detach_by_idx(params,1);
865  if (!to) return signatureMismatchReductionErr;
866  _t_free(params);
867 
868  Symbol to_sym = *(Symbol *)_t_surface(to);
869  _t_free(to);
870  Structure to_s = _sem_get_symbol_structure(sem,to_sym);
871 
872  T *items = _t_child(code,1);
873  if (!items) return signatureMismatchReductionErr;
874  T *t = __t_newr(0,PARAMS,true); //holder for the transcoding children
875  T *src;
876  while ((src = _t_detach_by_idx(items,1))) {
877  int e = _p_transcode(sem,src,to_sym,to_s,&x);
878  if (e != noReductionErr) {
879  if (e != redoReduction) return e;
880  else err = e;
881  }
882  _t_add(t,x);
883  }
884  if (_t_children(t) == 1) {
885  x = _t_detach_by_idx(t,1);
886  _t_free(t);
887  }
888  else {
889  x = __t_newr(0,DISSOLVE,true);
890  _t_add(x,t);
891  err = redoReduction;
892  }
893  }
894  break;
895  case DISSOLVE_ID:
896  // dissolve can't be the root process!
897  if (!_t_parent(code)) return structureMismatchReductionErr;
898 
899  // if the param has children, then they are to be inserted into the
900  // parent's children at this instruction's spot.
901  x = _t_detach_by_idx(code,1);
902  dissolve = _t_children(x) > 0;
903  break;
904  case MATCH_ID:
905  {
906  T *pattern = _t_detach_by_idx(code,1);
907  T *t = _t_detach_by_idx(code,1);
908  bool matchr = false;
909  if (_t_children(code)) {
910  T *t = _t_detach_by_idx(code,1);
911  matchr = *(int*)_t_surface(t);
912  _t_free(t);
913  }
914  T *results;
915  bool match;
916  if (matchr) {
917  match =_t_matchr(pattern,t,&results);
918  if (match) {
919  x = _t_rclone(results);
920  _t_free(results);
921  }
922 
923  else return incompatibleTypeReductionErr; //x = __t_newi(0,BOOLEAN,0,true);
924  }
925  else {
926  match = _t_match(pattern,t);
927  x = __t_newi(0,BOOLEAN,match,true);
928  }
929  _t_free(pattern);
930  _t_free(t);
931  }
932  break;
933  case FILL_ID:
934  {
935  x = _t_detach_by_idx(code,1);
936  T *sem_map = _t_detach_by_idx(code,1);
937  _t_fill_template(x,sem_map);
938  _t_free(sem_map);
939  }
940  break;
941  case FILL_FROM_MATCH_ID:
942  match_results = _t_child(code,2);
943  match_tree = _t_child(code,3);
944  x = _t_detach_by_idx(code,1);
946  _p_fill_from_match(sem,x,match_results,match_tree);
947  break;
948  case RAISE_ID:
949  return raiseReductionErr;
950  break;
951  case STREAM_READ_ID:
952  {
953  // get the stream param
954  T *s = _t_child(code,1);
955  Stream *st = _t_surface(s);
956 
957  //@todo possible another parameter to specify if we should read lines, or specific number of bytes
958  st->callback = 0;
959  if (st->flags & StreamHasData) {
960  _t_detach_by_idx(code,1);
961  _t_free(s);
962  if (st->err) return(st->err);
963  // get the result type to use as the symbol type for the ascii data
964  s = _t_detach_by_idx(code,1);
965  sy = _t_symbol(s);
966  if (semeq(RESULT_SYMBOL,sy)) {
967  sy = *(Symbol *)_t_surface(s);
968  _t_free(s);
969  size_t l = _st_data_size(st);
970  char *c = _st_data(st);
971 
972  Structure to_s = _sem_get_symbol_structure(sem,sy);
973  if (semeq(to_s,CSTRING)) {
974  debug(D_STREAM,"creating CSTRING: %s '%.*s'\n",_sem_get_name(sem,sy),(int)l,c);
975  // @todo fix this to be a flag instruction to __t_new
976  // currently it only works because that value is the newline in the
977  // read buffer.
978  _st_data(st)[l] = 0;
979  x = __t_new(0,sy,c,l+1,1);
980  }
981  else {
982  debug(D_STREAM,"non CSTRING RESULT_SYMBOL so converting to ASCII_CHARS and transcoding to %s \n",_sem_get_name(sem,sy));
983  T *src = __t_newr(0,ASCII_CHARS,true);
984  while (l--) {
985  __t_newc(src,ASCII_CHAR,*c,true);
986  c++;
987  }
988  int e = _p_transcode(sem,src,sy,to_s,&x);
989  if (e != noReductionErr) {
990  if (e != redoReduction) return e;
991  else err = e;
992  }
993  }
994  _st_data_consumed(st);
995  }
996  else {raise_error("expecting RESULT_SYMBOL");}
997  }
998  else if (_st_is_alive(st)) {
999  st->callback = processUnblocker;
1000  st->callback_arg1 = q;
1001  st->callback_arg2 = q->active->id;
1002 
1003  // start up the thread to read the data,
1004  _st_start_read(st);
1005 
1006  // and block this context
1007  return(Block);
1008  }
1009  else {
1010  return deadStreamReadReductionErr;
1011  }
1012  }
1013  break;
1014  case STREAM_WRITE_ID:
1015  {
1016  // get the stream param
1017  T *s = _t_detach_by_idx(code,1);
1018  Stream *st = _t_surface(s);
1019  _t_free(s);
1020  // get the data to write as string
1021  while ((s = _t_detach_by_idx(code,1))) {
1022  int err = _t_write(sem,s,st);
1023  _t_free(s);
1024  if (err == 0) return unixErrnoReductionErr;
1025  }
1027  x = __t_news(0,REDUCTION_ERROR_SYMBOL,NULL_SYMBOL,1);
1028  }
1029  break;
1030  case STREAM_ALIVE_ID:
1031  {
1032  // get the stream param
1033  T *s = _t_detach_by_idx(code,1);
1034  Stream *st = _t_surface(s);
1035  x = __t_newi(0,BOOLEAN,_st_is_alive(st),true);
1036  _t_free(s);
1037  }
1038  break;
1039  case STREAM_CLOSE_ID:
1040  {
1041  // get the stream param
1042  T *s = _t_detach_by_idx(code,1);
1043  Stream *st = _t_surface(s);
1044  _st_kill(st);
1045  void *status;
1046  int rc;
1047 
1048  rc = pthread_join(st->pthread, &status);
1049  if (rc) {
1050  raise_error("ERROR; return code from pthread_join() is %d\n", rc);
1051  }
1052  _st_free(st);
1053  x = __t_newi(0,BOOLEAN,1,true);
1054  _t_free(s);
1055  }
1056  break;
1057  case ITERATE_ID:
1058  // iterate is a special case, we have to check the phase to see what to do
1059  // after the children have been evaluated.
1060  {
1062  bool done = false;
1063  int next_phase;
1064  // get the condition or body results into x
1065  x = _t_detach_by_idx(code,2);
1066  switch(state->phase) {
1067  case EvalCondition: {
1068  // if this is the first time evaluating the cond, figure out what type
1069  // of iteration we are doing based on the semantics
1070  if (state->type == IterateTypeUnknown) {
1071  Symbol c = _t_symbol(x);
1072  if (semeq(c,BOOLEAN)) {
1073  state->type = IterateTypeCond;
1074  state->count = 0;
1075  }
1076  else if (semeq(c,ITERATE_ON_SYMBOL)) {
1077  state->type = IterateTypeOnSymbol;
1078  T *params = _t_child(code,1);
1079  T *list = _t_newr(params,ITERATION_DATA);
1080  _a_get_instances(&q->r->instances,*(Symbol *)_t_surface(x),list);
1081  // if the list has no children the we are already done
1082  done = !_t_children(list);
1083  }
1084  else {
1085  Structure s = _sem_get_symbol_structure(sem,c);
1086  if (semeq(s,INTEGER)) {
1087  state->type = IterateTypeCount;
1088  state->count = *(int *)_t_surface(x);
1089  }
1090  else {
1091  raise_error("unable to determine iteration type! symbol was:%s",_sem_get_name(sem,s));
1092  }
1093  }
1094  }
1095  // evaluate condition based on iteration type
1096  switch(state->type) {
1097  case IterateTypeCond:
1098  done = !*(int *)_t_surface(x);
1099  break;
1100  case IterateTypeCount:
1101  done = (--state->count < 0);
1102  break;
1103  // IterateTypeOnSymbol handled above
1104  }
1105  next_phase = EvalBody;
1106  break;
1107  }
1108  case EvalBody:
1109  // if this is cond iteration we have to alternate between phases because
1110  // the cond has to be re-cloned and evaluated. For other iteration types
1111  // the conditionality not internal so we evaluate it directly
1112  // @todo we should refactor this code because the done checking also appear
1113  // above in the first IterateTypeUnknown case.
1114  next_phase = (state->type == IterateTypeCond) ? EvalCondition : EvalBody;
1115  if (state->type == IterateTypeCount){
1116  if (--state->count < 0) done = true;
1117  }
1118  else {
1119  // we aren't doing count iteration, use the count var just
1120  // to keep track of how many times we've gone through the loop
1121  state->count++;
1122  if (state->type == IterateTypeOnSymbol) {
1123  T *params = _t_child(code,1);
1124  int p = _t_children(params);
1125  T *list = _t_child(params,p); // iterate list should be last child
1126  T *t = _t_detach_by_idx(list,1);
1127  _t_free(t);
1128  if (!_t_children(list)) done = true;
1129  }
1130  //if (state->count > 9) done = true; // temporary infinite loop breaker
1131  }
1132  }
1133  if (done) {
1134  // we are done so free up the iteration state info
1136  _t_free(state->code);
1137  free(state);
1138  code->contents.size = 0;
1139  }
1140  else {
1141  _t_free(x);
1142  // add a copy of the body/condition on as the last child
1143  _t_add(code,_t_rclone(_t_child(state->code,next_phase == EvalBody ? 3 : 2)));
1144  // and reset the current child count so it gets evaluated.
1145  set_rt_cur_child(q->r,code,1); // reset the current child count on the code
1146  state->phase = next_phase;
1147  return Eval;
1148  }
1149 
1150  }
1151 
1152  break;
1153  case LISTEN_ID:
1154  {
1155  t = _t_detach_by_idx(code,1);
1156  if (!t || !semeq(_t_symbol(t),ASPECT_IDENT)) {
1157  raise_error("expected ASPECT_IDENT!");
1158  }
1159  Aspect aspect = *(Aspect *)_t_surface(t);
1160  _t_free(t);
1161 
1162  T *on = _t_detach_by_idx(code,1);
1163  Symbol carrier = *(Symbol *)_t_surface(on);
1164  _t_free(on);
1165  T *match = _t_detach_by_idx(code,1);
1166  T *with = NULL;
1167  T *until = NULL;
1168  T *act = NULL;
1169  int kids = _t_children(code);
1170  if (kids > 3) {
1171  return(tooManyParamsReductionErr);
1172  }
1173  while(kids--) {
1174  T *t = _t_detach_by_idx(code,1);
1175  Symbol sym = _t_symbol(t);
1176  if (semeq(sym,END_CONDITIONS)) {
1177  until = t;
1178  }
1179  else if (semeq(sym,ACTION)) {
1180  act = t;
1181  }
1182  else if (semeq(sym,PARAMS)) {
1183  with = t;
1184  }
1185  }
1186  if (!with) {
1187  with = _t_new_root(PARAMS);
1188  T *s = _t_newr(with,SLOT);
1189  _t_news(s,USAGE,NULL_SYMBOL);
1190  }
1191 
1192  T *cid = context && context->conversation ? _t_clone(context->conversation->cid) : NULL;
1193  // @todo add SEMANTIC_MAP into LISTEN
1194  if (act) {
1195  _r_add_expectation(q->r,aspect,carrier,match,act,with,until,NULL,cid);
1196  x = __t_news(0,REDUCTION_ERROR_SYMBOL,NULL_SYMBOL,1);
1197  debug(D_LISTEN,"adding expectation\n");
1198  }
1199  else {
1200  act = __p_build_wakeup_info(code,context->id);
1201  if (!until) {
1202  until = _t_new_root(END_CONDITIONS);
1203  _t_newi(until,COUNT,1);
1204  }
1205  _r_add_expectation(q->r,aspect,carrier,match,act,with,until,NULL,cid);
1206  debug(D_LISTEN,"adding expectation and blocking at %d,%s\n",context->id,_td(q->r,code));
1207  return Block;
1208  }
1209  }
1210  break;
1211  case INITIATE_PROTOCOL_ID:
1212  {
1213  T *protocol = _t_detach_by_idx(code,1);
1214  T *interaction = _t_detach_by_idx(code,1);
1215  T *bindings = _t_detach_by_idx(code,1);
1216  T *sem_map;
1217  x = __o_initiate(q->r,*(SemanticID *)_t_surface(protocol),*(SemanticID *)_t_surface(interaction),bindings,&sem_map);
1218  _t_free(protocol);
1219  _t_free(interaction);
1220  err = redoReduction;
1221  }
1222  break;
1223  case SELF_ADDR_ID:
1224  {
1225  x = _t_detach_by_idx(code,1);
1226  Symbol sym = *(Symbol *)_t_surface(x);
1227  _t_free(x);
1228  ReceptorAddress addr = __r_get_self_address(q->r);
1229  x = ___r_make_addr(0,sym,addr,true);
1230  }
1231  break;
1232  case GET_LABEL_ID:
1233  {
1234  x = _t_detach_by_idx(code,1);
1235  Symbol sym = *(Symbol *)_t_surface(x);
1236  _t_free(x);
1237  x = _t_detach_by_idx(code,1);
1238  Symbol type = *(Symbol *)_t_surface(x);
1239  _t_free(x);
1240  x = _t_detach_by_idx(code,1);
1241  Symbol as;
1242  if (x) {
1243  as = *(Symbol *)_t_surface(x);
1244  _t_free(x);
1245  }
1246  else as = type;
1247  T *l = _sem_get_label(sem,sym,type);
1248  if (!l) raise_error("label not found for symbol");
1249  x = __t_new_str(0,as,_t_surface(l),true);
1250  }
1251  break;
1252  case MAGIC_ID:
1253  {switch(*(int *)_t_surface(code)) {
1254  case MagicReceptors:
1255  if (G_vm) {
1256  char *s = malloc(10000);
1257  int i;
1258  int l = 0;
1259  for (i=0;i<G_vm->receptor_count;i++) {
1260  Receptor *r = G_vm->routing_table[i].r;
1261  if (r) {
1262  char *n = _sem_get_name(r->sem,G_vm->routing_table[i].s);
1263  if (!n) n= "??";
1264  int nl = strlen(n);
1265  memcpy(&s[l],n,nl);
1266  l+= nl;
1267  sprintf(&s[l],":%d ",r->addr.addr);
1268  l += strlen(&s[l]);
1269  }
1270  }
1271  s[l]=0;
1272  x = __t_new_str(0,LINE,s,1);
1273  free(s);
1274  }
1275  break;
1276  case MagicDebug:
1277  if (!debugging(D_SIGNALS)) {
1278  debug_enable(D_SIGNALS);
1279  x = __t_new_str(0,LINE,"debugging enabled",1);
1280  }
1281  else {
1282  debug_disable(D_SIGNALS);
1283  x = __t_new_str(0,LINE,"debugging disabled",1);
1284  }
1285  break;
1286  case MagicQuit:
1287  if (G_vm) {
1288  __r_kill(G_vm->r);
1289  }
1290  default:
1291  x = __t_new_str(0,TEST_STR_SYMBOL,"blorp!",1);
1292  }
1293  }
1294  break;
1295  default:
1296  raise_error("unknown sys-process id: %d",s.id);
1297  }
1298 
1299  if (debugging(D_STEP)) {
1300  if (err==redoReduction) debug(D_STEP," redoing reduction\n");
1301  }
1302 
1303  if (!dissolve) {
1304  // in the normal case we just replace code with the value of x, so:
1305  // any remaining children of 'code' are the parameters which have all now been "used up"
1306  // so we can call the low-level __t_free the clean them up and then replace the contents of
1307  // the 'code' node with the contents of the 'x' node that was either detached or produced
1308  // by the the process that just ran
1309  __t_free(code);
1310  code->structure.child_count = x->structure.child_count;
1311  code->structure.children = x->structure.children;
1312  code->contents = x->contents;
1313  code->context = x->context;
1314  // we do have to fixe the parent value of all the children
1315  DO_KIDS(code,_t_child(code,i)->structure.parent = code);
1316  free(x);
1317  debug(D_STEP," to %s\n",_t2s(sem,code));
1318  }
1319  else {
1320  // in the dissolve case we have to insert x's children into the spot where code was, so:
1321  // first build a path
1322  int path[2] = {0,TREE_PATH_TERMINATOR};
1323  path[0] = _t_node_index(code);
1324  T *parent = _t_parent(code);
1325  // then free the code node
1326  _t_detach_by_ptr(parent,code);
1327  _t_free(code);
1328  // then loop through x's children inserting them in the parent
1329  T *c;
1330  while ((c = _t_detach_by_idx(x,1))) {
1331  _t_insert_at(parent,path,c);
1332  path[0]++;
1333  }
1334  _t_free(x); // and free the decapitated root!
1335  debug(D_STEP," dissolving to %s\n",_t2s(sem,parent));
1336 
1337  //@todo, I think this might cause those children to be evaluated twice which may be a mistake...
1338  }
1339  return err;
1340 }
1341 
1345 R *__p_make_context(T *run_tree,R *caller,int process_id,T *sem_map) {
1346  R *context = malloc(sizeof(R));
1347  context->id = process_id;
1348  context->state = Eval;
1349  context->err = 0;
1350  context->run_tree = run_tree;
1351  // start with the node_pointer at the first child of the run_tree
1352  context->node_pointer = _t_child(run_tree,1);
1353  context->parent = run_tree;
1354  context->idx = 1;
1355  context->caller = caller;
1356  context->sem_map = sem_map;
1357  // copy in the callers conversation context too.
1358  context->conversation = caller ? caller->conversation : NULL;
1359  if (caller) caller->callee = context;
1360  return context;
1361 }
1362 
1363 #ifdef CEPTR_DEBUG
1364 void pq(Qe *qe) {
1365  while(qe) {
1366  printf("%p(<-%p) -> ",qe,qe->prev);
1367  qe = qe->next;
1368  }
1369  printf("NULL\n");
1370 }
1371 #endif
1372 
1373 // unlink the queue element from the list rejoining the
1374 // previous with the next
1375 #define __p_dequeue(list,qe) \
1376  if (qe->next) {qe->next->prev = qe->prev;} \
1377  if (!qe->prev) { \
1378  list = qe->next; \
1379  } \
1380  else { \
1381  qe->prev->next = qe->next; \
1382  }
1383 
1384 
1385 // add the queue element onto the head of the list
1386 #define __p_enqueue(list,qe) { \
1387  Qe *d = list; \
1388  qe->next = d; \
1389  if (d) d->prev = qe; \
1390  list = qe; \
1391  }
1392 // add the queue element onto the tail of the list
1393 #define __p_append(list,qe) { \
1394  qe->next = NULL; \
1395  if (!list) {list=qe;} \
1396  else { \
1397  Qe *d = list; \
1398  while(d->next) { \
1399  d = d->next; \
1400  } \
1401  qe->prev = d; \
1402  d->next = qe; \
1403  } \
1404  }
1405 
1406 void _p_enqueue(Qe **listP,Qe *e) {
1407  __p_enqueue(*listP,e);
1408 }
1409 
1410 Qe *__p_find_context(Qe *e,int process_id) {
1411  while (e && e->id != process_id) e = e->next;
1412  return e;
1413 }
1414 
1415 // low level unblock. Should be called only when q mutex is locked
1416 void __p_unblock(Q *q,Qe *e,Error err) {
1417  __p_dequeue(q->blocked,e);
1418  __p_enqueue(q->active,e);
1419  q->contexts_count++;
1420  e->context->state = err ? err : Eval;
1421 }
1422 
1426 Error _p_unblock(Q *q,int id) {
1427  // find the context in the queue
1428  int err = 0;
1429  debug(D_LOCK,"unblock LOCK\n");
1430  pthread_mutex_lock(&q->mutex);
1431  Qe *e = __p_find_context(q->blocked,id);
1432  if (e) {
1433  __p_unblock(q,e,noReductionErr);
1434  }
1435  else {
1436  // if the process has been completed then return err 2 otherwise 1
1437  e = __p_find_context(q->completed,id);
1438  if (e) err = 2;
1439  else {
1440  e = __p_find_context(q->active,id);
1441  err = e ? 1 : 3;
1442  }
1443  }
1444  pthread_mutex_unlock(&q->mutex);
1445  debug(D_LOCK,"unblock UNLOCK\n");
1446  return err;
1447 }
1448 
1449 
1458 void _p_wakeup(Q *q,T *wakeup, T *with,Error err) {
1459  int process_id = *(int *)_t_surface(_t_child(wakeup,WakeupReferenceProcessIdentIdx));
1460  //int *code_path = (int *)_t_surface(_t_child(wakeup,WakeupReferenceCodePathIdx));
1461 
1462  debug(D_LOCK,"wakeup LOCK\n");
1463  pthread_mutex_lock(&q->mutex);
1464  Qe *e = __p_find_context(q->blocked,process_id);
1465  if (e) {
1466  // code_path is something I thought I needed to restart execution at the right place
1467  // I currently think that was a mistake, because a blocked process should really only
1468  // be blocked at ONE place, wherever the node_pointer is.
1469  /* T *t = _t_get(e->context->run_tree,code_path); */
1470  /* if (!t) raise_error("failed to find code path when waking up expectation!"); */
1471  /* if (t != e->context->node_pointer) */
1472  /* raise_error("wakeup ref and node_pointer don't match"); */
1473  if (with) {
1474  if (!(with->context.flags & TFLAG_RUN_NODE)) {
1475  T *w = _t_rclone(with);
1476  _t_free(with);
1477  with = w;
1478  }
1479  T *t = e->context->node_pointer;
1480  T *p = _t_parent(t);
1481  _t_replace(p,_t_node_index(t), with);
1482  e->context->node_pointer = with;
1483  }
1484  __p_unblock(q,e,err);
1485  }
1486  else { if (with) _t_free(with);}
1487  pthread_mutex_unlock(&q->mutex);
1488  debug(D_LOCK,"wakeup UNLOCK\n");
1489 }
1490 
1506 Error _p_reduce(SemTable *sem,T *rt) {
1507  T *run_tree = rt;
1508  R *context = __p_make_context(run_tree,0,0,NULL);
1509  Error e;
1510 
1511  // build a fake Receptor and Q on the stack so _p_step will work
1512  Receptor r;
1513  Q q;
1514  r.root = NULL;
1515  r.sem = sem;
1516  r.q = &q;
1517  q.r = &r;
1518 
1519  while(_p_step(&q, &context) != Done);
1520  e = context->err;
1521  free(context);
1522  return e;
1523 }
1524 
1525 T * __p_buildErr(R *context) {
1526  Symbol se;
1527  T *extra = NULL;
1528  //@todo: fix this so we don't actually use an error value that
1529  // then has to be translated into a symbol, but rather so that we
1530  // can programatically calculate the symbol.
1531  // or, perhaps, the Error type should actually be the REDUCTION_ERROR
1532  // symbol that we generate when we want to return an error during the
1533  // reduction process (rather than here). That way we actually have the
1534  // symbol whether or not a we have an error handler for context.
1535 
1536  switch(context->state) {
1537  case tooFewParamsReductionErr: se=TOO_FEW_PARAMS_ERR;break;
1538  case tooManyParamsReductionErr: se=TOO_MANY_PARAMS_ERR;break;
1539  case signatureMismatchReductionErr: se=SIGNATURE_MISMATCH_ERR;break;
1540  case notProcessReductionError: se=NOT_A_PROCESS_ERR;break;
1541  case notInSignalContextReductionError: se=NOT_IN_SIGNAL_CONTEXT_ERR;
1542  case divideByZeroReductionErr: se=ZERO_DIVIDE_ERR;break;
1543  case incompatibleTypeReductionErr: se=INCOMPATIBLE_TYPE_ERR;break;
1544  case deadStreamReadReductionErr: se=DEAD_STREAM_READ_ERR;break;
1545  case missingSemanticMapReductionErr: se=MISSING_SEMANTIC_MAP_ERR;break;
1546  case mismatchSemanticMapReductionErr: se=MISMATCH_SEMANTIC_MAP_ERR;break;
1547  case structureMismatchReductionErr: se=STRUCTURE_MISMATCH_ERR;break;
1548  // case conversatonCompletedReductionErr: se=CONVERSATION_COMPLETED_ERR;break;
1549  case unixErrnoReductionErr:
1550  se=UNIX_ERRNO_ERR;
1551  extra = _t_new_str(0,TEST_STR_SYMBOL,strerror(errno));
1552  break;
1553  case raiseReductionErr:
1554  se = *(Symbol *)_t_surface(_t_child(context->node_pointer,1));
1555  break;
1556  default: raise_error("unknown reduction error: %d",context->err);
1557  }
1558  T *err = __t_newr(0,se,true);
1559  int *path = _t_get_path(context->node_pointer);
1560  _t_new(err,ERROR_LOCATION,path,sizeof(int)*(_t_path_depth(path)+1));
1561  free(path);
1562  if (extra) {
1563  _t_add(err,extra);
1564  }
1565  return err;
1566 }
1567 
1578 Error _p_step(Q *q, R **contextP) {
1579  R *context = *contextP;
1580  SemTable *sem = q->r->sem;
1581 
1582  switch(context->state) {
1583  case noReductionErr:
1584  case Block:
1585  raise_error("whoa, virtual states can't be executed!"); // shouldn't be calling step if Done or noErr or Block
1586  break;
1587  case Pop:
1588  // if this was the successful reduction by an error handler
1589  // move the value to the 1st child
1590  if (context->err) {
1591  T *t = _t_detach_by_idx(context->run_tree,RunTreeErrorCodeIdx);
1592  if (t) {
1593  _t_replace(context->run_tree,1,t);
1594 
1595  context->err = noReductionErr;
1596  }
1597  }
1598 
1599  // if this is top caller on the stack then we are completely done
1600  if (!context->caller) {
1601  if (debugging(D_REDUCE) && (context->err)) {
1602  // T *err = __p_buildErr(context);
1603  debug(D_REDUCE,"finishing reduction with unhandled error: %d\n",context->err);
1604  // _t_free(err);
1605  }
1606  context->state = Done;
1607  break;
1608  }
1609  else {
1610  // otherwise pop the context
1611  R *ctx = context;
1612  context = context->caller; // set the new context
1613 
1614  if (!ctx->err) {
1615  // get results of the run_tree
1616  T *np = _t_detach_by_idx(ctx->run_tree,1);
1617  _t_replace(context->parent,context->idx,np); // replace the process call node with the result
1618  set_rt_cur_child(q->r,np,RUN_TREE_EVALUATED);
1619  context->node_pointer = np;
1620  context->state = Eval; // or possible ascend??
1621  }
1622  else context->state = ctx->err;
1623  // cleanup
1624  _t_free(ctx->run_tree);
1625  free(ctx);
1626  context->callee = 0;
1627  *contextP = context;
1628  }
1629 
1630  break;
1631  case Eval:
1632  {
1633  T *np = context->node_pointer;
1634  if (!np) {
1635  raise_error("Whoa! Null node pointer");
1636  }
1637  Process s = _t_symbol(np);
1638 
1639  if (semeq(s,PARAMETER)) {
1640  T *ref = _t_child(_t_child(np,ParameterReferenceIdx),1);
1641  Symbol ref_sym = _t_symbol(ref);
1642  T *param;
1643  if (semeq(ref_sym,PARAM_PATH)) {
1644  param = _t_get(context->run_tree,(int *)_t_surface(ref));
1645  }
1646  else if (semeq(ref_sym,PARAM_LABEL)) {
1647  raise_error("reference by PARAM_LABEL not implemented");
1648  }
1649  else raise_error("unknown reference type %s",_sem_get_name(sem,ref_sym));
1650 
1651  T *result = _t_child(_t_child(np,ParameterResultIdx),1);
1652  Symbol result_sym = _t_symbol(result);
1653  if (semeq(result_sym,RESULT_VALUE)) {
1654  np = _t_rclone(param);
1655  }
1656  else if (semeq(result_sym,RESULT_SYMBOL)) {
1657  np = __t_news(0,*(Symbol *)_t_surface(result),_t_symbol(param),true);
1658  }
1659  else if (semeq(result_sym,RESULT_LABEL)) {
1660  Symbol label_sym = *(Symbol *)_t_surface(result);
1661  Symbol param_sym = _t_symbol(param);
1662  T *l = _sem_get_label(sem,param_sym,label_sym);
1663  if (!l) raise_error("label not found for param symbol");
1664  np = _t_rclone(l);
1665  }
1666  else raise_error("unknown result type %s",_sem_get_name(sem,result_sym));
1667 
1668  context->node_pointer = np;
1669  _t_replace(context->parent, context->idx,np);
1670  s = _t_symbol(np);
1671  }
1672  else if (semeq(s,PARAM_REF)) {
1673  int *path = (int *)_t_surface(np);
1674  T *param = _t_get(context->run_tree,path);
1675  if (!param) {
1676  char buf[255];
1677  _t_sprint_path(path,buf);
1678  raise_error("request for non-existent param %s in %s",buf,_t2s(sem,context->run_tree));
1679  }
1680  context->node_pointer = np = _t_rclone(param);
1681  _t_replace(context->parent, context->idx,np);
1682  s = _t_symbol(np);
1683  }
1684  else if (semeq(s,SIGNAL_REF)) {
1685  T *sig = _t_parent(context->run_tree);
1686  if (!sig) {
1687  raise_error("not in signal context!");
1688  // return context->state = context->err = notInSignalContextReductionError;
1689  }
1690  T *param = _t_get(sig,(int *)_t_surface(np));
1691  if (!param) {
1692  char buf[1000];
1693 
1694  raise_error("request for non-existent signal portion. signal was: %s \n path was:%s\n",t2s(sig),_t_sprint_path((int *)_t_surface(np),buf));
1695  }
1696  context->node_pointer = np = _t_rclone(param);
1697  _t_replace(context->parent, context->idx,np);
1698  s = _t_symbol(np);
1699  }
1701 
1702  int count = _t_children(np);
1703  if (!is_process(s)) {
1704 
1705  // if this node is not a process, i.e. it's data, then either we
1706  // are done descending and the current items will be the result so ascend
1707  // or if we are doing deep param_ref searching, then search the entire tree
1708  // @todo increase efficiency by adding some instruction to allow the coder choose, see #39
1709 #ifndef RUN_TREE_SHALLOW_PARAM_REF_SEARCH
1710  int node_cur_child = get_rt_cur_child(q->r,np);
1711  if ((node_cur_child != RUN_TREE_EVALUATED) && count && (count != node_cur_child))
1712  context->state = Descend;
1713  else
1714 #endif
1715  context->state = Ascend;
1716  } else {
1717  if (semeq(s,ITERATE)) {
1718  // if first time we are hitting this iteration
1719  // then we need to set up the state data to track the iteration
1720  if (_t_size(np) == 0) {
1721  // sanity check
1722  if (_t_children(np) != 3) {raise_error("ITERATE must have 3 params");}
1723  // create a copy of the code and stick it in the iteration state struct
1724  IterationState *state = malloc(sizeof(IterationState));
1725  state->phase = EvalCondition;
1726  state->code = _t_rclone(np);
1727  state->type = IterateTypeUnknown;
1728  *((IterationState **)&np->contents.surface) = state;
1729  np->contents.size = sizeof(IterationState *);
1730 
1731  // we start in condition phase so throw away the code copy
1732  T *x = _t_detach_by_idx(np,3);
1733  _t_free(x);
1734  }
1735  }
1736  else if (semeq(s,COND)) {
1737  // if first time we are hitting the cond
1738  // the we need to set up the state data to track flow control
1739  if (_t_size(np) == 0) {
1740  CondState *state = malloc(sizeof(CondState));
1741  // remove the conditions and store them in state
1742  T *c = state->conditions = _t_detach_by_idx(np,1);
1743  c = _t_child(c,1);
1744  // we add the first child of the COND_PAIR or the COND_ELSE
1745  // to the code for reduction and set the phase appropriately
1746  _t_add(np,_t_detach_by_idx(c,1));
1747  if (semeq(_t_symbol(c),COND_PAIR)) {
1748  state->phase = EvalCondCondtions;
1749  }
1750  else {
1751  state->phase = EvalCondResult;
1752  }
1753  *((CondState **)&np->contents.surface) = state;
1754  np->contents.size = sizeof(CondState *);
1755  }
1756  }
1757  else if (semeq(s,CONVERSE)) {
1758  // if first time we are hitting the CONVERSE instruction
1759  // in the tree (i.e. on the way down) we need to register
1760  // the conversation IDs and make the tree
1761  if (_t_size(np) == 0) {
1762  UUIDt cuuid = __uuid_gen();
1763  T *until,*wait = NULL; //@todo wait set but not used... what was I doing here?
1764  //@todo get these value semantically i.e _t_get_siganture_child(np,"until");
1765  until =_t_child(np,2);
1766  if (until) {
1767  if (semeq(_t_symbol(until),BOOLEAN)) {
1768  wait = until;
1769  until = NULL;
1770  }
1771  else wait = _t_child(np,3);
1772  }
1773 
1774  UUIDt *parent_u;
1775  if (context->conversation) {
1776  parent_u = __cid_getUUID(context->conversation->cid);
1777  }
1778  else {
1779  parent_u = NULL;
1780  }
1781 
1782  T *c = _r_add_conversation(q->r,parent_u,&cuuid,until?_t_clone(until):NULL,
1783  __p_build_wakeup_info(np,context->id)
1784  );
1785 
1786  ConversationState *state = malloc(sizeof(ConversationState));
1787  state->converse_pointer = np; // save the node pointer for later COMPLETEs
1788  state->cid = _t_child(c,ConversationIdentIdx);
1789  *((ConversationState **)&np->contents.surface) = state;
1790  np->contents.size = sizeof(ConversationState *);
1791  np->context.flags |= TFLAG_ALLOCATED;
1792 
1793  // register the conversation with the context linking an existing conversation
1794  // to the new one if it exists
1795  state->next = context->conversation;
1796  context->conversation = state;
1797  }
1798  }
1799  if (count == get_rt_cur_child(q->r,np) || semeq(s,QUOTE)) {
1800  // if the current child == the child count this means
1801  // all the children have been processed, so we can evaluate this process
1802  // if the process is QUOTE that's a special case and we evaluate it
1803  // immediately without descending.
1804  if (!is_sys_process(s)) {
1805  debug(D_STEP,"Stepping into %s\n",_sem_get_name(sem,s));
1806  // if it's user defined process then we check the signature and then make
1807  // a new run-tree run that process
1808 
1809  Error e = __p_check_signature(sem,s,np,context->sem_map);
1810  if (e) {
1811  context->state = e;
1812  }
1813  else {
1814  T *run_tree = _p_make_run_tree(sem,s,np,context->sem_map);
1815  context->state = Pushed;
1816  // @todo for now we just are just passing the semantic map from one
1817  // context to the next, but I'm pretty sure we're going to need a way
1818  // for folks to modify this on the fly as processes are called
1819  *contextP = __p_make_context(run_tree,context,context->id,context->sem_map);
1820  debug(D_REDUCE,"New context for %s: %s\n\n",_sem_get_name(sem,s),_t2s(sem,run_tree));
1821  }
1822  }
1823  else {
1824  // if it's a sys process we can just reduce it in and then ascend
1825  // or move to the error handling state
1826 
1827  //Error e = __p_check_signature(sem,s,np,context->sem_map);
1828  //if (e) raise_error("SIG FAILURE on %s\n",_t2s(sem,np));
1829 
1830  Error e = __p_reduce_sys_proc(context,s,np,q);
1831  if (e == redoReduction) {
1832  // reset the node_pointer
1833  np = context->node_pointer = _t_child(context->parent,context->idx);
1834  // there are two reasons to redoReduction, one because the call to reduce_sys_proc
1835  // added more code to the runtree that just still needs to be reduced
1836  // or because it added a new run-tree, which needs to be treated as a function
1837  // call and thus adding a new context
1838  if (semeq(RUN_TREE,_t_symbol(np))) {
1839  context->state = Pushed;
1840  // swap out the RUN_TREE for a dummy proc
1841  // @todo really the error returned by reduce_sys_proc should be a struct
1842  // with the RUN_TREE in it so we don't have store it in the actual tree
1843  int i = _t_node_index(np);
1844  T *p = _t_parent(np);
1845  T *dummy = __t_newr(0,NOOP,true);
1846  p->structure.children[i-1] = dummy;
1847  dummy->structure.parent = p;
1848  np->structure.parent = NULL;
1849  *contextP = __p_make_context(np,context,context->id,context->sem_map);
1850  debug(D_REDUCE,"Redoing with a new context for: %s\n\n",_t2s(sem,np));
1851  }
1852  else {
1853  context->state = Eval;
1854  set_rt_cur_child(q->r,np,RUN_TREE_NOT_EVAULATED); // reset the current child count on the code
1855  }
1856  }
1857  else context->state = e ? e : Ascend;
1858  }
1859 
1860  }
1861  else if(count) {
1862  //descend and increment the current child we're working on!
1863  context->state = Descend;
1864  }
1865  else {
1866  raise_error("whoa! brain fart! on %d,%s",count,_t2s(sem,np));
1867  }
1868  }
1869  }
1870  break;
1871  case Ascend:
1872  set_rt_cur_child(q->r,context->node_pointer,RUN_TREE_EVALUATED);
1873  context->node_pointer = context->parent;
1874  context->parent = _t_parent(context->node_pointer);
1875  if (!context->parent || context->parent == context->run_tree || (context->node_pointer == context->run_tree)) {
1876  context->idx = 1;
1877  }
1878  else {
1879  context->idx = get_rt_cur_child(q->r,context->parent);
1880  }
1881  if (context->node_pointer == context->run_tree)
1882  context->state = Pop;
1883  else
1884  context->state = Eval;
1885  break;
1886  case Descend:
1887  context->parent = context->node_pointer;
1888  rt_check(q->r,context->node_pointer);
1889  context->idx = ++rt_cur_child(context->node_pointer);
1890  context->node_pointer = _t_child(context->node_pointer,context->idx);
1891  context->state = Eval;
1892  break;
1893  default:
1894  context->err = context->state;
1895  if (_t_children(context->run_tree) <= 2) {
1896  // no error handler so just return the error
1897  if (debugging(D_STEP)) {
1898  T *err = __p_buildErr(context);
1899  debug(D_STEP,"Reduction Err (no handler): %s\n",_t2s(sem,err));
1900  debug(D_STEP," node_pointer @ err: %s\n",_t2s(sem,context->node_pointer));
1901  _t_free(err);
1902  }
1903  context->state = Pop;
1904  }
1905  else {
1906  // the first parameter to the error code is always a reduction error
1907  // which gets added on as the 4th child of the run tree when the
1908  // error happens.
1909  T *ps = _t_newr(context->run_tree,PARAMS);
1910 
1911  T *err = __p_buildErr(context);
1912  debug(D_STEP,"In Error Handler with %s\n",_t2s(sem,err));
1913  debug(D_STEP," node_pointer @ err: %s\n",_t2s(sem,context->node_pointer));
1914  _t_add(ps,err);
1915 
1916  // switch the node_pointer to the top of the error handling routine
1917  context->node_pointer = _t_child(context->run_tree,RunTreeErrorCodeIdx);
1918  context->idx = RunTreeErrorCodeIdx;
1919  context->parent = context->run_tree;
1920 
1921  context->state = Eval;
1922  }
1923  }
1924  return context->state;
1925 }
1926 
1927 
1931 T* __p_build_wakeup_info(T *code_point,int process_id) {
1932  T *wakeup = _t_new_root(WAKEUP_REFERENCE);
1933  _t_newi(wakeup,PROCESS_IDENT,process_id);
1934  int *path = _t_get_path(code_point);
1935  _t_new(wakeup,CODE_PATH,path,sizeof(int)*(_t_path_depth(path)+1));
1936  free(path);
1937  return wakeup;
1938 }
1939 
1943 T *__p_build_run_tree_va(T* code,int num_params,va_list params) {
1944  T *t = _t_new_root(RUN_TREE);
1945  T *c = _t_rclone(code);
1946  _t_add(t,c);
1947  T *ps = _t_newr(t,PARAMS);
1948  int i;
1949  for(i=1;i<=num_params;i++) {
1950  _t_add(ps,_t_clone(va_arg(params,T *)));
1951  }
1952  return t;
1953 }
1954 
1955 T *__p_build_run_tree(T* code,int num_params,...) {
1956  va_list params;
1957  va_start(params,num_params);
1958  T *t = __p_build_run_tree_va(code,num_params,params);
1959  va_end(params);
1960  return t;
1961 }
1962 
1974 T *_p_make_run_tree(SemTable *sem,Process p,T *params,T *sem_map) {
1975  if (!is_process(p)) {
1976  raise_error("not a Process!");
1977  }
1978  T *processes = _sem_get_defs(sem,p);
1979  T *code_def = _d_get_process_code(processes,p);
1980  T *t = _t_new_root(RUN_TREE);
1981  T *ps;
1982 
1983  T *code = _t_child(code_def,ProcessDefCodeIdx);
1984 
1985  // if this is a system process the code will be NULL_PROCESS so
1986  // we'll just add the params right onto the process node
1987  // and leave the run tree params empty
1988  if (semeq(_t_symbol(code),NULL_PROCESS)) {
1989  ps = __t_new(t,p,0,0,true);
1990  _t_newr(t,PARAMS);
1991  }
1992  else {
1993  // otherwise we clone the code of the process
1994  T *c = _t_rclone(code);
1995  _t_add(t,c);
1996  ps = _t_newr(t,PARAMS);
1997  }
1998  int i,num_params = _t_children(params);
1999  for(i=1;i<=num_params;i++) {
2000  _t_add(ps,_t_detach_by_idx(params,1));
2001  }
2002 
2003  if (sem_map) {
2004  __t_fill_template(t,sem_map,true);
2005  }
2006  return t;
2007 }
2008 
2016  Q *q = malloc(sizeof(Q));
2017  q->r = r;
2018  q->contexts_count = 0;
2019  q->active = NULL;
2020  q->completed = NULL;
2021  q->blocked = NULL;
2022  pthread_mutex_init(&(q->mutex), NULL);
2023  return q;
2024 }
2025 
2026 // clean up a context including its run-trees
2027 void _p_free_context(R *c) {
2028  while(c) {
2029  // free any run_trees that are roots, i.e. assume
2030  // that a tree in a context that's part of another tree
2031  // will get freed elsewhere.
2032  if (!_t_parent(c->run_tree))
2033  _t_free(c->run_tree);
2034  R *n = c->caller;
2035  free(c);
2036  c = n;
2037  }
2038 }
2039 
2040 // clean up a queue element
2041 void _p_free_elements(Qe *e) {
2042  while(e) {
2043  _p_free_context(e->context);
2044  Qe *n = e->next;
2045  free(e);
2046  e = n;
2047  }
2048 }
2049 
2055 void _p_freeq(Q *q) {
2056  _p_free_elements(q->active);
2057  _p_free_elements(q->completed);
2058  _p_free_elements(q->blocked);
2059  free(q);
2060 }
2061 
2062 int G_next_process_id = 0;
2068 Qe *__p_addrt2q(Q *q,T *run_tree,T *sem_map) {
2069  Qe *n = malloc(sizeof(Qe));
2070  n->id = ++G_next_process_id;
2071  n->prev = NULL;
2072  n->context = __p_make_context(run_tree,0,n->id,sem_map);
2073  n->accounts.elapsed_time = 0;
2074  debug(D_LOCK,"addrt2q LOCK\n");
2075  pthread_mutex_lock(&q->mutex);
2076  __p_append(q->active,n);
2077  q->contexts_count++;
2078  pthread_mutex_unlock(&q->mutex);
2079  debug(D_LOCK,"addrt2q UNLOCK\n");
2080 
2081  return n;
2082 }
2083 
2089 void *_p_reduceq_thread(void *arg){
2090  int err;
2091  err = _p_reduceq((Q *)arg);
2092  pthread_exit(NULL);
2093 }
2094 
2095 #ifdef CEPTR_DEBUG
2096 void debug_np(int type,T *np) {
2097  int *path = _t_get_path(np);
2098  char pp[255];
2099  _t_sprint_path(path,pp);
2100  debug(type,"Node Pointer:%s\n",pp);
2101  free(path);
2102 }
2103 #endif
2104 
2105 
2106 #ifdef CEPTR_DEBUG
2107 char *sn[]={"Done","Ascend","Descend","Pushed","Pop","Eval","Block"};
2108 char *__debug_state_str(R *context) {
2109  int s = context->state;
2110 
2111  char *result;
2112  if (s <= 0) result = sn[-s];
2113  else {
2114  T *e = __p_buildErr(context);
2115  result = t2s(e);
2116  _t_free(e);
2117  }
2118  return result;
2119 }
2120 #endif
2121 
2126 Error _p_reduceq(Q *q) {
2127  debug(D_REDUCE+D_REDUCEV,"Starting reduce:\n");
2128 
2129  Qe *qe = q->active;
2130  Error next_state;
2131  struct timespec start, end;
2132 
2133  while (q->contexts_count) {
2134 #ifdef CEPTR_DEBUG
2135  if (debugging(D_REDUCEV)) {
2136  R *context = qe->context;
2137  char *s = __debug_state_str(context);
2138  debug(D_REDUCEV,"ID:%d -- State %s(%d)\n",qe->id,s,context->state);
2139  debug(D_REDUCEV," idx:%d\n",context->idx);
2140  debug(D_REDUCEV,"%s\n",_t2s(q->r->sem,context->run_tree));
2141  if (context) {
2142  if (context->node_pointer == 0) {
2143  debug(D_REDUCEV,"Node Pointer: NULL!\n");
2144  }
2145  else {
2146  debug(D_REDUCEV,"rt_cur_child:%d\n",rt_cur_child(context->node_pointer));
2147  debug_np(D_REDUCEV,context->node_pointer);
2148  }
2149  }
2150  }
2151  int prev_state;
2152  if (debugging(D_REDUCEV+D_REDUCE)) {
2153  prev_state = qe->context->state;
2154  }
2155 #endif
2156 
2157  clock_gettime(CLOCK_MONOTONIC, &start);
2158  next_state = _p_step(q, &qe->context); // next state is set in directly in the context
2159  clock_gettime(CLOCK_MONOTONIC, &end);
2160  qe->accounts.elapsed_time += diff_micro(&start, &end);
2161 
2162 #ifdef CEPTR_DEBUG
2163  debug(D_REDUCEV,"result state:%s\n\n",__debug_state_str(qe->context));
2164  if (debugging(D_REDUCE) && prev_state == Eval) {
2165  debug_np(D_REDUCE,qe->context->node_pointer);
2166  debug(D_REDUCE,"Eval: %s\n\n",_t2s(q->r->sem,qe->context->run_tree));
2167  }
2168 #endif
2169  debug(D_LOCK,"reduce LOCK\n");
2170  pthread_mutex_lock(&q->mutex);
2171  Qe *next = qe->next;
2172  if (next_state == Done) {
2173  // remove from the round-robin
2174  __p_dequeue(q->active,qe);
2175 
2176  debug(D_REDUCEV,"Just completed:%d\n",qe->id);
2177 
2178  // add to the completed list
2179  __p_enqueue(q->completed,qe);
2180  q->contexts_count--;
2181  }
2182  else if (next_state == Block) {
2183  // remove from the round-robin
2184  __p_dequeue(q->active,qe);
2185 
2186  // add to the blocked list
2187  __p_enqueue(q->blocked,qe);
2188  q->contexts_count--;
2189  }
2190  qe = next ? next : q->active; // next in round robin or wrap back to first
2191  pthread_mutex_unlock(&q->mutex);
2192  debug(D_LOCK,"reduce UNLOCK\n");
2193  };
2194 
2196  // one process ended ok, but one did not. What's the error? Probably
2197  // the errors here would be at a different level, and the caller would be
2198  // expected to inspect the errors of the reduced processes.
2199  debug(D_REDUCE+D_REDUCEV,"Ending reduce\n");
2200  return 0;
2201 }
2202 
2208 void _p_cleanup(Q *q) {
2209  debug(D_LOCK,"cleanup LOCK\n");
2210  pthread_mutex_lock(&q->mutex);
2211  Qe *e = q->completed;
2212  while (e) {
2213  T *ett = _t_child(_t_child(q->r->root,ReceptorInstanceStateIdx),ReceptorElapsedTimeIdx);
2214  int *et = (int *)_t_surface(ett);
2215  (*et) += e->accounts.elapsed_time;
2216  e = e->next;
2217  }
2218  _p_free_elements(q->completed);
2219  q->completed = NULL;
2220  pthread_mutex_unlock(&q->mutex);
2221  debug(D_LOCK,"cleanup UNLOCK\n");
2222 }
2223 
2235 T *__p_make_form(Symbol sym,char *output_label,Symbol output_type,SemanticID output_sem,...){
2236  va_list params;
2237  va_start(params,output_sem);
2238  char *label;
2239  Symbol type,value;
2240  int optional;
2241  T *signature = _t_new_root(sym);
2242  T *o = _t_newr(signature,OUTPUT_SIGNATURE);
2243  T *l = _t_newr(o,SIGNATURE_LABEL);
2244  _t_new_str(l,ENGLISH_LABEL,output_label);
2245  if (semeq(output_type,SIGNATURE_PASSTHRU)) {
2246  _t_newr(o,output_type);
2247  }
2248  else {
2249  _t_news(o,output_type,output_sem);
2250  }
2251  while ((label = va_arg(params,char*))) {
2252  type = va_arg(params,Symbol);
2253  if (semeq(type,SIGNATURE_OPTIONAL)) {
2254  optional = 1;
2255  type = va_arg(params,Symbol);
2256  }
2257  else {
2258  optional = 0;
2259  }
2260  value = va_arg(params,Symbol);
2261  T *i = _t_newr(signature,INPUT_SIGNATURE);
2262  l = _t_newr(i,SIGNATURE_LABEL);
2263  _t_new_str(l,ENGLISH_LABEL,label);
2264  _t_news(i,type,value);
2265  if (optional) _t_newr(i,SIGNATURE_OPTIONAL);
2266  }
2267  va_end(params);
2268  return signature;
2269 }
2270 
T * __p_make_form(Symbol sym, char *output_label, Symbol output_type, SemanticID output_sem,...)
Definition: process.c:2235
T * _t_new_root(Symbol symbol)
Definition: tree.c:160
Receptor * r
back-pointer to receptor in which this Q is running (for defs and more)
Definition: ceptr_types.h:207
char * _sem_get_name(SemTable *sem, SemanticID s)
Definition: semtable.c:85
Definition: ceptr_types.h:114
T * parent
node_pointer's parent (cached here for efficiency)
Definition: ceptr_types.h:174
Error _p_step(Q *q, R **contextP)
Definition: process.c:1578
Definition: stream.h:30
Definition: ceptr_types.h:206
T * __t_newi(T *parent, Symbol symbol, int surface, bool is_run_node)
Definition: tree.c:97
T * _t_get(T *t, int *p)
Definition: tree.c:1441
int _t_write(SemTable *sem, T *t, Stream *stream)
Definition: tree.c:2066
bool __t_fill_template(T *template, T *sem_map, bool as_run_node)
Definition: tree.c:1049
T * _t_detach_by_idx(T *t, int i)
Definition: tree.c:278
header file for symbol and structure definition functions
void _p_freeq(Q *q)
Definition: process.c:2055
int _t_path_depth(int *p)
Definition: tree.c:1365
int _t_node_index(T *t)
Definition: tree.c:1284
T * converse_pointer
pointer to the CONVERSE instruction in the run tree
Definition: ceptr_types.h:160
R * caller
a pointer to the context that invoked this run-tree/context
Definition: ceptr_types.h:176
Semantic tree regular expression header file.
T * sem_map
semantic map in effect for this context
Definition: ceptr_types.h:178
void _p_wakeup(Q *q, T *wakeup, T *with, Error err)
Definition: process.c:1458
void * _p_reduceq_thread(void *arg)
Definition: process.c:2089
void _st_data_consumed(Stream *st)
Definition: stream.c:470
TreeHash _t_hash(SemTable *sem, T *t)
Definition: tree.c:1614
int * _t_get_path(T *t)
Definition: tree.c:1384
T * __p_build_wakeup_info(T *code_point, int process_id)
Definition: process.c:1931
T * _t_clone(T *t)
Definition: tree.c:589
Error _p_reduceq(Q *q)
Definition: process.c:2126
int state
process state machine state
Definition: ceptr_types.h:171
T * __t_newr(T *parent, Symbol symbol, bool is_run_node)
Definition: tree.c:171
T * __t_new(T *parent, Symbol symbol, void *surface, size_t size, bool is_run_node)
Definition: tree.c:59
Symbol _t_symbol(T *t)
Definition: tree.c:1228
T * _t_child(T *t, int i)
Definition: tree.c:1251
protocol helpers header file
T * makeASCIITree(char *c)
Definition: semtrex.c:1206
int err
process error value
Definition: ceptr_types.h:170
void _t_insert_at(T *t, int *path, T *i)
Definition: tree.c:438
T * cid
pointer to CONVERSATION_IDENT in receptors CONVERSATIONS tree
Definition: ceptr_types.h:161
Context context
the context this receptor's definition creates
Definition: ceptr_types.h:240
T * _sem_get_label(SemTable *sem, SemanticID s, Symbol label_type)
Definition: semtable.c:124
ReceptorAddress addr
the address by which to get messages to this receptor instance
Definition: ceptr_types.h:241
Error __p_check_signature(SemTable *sem, Process p, T *code, T *sem_map)
Definition: process.c:291
Instances instances
the instances store
Definition: ceptr_types.h:249
void _st_kill(Stream *st)
Definition: stream.c:481
receptor implementation header file
void _r_add_expectation(Receptor *r, Aspect aspect, Symbol carrier, T *pattern, T *action, T *with, T *until, T *using, T *cid)
Adds an expectation to a receptor's aspect.
Definition: receptor.c:164
Q * q
process queue
Definition: ceptr_types.h:250
int _t_match(T *semtrex, T *t)
Definition: semtrex.c:809
void * _t_surface(T *t)
Definition: tree.c:1215
T * __t_new_str(T *parent, Symbol symbol, char *surface, bool is_run_node)
Definition: tree.c:150
Error _p_unblock(Q *q, int id)
Definition: process.c:1426
void _st_start_read(Stream *st)
Definition: stream.c:455
T * __p_build_run_tree_va(T *code, int num_params, va_list params)
Definition: process.c:1943
Qe * active
active processes
Definition: ceptr_types.h:209
T * _r_send(Receptor *r, T *signal)
Definition: receptor.c:556
int id
the process id this context exists in
Definition: ceptr_types.h:169
Error __p_reduce_sys_proc(R *context, Symbol s, T *code, Q *q)
Definition: process.c:395
SState * state(StateType type, int *statesP, int level)
Definition: semtrex.c:103
R * callee
a pointer to the context we've invoked
Definition: ceptr_types.h:177
int contexts_count
number of active processes
Definition: ceptr_types.h:208
T * _t_parent(T *t)
Definition: tree.c:1262
void _st_free(Stream *st)
Definition: stream.c:507
processing header files
int _t_matchr(T *semtrex, T *t, T **rP)
Definition: semtrex.c:798
Qe * __p_addrt2q(Q *q, T *run_tree, T *sem_map)
Definition: process.c:2068
T * node_pointer
pointer to the tree node to execute next
Definition: ceptr_types.h:173
T * __r_make_signal(ReceptorAddress from, ReceptorAddress to, Aspect aspect, Symbol carrier, T *signal_contents, UUIDt *in_response_to, T *until, T *cid)
Definition: receptor.c:515
T * root
RECEPTOR_INSTANCE semantic tree.
Definition: ceptr_types.h:238
void _p_fill_from_match(SemTable *sem, T *t, T *match_results, T *match_tree)
Definition: process.c:76
Qe * blocked
blocked processes
Definition: ceptr_types.h:211
SemTable * sem
pointer back to the genotype table for this receptor's vmhost instance
Definition: ceptr_types.h:242
Error _p_reduce(SemTable *sem, T *rt)
Definition: process.c:1506
T * _r_get_instance(Receptor *r, Xaddr x)
Definition: receptor.c:379
Q * _p_newq(Receptor *r)
Definition: process.c:2015
Receptor * r
Receptor data for this vm host.
Definition: vmhost.h:43
ConversationState * conversation
record of the conversation state active in this context frame
Definition: ceptr_types.h:179
SemanticID __d_define_receptor(SemTable *sem, T *def, Context c)
Definition: def.c:478
int idx
node pointers child index (cached here for efficiency)
Definition: ceptr_types.h:175
Qe * completed
completed processes (pending cleanup)
Definition: ceptr_types.h:210
T * run_tree
pointer to the root of the run_tree
Definition: ceptr_types.h:172
void _t_add(T *t, T *c)
Definition: tree.c:261
R * __p_make_context(T *run_tree, R *caller, int process_id, T *sem_map)
Definition: process.c:1345
void _p_cleanup(Q *q)
Definition: process.c:2208
T * _stx_results2sem_map(SemTable *sem, T *match_results, T *match_tree)
Definition: semtrex.c:2029
Definition: ceptr_types.h:168
int _t_children(T *t)
Definition: tree.c:1205
header file for the accumulator
void _t_replace(T *t, int i, T *r)
Definition: tree.c:372
void _t_detach_by_ptr(T *t, T *c)
Definition: tree.c:291
void _t_free(T *t)
Definition: tree.c:526
T * __o_initiate(Receptor *r, SemanticID protocol, SemanticID interaction, T *bindings, T **sem_mapP)
Definition: protocol.c:473
T * _r_delete_instance(Receptor *r, Xaddr x)
Definition: receptor.c:407
T * __t_news(T *parent, Symbol symbol, SemanticID surface, bool is_run_node)
Definition: tree.c:121
T * _r_request(Receptor *r, T *signal, Symbol response_carrier, T *code_point, int process_id, T *cid)
Definition: receptor.c:575
T * _t_getv(T *t,...)
Definition: tree.c:1470
size_t _t_size(T *t)
Definition: tree.c:1238
T * _p_make_run_tree(SemTable *sem, Process p, T *params, T *sem_map)
Definition: process.c:1974
T * __t_newc(T *parent, Symbol symbol, char surface, bool is_run_node)
Definition: tree.c:85
Xaddr _r_new_instance(Receptor *r, T *t)
Definition: receptor.c:365
char * _t_sprint_path(int *fp, char *buf)
Definition: tree.c:1508