webdav_requestqueue.c [plain text]
#include "webdavd.h"
#include "LogMessage.h"
#include <sys/syslog.h>
#include <err.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/param.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include "webdav_requestqueue.h"
#include "webdav_network.h"
typedef struct webdav_requestqueue_element_tag
{
struct webdav_requestqueue_element_tag *next;
int type;
union
{
struct request
{
int socket;
} request;
struct download
{
struct node_entry *node;
struct ReadStreamRec *readStreamRecPtr;
} download;
struct serverping
{
u_int32_t delay;
} serverping;
struct seqwrite_read_rsp
{
struct stream_put_ctx *ctx;
} seqwrite_read_rsp;
} element;
} webdav_requestqueue_element_t;
typedef struct
{
webdav_requestqueue_element_t *item_head;
webdav_requestqueue_element_t *item_tail;
int request_count;
} webdav_requestqueue_header_t;
#define WEBDAV_REQUEST_TYPE 1
#define WEBDAV_DOWNLOAD_TYPE 2
#define WEBDAV_SERVER_PING_TYPE 3
#define WEBDAV_SEQWRITE_MANAGER_TYPE 4
#define WEBDAV_MAX_IDLE_TIME 10
static pthread_mutex_t connectionstate_lock;
static int connectionstate;
static pthread_mutex_t requests_lock;
static pthread_cond_t requests_condvar;
static webdav_requestqueue_header_t waiting_requests;
static pthread_mutex_t pulse_lock;
static pthread_cond_t pulse_condvar;
static int purge_cache_files;
static int handle_request_thread(void *arg);
static int gCurrThreadCount = 0;
static int gIdleThreadCount = 0;
static pthread_attr_t gRequest_thread_attr;
int get_connectionstate(void)
{
int error;
int result = 1;
error = pthread_mutex_lock(&connectionstate_lock);
require_noerr(error, pthread_mutex_lock);
result = connectionstate;
error = pthread_mutex_unlock(&connectionstate_lock);
require_noerr(error, pthread_mutex_unlock);
pthread_mutex_unlock:
pthread_mutex_lock:
return ( result );
}
void set_connectionstate(int state)
{
int error;
error = pthread_mutex_lock(&connectionstate_lock);
require_noerr(error, pthread_mutex_lock);
switch (state) {
case WEBDAV_CONNECTION_DOWN:
if (connectionstate == WEBDAV_CONNECTION_UP) {
syslog(LOG_ERR, "WebDAV server is no longer responding, will keep retrying...");
connectionstate = WEBDAV_CONNECTION_DOWN;
requestqueue_enqueue_server_ping(0);
}
break;
case WEBDAV_CONNECTION_UP:
if (connectionstate == WEBDAV_CONNECTION_DOWN) {
syslog(LOG_ERR, "WebDAV server is now responding normally");
connectionstate = WEBDAV_CONNECTION_UP;
}
break;
default:
break;
}
error = pthread_mutex_unlock(&connectionstate_lock);
require_noerr(error, pthread_mutex_unlock);
pthread_mutex_unlock:
pthread_mutex_lock:
return;
}
static int get_request(int so, int *operation, void *key, size_t klen)
{
int error;
struct iovec iov[2];
struct msghdr msg;
int n;
iov[0].iov_base = (caddr_t)operation;
iov[0].iov_len = sizeof(int);
iov[1].iov_base = key;
iov[1].iov_len = klen;
memset(&msg, 0, sizeof(msg));
msg.msg_iov = iov;
msg.msg_iovlen = 2;
n = recvmsg(so, &msg, 0);
if ( n >= (int)(sizeof(int) + sizeof(struct webdav_cred)) )
{
error = 0;
n -= sizeof(int);
((char *)key)[n] = '\0';
}
else if ( n < 0 )
{
error = errno;
LogMessage(kError, "get_request recvmsg failed error %d\n", error);
}
else
{
error = EINVAL;
LogMessage(kError, "get_request got short message\n");
}
return ( error );
}
static void send_reply(int so, void *data, size_t size, int error)
{
int n;
struct iovec iov[2];
struct msghdr msg;
int send_error = error;
if ( get_connectionstate() == WEBDAV_CONNECTION_DOWN )
{
send_error |= WEBDAV_CONNECTION_DOWN_MASK;
}
iov[0].iov_base = (caddr_t)&send_error;
iov[0].iov_len = sizeof(send_error);
if ( size != 0 )
{
iov[1].iov_base = (caddr_t)data;
iov[1].iov_len = size;
}
memset(&msg, 0, sizeof(msg));
msg.msg_iov = iov;
if ( size != 0 )
{
msg.msg_iovlen = 2;
}
else
{
msg.msg_iovlen = 1;
}
n = sendmsg(so, &msg, 0);
if (n < 0)
{
LogMessage(kError, "send_reply sendmsg failed\n");
}
}
static void handle_filesystem_request(int so)
{
int error;
int operation;
char key[(NAME_MAX + 1) + sizeof(union webdav_request)];
size_t num_bytes;
char *bytes;
union webdav_reply reply;
error = get_request(so, &operation, key, sizeof(key));
if ( !error ) {
#if DEBUG
LogMessage(kTrace, "handle_filesystem_request: %s(%d)\n",
(operation==WEBDAV_LOOKUP) ? "LOOKUP" :
(operation==WEBDAV_CREATE) ? "CREATE" :
(operation==WEBDAV_OPEN) ? "OPEN" :
(operation==WEBDAV_CLOSE) ? "CLOSE" :
(operation==WEBDAV_GETATTR) ? "GETATTR" :
(operation==WEBDAV_SETATTR) ? "SETATTR" :
(operation==WEBDAV_READ) ? "READ" :
(operation==WEBDAV_WRITE) ? "WRITE" :
(operation==WEBDAV_FSYNC) ? "FSYNC" :
(operation==WEBDAV_REMOVE) ? "REMOVE" :
(operation==WEBDAV_RENAME) ? "RENAME" :
(operation==WEBDAV_MKDIR) ? "MKDIR" :
(operation==WEBDAV_RMDIR) ? "RMDIR" :
(operation==WEBDAV_READDIR) ? "READDIR" :
(operation==WEBDAV_STATFS) ? "STATFS" :
(operation==WEBDAV_UNMOUNT) ? "UNMOUNT" :
(operation==WEBDAV_INVALCACHES) ? "INVALCACHES" :
"???",
operation
);
#endif
bzero((void *)&reply, sizeof(union webdav_reply));
if ( (get_connectionstate() == WEBDAV_CONNECTION_DOWN) && (operation != WEBDAV_UNMOUNT) &&
(operation != WEBDAV_INVALCACHES) )
{
error = ETIMEDOUT;
send_reply(so, (void *)&reply, sizeof(union webdav_reply), error);
}
else
{
switch ( operation )
{
case WEBDAV_LOOKUP:
error = filesystem_lookup((struct webdav_request_lookup *)key,
(struct webdav_reply_lookup *)&reply);
send_reply(so, (void *)&reply, sizeof(struct webdav_reply_lookup), error);
break;
case WEBDAV_CREATE:
error = filesystem_create((struct webdav_request_create *)key,
(struct webdav_reply_create *)&reply);
send_reply(so, (void *)&reply, sizeof(struct webdav_reply_create), error);
break;
case WEBDAV_OPEN:
error = filesystem_open((struct webdav_request_open *)key,
(struct webdav_reply_open *)&reply);
send_reply(so, (void *)&reply, sizeof(struct webdav_reply_open), error);
break;
case WEBDAV_CLOSE:
error = filesystem_close((struct webdav_request_close *)key);
send_reply(so, (void *)0, 0, error);
break;
case WEBDAV_GETATTR:
error = filesystem_getattr((struct webdav_request_getattr *)key,
(struct webdav_reply_getattr *)&reply);
send_reply(so, (void *)&reply, sizeof(struct webdav_reply_getattr), error);
break;
case WEBDAV_READ:
bytes = NULL;
num_bytes = 0;
error = filesystem_read((struct webdav_request_read *)key,
&bytes, &num_bytes);
send_reply(so, (void *)bytes, (int)num_bytes, error);
if (bytes)
{
free(bytes);
}
break;
case WEBDAV_FSYNC:
error = filesystem_fsync((struct webdav_request_fsync *)key);
send_reply(so, (void *)0, 0, error);
break;
case WEBDAV_REMOVE:
error = filesystem_remove((struct webdav_request_remove *)key);
send_reply(so, (void *)0, 0, error);
break;
case WEBDAV_RENAME:
error = filesystem_rename((struct webdav_request_rename *)key);
send_reply(so, (void *)0, 0, error);
break;
case WEBDAV_MKDIR:
error = filesystem_mkdir((struct webdav_request_mkdir *)key,
(struct webdav_reply_mkdir *)&reply);
send_reply(so, (void *)&reply, sizeof(struct webdav_reply_mkdir), error);
break;
case WEBDAV_RMDIR:
error = filesystem_rmdir((struct webdav_request_rmdir *)key);
send_reply(so, (void *)0, 0, error);
break;
case WEBDAV_READDIR:
error = filesystem_readdir((struct webdav_request_readdir *)key);
send_reply(so, (void *)0, 0, error);
break;
case WEBDAV_STATFS:
error = filesystem_statfs((struct webdav_request_statfs *)key,
(struct webdav_reply_statfs *)&reply);
send_reply(so, (void *)&reply, sizeof(struct webdav_reply_statfs), error);
break;
case WEBDAV_UNMOUNT:
webdav_kill(-2);
send_reply(so, (void *)0, 0, error);
break;
case WEBDAV_INVALCACHES:
error = filesystem_invalidate_caches((struct webdav_request_invalcaches *)key);
send_reply(so, (void *)0, 0, error);
break;
case WEBDAV_WRITESEQ:
error = filesystem_write_seq((struct webdav_request_writeseq *)key);
send_reply(so, (void *)0, 0, error);
break;
default:
error = ENOTSUP;
break;
}
}
#if DEBUG
LogMessage(kError, "handle_filesystem_request: error %d, %s(%d)\n", error,
(operation==WEBDAV_LOOKUP) ? "LOOKUP" :
(operation==WEBDAV_CREATE) ? "CREATE" :
(operation==WEBDAV_OPEN) ? "OPEN" :
(operation==WEBDAV_CLOSE) ? "CLOSE" :
(operation==WEBDAV_GETATTR) ? "GETATTR" :
(operation==WEBDAV_SETATTR) ? "SETATTR" :
(operation==WEBDAV_READ) ? "READ" :
(operation==WEBDAV_WRITE) ? "WRITE" :
(operation==WEBDAV_FSYNC) ? "FSYNC" :
(operation==WEBDAV_REMOVE) ? "REMOVE" :
(operation==WEBDAV_RENAME) ? "RENAME" :
(operation==WEBDAV_MKDIR) ? "MKDIR" :
(operation==WEBDAV_RMDIR) ? "RMDIR" :
(operation==WEBDAV_READDIR) ? "READDIR" :
(operation==WEBDAV_STATFS) ? "STATFS" :
(operation==WEBDAV_UNMOUNT) ? "UNMOUNT" :
(operation==WEBDAV_INVALCACHES) ? "INVALCACHES" :
"???",
operation
);
#endif
}
else {
LogMessage(kError, "handle_filesystem_request: get_request failed %d\n", error);
send_reply(so, NULL, 0, error);
}
close(so);
}
static void pulse_thread(void *arg)
{
#pragma unused(arg)
int error;
struct node_entry *node;
error = 0;
while ( TRUE )
{
struct timespec pulsetime;
error = pthread_mutex_lock(&pulse_lock);
require_noerr(error, pthread_mutex_lock);
LogMessage(kTrace, "pulse_thread running\n");
node = nodecache_get_next_file_cache_node(TRUE);
while ( node != NULL )
{
if ( NODE_FILE_IS_OPEN(node) )
{
if ( !NODE_IS_DELETED(node) )
{
(void) filesystem_lock(node);
}
}
else
{
if ( NODE_IS_DELETED(node) || NODE_FILE_CACHE_INVALID(node) || purge_cache_files )
{
nodecache_remove_file_cache(node);
}
}
node = nodecache_get_next_file_cache_node(FALSE);
}
nodecache_free_nodes();
purge_cache_files = FALSE;
pulsetime.tv_sec = time(NULL) + (gtimeout_val / 2);
pulsetime.tv_nsec = 0;
error = pthread_cond_timedwait(&pulse_condvar, &pulse_lock, &pulsetime);
require((error == ETIMEDOUT || error == 0), pthread_cond_timedwait);
error = pthread_mutex_unlock(&pulse_lock);
require_noerr(error, pthread_mutex_unlock);
}
pthread_mutex_lock:
pthread_cond_timedwait:
pthread_mutex_unlock:
if ( error )
{
webdav_kill(-1);
}
}
static int handle_request_thread(void *arg)
{
#pragma unused(arg)
int error;
webdav_requestqueue_element_t * myrequest;
struct timespec timeout;
int idleRecheck = 0;
while (TRUE) {
error = pthread_mutex_lock(&requests_lock);
require_noerr(error, pthread_mutex_lock);
if (waiting_requests.request_count > 0) {
idleRecheck = 0;
myrequest = waiting_requests.item_head;
--(waiting_requests.request_count);
if (waiting_requests.request_count > 0) {
waiting_requests.item_head = myrequest->next;
}
else {
waiting_requests.item_head = waiting_requests.item_tail = 0;
}
error = pthread_mutex_unlock(&requests_lock);
require_noerr(error, pthread_mutex_unlock);
switch (myrequest->type) {
case WEBDAV_REQUEST_TYPE:
handle_filesystem_request(myrequest->element.request.socket);
break;
case WEBDAV_DOWNLOAD_TYPE:
error = network_finish_download(myrequest->element.download.node, myrequest->element.download.readStreamRecPtr);
if (error) {
verify_noerr(fchflags(myrequest->element.download.node->file_fd, UF_APPEND));
myrequest->element.download.node->file_status = WEBDAV_DOWNLOAD_ABORTED;
}
else {
verify_noerr(fchflags(myrequest->element.download.node->file_fd, 0));
myrequest->element.download.node->file_status = WEBDAV_DOWNLOAD_FINISHED;
}
error = 0;
break;
case WEBDAV_SERVER_PING_TYPE:
network_server_ping(myrequest->element.serverping.delay);
break;
case WEBDAV_SEQWRITE_MANAGER_TYPE:
network_seqwrite_manager(myrequest->element.seqwrite_read_rsp.ctx);
break;
default:
break;
}
free(myrequest);
}
else {
if (idleRecheck == 1) {
gCurrThreadCount -= 1;
error = pthread_mutex_unlock(&requests_lock);
pthread_exit(NULL);
}
timeout.tv_sec = time(NULL) + WEBDAV_MAX_IDLE_TIME;
timeout.tv_nsec = 0;
gIdleThreadCount += 1;
error = pthread_cond_timedwait(&requests_condvar, &requests_lock, &timeout);
gIdleThreadCount -= 1;
require((error == ETIMEDOUT || error == 0), pthread_cond_wait);
if (error == ETIMEDOUT) {
idleRecheck = 1;
}
error = pthread_mutex_unlock(&requests_lock);
require_noerr(error, pthread_mutex_unlock);
}
}
pthread_cond_wait:
pthread_mutex_unlock:
pthread_mutex_lock:
if ( error ) {
webdav_kill(-1);
}
return error;
}
int requestqueue_init()
{
int error;
pthread_mutexattr_t mutexattr;
pthread_t the_pulse_thread;
pthread_attr_t the_pulse_thread_attr;
connectionstate = WEBDAV_CONNECTION_UP;
error = pthread_mutexattr_init(&mutexattr);
require_noerr(error, pthread_mutexattr_init);
error = pthread_mutex_init(&connectionstate_lock, &mutexattr);
require_noerr(error, pthread_mutex_init);
bzero(&waiting_requests, sizeof(waiting_requests));
error = pthread_cond_init(&requests_condvar, NULL);
require_noerr(error, pthread_cond_init);
error = pthread_mutexattr_init(&mutexattr);
require_noerr(error, pthread_mutexattr_init);
error = pthread_mutex_init(&requests_lock, &mutexattr);
require_noerr(error, pthread_mutex_init);
error = pthread_attr_init(&gRequest_thread_attr);
require_noerr(error, pthread_attr_init);
error = pthread_attr_setdetachstate(&gRequest_thread_attr, PTHREAD_CREATE_DETACHED);
require_noerr(error, pthread_attr_setdetachstate);
purge_cache_files = FALSE;
error = pthread_mutexattr_init(&mutexattr);
require_noerr(error, pthread_mutexattr_init);
error = pthread_mutex_init(&pulse_lock, &mutexattr);
require_noerr(error, pthread_mutex_init);
error = pthread_cond_init(&pulse_condvar, NULL);
require_noerr(error, pthread_cond_init);
error = pthread_attr_init(&the_pulse_thread_attr);
require_noerr(error, pthread_attr_init);
error = pthread_attr_setdetachstate(&the_pulse_thread_attr, PTHREAD_CREATE_DETACHED);
require_noerr(error, pthread_attr_setdetachstate);
error = pthread_create(&the_pulse_thread, &the_pulse_thread_attr, (void *)pulse_thread, (void *)NULL);
require_noerr(error, pthread_create);
pthread_create:
pthread_attr_setdetachstate:
pthread_attr_init:
pthread_cond_init:
pthread_mutex_init:
pthread_mutexattr_init:
return ( error );
}
int requestqueue_enqueue_request(int socket)
{
int error, unlock_error;
webdav_requestqueue_element_t * request_element_ptr;
pthread_t request_thread;
error = pthread_mutex_lock(&requests_lock);
require_noerr(error, pthread_mutex_lock);
request_element_ptr = malloc(sizeof(webdav_requestqueue_element_t));
require_action(request_element_ptr != NULL, malloc_request_element_ptr, error = ENOMEM);
request_element_ptr->type = WEBDAV_REQUEST_TYPE;
request_element_ptr->element.request.socket = socket;
request_element_ptr->next = 0;
++(waiting_requests.request_count);
if (!(waiting_requests.item_tail)) {
waiting_requests.item_head = waiting_requests.item_tail = request_element_ptr;
}
else {
waiting_requests.item_tail->next = request_element_ptr;
waiting_requests.item_tail = request_element_ptr;
}
if (gIdleThreadCount > 0) {
error = pthread_cond_signal(&requests_condvar);
require_noerr(error, pthread_cond_signal);
}
else {
if (gCurrThreadCount < WEBDAV_REQUEST_THREADS) {
error = pthread_create(&request_thread, &gRequest_thread_attr, (void *) handle_request_thread, (void *) NULL);
require_noerr(error, pthread_create_signal);
gCurrThreadCount += 1;
}
}
pthread_create_signal:
pthread_cond_signal:
malloc_request_element_ptr:
unlock_error = pthread_mutex_unlock(&requests_lock);
require_noerr_action(unlock_error, pthread_mutex_unlock, error = (error == 0) ? unlock_error : error);
pthread_mutex_unlock:
pthread_mutex_lock:
return (error);
}
int requestqueue_enqueue_download(struct node_entry *node, struct ReadStreamRec *readStreamRecPtr)
{
int error, error2;
webdav_requestqueue_element_t * request_element_ptr;
pthread_t request_thread;
error = pthread_mutex_lock(&requests_lock);
require_noerr_action(error, pthread_mutex_lock, webdav_kill(-1));
request_element_ptr = malloc(sizeof(webdav_requestqueue_element_t));
require_action(request_element_ptr != NULL, malloc_request_element_ptr, error = EIO);
request_element_ptr->type = WEBDAV_DOWNLOAD_TYPE;
request_element_ptr->element.download.node = node;
request_element_ptr->element.download.readStreamRecPtr = readStreamRecPtr;
request_element_ptr->next = waiting_requests.item_head;
++(waiting_requests.request_count);
if ( waiting_requests.item_head == NULL ) {
waiting_requests.item_head = waiting_requests.item_tail = request_element_ptr;
}
else {
waiting_requests.item_head = request_element_ptr;
}
if (gIdleThreadCount > 0) {
error = pthread_cond_signal(&requests_condvar);
require_noerr(error, pthread_cond_signal);
}
else {
if (gCurrThreadCount < WEBDAV_REQUEST_THREADS) {
error = pthread_create(&request_thread, &gRequest_thread_attr, (void *) handle_request_thread, (void *) NULL);
require_noerr(error, pthread_create_signal);
gCurrThreadCount += 1;
}
}
pthread_create_signal:
pthread_cond_signal:
malloc_request_element_ptr:
error2 = pthread_mutex_unlock(&requests_lock);
require_noerr_action(error2, pthread_mutex_unlock, error = (error == 0) ? error2 : error; webdav_kill(-1));
pthread_mutex_unlock:
pthread_mutex_lock:
return (error);
}
int requestqueue_enqueue_server_ping(u_int32_t delay)
{
int error, error2;
webdav_requestqueue_element_t * request_element_ptr;
pthread_t request_thread;
error = pthread_mutex_lock(&requests_lock);
require_noerr_action(error, pthread_mutex_lock, webdav_kill(-1));
request_element_ptr = malloc(sizeof(webdav_requestqueue_element_t));
require_action(request_element_ptr != NULL, malloc_request_element_ptr, error = EIO);
request_element_ptr->type = WEBDAV_SERVER_PING_TYPE;
request_element_ptr->element.serverping.delay = delay;
request_element_ptr->next = waiting_requests.item_head;
++(waiting_requests.request_count);
if ( waiting_requests.item_head == NULL ) {
waiting_requests.item_head = waiting_requests.item_tail = request_element_ptr;
}
else {
waiting_requests.item_head = request_element_ptr;
}
if (gIdleThreadCount > 0) {
error = pthread_cond_signal(&requests_condvar);
require_noerr(error, pthread_cond_signal);
}
else {
if (gCurrThreadCount < WEBDAV_REQUEST_THREADS) {
error = pthread_create(&request_thread, &gRequest_thread_attr, (void *) handle_request_thread, (void *) NULL);
require_noerr(error, pthread_create_signal);
gCurrThreadCount += 1;
}
}
pthread_create_signal:
pthread_cond_signal:
malloc_request_element_ptr:
error2 = pthread_mutex_unlock(&requests_lock);
require_noerr_action(error2, pthread_mutex_unlock, error = (error == 0) ? error2 : error; webdav_kill(-1));
pthread_mutex_unlock:
pthread_mutex_lock:
return (error);
}
int requestqueue_enqueue_seqwrite_manager(struct stream_put_ctx *ctx)
{
int error, error2;
webdav_requestqueue_element_t * request_element_ptr;
pthread_t request_thread;
error = pthread_mutex_lock(&requests_lock);
require_noerr_action(error, pthread_mutex_lock, webdav_kill(-1));
request_element_ptr = malloc(sizeof(webdav_requestqueue_element_t));
require_action(request_element_ptr != NULL, malloc_request_element_ptr, error = EIO);
request_element_ptr->type = WEBDAV_SEQWRITE_MANAGER_TYPE;
request_element_ptr->element.seqwrite_read_rsp.ctx = ctx;
request_element_ptr->next = waiting_requests.item_head;
++(waiting_requests.request_count);
if ( waiting_requests.item_head == NULL ) {
waiting_requests.item_head = waiting_requests.item_tail = request_element_ptr;
}
else {
waiting_requests.item_head = request_element_ptr;
}
if (gIdleThreadCount > 0) {
error = pthread_cond_signal(&requests_condvar);
require_noerr(error, pthread_cond_signal);
}
else {
if (gCurrThreadCount < WEBDAV_REQUEST_THREADS) {
error = pthread_create(&request_thread, &gRequest_thread_attr, (void *) handle_request_thread, (void *) NULL);
require_noerr(error, pthread_create_signal);
gCurrThreadCount += 1;
}
}
pthread_create_signal:
pthread_cond_signal:
malloc_request_element_ptr:
error2 = pthread_mutex_unlock(&requests_lock);
require_noerr_action(error2, pthread_mutex_unlock, error = (error == 0) ? error2 : error; webdav_kill(-1));
pthread_mutex_unlock:
pthread_mutex_lock:
return (error);
}
int requestqueue_purge_cache_files(void)
{
int error;
error = pthread_mutex_lock(&pulse_lock);
require_noerr(error, pthread_mutex_lock);
purge_cache_files = TRUE;
error = pthread_cond_signal(&pulse_condvar);
require_noerr(error, pthread_cond_signal);
pthread_cond_signal:
error = pthread_mutex_unlock(&pulse_lock);
pthread_mutex_lock:
return ( error );
}