obs推流核心流程分析

2023-03-16 21:00:23

 

前置步驟和錄屏是一樣的,見我的上一篇文章

https://www.cnblogs.com/billin/p/17219558.html

bool obs_output_actual_start(obs_output_t *output)在上文的這個函數中,如果是啟用推流直播,函數指標會轉到這裡

//碧麟精簡批註版
//rtmp推流開始
static bool rtmp_stream_start(void *data)
{
	struct rtmp_stream *stream = data;

	os_atomic_set_bool(&stream->connecting, true);
	
    //開啟推流執行緒
    return pthread_create(&stream->connect_thread, NULL, connect_thread,
			      stream) == 0;
}

推流使用的是obs-outputs外掛

上面函數就是新開一個執行緒,來執行

這個函數static void *connect_thread(void *data),執行緒傳入引數是stream 

先看一下rtmp_stream結構

//rtmp stream結構
struct rtmp_stream {
	//obs_output
    obs_output_t *output;
    
    //packet資訊
	struct circlebuf packets;
	bool sent_headers;

	bool got_first_video;
	int64_t start_dts_offset;

	volatile bool connecting;
    //連線執行緒地址
	pthread_t connect_thread;
    //傳送執行緒地址
	pthread_t send_thread;

	os_sem_t *send_sem;
    
    //推流地址,key
    //比如b站,推流地址是rtmp://live-push.bilivideo.com/live-bvc/
	struct dstr path, key;
    //使用者名稱,密碼
	struct dstr username, password;
    //編碼器名字:
    //我用的是FMLE/3.0 (compatible; FMSc/1.0)
	struct dstr encoder_name;
	struct dstr bind_ip;


	int64_t last_dts_usec;

	uint64_t total_bytes_sent;
	int dropped_frames;

	pthread_mutex_t dbr_mutex;
	struct circlebuf dbr_frames;
	size_t dbr_data_size;

	long audio_bitrate;
	long dbr_est_bitrate;
	long dbr_orig_bitrate;
	long dbr_prev_bitrate;
	long dbr_cur_bitrate;
	long dbr_inc_bitrate;
	bool dbr_enabled;
    
    // RTMP結構物件
	RTMP rtmp;
    
	pthread_t socket_thread;
	uint8_t *write_buf;
	size_t write_buf_len;
	size_t write_buf_size;
	pthread_mutex_t write_buf_mutex;
	
};

rtmp_stream結構裡儲存了推流的所有關鍵資訊,包括推流地址,key,流的編碼器等引數

還儲存了幾個關鍵的指標,用於多執行緒中排程。有連線執行緒connect_thread,也有傳送執行緒pthread_t send_thread;

 

//rtmp連線執行緒
static void *connect_thread(void *data)
{
	struct rtmp_stream *stream = data;
	int ret;
    //設定執行緒名
	os_set_thread_name("rtmp-stream: connect_thread");
    
    //初始化
	if (!silently_reconnecting(stream)) {
		if (!init_connect(stream)) {
			obs_output_signal_stop(stream->output,
					       OBS_OUTPUT_BAD_PATH);
			os_atomic_set_bool(&stream->silent_reconnect, false);
			return NULL;
		}
	} else {
		struct encoder_packet packet;
		peek_next_packet(stream, &packet);
		stream->start_dts_offset = get_ms_time(&packet, packet.dts);
	}
    
    //連線
	ret = try_connect(stream);

	if (ret != OBS_OUTPUT_SUCCESS) {
		obs_output_signal_stop(stream->output, ret);
		info("Connection to %s failed: %d", stream->path.array, ret);
	}

	if (!stopping(stream))
		pthread_detach(stream->connect_thread);

	os_atomic_set_bool(&stream->silent_reconnect, false);
	os_atomic_set_bool(&stream->connecting, false);
	return NULL;
}

上面的方法是在單獨的執行緒中執行的,主要是做了下面幾件事:

1 設定執行緒名為「rtmp-stream :connect_thread」

2 初始化 init_connect

3 通過呼叫try_connect(stream)執行實際連線邏輯

//rtmp connect
static int try_connect(struct rtmp_stream *stream)
{
	info("Connecting to RTMP URL %s...", stream->path.array);

    //rtmp初始化
	RTMP_Init(&stream->rtmp);
    
    //設定URL
	if (!RTMP_SetupURL(&stream->rtmp, stream->path.array))
		return OBS_OUTPUT_BAD_PATH;

	RTMP_EnableWrite(&stream->rtmp);
    
    //設定流編碼格式
	dstr_copy(&stream->encoder_name, "FMLE/3.0 (compatible; FMSc/1.0)");
    
    //設定使用者名稱密碼
	set_rtmp_dstr(&stream->rtmp.Link.pubUser, &stream->username);
	set_rtmp_dstr(&stream->rtmp.Link.pubPasswd, &stream->password);
	set_rtmp_dstr(&stream->rtmp.Link.flashVer, &stream->encoder_name);
	stream->rtmp.Link.swfUrl = stream->rtmp.Link.tcUrl;
	stream->rtmp.Link.customConnectEncode = add_connect_data;

	if (dstr_is_empty(&stream->bind_ip) ||
	    dstr_cmp(&stream->bind_ip, "default") == 0) {
		memset(&stream->rtmp.m_bindIP, 0,
		       sizeof(stream->rtmp.m_bindIP));
	} else {
		bool success = netif_str_to_addr(&stream->rtmp.m_bindIP.addr,
						 &stream->rtmp.m_bindIP.addrLen,
						 stream->bind_ip.array);
		if (success) {
			int len = stream->rtmp.m_bindIP.addrLen;
			bool ipv6 = len == sizeof(struct sockaddr_in6);
			info("Binding to IPv%d", ipv6 ? 6 : 4);
		}
	}

	RTMP_AddStream(&stream->rtmp, stream->key.array);

	stream->rtmp.m_outChunkSize = 4096;
	stream->rtmp.m_bSendChunkSizeInfo = true;
	stream->rtmp.m_bUseNagle = true;

    //連線
	if (!RTMP_Connect(&stream->rtmp, NULL)) {
		set_output_error(stream);
		return OBS_OUTPUT_CONNECT_FAILED;
	}

	if (!RTMP_ConnectStream(&stream->rtmp, 0))
		return OBS_OUTPUT_INVALID_STREAM;

	info("Connection to %s successful", stream->path.array);
    
    //到這裡說明連線成功,開始初始化send邏輯,準備推流
	return init_send(stream);
}

當連線成功,開始呼叫init_send函數,初始化send,準備推流

需要注意,send也是在單獨的線

程處理的,因為傳視訊比較大,如果不單開執行緒,肯定會造成阻塞。

//初始化rtmp send邏輯
static int init_send(struct rtmp_stream *stream)
{
	int ret;
	obs_output_t *context = stream->output;

    //建立send執行緒,執行send_thread方法,引數是stream
	ret = pthread_create(&stream->send_thread, NULL, send_thread, stream);

	if (stream->new_socket_loop) {
		int one = 1;
#ifdef _WIN32
		if (ioctlsocket(stream->rtmp.m_sb.sb_socket, FIONBIO, &one)) {
			stream->rtmp.last_error_code = WSAGetLastError();
#else
		if (ioctl(stream->rtmp.m_sb.sb_socket, FIONBIO, &one)) {
			stream->rtmp.last_error_code = errno;
#endif
			warn("Failed to set non-blocking socket");
			return OBS_OUTPUT_ERROR;
		}

		os_event_reset(stream->send_thread_signaled_exit);

		info("New socket loop enabled by user");
		if (stream->low_latency_mode)
			info("Low latency mode enabled by user");

		if (stream->write_buf)
			bfree(stream->write_buf);

		int total_bitrate = 0;

		obs_encoder_t *vencoder = obs_output_get_video_encoder(context);
		if (vencoder) {
			obs_data_t *params = obs_encoder_get_settings(vencoder);
			if (params) {
				int bitrate =
					obs_data_get_int(params, "bitrate");
				if (!bitrate) {
					warn("Video encoder didn't return a "
					     "valid bitrate, new network "
					     "code may function poorly. "
					     "Low latency mode disabled.");
					stream->low_latency_mode = false;
					bitrate = 10000;
				}
				total_bitrate += bitrate;
				obs_data_release(params);
			}
		}

		obs_encoder_t *aencoder =
			obs_output_get_audio_encoder(context, 0);
		if (aencoder) {
			obs_data_t *params = obs_encoder_get_settings(aencoder);
			if (params) {
				int bitrate =
					obs_data_get_int(params, "bitrate");
				if (!bitrate)
					bitrate = 160;
				total_bitrate += bitrate;
				obs_data_release(params);
			}
		}

		// to bytes/sec
		int ideal_buffer_size = total_bitrate * 128;

		if (ideal_buffer_size < 131072)
			ideal_buffer_size = 131072;

		stream->write_buf_size = ideal_buffer_size;
		stream->write_buf = bmalloc(ideal_buffer_size);

#ifdef _WIN32
		ret = pthread_create(&stream->socket_thread, NULL,
				     socket_thread_windows, stream);
#else
		warn("New socket loop not supported on this platform");
		return OBS_OUTPUT_ERROR;
#endif

		if (ret != 0) {
			RTMP_Close(&stream->rtmp);
			warn("Failed to create socket thread");
			return OBS_OUTPUT_ERROR;
		}

		stream->socket_thread_active = true;
		stream->rtmp.m_bCustomSend = true;
		stream->rtmp.m_customSendFunc = socket_queue_data;
		stream->rtmp.m_customSendParam = stream;
	}

	os_atomic_set_bool(&stream->active, true);

	if (!send_meta_data(stream)) {
		warn("Disconnected while attempting to send metadata");
		set_output_error(stream);
		return OBS_OUTPUT_DISCONNECTED;
	}

	obs_encoder_t *aencoder = obs_output_get_audio_encoder(context, 1);
	if (aencoder && !send_additional_meta_data(stream)) {
		warn("Disconnected while attempting to send additional "
		     "metadata");
		return OBS_OUTPUT_DISCONNECTED;
	}

	if (obs_output_get_audio_encoder(context, 2) != NULL) {
		warn("Additional audio streams not supported");
		return OBS_OUTPUT_DISCONNECTED;
	}

	if (!silently_reconnecting(stream))
		obs_output_begin_data_capture(stream->output, 0);

	return OBS_OUTPUT_SUCCESS;
}

 

核心推流執行緒,在單獨的執行緒裡完成推流邏輯

 

//推流核心執行緒
static void *send_thread(void *data)
{
	struct rtmp_stream *stream = data;
    
    //設定執行緒名
	os_set_thread_name("rtmp-stream: send_thread");

    
    //設定buffersize
#if defined(_WIN32)
	// Despite MSDN claiming otherwise, send buffer auto tuning on
	// Windows 7 doesn't seem to work very well.
	if (get_win_ver_int() == 0x601) {
		DWORD cur_sendbuf_size;
		DWORD desired_sendbuf_size = 524288;
		socklen_t int_size = sizeof(int);

		if (!getsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET,
				SO_SNDBUF, (char *)&cur_sendbuf_size,
				&int_size) &&
		    cur_sendbuf_size < desired_sendbuf_size) {

			setsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET,
				   SO_SNDBUF, (char *)&desired_sendbuf_size,
				   sizeof(desired_sendbuf_size));
		}
	}

	log_sndbuf_size(stream);
#endif
    
    //推流主迴圈
	while (os_sem_wait(stream->send_sem) == 0) {
		struct encoder_packet packet;
		struct dbr_frame dbr_frame;

		if (stopping(stream) && stream->stop_ts == 0) {
			break;
		}

		if (!get_next_packet(stream, &packet))
			continue;

		if (stopping(stream)) {
			if (can_shutdown_stream(stream, &packet)) {
				obs_encoder_packet_release(&packet);
				break;
			}
		}

		if (!stream->sent_headers) {
			if (!send_headers(stream)) {
				os_atomic_set_bool(&stream->disconnected, true);
				break;
			}
		}

		/* silent reconnect signal received from server, reconnect on
		 * next keyframe */
		if (silently_reconnecting(stream) &&
		    packet.type == OBS_ENCODER_VIDEO && packet.keyframe) {
			reinsert_packet_at_front(stream, &packet);
			break;
		}

		if (stream->dbr_enabled) {
			dbr_frame.send_beg = os_gettime_ns();
			dbr_frame.size = packet.size;
		}

		if (send_packet(stream, &packet, false, packet.track_idx) < 0) {
			os_atomic_set_bool(&stream->disconnected, true);
			break;
		}

		if (stream->dbr_enabled) {
			dbr_frame.send_end = os_gettime_ns();

			pthread_mutex_lock(&stream->dbr_mutex);
			dbr_add_frame(stream, &dbr_frame);
			pthread_mutex_unlock(&stream->dbr_mutex);
		}
	}

	bool encode_error = os_atomic_load_bool(&stream->encode_error);

	if (disconnected(stream)) {
		info("Disconnected from %s", stream->path.array);
	} else if (encode_error) {
		info("Encoder error, disconnecting");
	} else if (silently_reconnecting(stream)) {
		info("Silent reconnect signal received from server");
	} else {
		info("User stopped the stream");
	}

#if defined(_WIN32)
	log_sndbuf_size(stream);
#endif

	if (stream->new_socket_loop) {
		os_event_signal(stream->send_thread_signaled_exit);
		os_event_signal(stream->buffer_has_data_event);
		pthread_join(stream->socket_thread, NULL);
		stream->socket_thread_active = false;
		stream->rtmp.m_bCustomSend = false;
	}

	set_output_error(stream);

	if (silently_reconnecting(stream)) {
		/* manually close the socket to prevent librtmp from sending
		 * unpublish / deletestream messages when we call RTMP_Close,
		 * since we want to re-use this stream when we reconnect */
		RTMPSockBuf_Close(&stream->rtmp.m_sb);
		stream->rtmp.m_sb.sb_socket = -1;
	}

	RTMP_Close(&stream->rtmp);

	/* reset bitrate on stop */
	if (stream->dbr_enabled) {
		if (stream->dbr_cur_bitrate != stream->dbr_orig_bitrate) {
			stream->dbr_cur_bitrate = stream->dbr_orig_bitrate;
			dbr_set_bitrate(stream);
		}
	}

	if (!stopping(stream)) {
		pthread_detach(stream->send_thread);
		if (!silently_reconnecting(stream))
			obs_output_signal_stop(stream->output,
					       OBS_OUTPUT_DISCONNECTED);
	} else if (encode_error) {
		obs_output_signal_stop(stream->output, OBS_OUTPUT_ENCODE_ERROR);
	} else {
		obs_output_end_data_capture(stream->output);
	}

	if (!silently_reconnecting(stream)) {
		free_packets(stream);
		os_event_reset(stream->stop_event);
		os_atomic_set_bool(&stream->active, false);
	}

	stream->sent_headers = false;

	/* reset bitrate on stop */
	if (stream->dbr_enabled) {
		if (stream->dbr_cur_bitrate != stream->dbr_orig_bitrate) {
			stream->dbr_cur_bitrate = stream->dbr_orig_bitrate;
			dbr_set_bitrate(stream);
		}
	}

	if (silently_reconnecting(stream)) {
		rtmp_stream_start(stream);
	}

	return NULL;
}