Spaces:
Runtime error
Runtime error
/* | |
* Permission is hereby granted, free of charge, to any person obtaining a copy | |
* of this software and associated documentation files (the "Software"), to deal | |
* in the Software without restriction, including without limitation the rights | |
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
* copies of the Software, and to permit persons to whom the Software is | |
* furnished to do so, subject to the following conditions: | |
* | |
* The above copyright notice and this permission notice shall be included in | |
* all copies or substantial portions of the Software. | |
* | |
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL | |
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
* THE SOFTWARE. | |
*/ | |
/** | |
* Thread message API test | |
*/ | |
struct sender_data { | |
int id; | |
pthread_t tid; | |
int workload; | |
AVThreadMessageQueue *queue; | |
}; | |
/* same as sender_data but shuffled for testing purpose */ | |
struct receiver_data { | |
pthread_t tid; | |
int workload; | |
int id; | |
AVThreadMessageQueue *queue; | |
}; | |
struct message { | |
AVFrame *frame; | |
// we add some junk in the message to make sure the message size is > | |
// sizeof(void*) | |
int magic; | |
}; | |
static void free_frame(void *arg) | |
{ | |
struct message *msg = arg; | |
av_assert0(msg->magic == MAGIC); | |
av_frame_free(&msg->frame); | |
} | |
static void *sender_thread(void *arg) | |
{ | |
int i, ret = 0; | |
struct sender_data *wd = arg; | |
av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload); | |
for (i = 0; i < wd->workload; i++) { | |
if (rand() % wd->workload < wd->workload / 10) { | |
av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id); | |
av_thread_message_flush(wd->queue); | |
} else { | |
char *val; | |
AVDictionary *meta = NULL; | |
struct message msg = { | |
.magic = MAGIC, | |
.frame = av_frame_alloc(), | |
}; | |
if (!msg.frame) { | |
ret = AVERROR(ENOMEM); | |
break; | |
} | |
/* we add some metadata to identify the frames */ | |
val = av_asprintf("frame %d/%d from sender %d", | |
i + 1, wd->workload, wd->id); | |
if (!val) { | |
av_frame_free(&msg.frame); | |
ret = AVERROR(ENOMEM); | |
break; | |
} | |
ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL); | |
if (ret < 0) { | |
av_frame_free(&msg.frame); | |
break; | |
} | |
msg.frame->metadata = meta; | |
/* allocate a real frame in order to simulate "real" work */ | |
msg.frame->format = AV_PIX_FMT_RGBA; | |
msg.frame->width = 320; | |
msg.frame->height = 240; | |
ret = av_frame_get_buffer(msg.frame, 0); | |
if (ret < 0) { | |
av_frame_free(&msg.frame); | |
break; | |
} | |
/* push the frame in the common queue */ | |
av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n", | |
wd->id, i + 1, wd->workload, msg.frame); | |
ret = av_thread_message_queue_send(wd->queue, &msg, 0); | |
if (ret < 0) { | |
av_frame_free(&msg.frame); | |
break; | |
} | |
} | |
} | |
av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n", | |
wd->id, av_err2str(ret)); | |
av_thread_message_queue_set_err_recv(wd->queue, ret < 0 ? ret : AVERROR_EOF); | |
return NULL; | |
} | |
static void *receiver_thread(void *arg) | |
{ | |
int i, ret = 0; | |
struct receiver_data *rd = arg; | |
for (i = 0; i < rd->workload; i++) { | |
if (rand() % rd->workload < rd->workload / 10) { | |
av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue, " | |
"discarding %d message(s)\n", rd->id, | |
av_thread_message_queue_nb_elems(rd->queue)); | |
av_thread_message_flush(rd->queue); | |
} else { | |
struct message msg; | |
AVDictionary *meta; | |
AVDictionaryEntry *e; | |
ret = av_thread_message_queue_recv(rd->queue, &msg, 0); | |
if (ret < 0) | |
break; | |
av_assert0(msg.magic == MAGIC); | |
meta = msg.frame->metadata; | |
e = av_dict_get(meta, "sig", NULL, 0); | |
av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame); | |
av_frame_free(&msg.frame); | |
} | |
} | |
av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i); | |
av_thread_message_queue_set_err_send(rd->queue, ret < 0 ? ret : AVERROR_EOF); | |
return NULL; | |
} | |
static int get_workload(int minv, int maxv) | |
{ | |
return maxv == minv ? maxv : rand() % (maxv - minv) + minv; | |
} | |
int main(int ac, char **av) | |
{ | |
int i, ret = 0; | |
int max_queue_size; | |
int nb_senders, sender_min_load, sender_max_load; | |
int nb_receivers, receiver_min_load, receiver_max_load; | |
struct sender_data *senders; | |
struct receiver_data *receivers; | |
AVThreadMessageQueue *queue = NULL; | |
if (ac != 8) { | |
av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> " | |
"<nb_senders> <sender_min_send> <sender_max_send> " | |
"<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]); | |
return 1; | |
} | |
max_queue_size = atoi(av[1]); | |
nb_senders = atoi(av[2]); | |
sender_min_load = atoi(av[3]); | |
sender_max_load = atoi(av[4]); | |
nb_receivers = atoi(av[5]); | |
receiver_min_load = atoi(av[6]); | |
receiver_max_load = atoi(av[7]); | |
if (max_queue_size <= 0 || | |
nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 || | |
nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) { | |
av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n"); | |
return 1; | |
} | |
av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / " | |
"%d receivers receiving [%d-%d]\n", max_queue_size, | |
nb_senders, sender_min_load, sender_max_load, | |
nb_receivers, receiver_min_load, receiver_max_load); | |
senders = av_calloc(nb_senders, sizeof(*senders)); | |
receivers = av_calloc(nb_receivers, sizeof(*receivers)); | |
if (!senders || !receivers) { | |
ret = AVERROR(ENOMEM); | |
goto end; | |
} | |
ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message)); | |
if (ret < 0) | |
goto end; | |
av_thread_message_queue_set_free_func(queue, free_frame); | |
SPAWN_THREADS(receiver); | |
SPAWN_THREADS(sender); | |
WAIT_THREADS(sender); | |
WAIT_THREADS(receiver); | |
end: | |
av_thread_message_queue_free(&queue); | |
av_freep(&senders); | |
av_freep(&receivers); | |
if (ret < 0 && ret != AVERROR_EOF) { | |
av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret)); | |
return 1; | |
} | |
return 0; | |
} | |