Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 276 additions & 1 deletion plugins/obs-webrtc/whip-output.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "whip-output.h"
#include "whip-utils.h"

#include <regex>
#include <obs.hpp>

/*
Expand Down Expand Up @@ -34,6 +35,17 @@ WHIPOutput::WHIPOutput(obs_data_t *, obs_output_t *output)
endpoint_url(),
bearer_token(),
resource_url(),
ice_gathering_mutex(),
ice_gathering_cv(),
ice_gathering_complete(false),
has_first_candidate(false),
offer_sent(false),
has_ice_servers(false),
ice_ufrag(),
ice_pwd(),
pending_candidates(),
pending_candidates_mutex(),
post_response_gather_started(false),
running(false),
start_stop_mutex(),
start_stop_thread(),
Expand Down Expand Up @@ -266,6 +278,67 @@ bool WHIPOutput::Init()
return true;
}

/**
* @brief Fetch ICE servers via OPTIONS request to WHIP endpoint.
*
* Per WHIP spec, the endpoint may provide STUN/TURN servers via Link headers
* in response to an OPTIONS request. This allows ICE gathering to begin
* before the offer is sent, enabling P2P connections behind NAT.
*
* @param iceServers Vector to populate with discovered ICE servers
* @return bool True if request succeeded (even if no ICE servers found)
*/
bool WHIPOutput::FetchIceServersViaOptions(std::vector<rtc::IceServer> &iceServers)
{
struct curl_slist *headers = nullptr;
headers = curl_slist_append(headers, "Accept: application/sdp");
headers = curl_slist_append(headers, user_agent.c_str());

if (!bearer_token.empty()) {
auto bearer_token_header = std::string("Authorization: Bearer ") + bearer_token;
headers = curl_slist_append(headers, bearer_token_header.c_str());
}

std::vector<std::string> http_headers;

CURL *c = curl_easy_init();
curl_easy_setopt(c, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(c, CURLOPT_URL, endpoint_url.c_str());
curl_easy_setopt(c, CURLOPT_CUSTOMREQUEST, "OPTIONS");
curl_easy_setopt(c, CURLOPT_NOBODY, 1L);
curl_easy_setopt(c, CURLOPT_TIMEOUT, 5L);
curl_easy_setopt(c, CURLOPT_HEADERFUNCTION, curl_header_function);
curl_easy_setopt(c, CURLOPT_HEADERDATA, (void *)&http_headers);

CURLcode res = curl_easy_perform(c);
curl_easy_cleanup(c);
curl_slist_free_all(headers);

if (res != CURLE_OK) {
do_log(LOG_DEBUG, "OPTIONS request failed: %s (will proceed without pre-configured ICE servers)",
curl_easy_strerror(res));
return false;
}

for (auto &http_header : http_headers) {
auto value = value_for_header("link", http_header);
if (value.empty())
continue;

for (auto end = value.find(","); end != std::string::npos; end = value.find(",")) {
this->ParseLinkHeader(value.substr(0, end), iceServers);
value = value.substr(end + 1);
}
this->ParseLinkHeader(value, iceServers);
}

if (!iceServers.empty()) {
do_log(LOG_INFO, "Discovered %zu ICE server(s) via OPTIONS request", iceServers.size());
}

return true;
}

/**
* @brief Set up the PeerConnection and media tracks.
*
Expand All @@ -275,12 +348,82 @@ bool WHIPOutput::Setup()
{
rtc::Configuration cfg;

// Fetch ICE servers via OPTIONS request (per WHIP spec section 4.4)
std::vector<rtc::IceServer> iceServers;
FetchIceServersViaOptions(iceServers);
has_ice_servers = !iceServers.empty();
if (has_ice_servers) {
cfg.iceServers = iceServers;
}

#if RTC_VERSION_MAJOR == 0 && RTC_VERSION_MINOR > 20 || RTC_VERSION_MAJOR > 0
cfg.disableAutoGathering = true;
// Enable auto-gathering if we have ICE servers from OPTIONS
cfg.disableAutoGathering = iceServers.empty();
#endif

ice_gathering_complete = false;
has_first_candidate = false;
offer_sent = false;
post_response_gather_started = false;
ice_ufrag.clear();
ice_pwd.clear();
first_mid.clear();
{
std::lock_guard<std::mutex> lock(pending_candidates_mutex);
pending_candidates.clear();
}

peer_connection = std::make_shared<rtc::PeerConnection>(cfg);

// Track when we receive our first ICE candidate
peer_connection->onLocalCandidate([this](rtc::Candidate candidate) {
{
std::lock_guard<std::mutex> lock(ice_gathering_mutex);
if (!has_first_candidate) {
has_first_candidate = true;
first_mid = candidate.mid(); // Saved for end-of-candidates signal
ice_gathering_cv.notify_one();
}
}
// If offer already sent, trickle this candidate immediately
// Otherwise queue it - it will be sent after POST completes
// Only trickle if we have ICE servers (trickle ICE not needed for host-only)
if (has_ice_servers) {
bool should_send = false;
// Lock to synchronize with the flush in Connect()
// This prevents candidates from being lost between setting
// offer_sent and flushing the queue
{
std::lock_guard<std::mutex> lock(pending_candidates_mutex);
if (offer_sent) {
should_send = true;
} else {
pending_candidates.push_back(candidate);
}
}
if (should_send) {
SendTrickleCandidate(candidate);
}
}
});

// Set up async ICE gathering completion notification
peer_connection->onGatheringStateChange([this](rtc::PeerConnection::GatheringState state) {
if (state == rtc::PeerConnection::GatheringState::Complete) {
{
std::lock_guard<std::mutex> lock(ice_gathering_mutex);
ice_gathering_complete = true;
ice_gathering_cv.notify_one();
}
// Only send end-of-candidates after the final (post-response) gather
// completes, not after the pre-offer OPTIONS gather. This ensures
// candidates from POST response ICE servers aren't ignored (RFC 8840).
if (has_ice_servers && post_response_gather_started) {
SendEndOfCandidates();
}
}
});

peer_connection->onStateChange([this](rtc::PeerConnection::State state) {
switch (state) {
case rtc::PeerConnection::State::New:
Expand Down Expand Up @@ -400,8 +543,34 @@ bool WHIPOutput::Connect()
std::string read_buffer;
std::vector<std::string> http_headers;

#if RTC_VERSION_MAJOR == 0 && RTC_VERSION_MINOR > 20 || RTC_VERSION_MAJOR > 0
// Smart waiting: if we have ICE servers, wait for first candidate OR 150ms.
// This gets us at least host candidates quickly, and likely some STUN
// candidates too. Any candidates gathered after offer is sent will be
// trickled via PATCH.
if (has_ice_servers) {
std::unique_lock<std::mutex> lock(ice_gathering_mutex);
if (!has_first_candidate) {
// 150ms balances latency vs. candidate coverage; typically enough for host + STUN
auto timeout = std::chrono::milliseconds(150);
ice_gathering_cv.wait_for(lock, timeout, [this] { return has_first_candidate.load(); });
}
}
#endif

auto offer_sdp = std::string(peer_connection->localDescription().value());

// Extract ICE credentials for trickle PATCH requests
std::regex re_ufrag("a=ice-ufrag:([^\\r\\n]+)");
std::regex re_pwd("a=ice-pwd:([^\\r\\n]+)");
std::smatch match;
if (std::regex_search(offer_sdp, match, re_ufrag)) {
ice_ufrag = match[1];
}
if (std::regex_search(offer_sdp, match, re_pwd)) {
ice_pwd = match[1];
}

#ifdef DEBUG_SDP
do_log(LOG_DEBUG, "Offer SDP:\n%s", offer_sdp.c_str());
#endif
Expand Down Expand Up @@ -531,6 +700,23 @@ bool WHIPOutput::Connect()
do_log(LOG_DEBUG, "WHIP Resource URL is: %s", resource_url.c_str());
curl_url_cleanup(url_builder);

// Flush any candidates that arrived during the POST request (trickle ICE only)
// Set offer_sent inside the lock to prevent race with onLocalCandidate callback
if (has_ice_servers) {
std::vector<rtc::Candidate> candidates_to_send;
{
std::lock_guard<std::mutex> lock(pending_candidates_mutex);
offer_sent = true;
candidates_to_send = std::move(pending_candidates);
pending_candidates.clear();
}
for (const auto &candidate : candidates_to_send) {
SendTrickleCandidate(candidate);
}
} else {
offer_sent = true;
}

#ifdef DEBUG_SDP
do_log(LOG_DEBUG, "Answer SDP:\n%s", read_buffer.c_str());
#endif
Expand Down Expand Up @@ -574,6 +760,11 @@ bool WHIPOutput::Connect()
doCleanup(false);

#if RTC_VERSION_MAJOR == 0 && RTC_VERSION_MINOR > 20 || RTC_VERSION_MAJOR > 0
// Always gather with POST response servers to:
// 1. Get host candidates even if no ICE servers provided
// 2. Incorporate any TURN servers/credentials from the POST response
// Mark that this is the final gather - end-of-candidates will be sent when complete
post_response_gather_started = true;
peer_connection->gatherLocalCandidates(iceServers);
#endif

Expand Down Expand Up @@ -718,6 +909,90 @@ void WHIPOutput::Send(void *data, uintptr_t size, uint64_t duration, std::shared
}
}

void WHIPOutput::SendTrickleIcePatch(const std::string &sdp_frag)
{
struct curl_slist *headers = NULL;
headers = curl_slist_append(headers, "Content-Type: application/trickle-ice-sdpfrag");
if (!bearer_token.empty()) {
auto bearer_token_header = std::string("Authorization: Bearer ") + bearer_token;
headers = curl_slist_append(headers, bearer_token_header.c_str());
}
headers = curl_slist_append(headers, user_agent.c_str());

char error_buffer[CURL_ERROR_SIZE] = {};

CURL *c = curl_easy_init();
curl_easy_setopt(c, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(c, CURLOPT_URL, resource_url.c_str());
curl_easy_setopt(c, CURLOPT_CUSTOMREQUEST, "PATCH");
curl_easy_setopt(c, CURLOPT_COPYPOSTFIELDS, sdp_frag.c_str());
curl_easy_setopt(c, CURLOPT_TIMEOUT, 8L);
curl_easy_setopt(c, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(c, CURLOPT_UNRESTRICTED_AUTH, 1L);
curl_easy_setopt(c, CURLOPT_ERRORBUFFER, error_buffer);

CURLcode res = curl_easy_perform(c);
if (res != CURLE_OK) {
do_log(LOG_WARNING, "Trickle ICE PATCH failed: %s",
error_buffer[0] ? error_buffer : curl_easy_strerror(res));
} else {
long response_code = 0;
curl_easy_getinfo(c, CURLINFO_RESPONSE_CODE, &response_code);
if (response_code < 200 || response_code >= 300) {
do_log(LOG_WARNING, "Trickle ICE PATCH returned HTTP %ld", response_code);
}
}

curl_easy_cleanup(c);
curl_slist_free_all(headers);
}

void WHIPOutput::SendTrickleCandidate(const rtc::Candidate &candidate)
{
// Guard: credentials not yet extracted from offer SDP
if (resource_url.empty() || ice_ufrag.empty() || ice_pwd.empty()) {
return;
}

// Build SDP fragment with single candidate (RFC 8840 compliant)
std::string sdp_frag;
sdp_frag.append("a=ice-ufrag:" + ice_ufrag + "\r\n");
sdp_frag.append("a=ice-pwd:" + ice_pwd + "\r\n");
std::string mid = candidate.mid();
if (!mid.empty()) {
sdp_frag.append("a=mid:" + mid + "\r\n");
}
sdp_frag.append("a=" + candidate.candidate() + "\r\n");

do_log(LOG_DEBUG, "Trickle ICE candidate (mid=%s): %s", mid.c_str(), candidate.candidate().c_str());
SendTrickleIcePatch(sdp_frag);
}

void WHIPOutput::SendEndOfCandidates()
{
// Guard: credentials not yet extracted from offer SDP
if (resource_url.empty() || ice_ufrag.empty() || ice_pwd.empty()) {
return;
}

// Build SDP fragment with end-of-candidates marker (RFC 8840 compliant)
std::string sdp_frag;
sdp_frag.append("a=ice-ufrag:" + ice_ufrag + "\r\n");
sdp_frag.append("a=ice-pwd:" + ice_pwd + "\r\n");
std::string mid;
{
std::lock_guard<std::mutex> lock(ice_gathering_mutex);
mid = first_mid;
}
if (!mid.empty()) {
sdp_frag.append("a=mid:" + mid + "\r\n");
}
sdp_frag.append("a=end-of-candidates\r\n");

do_log(LOG_DEBUG, "Sending end-of-candidates");
SendTrickleIcePatch(sdp_frag);
}

void register_whip_output()
{
const uint32_t base_flags = OBS_OUTPUT_ENCODED | OBS_OUTPUT_SERVICE | OBS_OUTPUT_MULTI_TRACK_AV;
Expand Down
23 changes: 23 additions & 0 deletions plugins/obs-webrtc/whip-output.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@
#include <mutex>
#include <thread>
#include <algorithm>
#include <chrono>
#include <condition_variable>

#include <rtc/rtc.hpp>
#include <vector>
#include <rtc/candidate.hpp>

struct videoLayerState {
uint16_t sequenceNumber;
Expand Down Expand Up @@ -45,15 +49,34 @@ class WHIPOutput {
void SendDelete();
void StopThread(bool signal);
void ParseLinkHeader(std::string linkHeader, std::vector<rtc::IceServer> &iceServers);
bool FetchIceServersViaOptions(std::vector<rtc::IceServer> &iceServers);
void Send(void *data, uintptr_t size, uint64_t duration, std::shared_ptr<rtc::Track> track,
std::shared_ptr<rtc::RtcpSrReporter> rtcp_sr_reporter);
void SendTrickleCandidate(const rtc::Candidate &candidate);
void SendEndOfCandidates();
void SendTrickleIcePatch(const std::string &sdp_frag);

obs_output_t *output;

std::string endpoint_url;
std::string bearer_token;
std::string resource_url;

std::mutex ice_gathering_mutex;
std::condition_variable ice_gathering_cv;
std::atomic<bool> ice_gathering_complete;
std::atomic<bool> has_first_candidate;
std::atomic<bool> offer_sent;
bool has_ice_servers;

// Trickle ICE support (RFC 8840)
std::string ice_ufrag;
std::string ice_pwd;
std::string first_mid;
std::vector<rtc::Candidate> pending_candidates; // Queued until POST completes
std::mutex pending_candidates_mutex;
std::atomic<bool> post_response_gather_started; // Distinguishes pre-offer vs final gather

std::atomic<bool> running;

std::mutex start_stop_mutex;
Expand Down