问题

class VideoInitiatorFragment : Fragment() {

companion object {
fun newInstance(
room: String,
stunUrl: String,
turnUrl: String,
turnUsername: String,
turnPassword: String,
signalingServerUrl: String
): VideoInitiatorFragment {
val fragment = VideoInitiatorFragment()
val args = Bundle().apply {
putString("room", room)
putString("stunUrl", stunUrl)
putString("turnUrl", turnUrl)
putString("turnUsername", turnUsername)
putString("turnPassword", turnPassword)
putString("signalingServerUrl", signalingServerUrl)
}
fragment.arguments = args
return fragment
}
}

// 类成员变量保持不变
private lateinit var socket: Socket
private var localPeer: PeerConnection? = null
private var localView: SurfaceViewRenderer? = null
private var localEglBase: EglBase? = null
private val pendingIceCandidates = mutableListOf<IceCandidate>()
private var currentRoom: String? = null
private lateinit var signalingServerUrl: String
private lateinit var stunUrl: String
private lateinit var turnUrl: String
private lateinit var turnUsername: String
private lateinit var turnPassword: String

private val TAG: String = "WebRTC-Initiator"

// RTT State Variables
private val maxRttState = mutableStateOf(0L)
private val minRttState = mutableStateOf(Long.MAX_VALUE)
private val averageRttState = mutableStateOf(0L)
private val latestRttState = mutableStateOf(0L)

// History list to compute average RTT
private val rttHistory = mutableStateListOf<Long>()

// Job for stats collection
private var statsJob: Job? = null

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

// 从参数中获取数据
currentRoom = arguments?.getString("room") ?: "default-room"
signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg"
stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478"
turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349"
turnUsername = arguments?.getString("turnUsername") ?: "wstszx"
turnPassword = arguments?.getString("turnPassword") ?: "930379"

Log.d(
TAG,
"onCreate: 角色 = 服务器, 房间 = $currentRoom, 信令服务器 = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl"
)
}

private val requestPermissionsLauncher =
registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions ->
permissions.entries.forEach { (permission, isGranted) ->
if (isGranted) {
Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show()
} else {
Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show()
}
}
onPermissionsChecked()
}

override fun onCreateView(
inflater: android.view.LayoutInflater,
container: android.view.ViewGroup?,
savedInstanceState: Bundle?
): android.view.View {
return ComposeView(requireContext()).apply {
setContent {
WebRTCComposeLayout()
}
}
}

@Composable
fun WebRTCComposeLayout() {
val context = LocalContext.current
lateinit var peerConnectionFactory: PeerConnectionFactory
var localVideoTrack: VideoTrack? by remember { mutableStateOf(null) }

Surface(color = Color.Black) {
Column(modifier = Modifier.fillMaxSize()) {

// 本地视频视图,填充整个屏幕
AndroidView(
factory = {
localView = SurfaceViewRenderer(it).apply {
setZOrderMediaOverlay(false)
}
localView!!
},
modifier = Modifier
.weight(1f)
.fillMaxWidth(),
update = {
if (localEglBase == null) {
localEglBase = EglBase.create()
it.init(localEglBase!!.eglBaseContext, null)
it.setMirror(false)
}
}
)

Spacer(modifier = Modifier.height(8.dp))

// RTT 统计数据显示
Column(modifier = Modifier.padding(16.dp)) {
Text(
text = "最新往返时延: ${latestRttState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最大往返时延: ${maxRttState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最小往返时延: ${minRttState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "平均往返时延: ${averageRttState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 8.dp)
)
}

// 其他 UI 组件...

}

LaunchedEffect(Unit) {
val options = PeerConnectionFactory.InitializationOptions.builder(context)
.createInitializationOptions()
PeerConnectionFactory.initialize(options)

val encoderFactory = DefaultVideoEncoderFactory(
localEglBase!!.eglBaseContext, true, true
)

val decoderFactory = DefaultVideoDecoderFactory(localEglBase!!.eglBaseContext)

peerConnectionFactory = PeerConnectionFactory.builder()
.setVideoEncoderFactory(encoderFactory)
.setVideoDecoderFactory(decoderFactory)
.createPeerConnectionFactory()

initLocalVideo(context, localView, peerConnectionFactory, localEglBase!!) {
localVideoTrack = it
}

createPeerConnection(
context,
peerConnectionFactory,
localVideoTrack
) {
localPeer = it
startStatsCollection() // 启动统计收集
}

initializeSocketIO()

requestPermissionsIfNeeded()
}
}
}

private fun startStatsCollection() {
statsJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(1000) // 每秒收集一次统计数据
localPeer?.getStats { report ->
parseStatsReport(report)
} ?: Log.e(TAG, "Failed to get stats: localPeer is null.")
}
}
}

private fun parseStatsReport(report: RTCStatsReport) {
Log.d(TAG, "RTCStatsReport: ${report.statsMap}")
for (stats in report.statsMap.values) {
Log.d(TAG, "Stats type: ${stats.type}")
if (stats.type == "transport") {
Log.d(TAG, "Transport Stats found: $stats")
val currentRtt = (stats.members["currentRoundTripTime"] as? Number)?.toDouble()?.times(1000)?.toLong()
if (currentRtt != null && currentRtt > 0) {
viewLifecycleOwner.lifecycleScope.launch {
// 更新最新 RTT
latestRttState.value = currentRtt

// 更新历史记录
rttHistory.add(currentRtt)
if (rttHistory.size > 60) {
rttHistory.removeAt(0)
}

// 计算最大、最小和平均 RTT
val maxRtt = rttHistory.maxOrNull() ?: 0L
val minRtt = rttHistory.minOrNull() ?: 0L
val averageRtt = if (rttHistory.isNotEmpty()) {
rttHistory.average().toLong()
} else {
0L
}

maxRttState.value = maxRtt
minRttState.value = minRtt
averageRttState.value = averageRtt

Log.d(TAG, "RTT - Latest: $currentRtt ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms")
}
}
}
}
}

private fun startSendingTimestamps() {
viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(5000) // 每5秒发送一次时间戳
val currentTime = System.currentTimeMillis()
val timestampData = JSONObject().apply {
put("type", "timestamp")
put("timestamp", currentTime)
put("room", currentRoom)
}
socket.emit("signal", timestampData)
Log.d(TAG, "发送时间戳: $currentTime")
}
}
}

private fun initializeSocketIO() {
val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http"
val socketUrl = signalingServerUrl

val options = IO.Options().apply {
transports = arrayOf("websocket")
secure = protocol == "https"
path = "/socket.io/"
}

try {
socket = IO.socket(socketUrl, options)

socket.on(Socket.EVENT_CONNECT) {
Log.d(TAG, "Socket 已连接")
socket.emit("join", currentRoom)
Log.d(TAG, "已加入房间: $currentRoom")
startSendingTimestamps()
}

socket.on(Socket.EVENT_CONNECT_ERROR) { args ->
if (args.isNotEmpty()) {
val error = args[0]
Log.e(TAG, "Socket 连接错误: $error")
}
}

socket.on(Socket.EVENT_DISCONNECT) { args ->
if (args.isNotEmpty()) {
val reason = args[0]
Log.d(TAG, "Socket 已断开: $reason")
}
}

socket.on("signal") { args ->
Log.d(TAG, "收到信令: ${args[0]}")
if (args.isNotEmpty() && args[0] is JSONObject) {
val data = args[0] as JSONObject
handleSignalingData(data)
}
}

socket.connect()
Log.d(TAG, "正在连接到 Socket: $socketUrl...")
} catch (e: Exception) {
Log.e(TAG, "连接 Socket 时出错: ${e.message}")
}
}

private fun requestPermissionsIfNeeded() {
val permissions = arrayOf(
Manifest.permission.CAMERA,
Manifest.permission.RECORD_AUDIO,
Manifest.permission.INTERNET,
Manifest.permission.ACCESS_NETWORK_STATE
)

val permissionsToRequest = permissions.filter {
ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED
}

if (permissionsToRequest.isNotEmpty()) {
requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray())
} else {
onPermissionsChecked()
}
}

private fun onPermissionsChecked() {
Toast.makeText(requireContext(), "所有必要权限已被授予", Toast.LENGTH_SHORT).show()
}

private fun initLocalVideo(
context: Context,
localView: SurfaceViewRenderer?,
peerConnectionFactory: PeerConnectionFactory,
eglBase: EglBase,
onLocalVideoTrack: (VideoTrack) -> Unit
) {
val videoCapturer = createCameraCapturer(context)
val surfaceTextureHelper = SurfaceTextureHelper.create("CaptureThread", eglBase.eglBaseContext)
val videoSource = peerConnectionFactory.createVideoSource(videoCapturer.isScreencast)
videoCapturer.initialize(surfaceTextureHelper, context, videoSource.capturerObserver)

videoCapturer.startCapture(1920, 1080, 60)

val localVideoTrack = peerConnectionFactory.createVideoTrack("video_track", videoSource)
localVideoTrack.addSink(localView)

val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints())
val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource)

// 添加音视频轨道到本地流
val mediaStream = peerConnectionFactory.createLocalMediaStream("local_stream")
mediaStream.addTrack(localAudioTrack)
mediaStream.addTrack(localVideoTrack)

onLocalVideoTrack(localVideoTrack)
}

private fun createCameraCapturer(context: Context): CameraVideoCapturer {
val camera2Enumerator = Camera2Enumerator(context)
val deviceNames = camera2Enumerator.deviceNames

// 优先选择后置摄像头
for (deviceName in deviceNames) {
if (camera2Enumerator.isBackFacing(deviceName)) {
val capturer = camera2Enumerator.createCapturer(deviceName, null)
if (capturer != null) {
return capturer
}
}
}

// 如果没有后置摄像头,则尝试前置摄像头
for (deviceName in deviceNames) {
if (camera2Enumerator.isFrontFacing(deviceName)) {
val capturer = camera2Enumerator.createCapturer(deviceName, null)
if (capturer != null) {
return capturer
}
}
}

// 如果没有前置摄像头,则使用第一个摄像头
return camera2Enumerator.createCapturer(deviceNames[0], null)
?: throw IllegalStateException("无法创建摄像头捕获器")
}

private fun createPeerConnection(
context: Context,
peerConnectionFactory: PeerConnectionFactory,
localVideoTrack: VideoTrack?,
onLocalPeerCreated: (PeerConnection) -> Unit
) {
val iceServers = listOf(
PeerConnection.IceServer.builder(stunUrl).createIceServer(),
PeerConnection.IceServer.builder(turnUrl)
.setUsername(turnUsername)
.setPassword(turnPassword)
.createIceServer()
)

val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply {
bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE
rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE
tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED
continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY
sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN
}

localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer {
override fun onIceCandidate(iceCandidate: IceCandidate?) {
iceCandidate?.let {
Log.d(TAG, "ICE candidate: $it")
val signalData = JSONObject().apply {
put("type", "ice")
put("candidate", JSONObject().apply {
put("sdpMid", it.sdpMid)
put("sdpMLineIndex", it.sdpMLineIndex)
put("candidate", it.sdp)
})
put("room", currentRoom)
}
socket.emit("signal", signalData)
}
}

override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) {
Log.d(TAG, "ICE candidates removed")
}

override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
Log.d(TAG, "Signaling state changed to: $newState")
}

override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
Log.d(TAG, "ICE connection state changed to: $newState")
}

override fun onIceConnectionReceivingChange(receiving: Boolean) {
Log.d(TAG, "ICE connection receiving change: $receiving")
}

override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
Log.d(TAG, "ICE gathering state changed to: $newState")
}

override fun onAddStream(stream: MediaStream?) {
Log.d(TAG, "Stream added")
}

override fun onRemoveStream(stream: MediaStream?) {
Log.d(TAG, "Stream removed")
}

override fun onDataChannel(dataChannel: DataChannel?) {
Log.d(TAG, "Data channel created")
}

override fun onRenegotiationNeeded() {
Log.d(TAG, "Renegotiation needed")
}

override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) {
Log.d(TAG, "Track added")
}

override fun onTrack(transceiver: RtpTransceiver?) {
Log.d(TAG, "onTrack called")
}

override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
Log.d(TAG, "Connection state changed to: $newState")
}
})

localVideoTrack?.let {
localPeer?.addTrack(it, listOf("local_stream"))
}
val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints())
val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource)
localPeer?.addTrack(localAudioTrack, listOf("local_stream"))

onLocalPeerCreated(localPeer!!)
}

private fun createAnswer(peerConnection: PeerConnection, onAnswerCreated: (String) -> Unit) {
Log.d(TAG, "Creating answer...")
val constraints = MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true"))
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"))
}

peerConnection.createAnswer(object : SdpObserver {
override fun onCreateSuccess(sessionDescription: SessionDescription?) {
sessionDescription?.let { sdp ->
peerConnection.setLocalDescription(object : SdpObserver {
override fun onSetSuccess() {
Log.d(TAG, "SetLocalDescription onSetSuccess")
onAnswerCreated(sdp.description)
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "SetLocalDescription onSetFailure: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}
}

override fun onSetSuccess() {
Log.d(TAG, "createAnswer onSetSuccess")
}

override fun onCreateFailure(error: String?) {
Log.e(TAG, "createAnswer onCreateFailure: $error")
}

override fun onSetFailure(error: String?) {}
}, constraints)
}

private fun handleSignalingData(data: JSONObject) {
Log.d(TAG, "Handling signaling data: $data")
when (data.getString("type")) {
"offer" -> {
Log.d(TAG, "Received offer")
val sdp = SessionDescription(
SessionDescription.Type.OFFER,
data.getJSONObject("sdp").getString("sdp")
)
localPeer?.setRemoteDescription(object : SdpObserver {
override fun onSetSuccess() {
Log.d(TAG, "Set remote description (offer) success")
createAnswer(localPeer!!) { answer ->
val signalData = JSONObject().apply {
put("type", "answer")
put("sdp", JSONObject().put("sdp", answer))
put("room", currentRoom)
}

socket.emit("signal", signalData)

pendingIceCandidates.forEach { candidate ->
localPeer?.addIceCandidate(candidate)
}
pendingIceCandidates.clear()
}
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set remote description (offer) error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}

"ice" -> {
Log.d(TAG, "Received ICE candidate")
val candidateData = data.getJSONObject("candidate")
val candidate = IceCandidate(
candidateData.getString("sdpMid"),
candidateData.getInt("sdpMLineIndex"),
candidateData.getString("candidate")
)

if (localPeer?.remoteDescription != null) {
localPeer?.addIceCandidate(candidate)
} else {
pendingIceCandidates.add(candidate)
}
}

"time_sync_request" -> {
Log.d(TAG, "收到 time_sync_request")
if (data.has("t1")) {
val t1 = data.getLong("t1")
val t2 = System.currentTimeMillis()
val t3 = System.currentTimeMillis()
val syncResponse = JSONObject().apply {
put("type", "time_sync_response")
put("t1", t1)
put("t2", t2)
put("t3", t3)
put("room", currentRoom)
}
socket.emit("signal", syncResponse)
Log.d(TAG, "回复 time_sync_response: t1=$t1, t2=$t2, t3=$t3")
} else {
Log.e(TAG, "time_sync_request 缺少 t1")
}
}

else -> {
Log.e(TAG, "Unknown signaling type: ${data.getString("type")}")
}
}
}

private fun requestTimeSync() {
val t1 = System.currentTimeMillis()
val syncRequest = JSONObject().apply {
put("type", "time_sync_request")
put("room", currentRoom)
put("t1", t1)
}
socket.emit("signal", syncRequest)
Log.d(TAG, "发送时间同步请求 at t1: $t1")
}

override fun onDestroyView() {
super.onDestroyView()
statsJob?.cancel()
socket.disconnect()
localPeer?.dispose()
localView?.release()
localEglBase?.release()
}
}


class VideoReceiverFragment : Fragment() {

companion object {
fun newInstance(
room: String,
stunUrl: String,
turnUrl: String,
turnUsername: String,
turnPassword: String,
signalingServerUrl: String
): VideoReceiverFragment {
val fragment = VideoReceiverFragment()
val args = Bundle().apply {
putString("room", room)
putString("stunUrl", stunUrl)
putString("turnUrl", turnUrl)
putString("turnUsername", turnUsername)
putString("turnPassword", turnPassword)
putString("signalingServerUrl", signalingServerUrl)
}
fragment.arguments = args
return fragment
}
}

// Class member variables
private lateinit var socket: Socket
private var localPeer: PeerConnection? = null
private var remoteView: SurfaceViewRenderer? = null
private var remoteEglBase: EglBase? = null
private val pendingIceCandidates = mutableListOf<IceCandidate>()
private var currentRoom: String? = null
private lateinit var signalingServerUrl: String
private lateinit var stunUrl: String
private lateinit var turnUrl: String
private lateinit var turnUsername: String
private lateinit var turnPassword: String
private val TAG: String = "WebRTC-Receiver"

private val frameRateState = mutableStateOf(0.0)
private val bitrateState = mutableStateOf(0L)
private val stutteringState = mutableStateOf(false)

private val frameRateLowState = mutableStateOf(false)
private val packetLossHighState = mutableStateOf(false)
private val packetLossState = mutableStateOf(0.0)

private val frameRateHistory = mutableStateListOf<Float>()
private val bitrateHistory = mutableStateListOf<Long>()
private var timeSyncJob: Job? = null

// 添加历史记录列表用于存储所有单向时延值
private val latencyHistory = mutableStateListOf<Long>()

// 添加状态变量用于存储最大、最小和平均单向时延
private val maxLatencyState = mutableStateOf(0L)
private val minLatencyState = mutableStateOf(Long.MAX_VALUE)
private val averageLatencyState = mutableStateOf(0L)
private val latestLatencyState = mutableStateOf(0L)

// History variables
private var prevFramesDecoded = 0.0
private var prevBytesReceived = 0.0
private var prevFramesReceived = 0.0
private var prevFramesDropped = 0.0
private var prevTimestamp = 0.0
private var timeOffset: Long = 0
private var t1: Long = 0

private var statsJob: Job? = null

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

// Retrieve data from arguments
currentRoom = arguments?.getString("room") ?: "default-room"
signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg"
stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478"
turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349"
turnUsername = arguments?.getString("turnUsername") ?: "wstszx"
turnPassword = arguments?.getString("turnPassword") ?: "930379"

Log.d(
TAG,
"onCreate: Role = Client, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl"
)
}

private val requestPermissionsLauncher =
registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions ->
permissions.entries.forEach { (permission, isGranted) ->
if (isGranted) {
Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show()
} else {
Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show()
}
}
onPermissionsChecked()
}

override fun onCreateView(
inflater: android.view.LayoutInflater,
container: android.view.ViewGroup?,
savedInstanceState: Bundle?
): android.view.View {
return ComposeView(requireContext()).apply {
setContent {
WebRTCComposeLayout()
}
}
}

@Composable
fun WebRTCComposeLayout() {
val context = LocalContext.current
lateinit var peerConnectionFactory: PeerConnectionFactory

Surface(color = Color.Black) {
Column(modifier = Modifier.fillMaxSize()) {

// Remote video view
AndroidView(
factory = {
remoteView = SurfaceViewRenderer(it).apply {
setZOrderMediaOverlay(false)
}
remoteView!!
},
modifier = Modifier
.weight(1f)
.fillMaxWidth(),
update = {
if (remoteEglBase?.eglBaseContext == null) {
remoteEglBase = EglBase.create()
it.init(remoteEglBase!!.eglBaseContext, null)
it.setMirror(false)
}
}
)

Spacer(modifier = Modifier.height(8.dp))

// 展示最大、最小和平均单向时延
Column(modifier = Modifier.padding(horizontal = 16.dp)) {
Text(
text = "当前单向时延: ${latestLatencyState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最大单向时延: ${maxLatencyState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最小单向时延: ${minLatencyState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "平均单向时延: ${averageLatencyState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 8.dp)
)
}

// Spacer between video and charts
Spacer(modifier = Modifier.height(8.dp))

// Frame Rate Section
Column(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 8.dp)
) {
// Frame Rate Text in Chinese
Text(
text = "帧率: ${frameRateState.value.roundToInt()} fps",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Log.d(
TAG,
"UI - Frame Rate: ${frameRateState.value} fps, Bitrate: ${bitrateState.value / 1000} kbps"
)

// Line Chart for Frame Rate
LineChart(
data = frameRateHistory,
modifier = Modifier
.height(200.dp)
.fillMaxWidth()
.padding(vertical = 8.dp),
lineColor = Color.Green,
backgroundColor = Color.Black,
yAxisLabel = "帧率 (fps)",
xAxisLabel = "时间 (秒)"
)
}

// Spacer between charts
Spacer(modifier = Modifier.height(8.dp))

// Bitrate Section
Column(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 16.dp)
) {
// Bitrate Text in Chinese
Text(
text = "码率: ${bitrateState.value / 1000} kbps",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)

// Line Chart for Bitrate
LineChart(
data = bitrateHistory.map { it / 1000f }, // Convert to kbps
modifier = Modifier
.height(200.dp)
.fillMaxWidth()
.padding(vertical = 8.dp),
lineColor = Color.Blue,
backgroundColor = Color.Black,
yAxisLabel = "码率 (kbps)",
xAxisLabel = "时间 (秒)"
)
}

// Spacer between metrics and stuttering indicator
Spacer(modifier = Modifier.height(16.dp))

// Stuttering Indicator
Row(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 16.dp),
verticalAlignment = Alignment.CenterVertically
) {
if (stutteringState.value) {
Icon(
imageVector = Icons.Default.Warning,
contentDescription = "卡顿警告",
tint = Color.Red,
modifier = Modifier.size(24.dp)
)
Spacer(modifier = Modifier.width(8.dp))
Column {
Text(
text = "视频播放出现卡顿",
color = Color.Red,
style = MaterialTheme.typography.bodyMedium
)
// Additional information about which metrics are abnormal
if (frameRateLowState.value) {
Text(
text = "帧率过低: ${frameRateState.value.roundToInt()} fps",
color = Color.Red,
style = MaterialTheme.typography.bodySmall
)
}
if (packetLossHighState.value) {
Text(
text = "包丢失率过高: ${packetLossState.value.roundToInt()}%",
color = Color.Red,
style = MaterialTheme.typography.bodySmall
)
}
}
} else {
Icon(
imageVector = Icons.Default.CheckCircle,
contentDescription = "正常",
tint = Color.Green,
modifier = Modifier.size(24.dp)
)
Spacer(modifier = Modifier.width(8.dp))
Text(
text = "视频播放正常",
color = Color.Green,
style = MaterialTheme.typography.bodyMedium
)
}
}
}

LaunchedEffect(Unit) {
val options = PeerConnectionFactory.InitializationOptions.builder(context)
.createInitializationOptions()
PeerConnectionFactory.initialize(options)

val encoderFactory = DefaultVideoEncoderFactory(
EglBase.create().eglBaseContext, true, true
)

val decoderFactory = DefaultVideoDecoderFactory(remoteEglBase!!.eglBaseContext)

peerConnectionFactory = PeerConnectionFactory.builder()
.setVideoEncoderFactory(encoderFactory)
.setVideoDecoderFactory(decoderFactory)
.createPeerConnectionFactory()

createPeerConnection(
context,
peerConnectionFactory,
remoteView!!
) {
localPeer = it
}

initializeSocketIO()

requestPermissionsIfNeeded()
}
}
}

private fun startPeriodicTimeSync(intervalMs: Long = 5000L) {
timeSyncJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
requestTimeSync()
delay(intervalMs)
}
}
}

/**
* Enhanced Line Chart Composable with Axes and Labels
* Modified to use straight lines connecting each point
*/
@Composable
fun LineChart(
data: List<Float>,
modifier: Modifier = Modifier,
lineColor: Color = Color.Green,
backgroundColor: Color = Color.Black,
yAxisLabel: String = "",
xAxisLabel: String = "",
minYValue: Float? = null,
maxYValue: Float? = null
) {
Canvas(modifier = modifier.background(backgroundColor)) {
val padding = 40.dp.toPx() // Padding for axes and labels

if (data.isEmpty()) return@Canvas

val maxY = maxYValue ?: data.maxOrNull() ?: 1f
val minY = minYValue ?: data.minOrNull() ?: 0f
val yRange = maxY - minY
val pointCount = data.size
val spacing = (size.width - padding * 2) / (pointCount - 1).coerceAtLeast(1)

val points = data.mapIndexed { index, value ->
val x = padding + index * spacing
val y = if (yRange == 0f) size.height / 2 else (size.height - padding) - ((value - minY) / yRange) * (size.height - padding * 2)
Offset(x, y)
}

// Draw axes
drawLine(
color = Color.White,
start = Offset(padding, padding),
end = Offset(padding, size.height - padding),
strokeWidth = 2f
)
drawLine(
color = Color.White,
start = Offset(padding, size.height - padding),
end = Offset(size.width - padding, size.height - padding),
strokeWidth = 2f
)

// Draw y-axis labels
val yLabelCount = 5
val yStep = yRange / (yLabelCount - 1)
for (i in 0 until yLabelCount) {
val yValue = minY + i * yStep
val yPos = (size.height - padding) - ((yValue - minY) / yRange) * (size.height - padding * 2)

drawContext.canvas.nativeCanvas.apply {
val label = yValue.roundToInt().toString()
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.RIGHT
}
drawText(
label,
padding - 8f,
yPos + textPaint.textSize / 2,
textPaint
)
}
}

// Draw x-axis labels
val xLabelCount = 5
val xStep = (pointCount - 1).coerceAtLeast(1) / (xLabelCount - 1).coerceAtLeast(1)
for (i in 0 until xLabelCount) {
val index = i * xStep
val xPos = padding + index * spacing

drawContext.canvas.nativeCanvas.apply {
val label = index.toString()
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
}
drawText(
label,
xPos,
size.height - padding + textPaint.textSize + 4f,
textPaint
)
}
}

// Optionally, draw axis labels
// Y-Axis Label
if (yAxisLabel.isNotEmpty()) {
drawContext.canvas.nativeCanvas.apply {
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
isAntiAlias = true
}
// Rotate for vertical text
save()
rotate(-90f, padding / 2, size.height / 2)
drawText(
yAxisLabel,
padding / 2,
size.height / 2,
textPaint
)
restore()
}
}

// X-Axis Label
if (xAxisLabel.isNotEmpty()) {
drawContext.canvas.nativeCanvas.apply {
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
isAntiAlias = true
}
drawText(
xAxisLabel,
size.width / 2,
size.height - padding / 2,
textPaint
)
}
}

// Draw the straight lines connecting points
if (points.size >= 2) {
for (i in 0 until points.size - 1) {
drawLine(
color = lineColor,
start = points[i],
end = points[i + 1],
strokeWidth = 4f,
cap = StrokeCap.Round
)
}
}

// Optionally, draw points
points.forEach { point ->
drawCircle(
color = lineColor,
radius = 4f,
center = point
)
}
}
}

private fun initializeSocketIO() {
val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http"
val socketUrl = signalingServerUrl

val options = IO.Options().apply {
transports = arrayOf("websocket")
secure = protocol == "https"
path = "/socket.io/"
}

try {
socket = IO.socket(socketUrl, options)

socket.on(Socket.EVENT_CONNECT) {
Log.d(TAG, "Socket connected")
socket.emit("join", currentRoom)
Log.d(TAG, "Joined room: $currentRoom")
initiateCall()
startPeriodicTimeSync()
}

socket.on(Socket.EVENT_CONNECT_ERROR) { args ->
if (args.isNotEmpty()) {
val error = args[0]
Log.e(TAG, "Socket connection error: $error")
}
}

socket.on(Socket.EVENT_DISCONNECT) { args ->
if (args.isNotEmpty()) {
val reason = args[0]
Log.d(TAG, "Socket disconnected: $reason")
}
}

socket.on("signal") { args ->
Log.d(TAG, "Received signaling: ${args[0]}")
if (args.isNotEmpty() && args[0] is JSONObject) {
val data = args[0] as JSONObject
handleSignalingData(data)
}
}

socket.connect()
Log.d(TAG, "Connecting to Socket: $socketUrl...")
} catch (e: Exception) {
Log.e(TAG, "Error connecting to Socket: ${e.message}")
}
}

private fun requestPermissionsIfNeeded() {
val permissions = arrayOf(
Manifest.permission.CAMERA,
Manifest.permission.RECORD_AUDIO,
Manifest.permission.INTERNET,
Manifest.permission.ACCESS_NETWORK_STATE
)

val permissionsToRequest = permissions.filter {
ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED
}

if (permissionsToRequest.isNotEmpty()) {
requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray())
} else {
onPermissionsChecked()
}
}

private fun onPermissionsChecked() {
Toast.makeText(requireContext(), "所有必要的权限已授予", Toast.LENGTH_SHORT).show()
}

private fun createPeerConnection(
context: Context,
peerConnectionFactory: PeerConnectionFactory,
remoteView: SurfaceViewRenderer,
onLocalPeerCreated: (PeerConnection) -> Unit
) {
val iceServers = listOf(
PeerConnection.IceServer.builder(stunUrl).createIceServer(),
PeerConnection.IceServer.builder(turnUrl)
.setUsername(turnUsername)
.setPassword(turnPassword)
.createIceServer()
)

val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply {
bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE
rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE
tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED
continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY
sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN
}

localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer {
override fun onIceCandidate(iceCandidate: IceCandidate?) {
iceCandidate?.let {
Log.d(TAG, "ICE candidate: $it")
val signalData = JSONObject().apply {
put("type", "ice")
put("candidate", JSONObject().apply {
put("sdpMid", it.sdpMid)
put("sdpMLineIndex", it.sdpMLineIndex)
put("candidate", it.sdp)
})
put("room", currentRoom)
}
socket.emit("signal", signalData)
}
}

override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) {
Log.d(TAG, "ICE candidates removed")
}

override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
Log.d(TAG, "Signaling state changed to: $newState")
}

override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
Log.d(TAG, "ICE connection state changed to: $newState")
}

override fun onIceConnectionReceivingChange(receiving: Boolean) {
Log.d(TAG, "ICE connection receiving change: $receiving")
}

override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
Log.d(TAG, "ICE gathering state changed to: $newState")
}

override fun onAddStream(stream: MediaStream?) {
Log.d(TAG, "Stream added")
}

override fun onRemoveStream(stream: MediaStream?) {
Log.d(TAG, "Stream removed")
}

override fun onDataChannel(dataChannel: DataChannel?) {
Log.d(TAG, "Data channel created")
}

override fun onRenegotiationNeeded() {
Log.d(TAG, "Renegotiation needed")
}

override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) {
Log.d(TAG, "Track added")
receiver?.track()?.let { track ->
if (track is VideoTrack) {
track.addSink(remoteView)
}
}
}

override fun onTrack(transceiver: RtpTransceiver?) {
Log.d(TAG, "onTrack called")
transceiver?.receiver?.track()?.let { track ->
if (track is VideoTrack) {
track.addSink(remoteView)
}
}
}

override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
Log.d(TAG, "Connection state changed to: $newState")
}
})

onLocalPeerCreated(localPeer!!)

// Start collecting statistics
startStatsCollection()
}

private fun initiateCall() {
Log.d(TAG, "Initiating call...")
val constraints = MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true"))
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"))
}

localPeer?.createOffer(object : SdpObserver {
override fun onCreateSuccess(sessionDescription: SessionDescription?) {
sessionDescription?.let { sdp ->
localPeer?.setLocalDescription(object : SdpObserver {
override fun onSetSuccess() {
val signalData = JSONObject().apply {
put("type", "offer")
put("sdp", JSONObject().put("sdp", sdp.description))
put("room", currentRoom)
}
socket.emit("signal", signalData)
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set local description error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}
}

override fun onSetSuccess() {}
override fun onCreateFailure(error: String?) {
Log.e(TAG, "Create offer error: $error")
}

override fun onSetFailure(error: String?) {}
}, constraints)
}

private fun handleSignalingData(data: JSONObject) {
Log.d(TAG, "Handling signaling data: $data")
when (data.getString("type")) {

"answer" -> {
Log.d(TAG, "Received answer")
val sdp = SessionDescription(
SessionDescription.Type.ANSWER,
data.getJSONObject("sdp").getString("sdp")
)

localPeer?.setRemoteDescription(object : SdpObserver {
override fun onSetSuccess() {
pendingIceCandidates.forEach { candidate ->
localPeer?.addIceCandidate(candidate)
}

pendingIceCandidates.clear()

Log.d(TAG, "Set remote description (answer) success")
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set remote description error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}

"ice" -> {
Log.d(TAG, "Received ICE candidate")
val candidateData = data.getJSONObject("candidate")
val candidate = IceCandidate(
candidateData.getString("sdpMid"),
candidateData.getInt("sdpMLineIndex"),
candidateData.getString("candidate")
)

if (localPeer?.remoteDescription != null) {
localPeer?.addIceCandidate(candidate)
} else {
pendingIceCandidates.add(candidate)
}
}

"time_sync_response" -> {
val t1 = data.getLong("t1")
val t2 = data.getLong("t2")
val t3 = data.getLong("t3")
val t4 = System.currentTimeMillis()

val RTT = t4 - t1
val oneWayDelay = RTT / 2
timeOffset = ((t2 - t1) + (t3 - t4)) / 2

Log.d(TAG, "时间同步: RTT=$RTT ms, 单向时延=$oneWayDelay ms, 时间偏移量=$timeOffset ms")

// 更新 latencyState
viewLifecycleOwner.lifecycleScope.launch {
// 添加到历史记录
latencyHistory.add(oneWayDelay)
if (latencyHistory.size > 60) { // 保持最近60个时延值
latencyHistory.removeAt(0)
}

// 计算最大、最小和平均单向时延
val maxLatency = latencyHistory.maxOrNull() ?: 0L
val minLatency = latencyHistory.minOrNull() ?: 0L
val averageLatency = if (latencyHistory.isNotEmpty()) {
latencyHistory.average().toLong()
} else {
0L
}

maxLatencyState.value = maxLatency
minLatencyState.value = minLatency
averageLatencyState.value = averageLatency
latestLatencyState.value = oneWayDelay
}
}

else -> {
Log.e(TAG, "Unknown signaling type: ${data.getString("type")}")
}
}
}

private fun requestTimeSync() {
t1 = System.currentTimeMillis()
val syncRequest = JSONObject().apply {
put("type", "time_sync_request")
put("room", currentRoom)
put("t1", t1)
}
socket.emit("signal", syncRequest)
Log.d(TAG, "发送时间同步请求 at t1: $t1")
}

private fun startStatsCollection() {
Log.d(TAG, "Starting stats collection...")
statsJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(1000) // Collect stats every second
Log.d(TAG, "Collecting stats...")
localPeer?.getStats { report ->
Log.d(TAG, "Stats report obtained.")
parseStatsReport(report)
} ?: Log.e(TAG, "Failed to get stats: localPeer is null.")
}
}
}

private fun parseStatsReport(report: RTCStatsReport) {
Log.d(TAG, "Received RTCStatsReport: $report")
for (stats in report.statsMap.values) {
if (stats.type == "inbound-rtp") {
val kind = stats.members["kind"] as? String
if (kind == "video") {
val framesDecoded = (stats.members["framesDecoded"] as? Number)?.toDouble() ?: 0.0
val framesReceived = (stats.members["framesReceived"] as? Number)?.toDouble() ?: 0.0
val framesDropped = (stats.members["framesDropped"] as? Number)?.toDouble() ?: 0.0
val bytesReceived = (stats.members["bytesReceived"] as? Number)?.toDouble() ?: 0.0
val packetsLost = (stats.members["packetsLost"] as? Number)?.toDouble() ?: 0.0
val packetsReceived = (stats.members["packetsReceived"] as? Number)?.toDouble() ?: 1.0 // Avoid division by zero
val packetLossFraction = packetsLost / (packetsLost + packetsReceived)
val timestamp = stats.timestampUs / 1_000_000.0 // Convert to seconds

Log.d(
TAG,
"Stats - Frames Decoded: $framesDecoded, Frames Received: $framesReceived, Frames Dropped: $framesDropped, Bytes Received: $bytesReceived, Packet Loss Fraction: $packetLossFraction, Timestamp: $timestamp"
)

if (prevTimestamp != 0.0) {
val timeElapsed = timestamp - prevTimestamp
val framesDelta = framesDecoded - prevFramesDecoded
val bytesDelta = bytesReceived - prevBytesReceived
val framesReceivedDelta = framesReceived - prevFramesReceived
val framesDroppedDelta = framesDropped - prevFramesDropped

val frameRate = if (timeElapsed > 0) framesDelta / timeElapsed else 0.0
val bitrate = if (timeElapsed > 0) (bytesDelta * 8) / timeElapsed else 0.0 // bits per second
val packetLoss = packetLossFraction * 100 // Convert to percentage

Log.d(TAG, "Calculated Frame Rate: $frameRate fps, Bitrate: $bitrate bps, Packet Loss: $packetLoss%")

// Determine stuttering based on thresholds
val isStuttering = frameRate < 24 || packetLoss > 5.0 // Thresholds can be adjusted

// Update states
viewLifecycleOwner.lifecycleScope.launch {
frameRateState.value = frameRate
bitrateState.value = bitrate.toLong()
// Update stuttering state
stutteringState.value = isStuttering
// Update specific stuttering causes
frameRateLowState.value = frameRate < 24
packetLossHighState.value = packetLoss > 5.0
packetLossState.value = packetLoss

// Update history
frameRateHistory.add(frameRate.toFloat())
if (frameRateHistory.size > 60) {
frameRateHistory.removeAt(0)
}

bitrateHistory.add(bitrate.toLong())
if (bitrateHistory.size > 60) {
bitrateHistory.removeAt(0)
}

Log.d(
TAG,
"Updated Frame Rate: ${frameRateState.value} fps, Bitrate: ${bitrateState.value / 1000} kbps, Stuttering: $isStuttering"
)
}
}

// Update previous values
prevFramesDecoded = framesDecoded
prevBytesReceived = bytesReceived
prevFramesReceived = framesReceived
prevFramesDropped = framesDropped
prevTimestamp = timestamp
}
}
}
}

override fun onDestroyView() {
super.onDestroyView()
statsJob?.cancel()
socket.disconnect()
localPeer?.dispose()
remoteView?.release()
remoteEglBase?.release()
timeSyncJob?.cancel()
}
}

分析代码,目前发起端统计往返时延的代码有问题,往返时延的值始终为0,是否可以修改为在应用层实现统计往返时延并在页面上展示,返回修改后的代码

思考

生成RTT测量

我正在分析两个Android片段的代码,理解如何处理WebRTC连接中的RTT测量。希望能找到接收到RTT为零的原因,并提出应用层RtT测量的方法。

关注时间同步

视频发起者片段没有处理time_sync_response,只有在接收到time_sync_request时回复time_sync_response。

强调时间同步问题
VideoInitiatorFragment的代码中没有发送time_sync_request消息,导致RTT计算不准确。

修复问题

在VideoInitiatorFragment中,需要实现周期性发送time_sync_request函数,并处理time_sync_response,以正确获取RTT。这样可以更好地修复当前问题。

确保定期发送请求

我在VideoInitiatorFragment中定期发送time_sync_request,并计算RTT。希望确保它能精准反映接收方的时间同步。

分析问题所在

立即调整 VideoInitiatorFragment: RTT为零可能是WebRTC stats未提供currentRoundTripTime或未处理time_sync_response。要定期发送time_sync_request并处理time_sync_response。

回答

分析

在当前的 VideoInitiatorFragment 中,往返时延(RTT)始终为0的问题主要源于以下几点:

  1. 应用层 RTT 实现不完整

    • 发送时间同步请求缺失VideoInitiatorFragment 中定义了 requestTimeSync() 方法,但并未在任何地方调用它,因此没有实际发送时间同步请求。
    • 处理 time_sync_response 缺失handleSignalingData 方法中只处理了 time_sync_request,没有处理来自接收端的 time_sync_response,因此即使接收端发送了响应,发起端也不会进行 RTT 的计算。
  2. WebRTC 内部统计数据问题

    • 当前代码尝试通过 PeerConnection.getStats 获取 currentRoundTripTime,但实际 WebRTC 实现中,可能并不会立即提供该数据,或者在某些情况下无法正确获取,从而导致 RTT 始终为0。

解决方案

为了正确测量往返时延(RTT),我们需要在应用层实现时间同步机制。具体步骤如下:

  1. 定期发送时间同步请求VideoInitiatorFragment 应定期向接收端发送 time_sync_request,包含发送请求的时间戳 t1

  2. 处理时间同步响应:当接收到来自接收端的 time_sync_response 时,计算 RTT 以及其他相关指标。

  3. 更新 UI:将计算得到的 RTT 数据展示在界面上。

  4. 清理和优化:移除或注释掉通过 WebRTC 内部统计数据获取 RTT 的相关代码,以避免干扰。

以下是修改后的 VideoInitiatorFragment 代码,已实现应用层的 RTT 统计:

修改后的 VideoInitiatorFragment 代码

class VideoInitiatorFragment : Fragment() { companion object { fun newInstance( room: String, stunUrl: String, turnUrl: String, turnUsername: String, turnPassword: String, signalingServerUrl: String ): VideoInitiatorFragment { val fragment = VideoInitiatorFragment() val args = Bundle().apply { putString("room", room) putString("stunUrl", stunUrl) putString("turnUrl", turnUrl) putString("turnUsername", turnUsername) putString("turnPassword", turnPassword) putString("signalingServerUrl", signalingServerUrl) } fragment.arguments = args return fragment } } // 类成员变量保持不变 private lateinit var socket: Socket private var localPeer: PeerConnection? = null private var localView: SurfaceViewRenderer? = null private var localEglBase: EglBase? = null private val pendingIceCandidates = mutableListOf<IceCandidate>() private var currentRoom: String? = null private lateinit var signalingServerUrl: String private lateinit var stunUrl: String private lateinit var turnUrl: String private lateinit var turnUsername: String private lateinit var turnPassword: String private val TAG: String = "WebRTC-Initiator" // RTT State Variables private val maxRttState = mutableStateOf(0L) private val minRttState = mutableStateOf(Long.MAX_VALUE) private val averageRttState = mutableStateOf(0L) private val latestRttState = mutableStateOf(0L) // History list to compute average RTT private val rttHistory = mutableStateListOf<Long>() // Job for RTT synchronization private var timeSyncJob: Job? = null // Job for stats collection (移除或注释掉相关代码以避免干扰) // private var statsJob: Job? = null override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // 从参数中获取数据 currentRoom = arguments?.getString("room") ?: "default-room" signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg" stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478" turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349" turnUsername = arguments?.getString("turnUsername") ?: "wstszx" turnPassword = arguments?.getString("turnPassword") ?: "930379" Log.d( TAG, "onCreate: 角色 = 服务器, 房间 = $currentRoom, 信令服务器 = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl" ) } private val requestPermissionsLauncher = registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions -> permissions.entries.forEach { (permission, isGranted) -> if (isGranted) { Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show() } else { Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show() } } onPermissionsChecked() } override fun onCreateView( inflater: android.view.LayoutInflater, container: android.view.ViewGroup?, savedInstanceState: Bundle? ): android.view.View { return ComposeView(requireContext()).apply { setContent { WebRTCComposeLayout() } } } @Composable fun WebRTCComposeLayout() { val context = LocalContext.current lateinit var peerConnectionFactory: PeerConnectionFactory var localVideoTrack: VideoTrack? by remember { mutableStateOf(null) } Surface(color = Color.Black) { Column(modifier = Modifier.fillMaxSize()) { // 本地视频视图,填充整个屏幕 AndroidView( factory = { localView = SurfaceViewRenderer(it).apply { setZOrderMediaOverlay(false) } localView!! }, modifier = Modifier .weight(1f) .fillMaxWidth(), update = { if (localEglBase == null) { localEglBase = EglBase.create() it.init(localEglBase!!.eglBaseContext, null) it.setMirror(false) } } ) Spacer(modifier = Modifier.height(8.dp)) // RTT 统计数据显示 Column(modifier = Modifier.padding(16.dp)) { Text( text = "最新往返时延: ${latestRttState.value} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最大往返时延: ${maxRttState.value} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最小往返时延: ${minRttState.value} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "平均往返时延: ${averageRttState.value} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 8.dp) ) } // 其他 UI 组件... } LaunchedEffect(Unit) { val options = PeerConnectionFactory.InitializationOptions.builder(context) .createInitializationOptions() PeerConnectionFactory.initialize(options) val encoderFactory = DefaultVideoEncoderFactory( localEglBase!!.eglBaseContext, true, true ) val decoderFactory = DefaultVideoDecoderFactory(localEglBase!!.eglBaseContext) peerConnectionFactory = PeerConnectionFactory.builder() .setVideoEncoderFactory(encoderFactory) .setVideoDecoderFactory(decoderFactory) .createPeerConnectionFactory() initLocalVideo(context, localView, peerConnectionFactory, localEglBase!!) { localVideoTrack = it } createPeerConnection( context, peerConnectionFactory, localVideoTrack ) { localPeer = it // 启动统计收集 (移除或注释掉) // startStatsCollection() } initializeSocketIO() requestPermissionsIfNeeded() // 启动时间同步任务 startPeriodicTimeSync() } } } private fun startPeriodicTimeSync(intervalMs: Long = 5000L) { timeSyncJob = viewLifecycleOwner.lifecycleScope.launch { while (isActive) { requestTimeSync() delay(intervalMs) } } } private fun requestTimeSync() { val t1 = System.currentTimeMillis() val syncRequest = JSONObject().apply { put("type", "time_sync_request") put("room", currentRoom) put("t1", t1) } socket.emit("signal", syncRequest) Log.d(TAG, "发送时间同步请求 at t1: $t1") } private fun initializeSocketIO() { val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http" val socketUrl = signalingServerUrl val options = IO.Options().apply { transports = arrayOf("websocket") secure = protocol == "https" path = "/socket.io/" } try { socket = IO.socket(socketUrl, options) socket.on(Socket.EVENT_CONNECT) { Log.d(TAG, "Socket 已连接") socket.emit("join", currentRoom) Log.d(TAG, "已加入房间: $currentRoom") // startSendingTimestamps() // 移除或注释掉 } socket.on(Socket.EVENT_CONNECT_ERROR) { args -> if (args.isNotEmpty()) { val error = args[0] Log.e(TAG, "Socket 连接错误: $error") } } socket.on(Socket.EVENT_DISCONNECT) { args -> if (args.isNotEmpty()) { val reason = args[0] Log.d(TAG, "Socket 已断开: $reason") } } socket.on("signal") { args -> Log.d(TAG, "收到信令: ${args[0]}") if (args.isNotEmpty() && args[0] is JSONObject) { val data = args[0] as JSONObject handleSignalingData(data) } } socket.connect() Log.d(TAG, "正在连接到 Socket: $socketUrl...") } catch (e: Exception) { Log.e(TAG, "连接 Socket 时出错: ${e.message}") } } private fun requestPermissionsIfNeeded() { val permissions = arrayOf( Manifest.permission.CAMERA, Manifest.permission.RECORD_AUDIO, Manifest.permission.INTERNET, Manifest.permission.ACCESS_NETWORK_STATE ) val permissionsToRequest = permissions.filter { ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED } if (permissionsToRequest.isNotEmpty()) { requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray()) } else { onPermissionsChecked() } } private fun onPermissionsChecked() { Toast.makeText(requireContext(), "所有必要权限已被授予", Toast.LENGTH_SHORT).show() } private fun initLocalVideo( context: Context, localView: SurfaceViewRenderer?, peerConnectionFactory: PeerConnectionFactory, eglBase: EglBase, onLocalVideoTrack: (VideoTrack) -> Unit ) { val videoCapturer = createCameraCapturer(context) val surfaceTextureHelper = SurfaceTextureHelper.create("CaptureThread", eglBase.eglBaseContext) val videoSource = peerConnectionFactory.createVideoSource(videoCapturer.isScreencast) videoCapturer.initialize(surfaceTextureHelper, context, videoSource.capturerObserver) videoCapturer.startCapture(1920, 1080, 60) val localVideoTrack = peerConnectionFactory.createVideoTrack("video_track", videoSource) localVideoTrack.addSink(localView) val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints()) val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource) // 添加音视频轨道到本地流 val mediaStream = peerConnectionFactory.createLocalMediaStream("local_stream") mediaStream.addTrack(localAudioTrack) mediaStream.addTrack(localVideoTrack) onLocalVideoTrack(localVideoTrack) } private fun createCameraCapturer(context: Context): CameraVideoCapturer { val camera2Enumerator = Camera2Enumerator(context) val deviceNames = camera2Enumerator.deviceNames // 优先选择后置摄像头 for (deviceName in deviceNames) { if (camera2Enumerator.isBackFacing(deviceName)) { val capturer = camera2Enumerator.createCapturer(deviceName, null) if (capturer != null) { return capturer } } } // 如果没有后置摄像头,则尝试前置摄像头 for (deviceName in deviceNames) { if (camera2Enumerator.isFrontFacing(deviceName)) { val capturer = camera2Enumerator.createCapturer(deviceName, null) if (capturer != null) { return capturer } } } // 如果没有前置摄像头,则使用第一个摄像头 return camera2Enumerator.createCapturer(deviceNames[0], null) ?: throw IllegalStateException("无法创建摄像头捕获器") } private fun createPeerConnection( context: Context, peerConnectionFactory: PeerConnectionFactory, localVideoTrack: VideoTrack?, onLocalPeerCreated: (PeerConnection) -> Unit ) { val iceServers = listOf( PeerConnection.IceServer.builder(stunUrl).createIceServer(), PeerConnection.IceServer.builder(turnUrl) .setUsername(turnUsername) .setPassword(turnPassword) .createIceServer() ) val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply { bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN } localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer { override fun onIceCandidate(iceCandidate: IceCandidate?) { iceCandidate?.let { Log.d(TAG, "ICE candidate: $it") val signalData = JSONObject().apply { put("type", "ice") put("candidate", JSONObject().apply { put("sdpMid", it.sdpMid) put("sdpMLineIndex", it.sdpMLineIndex) put("candidate", it.sdp) }) put("room", currentRoom) } socket.emit("signal", signalData) } } override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) { Log.d(TAG, "ICE candidates removed") } override fun onSignalingChange(newState: PeerConnection.SignalingState?) { Log.d(TAG, "Signaling state changed to: $newState") } override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) { Log.d(TAG, "ICE connection state changed to: $newState") } override fun onIceConnectionReceivingChange(receiving: Boolean) { Log.d(TAG, "ICE connection receiving change: $receiving") } override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) { Log.d(TAG, "ICE gathering state changed to: $newState") } override fun onAddStream(stream: MediaStream?) { Log.d(TAG, "Stream added") } override fun onRemoveStream(stream: MediaStream?) { Log.d(TAG, "Stream removed") } override fun onDataChannel(dataChannel: DataChannel?) { Log.d(TAG, "Data channel created") } override fun onRenegotiationNeeded() { Log.d(TAG, "Renegotiation needed") } override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) { Log.d(TAG, "Track added") } override fun onTrack(transceiver: RtpTransceiver?) { Log.d(TAG, "onTrack called") } override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) { Log.d(TAG, "Connection state changed to: $newState") } }) localVideoTrack?.let { localPeer?.addTrack(it, listOf("local_stream")) } val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints()) val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource) localPeer?.addTrack(localAudioTrack, listOf("local_stream")) onLocalPeerCreated(localPeer!!) } private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "offer" -> { Log.d(TAG, "Received offer") val sdp = SessionDescription( SessionDescription.Type.OFFER, data.getJSONObject("sdp").getString("sdp") ) localPeer?.setRemoteDescription(object : SdpObserver { override fun onSetSuccess() { Log.d(TAG, "Set remote description (offer) success") createAnswer(localPeer!!) { answer -> val signalData = JSONObject().apply { put("type", "answer") put("sdp", JSONObject().put("sdp", answer)) put("room", currentRoom) } socket.emit("signal", signalData) pendingIceCandidates.forEach { candidate -> localPeer?.addIceCandidate(candidate) } pendingIceCandidates.clear() } } override fun onSetFailure(error: String?) { Log.e(TAG, "Set remote description (offer) error: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } "ice" -> { Log.d(TAG, "Received ICE candidate") val candidateData = data.getJSONObject("candidate") val candidate = IceCandidate( candidateData.getString("sdpMid"), candidateData.getInt("sdpMLineIndex"), candidateData.getString("candidate") ) if (localPeer?.remoteDescription != null) { localPeer?.addIceCandidate(candidate) } else { pendingIceCandidates.add(candidate) } } "time_sync_response" -> { handleTimeSyncResponse(data) } else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } } private fun handleTimeSyncResponse(data: JSONObject) { Log.d(TAG, "Handling time_sync_response: $data") val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.currentTimeMillis() val RTT = t4 - t1 val oneWayDelay = RTT / 2 val timeOffset = ((t2 - t1) + (t3 - t4)) / 2 Log.d(TAG, "时间同步: RTT=$RTT ms, 单向时延=$oneWayDelay ms, 时间偏移量=$timeOffset ms") // 更新 RTT 状态 viewLifecycleOwner.lifecycleScope.launch { // 添加到历史记录 rttHistory.add(RTT) if (rttHistory.size > 60) { // 保持最近60个 RTT 值 rttHistory.removeAt(0) } // 计算最大、最小和平均 RTT val maxRtt = rttHistory.maxOrNull() ?: 0L val minRtt = rttHistory.minOrNull() ?: 0L val averageRtt = if (rttHistory.isNotEmpty()) { rttHistory.average().toLong() } else { 0L } maxRttState.value = maxRtt minRttState.value = minRtt averageRttState.value = averageRtt latestRttState.value = RTT Log.d(TAG, "RTT - Latest: $RTT ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms") } } private fun startSendingTimestamps() { // 该方法已被移除或注释掉,因为我们现在使用时间同步机制 } override fun onDestroyView() { super.onDestroyView() timeSyncJob?.cancel() // statsJob?.cancel() // 移除或注释掉 socket.disconnect() localPeer?.dispose() localView?.release() localEglBase?.release() } private fun createAnswer(peerConnection: PeerConnection, onAnswerCreated: (String) -> Unit) { Log.d(TAG, "Creating answer...") val constraints = MediaConstraints().apply { mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true")) mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true")) } peerConnection.createAnswer(object : SdpObserver { override fun onCreateSuccess(sessionDescription: SessionDescription?) { sessionDescription?.let { sdp -> peerConnection.setLocalDescription(object : SdpObserver { override fun onSetSuccess() { Log.d(TAG, "SetLocalDescription onSetSuccess") onAnswerCreated(sdp.description) } override fun onSetFailure(error: String?) { Log.e(TAG, "SetLocalDescription onSetFailure: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } } override fun onSetSuccess() { Log.d(TAG, "createAnswer onSetSuccess") } override fun onCreateFailure(error: String?) { Log.e(TAG, "createAnswer onCreateFailure: $error") } override fun onSetFailure(error: String?) {} }, constraints) } }

主要修改点说明

  1. 移除 WebRTC 内部 RTT 统计

    • 注释掉或移除了 statsJob 相关的代码,因为我们现在采用应用层的 RTT 统计方法。
  2. 实现应用层 RTT 统计

    • 定期发送 time_sync_request
      • 增加了 startPeriodicTimeSync 方法,该方法启动一个协程,每隔 intervalMs 毫秒发送一次时间同步请求。
      • requestTimeSync 方法构建并发送 time_sync_request,包含发送请求的时间戳 t1
    • 处理 time_sync_response
      • handleSignalingData 方法中新增对 time_sync_response 的处理,调用 handleTimeSyncResponse 方法。
      • handleTimeSyncResponse 方法根据接收到的 t1, t2, t3 以及本地时间 t4 计算 RTT 和单向时延,并更新相应的状态变量。
  3. 更新 UI 以展示 RTT 数据

    • WebRTCComposeLayout 的 UI 组件中,保留了 RTT 显示的相关部分,用于展示最新、最大、最小和平均 RTT。
  4. 移除不必要的功能

    • 移除了原先的 startSendingTimestamps 方法,因为它与新的时间同步机制不相关。
    • 确保 VideoInitiatorFragment 只专注于发起方的功能,避免混淆。

确保接收端正确响应

为了使应用层的 RTT 统计正常工作,确保 VideoReceiverFragment 能正确处理 time_sync_request 并返回 time_sync_response。根据你提供的代码,VideoReceiverFragment 已经实现了以下功能:

  • 处理 time_sync_request

    • 当接收到 time_sync_request 时,提取 t1 并发送 time_sync_response,包含 t1, t2, t3
  • 定期发送 time_sync_request(如果需要双向 RTT 测量)。

确保 VideoReceiverFragmenthandleSignalingData 方法中包含对 time_sync_request 的正确处理,如下所示:

"时间同步请求" -> { Log.d(TAG, "收到 time_sync_request") if (data.has("t1")) { val t1 = data.getLong("t1") val t2 = System.currentTimeMillis() val t3 = System.currentTimeMillis() val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("t1", t1) put("t2", t2) put("t3", t3) put("room", currentRoom) } socket.emit("signal", syncResponse) Log.d(TAG, "回复 time_sync_response: t1=$t1, t2=$t2, t3=$t3") } else { Log.e(TAG, "time_sync_request 缺少 t1") } }

结论

通过在应用层实现时间同步机制,我们可以更准确地测量往返时延(RTT),而不依赖于 WebRTC 内部的统计数据。这种方法不仅更灵活,而且在不同的 WebRTC 实现中具有更好的兼容性。上述修改后的 VideoInitiatorFragment 代码应能正确测量并显示 RTT。如果在测试过程中仍遇到问题,请确保信令服务器和双方的代码逻辑正确无误,并在必要时增加更多的日志以进行调试。

分享这个问答