Skip to content

Commit

Permalink
MT#55283 support kernel-based media player
Browse files Browse the repository at this point in the history
Change-Id: I5958015d819ca280273f80245dfa748531577b6a
  • Loading branch information
dfxdj authored and rfuchs committed May 11, 2024
1 parent 5e8d553 commit 38b17eb
Show file tree
Hide file tree
Showing 13 changed files with 2,003 additions and 42 deletions.
102 changes: 102 additions & 0 deletions daemon/kernel.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ bool kernel_init_table(void) {
[REMG_ADD_STREAM] = sizeof(struct rtpengine_command_add_stream),
[REMG_DEL_STREAM] = sizeof(struct rtpengine_command_del_stream),
[REMG_PACKET] = sizeof(struct rtpengine_command_packet),
[REMG_INIT_PLAY_STREAMS] = sizeof(struct rtpengine_command_init_play_streams),
[REMG_GET_PACKET_STREAM] = sizeof(struct rtpengine_command_get_packet_stream),
[REMG_PLAY_STREAM_PACKET] = sizeof(struct rtpengine_command_play_stream_packet),
[REMG_PLAY_STREAM] = sizeof(struct rtpengine_command_play_stream),
[REMG_STOP_STREAM] = sizeof(struct rtpengine_command_stop_stream),
[REMG_FREE_PACKET_STREAM] = sizeof(struct rtpengine_command_free_packet_stream),
},
.rtpe_stats = rtpe_stats,
};
Expand Down Expand Up @@ -252,3 +258,99 @@ unsigned int kernel_add_intercept_stream(unsigned int call_idx, const char *id)
return UNINIT_IDX;
return cmd.stream.idx.stream_idx;
}

bool kernel_init_player(int num_media, int num_sessions) {
if (num_media <= 0 || num_sessions <= 0)
return false;
if (!kernel.is_open)
return false;

struct rtpengine_command_init_play_streams ips = {
.cmd = REMG_INIT_PLAY_STREAMS,
.num_packet_streams = num_media,
.num_play_streams = num_sessions,
};
ssize_t ret = write(kernel.fd, &ips, sizeof(ips));
if (ret != sizeof(ips))
return false;

kernel.use_player = true;

return true;
}

unsigned int kernel_get_packet_stream(void) {
if (!kernel.use_player)
return -1;

struct rtpengine_command_get_packet_stream gps = { .cmd = REMG_GET_PACKET_STREAM };
ssize_t ret = read(kernel.fd, &gps, sizeof(gps));
if (ret != sizeof(gps))
return -1;
return gps.packet_stream_idx;
}

bool kernel_add_stream_packet(unsigned int idx, const char *buf, size_t len, unsigned long delay_ms,
uint32_t ts, uint32_t dur)
{
if (!kernel.use_player)
return false;

size_t total_len = len + sizeof(struct rtpengine_command_play_stream_packet);
struct rtpengine_command_play_stream_packet *cmd = alloca(total_len);

cmd->cmd = REMG_PLAY_STREAM_PACKET;
cmd->play_stream_packet.packet_stream_idx = idx;
cmd->play_stream_packet.delay_ms = delay_ms;
cmd->play_stream_packet.delay_ts = ts;
cmd->play_stream_packet.duration_ts = dur;

memcpy(&cmd->play_stream_packet.data, buf, len);

ssize_t ret = write(kernel.fd, cmd, total_len);
if (ret != total_len)
return false;
return true;
}

unsigned int kernel_start_stream_player(struct rtpengine_play_stream_info *info) {
if (!kernel.use_player)
return -1;

struct rtpengine_command_play_stream ps = {
.cmd = REMG_PLAY_STREAM,
.info = *info,
};
ssize_t ret = read(kernel.fd, &ps, sizeof(ps));
if (ret == sizeof(ps))
return ps.play_idx;
return -1;
}

bool kernel_stop_stream_player(unsigned int idx) {
if (!kernel.use_player)
return false;

struct rtpengine_command_stop_stream ss = {
.cmd = REMG_STOP_STREAM,
.play_idx = idx,
};
ssize_t ret = write(kernel.fd, &ss, sizeof(ss));
if (ret == sizeof(ss))
return true;
return false;
}

bool kernel_free_packet_stream(unsigned int idx) {
if (!kernel.use_player)
return false;

struct rtpengine_command_free_packet_stream fps = {
.cmd = REMG_FREE_PACKET_STREAM,
.packet_stream_idx = idx,
};
ssize_t ret = write(kernel.fd, &fps, sizeof(fps));
if (ret == sizeof(fps))
return true;
return false;
}
10 changes: 9 additions & 1 deletion daemon/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ struct rtpengine_config rtpe_config = {
},
},
.max_recv_iters = MAX_RECV_ITERS,
.kernel_player_media = 128,
};

static void sighandler(gpointer x) {
Expand Down Expand Up @@ -633,6 +634,8 @@ static void options(int *argc, char ***argv) {
{ "silence-detect",0,0, G_OPTION_ARG_DOUBLE, &silence_detect, "Audio level threshold in percent for silence detection","FLOAT"},
{ "cn-payload",0,0, G_OPTION_ARG_STRING_ARRAY,&cn_payload, "Comfort noise parameters to replace silence with","INT INT INT ..."},
{ "player-cache",0,0, G_OPTION_ARG_NONE, &rtpe_config.player_cache,"Cache media files for playback in memory",NULL},
{ "kernel-player",0,0, G_OPTION_ARG_INT, &rtpe_config.kernel_player,"Max number of kernel media player streams","INT"},
{ "kernel-player-media",0,0,G_OPTION_ARG_INT, &rtpe_config.kernel_player_media,"Max number of kernel media files","INT"},
{ "audio-buffer-length",0,0, G_OPTION_ARG_INT,&rtpe_config.audio_buffer_length,"Length in milliseconds of audio buffer","INT"},
{ "audio-buffer-delay",0,0, G_OPTION_ARG_INT,&rtpe_config.audio_buffer_delay,"Initial delay in milliseconds for buffered audio","INT"},
{ "audio-player",0,0, G_OPTION_ARG_STRING, &use_audio_player, "When to enable the internal audio player","on-demand|play-media|transcoding|always"},
Expand Down Expand Up @@ -1245,6 +1248,12 @@ static void kernel_setup(void) {
#endif
if (!kernel_setup_table(rtpe_config.kernel_table) && rtpe_config.no_fallback)
die("Userspace fallback disallowed - exiting");

if (rtpe_config.player_cache && rtpe_config.kernel_player > 0 && rtpe_config.kernel_player_media > 0) {
if (!kernel_init_player(rtpe_config.kernel_player_media, rtpe_config.kernel_player))
die("Failed to initialise kernel media player");
}

return;

fallback:
Expand Down Expand Up @@ -1297,7 +1306,6 @@ static void init_everything(void) {
die("Kernel module version mismatch or other fatal error");
}


static void create_everything(void) {
struct timeval tmp_tv;

Expand Down
126 changes: 119 additions & 7 deletions daemon/media_player.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,17 @@ struct media_player_cache_index {
struct media_player_content_index index;
rtp_payload_type dst_pt;
};
TYPED_GHASHTABLE(media_player_ht, struct media_player, struct media_player, g_direct_hash, g_direct_equal, NULL, NULL) // XXX ref counting players
struct media_player_cache_entry {
bool finished;
volatile bool finished;
// "unfinished" elements, only used while decoding is active:
mutex_t lock;
cond_t cond; // to wait for more data to be decoded

cache_packet_arr *packets; // read-only except for decoder thread, which uses finished flags and locks
unsigned long duration; // cumulative in ms, summed up while decoding
unsigned int kernel_idx; // -1 if not in use
media_player_ht wait_queue; // players waiting on decoder to finish

struct codec_scheduler csch;
struct media_player_coder coder; // de/encoder data
Expand All @@ -64,7 +68,7 @@ struct media_player_cache_packet {
char *buf;
str s;
long long pts;
long long duration;
long long duration; // us
long long duration_ts;
};

Expand Down Expand Up @@ -130,12 +134,22 @@ static void media_player_shutdown(struct media_player *mp) {
mp->media = NULL;
media_player_coder_shutdown(&mp->coder);

if (mp->kernel_idx != -1)
kernel_stop_stream_player(mp->kernel_idx);
else if (mp->cache_entry) {
mutex_lock(&mp->cache_entry->lock);
if (t_hash_table_is_set(mp->cache_entry->wait_queue))
t_hash_table_remove(mp->cache_entry->wait_queue, mp);
mutex_unlock(&mp->cache_entry->lock);
}

mp->cache_index.type = MP_OTHER;
if (mp->cache_index.file.s)
g_free(mp->cache_index.file.s);
mp->cache_index.file = STR_NULL;// coverity[missing_lock : FALSE]
mp->cache_entry = NULL; // coverity[missing_lock : FALSE]
mp->cache_read_idx = 0;
mp->kernel_idx = -1;
}
#endif

Expand Down Expand Up @@ -185,6 +199,7 @@ void media_player_new(struct media_player **mpp, struct call_monologue *ml) {

mp->tt_obj.tt = &media_player_thread;
mutex_init(&mp->lock);
mp->kernel_idx = -1;

mp->run_func = media_player_read_packet; // default
mp->call = obj_get(ml->call);
Expand Down Expand Up @@ -499,8 +514,70 @@ retry:;
return false;
}

static void media_player_cached_reader_start(struct media_player *mp, const rtp_payload_type *dst_pt) {
static void media_player_kernel_player_start_now(struct media_player *mp) {
struct media_player_cache_entry *entry = mp->cache_entry;
if (!entry)
return;
const rtp_payload_type *dst_pt = &entry->coder.handler->dest_pt;

ilog(LOG_DEBUG, "Starting kernel media player index %i (PT %i)", entry->kernel_idx, dst_pt->payload_type);

struct rtpengine_play_stream_info info = {
.packet_stream_idx = entry->kernel_idx,
.pt = dst_pt->payload_type,
.seq = mp->seq,
.ts = mp->buffer_ts,
.ssrc = mp->ssrc_out->parent->h.ssrc,
.repeat = mp->opts.repeat,
.stats = mp->sink->stats_out,
.iface_stats = mp->sink->selected_sfd->local_intf->stats,
.ssrc_stats = mp->ssrc_out->stats,
};
mp->sink->endpoint.address.family->endpoint2kernel(&info.dst_addr, &mp->sink->endpoint); // XXX unify with __re_address_translate_ep
mp->sink->selected_sfd->socket.local.address.family->endpoint2kernel(&info.src_addr, &mp->sink->selected_sfd->socket.local); // XXX unify with __re_address_translate_ep
mp->crypt_handler->out->kernel(&info.encrypt, mp->sink);

unsigned int idx = kernel_start_stream_player(&info);
if (idx == -1)
ilog(LOG_ERR, "Failed to start kernel media player (index %i): %s", info.packet_stream_idx, strerror(errno));
else
mp->kernel_idx = idx;
}

static void media_player_kernel_player_start(struct media_player *mp) {
struct media_player_cache_entry *entry = mp->cache_entry;
if (!entry)
return;

// decoder finished yet? try unlocked read first
bool finished = entry->finished;

if (!finished) {
mutex_lock(&entry->lock);
// check flag again in case it happened just now
if (!entry->finished) {
// add us to wait list
ilog(LOG_DEBUG, "Decoder not finished yet, waiting to start kernel player index %i",
entry->kernel_idx);
t_hash_table_insert(entry->wait_queue, mp, mp); // XXX reference needed?
mutex_unlock(&entry->lock);
return;
}
// finished now, drop down below
mutex_unlock(&entry->lock);
}

media_player_kernel_player_start_now(mp);
}

static void media_player_cached_reader_start(struct media_player *mp) {
struct media_player_cache_entry *entry = mp->cache_entry;
const rtp_payload_type *dst_pt = &entry->coder.handler->dest_pt;

if (entry->kernel_idx != -1) {
media_player_kernel_player_start(mp);
return;
}

// create dummy codec handler and start timer

Expand Down Expand Up @@ -551,7 +628,7 @@ static bool media_player_cache_get_entry(struct media_player *mp,

bool ret = true; // entry exists, use cached data
if (entry) {
media_player_cached_reader_start(mp, dst_pt);
media_player_cached_reader_start(mp);
goto out;
}

Expand All @@ -563,10 +640,12 @@ static bool media_player_cache_get_entry(struct media_player *mp,
*ins_key = lookup;
str_init_dup_str(&ins_key->index.file, &lookup.index.file);
codec_init_payload_type(&ins_key->dst_pt, MT_UNKNOWN); // duplicate contents

entry = mp->cache_entry = g_slice_alloc0(sizeof(*entry));
mutex_init(&entry->lock);
cond_init(&entry->cond);
entry->packets = cache_packet_arr_new_sized(64);
entry->wait_queue = media_player_ht_new();

switch (lookup.index.type) {
case MP_DB:
Expand All @@ -584,6 +663,15 @@ static bool media_player_cache_get_entry(struct media_player *mp,

g_hash_table_insert(media_player_cache, ins_key, entry);

entry->kernel_idx = -1;
if (kernel.use_player) {
entry->kernel_idx = kernel_get_packet_stream();
if (entry->kernel_idx == -1)
ilog(LOG_ERR, "Failed to get kernel packet stream entry (%s)", strerror(errno));
else
ilog(LOG_DEBUG, "Using kernel packet stream index %i", entry->kernel_idx);
}

out:
mutex_unlock(&media_player_cache_lock);

Expand Down Expand Up @@ -631,12 +719,23 @@ static void media_player_cache_entry_decoder_thread(void *p) {
av_packet_unref(entry->coder.pkt);
}

ilog(LOG_DEBUG, "Decoder thread for %s finished", entry->info_str);

mutex_lock(&entry->lock);
entry->finished = true;
cond_broadcast(&entry->cond);
mutex_unlock(&entry->lock);

ilog(LOG_DEBUG, "Decoder thread for %s finished", entry->info_str);
media_player_ht_iter iter;
t_hash_table_iter_init(&iter, entry->wait_queue);
struct media_player *mp;
while (t_hash_table_iter_next(&iter, &mp, NULL)) {
if (mp->media)
media_player_kernel_player_start_now(mp);
}
t_hash_table_destroy(entry->wait_queue); // not needed any more
entry->wait_queue = media_player_ht_null();

mutex_unlock(&entry->lock);
}

static void packet_encoded_cache(AVPacket *pkt, struct codec_ssrc_handler *ch, struct media_packet *mp,
Expand All @@ -658,6 +757,16 @@ static void packet_encoded_cache(AVPacket *pkt, struct codec_ssrc_handler *ch, s
mutex_lock(&entry->lock);
t_ptr_array_add(entry->packets, ep);

if (entry->kernel_idx != -1) {
ilog(LOG_DEBUG, "Adding media packet (length %zu, TS %" PRIu64 ", delay %lu ms) to kernel packet stream %i",
s->len, pkt->pts, entry->duration, entry->kernel_idx);
if (!kernel_add_stream_packet(entry->kernel_idx, s->s, s->len, entry->duration, pkt->pts,
pkt->duration))
ilog(LOG_ERR, "Failed to add packet to kernel player (%s)", strerror(errno));
}

entry->duration += ep->duration / 1000;

cond_broadcast(&entry->cond);
mutex_unlock(&entry->lock);
}
Expand Down Expand Up @@ -693,7 +802,7 @@ static bool media_player_cache_entry_init(struct media_player *mp, const rtp_pay
// use low priority (10 nice)
thread_create_detach_prio(media_player_cache_entry_decoder_thread, entry, NULL, 10, "mp decoder");

media_player_cached_reader_start(mp, dst_pt);
media_player_cached_reader_start(mp);

return true;
}
Expand Down Expand Up @@ -1382,8 +1491,11 @@ static void media_player_cache_entry_free(void *p) {
t_ptr_array_free(e->packets, true);
mutex_destroy(&e->lock);
g_free(e->info_str);
if (t_hash_table_is_set(e->wait_queue))
t_hash_table_destroy(e->wait_queue); // XXX release references?
media_player_coder_shutdown(&e->coder);
av_packet_free(&e->coder.pkt);
kernel_free_packet_stream(e->kernel_idx);
g_slice_free1(sizeof(*e), e);
}
#endif
Expand Down
Loading

0 comments on commit 38b17eb

Please sign in to comment.