#include "sx.h"
void _sx_process_read(sx_t s, sx_buf_t buf) {
sx_error_t sxe;
nad_t nad;
char *errstring;
int i;
int ns, elem;
if(XML_Parse(s->expat, buf->data, buf->len, 0) == 0) {
if(!s->fail) {
errstring = (char *) XML_ErrorString(XML_GetErrorCode(s->expat));
_sx_gen_error(sxe, SX_ERR_XML_PARSE, "XML parse error", errstring);
_sx_event(s, event_ERROR, (void *) &sxe);
_sx_error(s, stream_err_XML_NOT_WELL_FORMED, errstring);
_sx_close(s);
_sx_buffer_free(buf);
return;
}
_sx_buffer_free(buf);
return;
}
_sx_buffer_free(buf);
if(s->state >= state_STREAM)
while((nad = jqueue_pull(s->rnadq)) != NULL) {
int plugin_error;
#ifdef SX_DEBUG
char *out; int len;
nad_print(nad, 0, &out, &len);
_sx_debug(ZONE, "completed nad: %.*s", len, out);
#endif
if (s->max_stanza_bytes > 0) {
if (len > s->max_stanza_bytes) {
_sx_gen_error(sxe, SX_ERR_STREAM, "Stream error", "Stanza too large");
_sx_event(s, event_ERROR, (void *) &sxe);
nad_free(nad);
return;
}
}
if ((s->max_message_bytes > 0) &&
(NAD_ENAME_L(nad, 0) == 7 && strncmp("message", NAD_ENAME(nad, 0), 7) == 0))
{
for (i = 1; NAD_ENAME_L(nad, i) > 0; i++) {
if((NAD_ENAME_L(nad, i) == 4) &&
(nad->elems[i].iname != NULL) &&
(strncmp("body", NAD_ENAME(nad, i), 4) == 0) &&
(NAD_CDATA_L(nad, i) > s->max_message_bytes)) {
_sx_gen_error(sxe, SX_ERR_STREAM, "Stream error", "Message body too large");
_sx_event(s, event_ERROR, (void *) &sxe);
nad_free(nad);
return;
}
}
}
if(NAD_ENS(nad, 0) >= 0 && NAD_NURI_L(nad, NAD_ENS(nad, 0)) == strlen(uri_STREAMS) && strncmp(NAD_NURI(nad, NAD_ENS(nad, 0)), uri_STREAMS, strlen(uri_STREAMS)) == 0 && NAD_ENAME_L(nad, 0) == 5 && strncmp(NAD_ENAME(nad, 0), "error", 5) == 0) {
errstring = NULL;
if((ns = nad_find_scoped_namespace(nad, uri_STREAM_ERR, NULL)) >= 0)
if((elem = nad_find_elem(nad, 0, ns, "text", 1)) >= 0)
if(NAD_CDATA_L(nad, elem) > 0) {
errstring = (char *) malloc(sizeof(char) * (NAD_CDATA_L(nad, elem) + 1));
sprintf(errstring, "%.*s", NAD_CDATA_L(nad, elem), NAD_CDATA(nad, elem));
}
if (errstring == NULL && NAD_CDATA_L(nad, 0) > 0) {
errstring = (char *) malloc(sizeof(char) * (NAD_CDATA_L(nad, 0) + 1));
sprintf(errstring, "%.*s", NAD_CDATA_L(nad, 0), NAD_CDATA(nad, 0));
}
if(s->state < state_CLOSING) {
_sx_gen_error(sxe, SX_ERR_STREAM, "Stream error", errstring);
_sx_event(s, event_ERROR, (void *) &sxe);
_sx_state(s, state_CLOSING);
}
if(errstring != NULL) free(errstring);
nad_free(nad);
break;
}
if(_sx_chain_nad_read(s, nad) == 0)
return;
plugin_error = 0;
if(s->env != NULL)
for(i = 0; i < s->env->nplugins; i++)
if(s->env->plugins[i]->process != NULL) {
int plugin_ret;
plugin_ret = (s->env->plugins[i]->process)(s, s->env->plugins[i], nad);
if(plugin_ret == 0) {
plugin_error ++;
break;
}
}
if ((plugin_error == 0) && (s->state < state_CLOSING))
_sx_event(s, event_PACKET, (void *) nad);
}
if(s->fail) {
_sx_close(s);
return;
}
if(s->depth < 0 && s->state < state_CLOSING) {
if(s->state >= state_STREAM_SENT) {
jqueue_push(s->wbufq, _sx_buffer_new("</stream:stream>", 16, NULL, NULL), 0);
s->want_write = 1;
}
_sx_state(s, state_CLOSING);
return;
}
}
int sx_can_read(sx_t s) {
sx_buf_t in, out;
int read, ret;
assert((int) s);
if(!s->want_read && s->state < state_CLOSING)
return 0;
_sx_debug(ZONE, "%d ready for reading", s->tag);
in = _sx_buffer_new(NULL, 1024, NULL, NULL);
read = _sx_event(s, event_READ, (void *) in);
if(read < 0) {
_sx_buffer_free(in);
s->want_read = 0;
s->want_write = 0;
return 0;
}
if(read == 0)
_sx_state(s, state_CLOSING);
else {
_sx_debug(ZONE, "passed %d read bytes", in->len);
out = _sx_buffer_new(in->data, in->len, in->notify, in->notify_arg);
ret = _sx_chain_io_read(s, out);
if(ret <= 0) {
if(ret < 0) {
s->want_read = s->want_write = 0;
}
_sx_buffer_free(in);
_sx_buffer_free(out);
if(s->want_write) _sx_event(s, event_WANT_WRITE, NULL);
return s->want_read;
}
_sx_buffer_free(in);
_sx_debug(ZONE, "decoded read data (%d bytes): %.*s", out->len, out->len, out->data);
_sx_process_read(s, out);
}
if(s->want_write == 0 && s->state == state_CLOSING) {
_sx_state(s, state_CLOSED);
_sx_event(s, event_CLOSED, NULL);
return 0;
}
if(s->state == state_CLOSED)
return 0;
if(s->want_write) _sx_event(s, event_WANT_WRITE, NULL);
return s->want_read;
}
static int _sx_get_pending_write(sx_t s) {
sx_buf_t in, out;
int ret;
assert(s != NULL);
if (s->wbufpending != NULL) {
return 0;
}
in = jqueue_pull(s->wbufq);
if(in == NULL) {
in = _sx_buffer_new(NULL, 0, NULL, NULL);
}
s->want_write = jqueue_size(s->wbufq);
out = _sx_buffer_new(in->data, in->len, in->notify, in->notify_arg);
_sx_debug(ZONE, "encoding %d bytes for writing: %.*s", in->len, in->len, in->data);
ret = _sx_chain_io_write(s, out);
if(ret <= 0) {
if(ret == -1) {
jqueue_push(s->wbufq, in, (s->wbufq->front != NULL) ? s->wbufq->front->priority : 0);
s->want_write = 1;
} else if(ret == -2) {
s->want_read = s->want_write = 0;
return -1;
}
return 0;
}
_sx_buffer_free(in);
if (out->len == 0)
_sx_buffer_free(out);
else
s->wbufpending = out;
return 0;
}
int sx_can_write(sx_t s) {
sx_buf_t out;
int ret, written;
assert((int) s);
if(!s->want_write && s->state < state_CLOSING)
return 0;
_sx_debug(ZONE, "%d ready for writing", s->tag);
ret = _sx_get_pending_write(s);
if (ret < 0) {
return 0;
}
if(s->wbufpending == NULL) {
if(s->want_read) _sx_event(s, event_WANT_READ, NULL);
return s->want_write;
}
out = s->wbufpending;
s->wbufpending = NULL;
_sx_debug(ZONE, "handing app %d bytes to write", out->len);
written = _sx_event(s, event_WRITE, (void *) out);
if(written < 0) {
_sx_buffer_free(out);
s->want_read = 0;
s->want_write = 0;
return 0;
} else if(written < out->len) {
out->len -= written;
out->data += written;
s->wbufpending = out;
s->want_write ++;
} else {
if(out->notify != NULL)
(out->notify)(s, out->notify_arg);
_sx_buffer_free(out);
}
if(s->want_write == 0 && s->state == state_CLOSING) {
_sx_state(s, state_CLOSED);
_sx_event(s, event_CLOSED, NULL);
return 0;
}
if(s->state == state_CLOSED)
return 0;
if(s->want_read) _sx_event(s, event_WANT_READ, NULL);
return s->want_write;
}
int _sx_nad_write(sx_t s, nad_t nad, int elem) {
char *out;
int len;
if(s->state >= state_CLOSING) {
log_debug(ZONE, "stream closed, dropping outgoing packet");
nad_free(nad);
return 1;
}
if(_sx_chain_nad_write(s, nad, elem) == 0)
return 1;
nad_print(nad, elem, &out, &len);
_sx_debug(ZONE, "queueing for write: %.*s", len, out);
jqueue_push(s->wbufq, _sx_buffer_new(out, len, NULL, NULL), 0);
nad_free(nad);
s->want_write = 1;
return 0;
}
void sx_nad_write_elem(sx_t s, nad_t nad, int elem) {
assert((int) s);
assert((int) nad);
if(_sx_nad_write(s, nad, elem) == 1)
return;
s->want_write = 1;
_sx_event(s, event_WANT_WRITE, NULL);
if(s->want_read) _sx_event(s, event_WANT_READ, NULL);
}
int _sx_raw_write(sx_t s, char *buf, int len) {
if(s->state >= state_CLOSING) {
log_debug(ZONE, "stream closed, dropping outgoing raw data");
return 1;
}
_sx_debug(ZONE, "queuing for write: %.*s", len, buf);
jqueue_push(s->wbufq, _sx_buffer_new(buf, len, NULL, NULL), 0);
s->want_write = 1;
return 0;
}
void sx_raw_write(sx_t s, char *buf, int len) {
assert((int) s);
assert((int) buf);
assert(len);
if(_sx_raw_write(s, buf, len) == 1)
return;
s->want_write = 1;
_sx_event(s, event_WANT_WRITE, NULL);
if(s->want_read) _sx_event(s, event_WANT_READ, NULL);
}
void _sx_close(sx_t s) {
if(s->state >= state_STREAM_SENT) {
jqueue_push(s->wbufq, _sx_buffer_new("</stream:stream>", 16, NULL, NULL), 0);
s->want_write = 1;
}
_sx_state(s, state_CLOSING);
}
void sx_close(sx_t s) {
assert((int) s);
if(s->state >= state_CLOSING)
return;
if(s->state >= state_STREAM_SENT && s->state < state_CLOSING) {
_sx_close(s);
_sx_event(s, event_WANT_WRITE, NULL);
} else {
_sx_state(s, state_CLOSED);
_sx_event(s, event_CLOSED, NULL);
}
}
void sx_kill(sx_t s) {
assert((int) s);
_sx_state(s, state_CLOSED);
_sx_event(s, event_CLOSED, NULL);
}