Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 175 additions & 109 deletions lib/upipe-modules/upipe_sync.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
/*
* Copyright (C) 2017 Open Broadcast Systems Ltd
* Copyright (C) 2017-2021 Open Broadcast Systems Ltd
*
* Authors: Rafaël Carré
* Kieran Kunhya
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
Expand Down Expand Up @@ -96,6 +97,9 @@ struct upipe_sync {
/** ntsc */
uint8_t frame_idx;

/* framesync */
int frame_sync;

/** public upipe structure */
struct upipe upipe;
};
Expand Down Expand Up @@ -469,7 +473,7 @@ static bool sync_channel(struct upipe *upipe)
} else {
float f = (float)((int64_t)pts - (int64_t)video_pts) * 1000 / UCLOCK_FREQ;
upipe_notice_va(upipe_sync_sub_to_upipe(upipe_sync_sub),
"DROP %.2f, duration in CLOCK %" PRIu64 "", f, duration);
"DROP %.2f, duration in CLOCK %" PRIu64 " samples %zu", f, duration, samples);
ulist_delete(uchain_uref);
uref_free(uref);
upipe_sync_sub->samples -= samples;
Expand Down Expand Up @@ -713,7 +717,8 @@ static void output_sound(struct upipe *upipe, const struct urational *fps,

src_samples -= uref_samples;
samples -= uref_samples;
upipe_sync_sub->samples -= uref_samples;
if (upipe_sync_sub->samples >= uref_samples)
upipe_sync_sub->samples -= uref_samples;
//upipe_notice_va(upipe_sub, "pop, samples %" PRIu64, upipe_sync_sub->samples);

if (src_samples == 0) {
Expand Down Expand Up @@ -747,44 +752,52 @@ static void cb(struct upump *upump)
struct upipe_sync *upipe_sync = upipe_sync_from_upipe(upipe);

uint64_t now = uclock_now(upipe_sync->uclock);
if (now - upipe_sync->ticks_per_frame > upipe_sync->pts)
upipe_dbg_va(upipe, "cb after %" PRId64 "ms",
(int64_t)((int64_t)now - (int64_t)upipe_sync->pts) / 27000);

if (upipe_sync->frame_sync) {
if (now - upipe_sync->ticks_per_frame > upipe_sync->pts)
upipe_dbg_va(upipe, "cb after %" PRId64 "ms",
(int64_t)((int64_t)now - (int64_t)upipe_sync->pts) / 27000);
}

now = upipe_sync->pts; // the upump was scheduled for now
struct uchain *uchain = NULL;
for (;;) {
uchain = ulist_peek(&upipe_sync->urefs);
upipe_throw(upipe, UPROBE_SYNC_PICTURE, UPIPE_SYNC_SIGNATURE, !!uchain);
if (!uchain)
break;

struct uref *uref = uref_from_uchain(uchain);
uint64_t pts = 0;
uref_clock_get_pts_sys(uref, &pts);
pts += upipe_sync->latency;
if (upipe_sync->frame_sync) {
for (;;) {
uchain = ulist_peek(&upipe_sync->urefs);
upipe_throw(upipe, UPROBE_SYNC_PICTURE, UPIPE_SYNC_SIGNATURE, !!uchain);
if (!uchain)
break;

struct uref *uref = uref_from_uchain(uchain);
uint64_t pts = 0;
uref_clock_get_pts_sys(uref, &pts);
pts += upipe_sync->latency;

/* frame duration */
const uint64_t ticks = upipe_sync->ticks_per_frame;

if (pts < now - ticks / 2) {
/* frame pts too much in the past */
upipe_warn_va(upipe, "too late");
} else if (pts > now + ticks / 2) {
upipe_warn_va(upipe, "video too early: %.2f > %.2f",
pts_to_time(pts), pts_to_time(now + ticks / 2)
);
uchain = NULL; /* do not drop */
break;
} else {
break; // ok
}

/* frame duration */
const uint64_t ticks = upipe_sync->ticks_per_frame;

if (pts < now - ticks / 2) {
/* frame pts too much in the past */
upipe_warn_va(upipe, "too late");
} else if (pts > now + ticks / 2) {
upipe_warn_va(upipe, "video too early: %.2f > %.2f",
pts_to_time(pts), pts_to_time(now + ticks / 2)
);
uchain = NULL; /* do not drop */
break;
} else {
break; // ok
ulist_pop(&upipe_sync->urefs);
uref_free(uref);
upipe_sync->buffered_frames--;
int64_t u = pts - now;
upipe_err_va(upipe, "Drop pic (pts-now == %" PRId64 "ms)", u / 27000);
}

ulist_pop(&upipe_sync->urefs);
uref_free(uref);
upipe_sync->buffered_frames--;
int64_t u = pts - now;
upipe_err_va(upipe, "Drop pic (pts-now == %" PRId64 "ms)", u / 27000);
}
else {
upipe_dbg_va(upipe, "queued %zu", ulist_depth(&upipe_sync->urefs));
}

/* sync audio */
Expand All @@ -795,56 +808,65 @@ static void cb(struct upump *upump)
/* output audio */
output_sound(upipe_sync_to_upipe(upipe_sync), &upipe_sync->fps, NULL);

/* output pic */
if (uchain) {
ulist_pop(&upipe_sync->urefs);
/* buffer picture */
uref_free(upipe_sync->uref);
upipe_sync->buffered_frames--;
upipe_sync->uref = uref_from_uchain(uchain);
} else {
upipe_dbg_va(upipe, "no picture, repeating last one");
}

struct uref *uref = NULL;
if (upipe_sync->uref) {
uref = uref_dup(upipe_sync->uref);
uref_clock_set_pts_sys(uref, upipe_sync->pts - upipe_sync->latency);
}
if (upipe_sync->frame_sync) {
/* output pic */
if (uchain) {
ulist_pop(&upipe_sync->urefs);
/* buffer picture */
uref_free(upipe_sync->uref);
upipe_sync->buffered_frames--;
upipe_sync->uref = uref_from_uchain(uchain);
} else {
upipe_dbg_va(upipe, "no picture, repeating last one");
}

if (0) {
now = uclock_now(upipe_sync->uclock);
upipe_notice_va(upipe,
"output %.2f now %.2f latency %" PRIu64,
pts_to_time(upipe_sync->pts - upipe_sync->latency),
pts_to_time(now),
upipe_sync->latency / 27000
);
if (upipe_sync->uref) {
uref = uref_dup(upipe_sync->uref);
uref_clock_set_pts_sys(uref, upipe_sync->pts - upipe_sync->latency);
}

if (0) {
now = uclock_now(upipe_sync->uclock);
upipe_notice_va(upipe,
"output %.2f now %.2f latency %" PRIu64,
pts_to_time(upipe_sync->pts - upipe_sync->latency),
pts_to_time(now),
upipe_sync->latency / 27000
);
}
}
else {
uchain = ulist_pop(&upipe_sync->urefs);
uref = uref_from_uchain(uchain);
}

if (uref)
upipe_sync_output(upipe, uref, NULL);

/* increment pts */
upipe_sync->pts += upipe_sync->ticks_per_frame;

/* schedule next pic */
now = uclock_now(upipe_sync->uclock);
if (now != UINT64_MAX && now > upipe_sync->pts) {
uint64_t diff = now - upipe_sync->pts;
diff += upipe_sync->ticks_per_frame - 1;
diff /= upipe_sync->ticks_per_frame;
upipe_err_va(upipe, "skipping %"PRIu64" beats", diff);
upipe_sync->pts += diff * upipe_sync->ticks_per_frame;
}
/* In non framesync mode scheduling is based on when video frame arrives */
if (upipe_sync->frame_sync) {
/* increment pts */
upipe_sync->pts += upipe_sync->ticks_per_frame;

uint64_t wait;
if (now == UINT64_MAX)
wait = upipe_sync->ticks_per_frame;
else
wait = upipe_sync->pts - now;
/* schedule next pic */
now = uclock_now(upipe_sync->uclock);
if (now != UINT64_MAX && now > upipe_sync->pts) {
uint64_t diff = now - upipe_sync->pts;
diff += upipe_sync->ticks_per_frame - 1;
diff /= upipe_sync->ticks_per_frame;
upipe_err_va(upipe, "skipping %"PRIu64" beats", diff);
upipe_sync->pts += diff * upipe_sync->ticks_per_frame;
}

uint64_t wait;
if (now == UINT64_MAX)
wait = upipe_sync->ticks_per_frame;
else
wait = upipe_sync->pts - now;

upipe_sync_wait_upump(upipe, wait, cb);
upipe_sync_wait_upump(upipe, wait, cb);
}
}

/** @internal @This receives data.
Expand Down Expand Up @@ -883,7 +905,7 @@ static void upipe_sync_sub_input(struct upipe *upipe, struct uref *uref,
size_t samples = 0;
uref_sound_size(uref, &samples, NULL);
upipe_sync_sub->samples += samples;
//upipe_notice_va(upipe, "push, samples %" PRIu64, upipe_sync_sub->samples);
upipe_notice_va(upipe, "push in samples %zu, queued samples %" PRIu64, samples, upipe_sync_sub->samples);

ulist_add(&upipe_sync_sub->urefs, uref_to_uchain(uref));

Expand Down Expand Up @@ -928,46 +950,63 @@ static void upipe_sync_input(struct upipe *upipe, struct uref *uref,
return;
}
pts += upipe_sync->latency;

uint64_t now = uclock_now(upipe_sync->uclock);

/* reject late pics */
if (now != UINT64_MAX && now > pts) {
uint64_t cr = 0;
uref_clock_get_cr_sys(uref, &cr);
upipe_err_va(upipe, "%s() picture too late by %" PRIu64 "ms, drop pic, recept %" PRIu64 "",
__func__, (now - pts) / 27000, (now - cr) / 27000);
uref_free(uref);
return;
}
uint64_t wait;
if (upipe_sync->frame_sync) {
/* reject late pics */
if (now != UINT64_MAX && now > pts) {
uint64_t cr = 0;
uref_clock_get_cr_sys(uref, &cr);
upipe_err_va(upipe, "%s() picture too late by %" PRIu64 "ms, drop pic, recept %" PRIu64 "",
__func__, (now - pts) / 27000, (now - cr) / 27000);
uref_free(uref);
return;
}

//upipe_dbg_va(upipe, "push PTS in %" PRIu64 " ms", (pts - now) / 27000);
//upipe_dbg_va(upipe, "push PTS in %" PRIu64 " ms", (pts - now) / 27000);

/* buffer pic */
ulist_add(&upipe_sync->urefs, uref_to_uchain(uref));
upipe_sync->buffered_frames++;
/* buffer pic */
ulist_add(&upipe_sync->urefs, uref_to_uchain(uref));
upipe_sync->buffered_frames++;

/* limit buffered frames */
if (unlikely(upipe_sync->buffered_frames >= MAX_VIDEO_FRAMES)) {
ulist_uref_flush(&upipe_sync->urefs);
upipe_sync->buffered_frames = 0;
}
/* limit buffered frames */
if (unlikely(upipe_sync->buffered_frames >= MAX_VIDEO_FRAMES)) {
ulist_uref_flush(&upipe_sync->urefs);
upipe_sync->buffered_frames = 0;
}

/* timer already active */
if (upipe_sync->upump)
return;
/* timer already active */
if (upipe_sync->upump)
return;

/* need upump mgr */
if (!ubase_check(upipe_sync_check_upump_mgr(upipe_sync_to_upipe(upipe_sync))))
return;
/* need upump mgr */
if (!ubase_check(upipe_sync_check_upump_mgr(upipe_sync_to_upipe(upipe_sync))))
return;

/* start timer */
uint64_t wait;
if (now == UINT64_MAX)
wait = upipe_sync->ticks_per_frame;
else
/* start timer */
if (now == UINT64_MAX)
wait = upipe_sync->ticks_per_frame;
else
wait = pts - now;
}
else {
wait = pts - now;

/* too old frames */
if (now > pts + upipe_sync->ticks_per_frame) {
uref_free(uref);
return;
}

/* buffer pic */
ulist_add(&upipe_sync->urefs, uref_to_uchain(uref));

/* need upump mgr */
if (!ubase_check(upipe_sync_check_upump_mgr(upipe_sync_to_upipe(upipe_sync))))
return;
}

upipe_sync->pts = pts;
upipe_sync_wait_upump(upipe_sync_to_upipe(upipe_sync), wait, cb);
}
Expand Down Expand Up @@ -997,6 +1036,7 @@ static struct upipe *upipe_sync_alloc(struct upipe_mgr *mgr,
upipe_sync->buffered_frames = 0;
upipe_sync->uref = NULL;
ulist_init(&upipe_sync->urefs);
upipe_sync->frame_sync = 1; /* Old frame sync behaviour by default */

upipe_sync_init_urefcount(upipe);
upipe_sync_init_uclock(upipe);
Expand Down Expand Up @@ -1059,6 +1099,28 @@ static int upipe_sync_set_flow_def(struct upipe *upipe, struct uref *flow_def)
return UBASE_ERR_NONE;
}

/** @internal @This sets the content of an sync option.
*
* @param upipe description structure of the pipe
* @param option name of the option
* @param content content of the option
* @return an error code
*/
static int upipe_sync_set_option(struct upipe *upipe,
const char *option, const char *content)
{
struct upipe_sync *upipe_sync = upipe_sync_from_upipe(upipe);
if (!option || !content)
return UBASE_ERR_INVALID;

if (!strcmp(option, "frame-sync"))
upipe_sync->frame_sync = atoi(content);
else
return UBASE_ERR_INVALID;

return UBASE_ERR_NONE;
}

/** @internal @This processes control commands.
*
* @param upipe description structure of the pipe
Expand All @@ -1081,7 +1143,11 @@ static int upipe_sync_control(struct upipe *upipe, int command, va_list args)
return UBASE_ERR_NONE;
case UPIPE_ATTACH_UPUMP_MGR:
return upipe_sync_attach_upump_mgr(upipe);

case UPIPE_SET_OPTION: {
const char *option = va_arg(args, const char *);
const char *content = va_arg(args, const char *);
return upipe_sync_set_option(upipe, option, content);
}
default:
return UBASE_ERR_UNHANDLED;
}
Expand Down