ceptr
 All Data Structures Files Functions Variables Typedefs Macros Modules Pages
vmhost.c
Go to the documentation of this file.
1 
11 #include "vmhost.h"
12 #include "tree.h"
13 #include "accumulator.h"
14 #include "debug.h"
15 /****************** create and destroy virtual machine */
16 
17 
18 /* set up the c structures for a vmhost*/
19 VMHost *__v_init(Receptor *r,SemTable *sem) {
20  VMHost *v = malloc(sizeof(VMHost));
21  v->r = r;
22  v->active_receptor_count = 0;
23  v->receptor_count = 0;
24  v->installed_receptors = _s_new(RECEPTOR_IDENTIFIER,RECEPTOR_SURFACE);
25  v->vm_thread.state = 0;
26  v->clock_thread.state = 0;
27  v->sem = sem;
28  return v;
29 }
30 
42  SemTable *sem = _sem_new();
43 
44  base_contexts(sem);
45  base_defs(sem);
46 
47  Receptor *r = _r_new(sem,SYS_RECEPTOR);
48  VMHost *v = __v_init(r,sem);
49 
50  load_contexts(sem);
51 
52  r = _r_new(sem,COMPOSITORY);
53  _v_new_receptor(v,v->r,COMPOSITORY,r);
54 
55  r = _r_new(sem,DEV_COMPOSITORY);
56  _v_new_receptor(v,v->r,DEV_COMPOSITORY,r);
57 
58  r = _r_new(sem,TEST_RECEPTOR);
59  _v_new_receptor(v,v->r,TEST_RECEPTOR,r);
60 
61  _r_defineClockReceptor(sem);
62 
63  return v;
64 }
65 
71 void _v_free(VMHost *v) {
72  _r_free(v->r);
73  _s_free(v->installed_receptors);
74  _t_free(_t_root(v->sem->stores[0].definitions));
75  _sem_free(v->sem);
76  free(v);
77 }
78 
91  Xaddr x;
92  raise_error("not implemented");
93  // x = _r_new_instance(v->c,p);
94  return x;
95 }
96 
109 Xaddr _v_install_r(VMHost *v,Xaddr package,T *bindings,char *label) {
110  raise_error("not implemented");
111  T *p;// = _r_get_instance(v->c,package);
112  T *id = _t_child(p,2);
113  TreeHash h = _t_hash(v->r->sem,id);
114 
115  // make sure we aren't re-installing an already installed receptor
116  Xaddr x = _s_get(v->installed_receptors,h);
117  if (!(is_null_xaddr(x))) return G_null_xaddr;
118  _s_add(v->installed_receptors,h,package);
119 
120  // confirm that the bindings match the manifest
122  if (bindings) {
123  T *m = _t_child(p,1);
124  int c = _t_children(m);
125  if (c%2) {raise_error("manifest must have even number of children!");}
126  int i;
127  for(i=1;i<=c;i++) {
128  T *mp = _t_child(m,i);
129  T *s = _t_child(mp,2);
130  T *bp = _t_child(bindings,i);
131  if (!bp) {
132  raise_error("missing binding for %s",(char *)_t_surface(_t_child(mp,1)));
133  }
134  T *vb = _t_child(bp,2);
135  Symbol spec = *(Symbol *)_t_surface(s);
136  if (semeq(_t_symbol(vb),spec)) {
137  T *symbols = _t_child(p,3);
138  raise_error("bindings symbol %s doesn't match spec %s",_sem_get_name(v->r->sem,_t_symbol(vb)),_sem_get_name(v->r->sem,spec));
139  }
140  }
141  }
142 
143  Symbol s = _r_define_symbol(v->r,RECEPTOR,label);
144 
145  raise_error("fix semtable");
146  Receptor *r = _r_new_receptor_from_package(NULL,s,p,bindings);
147  return _v_new_receptor(v,v->r,s,r);
148 }
149 
150 Xaddr _v_new_receptor(VMHost *v,Receptor *parent,Symbol s, Receptor *r) {
151  T *t = _t_new_receptor(NULL,s,r);
152  if (v->receptor_count+1 >= MAX_RECEPTORS) {
153  raise_error("too many receptors");
154  }
155  int c = v->receptor_count++;
156  v->routing_table[c].r=r;
157  v->routing_table[c].s=s;
158  r->addr.addr = c;
159 
160  //@todo what ever else is needed at the vmhost level to add the receptor's
161  // process queue to the process tables etc...
162  return _r_new_instance(parent,t);
163 }
164 
172 void _v_activate(VMHost *v, Xaddr x) {
173  if (v->active_receptor_count+1 >= MAX_ACTIVE_RECEPTORS) {
174  raise_error("too many active receptors");
175  }
176  T *t = _r_get_instance(v->r,x);
177  Receptor *r = __r_get_receptor(t);
178  int c = v->active_receptor_count++;
179  v->active_receptors[c].r=r;
180  v->active_receptors[c].x=x;
181 
182  // handle special cases
183  if (semeq(x.symbol,CLOCK_RECEPTOR)) {
184  _v_start_thread(&v->clock_thread,___clock_thread,r);
185  }
186 }
187 
194 void _v_send(VMHost *v,ReceptorAddress from,ReceptorAddress to,Aspect aspect,Symbol carrier,T *contents) {
195  T *s = __r_make_signal(from,to,aspect,carrier,contents,0,0,0);
196  T *x = _r_send(v->r,s);
197  _t_free(x);
198 }
199 
203 void _v_send_signals(VMHost *v,T *signals) {
204  while(_t_children(signals)>0) {
205  T *s = _t_detach_by_idx(signals,1);
206  T *r = _r_send(v->r,s);
207  _t_free(r); //@todo WHAT???? throwing away the rsult??
208  }
209 }
210 
212 // we just loop through all instances searching for a match
213 /* Xaddr __v_get_receptor_xaddr(Instances *instances,Receptor *r) { */
214 /* instances_elem *e = 0; */
215 /* SemanticID sid = {r->parent,SEM_TYPE_RECEPTOR,r->context}; */
216 /* Xaddr result = {sid,0}; */
217 /* HASH_FIND_INT( *instances, &sid, e ); */
218 /* if (e) { */
219 /* Instance *iP = &e->instances; */
220 
221 /* instance_elem *curi,*tmpi; */
222 /* HASH_ITER(hh, *iP, curi, tmpi) { */
223 /* if (__r_get_receptor(curi->instance) == r) { */
224 /* result.addr = curi->addr; */
225 /* return result; */
226 /* } */
227 /* } */
228 /* } */
229 /* return result; */
230 /* } */
231 
235 void _v_deliver_signals(VMHost *v, Receptor *sender) {
236  T *signals = sender->pending_signals;
237 
238  while(_t_children(signals)>0) {
239  T *s = _t_detach_by_idx(signals,1);
240  T *head = _t_getv(s,SignalMessageIdx,MessageHeadIdx,TREE_PATH_TERMINATOR);
241 
242  ReceptorAddress *toP = (ReceptorAddress *)_t_surface(_t_child(_t_child(head,HeadToIdx),1));
243  ReceptorAddress *fromP = (ReceptorAddress *)_t_surface(_t_child(_t_child(head,HeadFromIdx),1));
244 
245  // if the from or to address is "self" (-1) we find the senders self
246  // fix the values in the signal we are about to deliver.
247 
248  if (fromP->addr == SELF_RECEPTOR_ADDR) {
249  *fromP = __r_get_self_address(sender);
250  }
251 
252  Receptor *r;
253  if (toP->addr == SELF_RECEPTOR_ADDR) {
254  *toP = __r_get_self_address(sender);
255  }
256  else {
257  if (toP->addr >= v->receptor_count) {
258  raise_error("to address: %d doesn't exist!",toP->addr);
259  }
260  r = v->routing_table[toP->addr].r;
261  }
262 
263  Error err = _r_deliver(r,s);
264  if (err) {
265  raise_error("delivery error: %d",err);
266  }
267  }
268 }
269 
273 void *__v_process(void *arg) {
274  VMHost *v = (VMHost *) arg;
275  int c,i;
276 
277  while(v->r->state == Alive) {
278  // make sure everybody's doing the right thing...
279  // reallocate threads as necessary...
280  // do edge-receptor type stuff..
281  // what ever other watchdoggy type things are necessary...
282  // printf ("something\n");
283  // sleep(1);
284 
285  // for now we will check all receptors for any active contexts and
286  // we will reduce them here. Really this should be a thread pool manager
287  // where we put allocate receptor's queues for processing according to
288  // priority/etc...
289 
290  for (i=0;v->r->state == Alive && i<v->active_receptor_count;i++) {
291  Receptor *r = v->active_receptors[i].r;
292  if (r->q && r->q->contexts_count > 0) {
293  _p_reduceq(r->q);
294  }
295  // send any signals generated by the reduction
296  _v_deliver_signals(v,r);
297 
298  // cleanup any fully reduced run-trees
299  if (r->q->completed) _p_cleanup(r->q);
300  }
301  }
302 
303  // close down all receptors
304  for (i=0;i<v->active_receptor_count;i++) {
305  Receptor *r = v->active_receptors[i].r;
306  __r_kill(r);
307  // if other receptors have threads associated with them, the possibly we should
308  // be doing a thread_join here, or maybe even inside __r_kill @fixme
309  }
310 
311  int err =0;
312  pthread_exit(&err); //@todo determine if we should use pthread_exit or just return 0
313  return 0;
314 }
315 
316 // fire up the threads that make the vmhost work
317 void _v_start_vmhost(VMHost *v) {
318  _v_start_thread(&v->vm_thread,__v_process,v);
319 }
320 
325  Receptor *r = _r_makeClockReceptor(v->sem);
326  Xaddr clock = _v_new_receptor(v,v->r,CLOCK_RECEPTOR,r);
327  _v_activate(v,clock);
328 }
329 
330 /****************** thread handling */
331 
332 void _v_start_thread(thread *t,void *(*start_routine)(void*), void *arg) {
333  int rc;
334  if (t->state) {
335  raise_error("attempt to double-start a thread");
336  }
337  rc = pthread_create(&t->pthread,0,start_routine,arg);
338  if (rc){
339  raise_error("Error starting thread; return code from pthread_create() is %d\n", rc);
340  }
341  t->state = 1;
342 }
343 
344 void _v_join_thread(thread *t) {
345  if (t->state) { // make sure the thread was started before trying to join it
346  void *status;
347  int rc;
348 
349  rc = pthread_join(t->pthread, &status);
350  if (rc) {
351  raise_error("ERROR; return code from pthread_join() is %d\n", rc);
352  }
353  t->state = 0;
354  }
355 }
356 
char * _sem_get_name(SemTable *sem, SemanticID s)
Definition: semtable.c:85
Definition: ceptr_types.h:114
Scape * _s_new(Symbol key_source, Symbol data_source)
Definition: scape.c:23
int state
state information about the receptor that the vmhost manages
Definition: ceptr_types.h:251
Xaddr _s_get(Scape *s, TreeHash h)
Definition: scape.c:83
void _v_send(VMHost *v, ReceptorAddress from, ReceptorAddress to, Aspect aspect, Symbol carrier, T *contents)
Definition: vmhost.c:194
T * _t_detach_by_idx(T *t, int i)
Definition: tree.c:278
T * _t_new_receptor(T *parent, Symbol symbol, Receptor *r)
Definition: tree.c:204
Xaddr _v_load_receptor_package(VMHost *v, T *p)
Definition: vmhost.c:90
TreeHash _t_hash(SemTable *sem, T *t)
Definition: tree.c:1614
void _v_instantiate_builtins(VMHost *v)
Definition: vmhost.c:324
void _r_free(Receptor *r)
Definition: receptor.c:186
Error _p_reduceq(Q *q)
Definition: process.c:2126
semantic trees header file
void _s_free(Scape *s)
Definition: scape.c:43
T * _t_root(T *t)
Definition: tree.c:1272
Symbol _t_symbol(T *t)
Definition: tree.c:1228
T * _t_child(T *t, int i)
Definition: tree.c:1251
Xaddr _v_install_r(VMHost *v, Xaddr package, T *bindings, char *label)
Definition: vmhost.c:109
Receptor * __r_get_receptor(T *installed_receptor)
Definition: receptor.c:1039
void * __v_process(void *arg)
Definition: vmhost.c:273
vmhost implementation header file
Receptor * _r_new_receptor_from_package(SemTable *sem, Symbol s, T *p, T *bindings)
Creates a new receptor from a receptor package.
Definition: receptor.c:114
void _v_free(VMHost *v)
Definition: vmhost.c:71
ReceptorAddress addr
the address by which to get messages to this receptor instance
Definition: ceptr_types.h:241
Definition: vmhost.h:20
VMHost * _v_new()
Creates a new virtual machine host.
Definition: vmhost.c:41
Q * q
process queue
Definition: ceptr_types.h:250
void _v_send_signals(VMHost *v, T *signals)
Definition: vmhost.c:203
void * _t_surface(T *t)
Definition: tree.c:1215
void _v_activate(VMHost *v, Xaddr x)
Definition: vmhost.c:172
Receptor * _r_new(SemTable *sem, SemanticID r)
Creates a new receptor.
Definition: receptor.c:88
SemTable * sem
Semantic Table for definitions on this host.
Definition: vmhost.h:44
T * _r_send(Receptor *r, T *signal)
Definition: receptor.c:556
int contexts_count
number of active processes
Definition: ceptr_types.h:208
Definition: vmhost.h:40
T * __r_make_signal(ReceptorAddress from, ReceptorAddress to, Aspect aspect, Symbol carrier, T *signal_contents, UUIDt *in_response_to, T *until, T *cid)
Definition: receptor.c:515
SemTable * sem
pointer back to the genotype table for this receptor's vmhost instance
Definition: ceptr_types.h:242
ActiveReceptor active_receptors[MAX_ACTIVE_RECEPTORS]
pointer to array that holds all currently active receptors
Definition: vmhost.h:45
T * _r_get_instance(Receptor *r, Xaddr x)
Definition: receptor.c:379
void * ___clock_thread(void *arg)
Definition: receptor.c:1250
Receptor * r
Receptor data for this vm host.
Definition: vmhost.h:43
Qe * completed
completed processes (pending cleanup)
Definition: ceptr_types.h:210
Symbol _r_define_symbol(Receptor *r, Structure s, char *label)
Definition: receptor.c:218
void _p_cleanup(Q *q)
Definition: process.c:2208
void _v_deliver_signals(VMHost *v, Receptor *sender)
Definition: vmhost.c:235
Error _r_deliver(Receptor *r, T *signal)
Definition: receptor.c:954
int _t_children(T *t)
Definition: tree.c:1205
header file for the accumulator
void _t_free(T *t)
Definition: tree.c:526
void _s_add(Scape *s, TreeHash h, Xaddr x)
Definition: scape.c:57
T * _t_getv(T *t,...)
Definition: tree.c:1470
Xaddr _r_new_instance(Receptor *r, T *t)
Definition: receptor.c:365