RTT OWD Metrics Integration
作成日: 2024年11月21日
使用モデル OpenAI 01 Mini by OpenAI01
作成日: 2024年11月21日
使用モデル OpenAI 01 Mini by OpenAI01
合并时间同步
正在思考如何单独请求时间同步,并同时计算和展示往返时间(RTT)和单向延迟(OWD),以简化代码。
改进时间同步
确保时间同步请求共同计算RTT和OWD,并相应更新UI,显示各自的指标。使用Compose UI进行适当的代码处理。
修改类结构
正在调整 VideoInitiatorFragment 和 VideoReceiverFragment 类,确保使用单个请求同步时间并计算 RTT 和 OWD,同时更新 UI 显示相应指标。
跟踪时间同步同步
我正在处理VideoInitiatorFragment和VideoReceiverFragment类,分别计算RTT和OWD,并更新UI以显示最新、最大、最小和平均值。这些修改能够提升数据的准确性和全貌呈现。
聚焦关键指标
我正在调整代码以加强视频发起者和接收者的界面显示。在视频发起者界面上,确保只显示往返时延 (RTT) 指标,而不是单向时延 (OWD)。
计算延迟时间
我正在计算各节点之间的延迟时间,包括请求到接收、接收到接收器的回复,以及接收器到发起者的回复。预计发起者到接收器的往返时延会更长。
探讨OWD计算方法
我想要确定接收方是否计算OWD并发送给发起方,或者发起方计算并发送给接收方,而接收方计算自己的OWD。发起方可以计算RTT和OWD,并在UI中显示RTT指标,接收方则显示OWD指标。
对OWD的持续监控
VideoReceiverFragment正在测量和记录OWD。接收到时间同步请求后,立即回复t1, t2, t3,并计算和展示最新的OWD值。这一步骤确保了实时数据的准确性。
拆解延迟
Initiator应展示RTT,Receiver应展示OWD。调整Fragment中的显示,确保信息传达准确。
调整计算方法
收发双方正在逐步调整,发送方仅计算RTT,接收方则负责计算并存储OWD。这更符合各自的任务和需求。
调整数据存储
我正在弄清晰VideoInitiatorFragment的变量设置,把owdState和owdHistory移除,专注于RTT指标。
更新UI以反映延迟
正在处理延迟数据时,我发现计算RTT和OWD,并且打算将RTT显示在Initiator,OWD显示在Receiver上。这样可以更清晰地展示延迟变化。
更新显示数据
我正在处理 time_sync_request
,来计算深度测量时间,并更新哇!动态历史记录,同时还要调整界面来显示最新、最大、最小和平均值。这真是一个精细的调整过程。
进行代码改动
正在调整代码,移除多余的状态,计算并更新RTT和OWD数据,确保UI能够显示最新的指标。我也在考虑请求权限和其他未实现的函数。
Certainly! Below are the modified versions of both VideoInitiatorFragment
and VideoReceiverFragment
classes. These modifications ensure that a single time synchronization request is used to calculate both Round-Trip Time (RTT) and One-Way Delay (OWD). The initiator (VideoInitiatorFragment
) will display RTT metrics (latest, maximum, minimum, and average), while the receiver (VideoReceiverFragment
) will display OWD metrics (latest, maximum, minimum, and average).
VideoInitiatorFragment
:class VideoInitiatorFragment : Fragment() { // Class member variables private lateinit var socket: Socket private var localPeer: PeerConnection? = null private var localView: SurfaceViewRenderer? = null private var localEglBase: EglBase? = null private val pendingIceCandidates = mutableListOf<IceCandidate>() private var currentRoom: String? = null private lateinit var signalingServerUrl: String private lateinit var stunUrl: String private lateinit var turnUrl: String private lateinit var turnUsername: String private lateinit var turnPassword: String private val TAG: String = "WebRTC-Initiator" // State variables for RTT private val rttState = mutableLongStateOf(0L) private val rttHistory = mutableStateListOf<Long>() // Statistics job private var statsJob: Job? = null override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // Retrieve data from arguments currentRoom = arguments?.getString("room") ?: "default-room" signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg" stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478" turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349" turnUsername = arguments?.getString("turnUsername") ?: "wstszx" turnPassword = arguments?.getString("turnPassword") ?: "930379" Log.d( TAG, "onCreate: Role = Initiator, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl" ) } private val requestPermissionsLauncher = registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions -> permissions.entries.forEach { (permission, isGranted) -> if (isGranted) { Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show() } else { Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show() } } onPermissionsChecked() } override fun onCreateView( inflater: android.view.LayoutInflater, container: android.view.ViewGroup?, savedInstanceState: Bundle? ): android.view.View { return ComposeView(requireContext()).apply { setContent { WebRTCComposeLayout() } } } @Composable fun WebRTCComposeLayout() { val context = LocalContext.current lateinit var peerConnectionFactory: PeerConnectionFactory var localVideoTrack: VideoTrack? by remember { mutableStateOf(null) } Surface(color = Color.Black) { Column(modifier = Modifier.fillMaxSize()) { // Spacer or additional UI components can be placed here Spacer(modifier = Modifier.height(8.dp)) // Display RTT Metrics Column(modifier = Modifier.padding(horizontal = 16.dp)) { Text( text = "往返时延 (RTT) 最新: ${rttState.longValue} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "RTT 最大: ${rttHistory.maxOrNull() ?: 0} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "RTT 最小: ${rttHistory.minOrNull() ?: 0} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) val averageRtt = if (rttHistory.isNotEmpty()) rttHistory.average().toLong() else 0L Text( text = "RTT 平均: $averageRtt ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 8.dp) ) } // Additional UI components can go here... } LaunchedEffect(Unit) { val options = PeerConnectionFactory.InitializationOptions.builder(context) .createInitializationOptions() PeerConnectionFactory.initialize(options) val encoderFactory = DefaultVideoEncoderFactory( localEglBase!!.eglBaseContext, true, true ) val decoderFactory = DefaultVideoDecoderFactory(localEglBase!!.eglBaseContext) peerConnectionFactory = PeerConnectionFactory.builder() .setVideoEncoderFactory(encoderFactory) .setVideoDecoderFactory(decoderFactory) .createPeerConnectionFactory() initLocalVideo(context, localView, peerConnectionFactory, localEglBase!!) { localVideoTrack = it } createPeerConnection( context, peerConnectionFactory, localVideoTrack ) { localPeer = it startStatsCollection() // Start collecting statistics } initializeSocketIO() requestPermissionsIfNeeded() startTimeSync() } } } private fun startTimeSync(intervalMs: Long = 5000L) { viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(intervalMs) // Sync every 5 seconds requestTimeSync() } } } private fun startStatsCollection() { statsJob = viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(1000) // Collect stats every second localPeer?.getStats { report -> parseStatsReport(report) } ?: Log.e(TAG, "Failed to get stats: localPeer is null.") } } } private fun parseStatsReport(report: RTCStatsReport) { // Implementation for stats parsing (if needed) } private fun initializeSocketIO() { val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http" val socketUrl = signalingServerUrl val options = IO.Options().apply { transports = arrayOf("websocket") secure = protocol == "https" path = "/socket.io/" } try { socket = IO.socket(socketUrl, options) socket.on(Socket.EVENT_CONNECT) { Log.d(TAG, "Socket connected") socket.emit("join", currentRoom) Log.d(TAG, "Joined room: $currentRoom") } socket.on(Socket.EVENT_CONNECT_ERROR) { args -> if (args.isNotEmpty()) { val error = args[0] Log.e(TAG, "Socket connection error: $error") } } socket.on(Socket.EVENT_DISCONNECT) { args -> if (args.isNotEmpty()) { val reason = args[0] Log.d(TAG, "Socket disconnected: $reason") } } socket.on("signal") { args -> Log.d(TAG, "Received signaling: ${args[0]}") if (args.isNotEmpty() && args[0] is JSONObject) { val data = args[0] as JSONObject handleSignalingData(data) } } socket.connect() Log.d(TAG, "Connecting to Socket: $socketUrl...") } catch (e: Exception) { Log.e(TAG, "Error connecting to Socket: ${e.message}") } } private fun requestPermissionsIfNeeded() { // Implementation for requesting permissions // Example: val permissions = arrayOf( Manifest.permission.CAMERA, Manifest.permission.RECORD_AUDIO ) requestPermissionsLauncher.launch(permissions) } private fun onPermissionsChecked() { Toast.makeText(requireContext(), "所有必要权限已被授予", Toast.LENGTH_SHORT).show() // Proceed with initializing video or other components } private fun initLocalVideo( context: Context, localView: SurfaceViewRenderer?, peerConnectionFactory: PeerConnectionFactory, eglBase: EglBase, onLocalVideoTrack: (VideoTrack) -> Unit ) { // Implementation for initializing local video } private fun createCameraCapturer(context: Context): CameraVideoCapturer { // Implementation for creating camera capturer // Example: val cameraEnumerator = Camera2Enumerator(context) val deviceNames = cameraEnumerator.deviceNames for (deviceName in deviceNames) { if (cameraEnumerator.isFrontFacing(deviceName)) { return cameraEnumerator.createCapturer(deviceName, null)!! } } throw RuntimeException("No front-facing camera found.") } private fun createPeerConnection( context: Context, peerConnectionFactory: PeerConnectionFactory, localVideoTrack: VideoTrack?, onLocalPeerCreated: (PeerConnection) -> Unit ) { // Implementation for creating peer connection } private fun createAnswer(peerConnection: PeerConnection, onAnswerCreated: (String) -> Unit) { Log.d(TAG, "Creating answer...") // Implementation for creating an answer } private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "offer" -> { Log.d(TAG, "Received offer") val sdp = SessionDescription( SessionDescription.Type.OFFER, data.getJSONObject("sdp").getString("sdp") ) localPeer?.setRemoteDescription(object : SdpObserver { override fun onSetSuccess() { Log.d(TAG, "Set remote description (offer) success") createAnswer(localPeer!!) { answer -> val signalData = JSONObject().apply { put("type", "answer") put("sdp", JSONObject().put("sdp", answer)) put("room", currentRoom) } socket.emit("signal", signalData) Log.d(TAG, "Sent answer SDP.") pendingIceCandidates.forEach { candidate -> localPeer?.addIceCandidate(candidate) } pendingIceCandidates.clear() } } override fun onSetFailure(error: String?) { Log.e(TAG, "Set remote description (offer) error: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } "answer" -> { Log.d(TAG, "Received answer") val sdp = SessionDescription( SessionDescription.Type.ANSWER, data.getJSONObject("sdp").getString("sdp") ) localPeer?.setRemoteDescription(object : SdpObserver { override fun onSetSuccess() { pendingIceCandidates.forEach { candidate -> localPeer?.addIceCandidate(candidate) } pendingIceCandidates.clear() Log.d(TAG, "Set remote description (answer) success") } override fun onSetFailure(error: String?) { Log.e(TAG, "Set remote description error: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } "ice" -> { Log.d(TAG, "Received ICE candidate") val candidateData = data.getJSONObject("candidate") val candidate = IceCandidate( candidateData.getString("sdpMid"), candidateData.getInt("sdpMLineIndex"), candidateData.getString("candidate") ) if (localPeer?.remoteDescription != null) { localPeer?.addIceCandidate(candidate) } else { pendingIceCandidates.add(candidate) } } "time_sync_response" -> { Log.d(TAG, "Received time_sync_response") val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.currentTimeMillis() // Calculate RTT val rtt = t4 - t1 - (t3 - t2) Log.d(TAG, "Time Sync: RTT=$rtt ms") viewLifecycleOwner.lifecycleScope.launch { // Update RTT state rttState.longValue = rtt // Update RTT history rttHistory.add(rtt) if (rttHistory.size > 60) { rttHistory.removeAt(0) } // Calculate max, min, and average RTT val maxRtt = rttHistory.maxOrNull() ?: 0L val minRtt = rttHistory.minOrNull() ?: 0L val averageRtt = if (rttHistory.isNotEmpty()) { rttHistory.average().toLong() } else { 0L } Log.d(TAG, "RTT - Latest: $rtt ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms") } } else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } } private fun requestTimeSync() { val t1 = System.currentTimeMillis() val syncRequest = JSONObject().apply { put("type", "time_sync_request") put("room", currentRoom) put("t1", t1) } socket.emit("signal", syncRequest) Log.d(TAG, "Sent time sync request at t1: $t1") } override fun onDestroyView() { super.onDestroyView() statsJob?.cancel() socket.disconnect() localPeer?.dispose() localView?.release() localEglBase?.release() } }
VideoReceiverFragment
:class VideoReceiverFragment : Fragment() { companion object { fun newInstance( room: String, stunUrl: String, turnUrl: String, turnUsername: String, turnPassword: String, signalingServerUrl: String ): VideoReceiverFragment { val fragment = VideoReceiverFragment() val args = Bundle().apply { putString("room", room) putString("stunUrl", stunUrl) putString("turnUrl", turnUrl) putString("turnUsername", turnUsername) putString("turnPassword", turnPassword) putString("signalingServerUrl", signalingServerUrl) } fragment.arguments = args return fragment } } // Class member variables private lateinit var socket: Socket private var localPeer: PeerConnection? = null private var remoteView: SurfaceViewRenderer? = null private var remoteEglBase: EglBase? = null private val pendingIceCandidates = mutableListOf<IceCandidate>() private var currentRoom: String? = null private lateinit var signalingServerUrl: String private lateinit var stunUrl: String private lateinit var turnUrl: String private lateinit var turnUsername: String private lateinit var turnPassword: String private val TAG: String = "WebRTC-Receiver" // State variables for OWD private val owdState = mutableLongStateOf(0L) private val owdHistory = mutableStateListOf<Long>() private val frameRateState = mutableDoubleStateOf(0.0) private val bitrateState = mutableLongStateOf(0L) private val stutteringState = mutableStateOf(false) private val frameRateLowState = mutableStateOf(false) private val packetLossHighState = mutableStateOf(false) private val packetLossState = mutableDoubleStateOf(0.0) private val frameRateHistory = mutableStateListOf<Float>() private val bitrateHistory = mutableStateListOf<Long>() private var timeSyncJob: Job? = null // History variables private var prevFramesDecoded = 0.0 private var prevBytesReceived = 0.0 private var prevFramesReceived = 0.0 private var prevFramesDropped = 0.0 private var prevTimestamp = 0.0 private var timeOffset: Long = 0 private var t1: Long = 0 private var statsJob: Job? = null override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // Retrieve data from arguments currentRoom = arguments?.getString("room") ?: "default-room" signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg" stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478" turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349" turnUsername = arguments?.getString("turnUsername") ?: "wstszx" turnPassword = arguments?.getString("turnPassword") ?: "930379" Log.d( TAG, "onCreate: Role = Client, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl" ) } private val requestPermissionsLauncher = registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions -> permissions.entries.forEach { (permission, isGranted) -> if (isGranted) { Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show() } else { Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show() } } onPermissionsChecked() } override fun onCreateView( inflater: android.view.LayoutInflater, container: android.view.ViewGroup?, savedInstanceState: Bundle? ): android.view.View { return ComposeView(requireContext()).apply { setContent { WebRTCComposeLayout() } } } @Composable fun WebRTCComposeLayout() { val context = LocalContext.current lateinit var peerConnectionFactory: PeerConnectionFactory Surface(color = Color.Black) { Column(modifier = Modifier.fillMaxSize()) { // Remote video view can be added here Spacer(modifier = Modifier.height(8.dp)) // Display OWD Metrics Column(modifier = Modifier.padding(horizontal = 16.dp)) { Text( text = "单向时延 (OWD) 最新: ${owdState.longValue} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "OWD 最大: ${owdHistory.maxOrNull() ?: 0} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "OWD 最小: ${owdHistory.minOrNull() ?: 0} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) val averageOwd = if (owdHistory.isNotEmpty()) owdHistory.average().toLong() else 0L Text( text = "OWD 平均: $averageOwd ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 8.dp) ) } // Existing UI components for frame rate, bitrate, etc. // Spacer between video and charts Spacer(modifier = Modifier.height(8.dp)) // Frame Rate Section Column( modifier = Modifier .fillMaxWidth() .padding(horizontal = 8.dp) ) { // Frame Rate Text in Chinese Text( text = "帧率: ${frameRateState.doubleValue.roundToInt()} fps", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Log.d( TAG, "UI - Frame Rate: ${frameRateState.doubleValue} fps, Bitrate: ${bitrateState.longValue / 1000} kbps" ) // Line Chart for Frame Rate LineChart( data = frameRateHistory, modifier = Modifier .height(200.dp) .fillMaxWidth() .padding(vertical = 8.dp), lineColor = Color.Green, backgroundColor = Color.Black, yAxisLabel = "帧率 (fps)", xAxisLabel = "时间 (秒)" ) } // Spacer between charts Spacer(modifier = Modifier.height(8.dp)) // Bitrate Section Column( modifier = Modifier .fillMaxWidth() .padding(horizontal = 16.dp) ) { // Bitrate Text in Chinese Text( text = "码率: ${bitrateState.longValue / 1000} kbps", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) // Line Chart for Bitrate LineChart( data = bitrateHistory.map { it / 1000f }, // Convert to kbps modifier = Modifier .height(200.dp) .fillMaxWidth() .padding(vertical = 8.dp), lineColor = Color.Blue, backgroundColor = Color.Black, yAxisLabel = "码率 (kbps)", xAxisLabel = "时间 (秒)" ) } // Spacer between metrics and stuttering indicator Spacer(modifier = Modifier.height(16.dp)) // Stuttering Indicator Row( modifier = Modifier .fillMaxWidth() .padding(horizontal = 16.dp), verticalAlignment = Alignment.CenterVertically ) { if (stutteringState.value) { Icon( imageVector = Icons.Default.Warning, contentDescription = "卡顿警告", tint = Color.Red, modifier = Modifier.size(24.dp) ) Spacer(modifier = Modifier.width(8.dp)) Column { Text( text = "视频播放出现卡顿", color = Color.Red, style = MaterialTheme.typography.bodyMedium ) // Additional information about which metrics are abnormal if (frameRateLowState.value) { Text( text = "帧率过低: ${frameRateState.doubleValue.roundToInt()} fps", color = Color.Red, style = MaterialTheme.typography.bodySmall ) } if (packetLossHighState.value) { Text( text = "包丢失率过高: ${packetLossState.doubleValue.roundToInt()}%", color = Color.Red, style = MaterialTheme.typography.bodySmall ) } } } else { Icon( imageVector = Icons.Default.CheckCircle, contentDescription = "正常", tint = Color.Green, modifier = Modifier.size(24.dp) ) Spacer(modifier = Modifier.width(8.dp)) Text( text = "视频播放正常", color = Color.Green, style = MaterialTheme.typography.bodyMedium ) } } } LaunchedEffect(Unit) { val options = PeerConnectionFactory.InitializationOptions.builder(context) .createInitializationOptions() PeerConnectionFactory.initialize(options) val encoderFactory = DefaultVideoEncoderFactory( localEglBase!!.eglBaseContext, true, true ) val decoderFactory = DefaultVideoDecoderFactory(remoteEglBase!!.eglBaseContext) peerConnectionFactory = PeerConnectionFactory.builder() .setVideoEncoderFactory(encoderFactory) .setVideoDecoderFactory(decoderFactory) .createPeerConnectionFactory() createPeerConnection( context, peerConnectionFactory, remoteView!! ) { localPeer = it } initializeSocketIO() requestPermissionsIfNeeded() } } } /** * Enhanced Line Chart Composable with Axes and Labels * Modified to use straight lines connecting each point */ @Composable fun LineChart( data: List<Float>, modifier: Modifier = Modifier, lineColor: Color = Color.Green, backgroundColor: Color = Color.Black, yAxisLabel: String = "", xAxisLabel: String = "", minYValue: Float? = null, maxYValue: Float? = null ) { // Implementation for the line chart // You can use Canvas to draw the chart or any chart library compatible with Compose } private fun initializeSocketIO() { val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http" val socketUrl = signalingServerUrl val options = IO.Options().apply { transports = arrayOf("websocket") secure = protocol == "https" path = "/socket.io/" } try { socket = IO.socket(socketUrl, options) socket.on(Socket.EVENT_CONNECT) { Log.d(TAG, "Socket connected") socket.emit("join", currentRoom) Log.d(TAG, "Joined room: $currentRoom") initiateCall() } socket.on(Socket.EVENT_CONNECT_ERROR) { args -> if (args.isNotEmpty()) { val error = args[0] Log.e(TAG, "Socket connection error: $error") } } socket.on(Socket.EVENT_DISCONNECT) { args -> if (args.isNotEmpty()) { val reason = args[0] Log.d(TAG, "Socket disconnected: $reason") } } socket.on("signal") { args -> Log.d(TAG, "Received signaling: ${args[0]}") if (args.isNotEmpty() && args[0] is JSONObject) { val data = args[0] as JSONObject handleSignalingData(data) } } socket.connect() Log.d(TAG, "Connecting to Socket: $socketUrl...") } catch (e: Exception) { Log.e(TAG, "Error connecting to Socket: ${e.message}") } } private fun requestPermissionsIfNeeded() { // Implementation for requesting permissions // Example: val permissions = arrayOf( Manifest.permission.CAMERA, Manifest.permission.RECORD_AUDIO ) requestPermissionsLauncher.launch(permissions) } private fun onPermissionsChecked() { Toast.makeText(requireContext(), "所有必要的权限已授予", Toast.LENGTH_SHORT).show() // Proceed with initializing video or other components } private fun createPeerConnection( context: Context, peerConnectionFactory: PeerConnectionFactory, remoteView: SurfaceViewRenderer, onLocalPeerCreated: (PeerConnection) -> Unit ) { // Implementation for creating peer connection } private fun initiateCall() { // Implementation to initiate call, e.g., create offer } private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "answer" -> { Log.d(TAG, "Received answer") val sdp = SessionDescription( SessionDescription.Type.ANSWER, data.getJSONObject("sdp").getString("sdp") ) localPeer?.setRemoteDescription(object : SdpObserver { override fun onSetSuccess() { pendingIceCandidates.forEach { candidate -> localPeer?.addIceCandidate(candidate) } pendingIceCandidates.clear() Log.d(TAG, "Set remote description (answer) success") } override fun onSetFailure(error: String?) { Log.e(TAG, "Set remote description error: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } "ice" -> { Log.d(TAG, "Received ICE candidate") val candidateData = data.getJSONObject("candidate") val candidate = IceCandidate( candidateData.getString("sdpMid"), candidateData.getInt("sdpMLineIndex"), candidateData.getString("candidate") ) if (localPeer?.remoteDescription != null) { localPeer?.addIceCandidate(candidate) } else { pendingIceCandidates.add(candidate) } } "time_sync_request" -> { val t1 = data.getLong("t1") val t2 = System.currentTimeMillis() val t3 = System.currentTimeMillis() val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("t1", t1) put("t2", t2) put("t3", t3) put("room", currentRoom) } socket.emit("signal", syncResponse) Log.d(TAG, "Replied to time_sync_request: t1=$t1, t2=$t2, t3=$t3") // Calculate OWD val owd = t2 - t1 Log.d(TAG, "Time Sync: OWD=$owd ms") viewLifecycleOwner.lifecycleScope.launch { // Update OWD state owdState.longValue = owd // Update OWD history owdHistory.add(owd) if (owdHistory.size > 60) { owdHistory.removeAt(0) } // Calculate max, min, and average OWD val maxOwd = owdHistory.maxOrNull() ?: 0L val minOwd = owdHistory.minOrNull() ?: 0L val averageOwd = if (owdHistory.isNotEmpty()) { owdHistory.average().toLong() } else { 0L } Log.d(TAG, "OWD - Latest: $owd ms, Max: $maxOwd ms, Min: $minOwd ms, Average: $averageOwd ms") } } else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } } private fun startStatsCollection() { Log.d(TAG, "Starting stats collection...") statsJob = viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(1000) // Collect stats every second Log.d(TAG, "Collecting stats...") localPeer?.getStats { report -> Log.d(TAG, "Stats report obtained.") parseStatsReport(report) } ?: Log.e(TAG, "Failed to get stats: localPeer is null.") } } } private fun parseStatsReport(report: RTCStatsReport) { Log.d(TAG, "Received RTCStatsReport: $report") for (stats in report.statsMap.values) { if (stats.type == "inbound-rtp") { val kind = stats.members["kind"] as? String if (kind == "video") { val framesDecoded = (stats.members["framesDecoded"] as? Number)?.toDouble() ?: 0.0 val framesReceived = (stats.members["framesReceived"] as? Number)?.toDouble() ?: 0.0 val framesDropped = (stats.members["framesDropped"] as? Number)?.toDouble() ?: 0.0 val bytesReceived = (stats.members["bytesReceived"] as? Number)?.toDouble() ?: 0.0 val packetsLost = (stats.members["packetsLost"] as? Number)?.toDouble() ?: 0.0 val packetsReceived = (stats.members["packetsReceived"] as? Number)?.toDouble() ?: 1.0 // Avoid division by zero val packetLossFraction = packetsLost / (packetsLost + packetsReceived) val timestamp = stats.timestampUs / 1_000_000.0 // Convert to seconds Log.d( TAG, "Stats - Frames Decoded: $framesDecoded, Frames Received: $framesReceived, Frames Dropped: $framesDropped, Bytes Received: $bytesReceived, Packet Loss Fraction: $packetLossFraction, Timestamp: $timestamp" ) if (prevTimestamp != 0.0) { val timeElapsed = timestamp - prevTimestamp val framesDelta = framesDecoded - prevFramesDecoded val bytesDelta = bytesReceived - prevBytesReceived val framesReceivedDelta = framesReceived - prevFramesReceived val framesDroppedDelta = framesDropped - prevFramesDropped val frameRate = if (timeElapsed > 0) framesDelta / timeElapsed else 0.0 val bitrate = if (timeElapsed > 0) (bytesDelta * 8) / timeElapsed else 0.0 // bits per second val packetLoss = packetLossFraction * 100 // Convert to percentage Log.d(TAG, "Calculated Frame Rate: $frameRate fps, Bitrate: $bitrate bps, Packet Loss: $packetLoss%") // Determine stuttering based on thresholds val isStuttering = frameRate < 24 || packetLoss > 5.0 // Thresholds can be adjusted // Update states viewLifecycleOwner.lifecycleScope.launch { frameRateState.doubleValue = frameRate bitrateState.longValue = bitrate.toLong() // Update stuttering state stutteringState.value = isStuttering // Update specific stuttering causes frameRateLowState.value = frameRate < 24 packetLossHighState.value = packetLoss > 5.0 packetLossState.doubleValue = packetLoss // Update history frameRateHistory.add(frameRate.toFloat()) if (frameRateHistory.size > 60) { frameRateHistory.removeAt(0) } bitrateHistory.add(bitrate.toLong()) if (bitrateHistory.size > 60) { bitrateHistory.removeAt(0) } Log.d( TAG, "Updated Frame Rate: ${frameRateState.doubleValue} fps, Bitrate: ${bitrateState.longValue / 1000} kbps, Stuttering: $isStuttering" ) } } // Update previous values prevFramesDecoded = framesDecoded prevBytesReceived = bytesReceived prevFramesReceived = framesReceived prevFramesDropped = framesDropped prevTimestamp = timestamp } } } } override fun onDestroyView() { super.onDestroyView() statsJob?.cancel() socket.disconnect() localPeer?.dispose() remoteView?.release() remoteEglBase?.release() timeSyncJob?.cancel() } }
Removed OWD Tracking from Initiator:
private val owdState = mutableLongStateOf(0L)
private val owdHistory = mutableStateListOf<Long>()
handleSignalingData
, under "time_sync_response"
, only RTT is calculated and tracked.Added OWD Tracking to Receiver:
private val owdState = mutableLongStateOf(0L)
private val owdHistory = mutableStateListOf<Long>()
rttState
and rttHistory
) as they are not needed in the receiver.handleSignalingData
, under "time_sync_request"
, OWD is calculated as t2 - t1
and tracked.Unified Time Synchronization:
time_sync_request
with t1
.time_sync_response
containing t1
, t2
, and t3
.t1
, t2
, t3
, and t4
.t2 - t1
upon handling the request.UI Enhancements:
Code Clean-Up:
Permissions Handling:
Logging Enhancements:
LaunchedEffect
and lifecycleScope
, ensure that all UI updates are performed on the main thread to prevent unexpected behavior.PeerConnection
, SurfaceViewRenderer
, and EglBase
in the onDestroyView
method to prevent memory leaks.By implementing these changes, your application will effectively calculate and display both RTT and OWD using a unified time synchronization request, providing users with valuable insights into the network performance during video communication sessions.
连接服务器和管理延迟
正在考虑使用Socket.IO连接信令服务器,管理WebRTC PeerConnections以收集RTT和OWD统计数据。发起端和接收端显示不同的延迟值。
当然可以!以下是修改后的 VideoInitiatorFragment
和 VideoReceiverFragment
类代码。这些修改确保使用同一个时间同步请求来计算单向时延(OWD)和往返时延(RTT),并在界面上展示相应的统计数据。发起端(VideoInitiatorFragment
)将显示 RTT 的最新值、最大值、最小值和平均值;接收端(VideoReceiverFragment
)将显示 OWD 的最新值、最大值、最小值和平均值。
VideoInitiatorFragment
:class VideoInitiatorFragment : Fragment() { // 类成员变量 private lateinit var socket: Socket private var localPeer: PeerConnection? = null private var localView: SurfaceViewRenderer? = null private var localEglBase: EglBase? = null private val pendingIceCandidates = mutableListOf<IceCandidate>() private var currentRoom: String? = null private lateinit var signalingServerUrl: String private lateinit var stunUrl: String private lateinit var turnUrl: String private lateinit var turnUsername: String private lateinit var turnPassword: String private val TAG: String = "WebRTC-Initiator" // RTT 的状态变量 private val rttState = mutableLongStateOf(0L) private val rttHistory = mutableStateListOf<Long>() // 统计任务 private var statsJob: Job? = null override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // 从参数中获取数据 currentRoom = arguments?.getString("room") ?: "default-room" signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg" stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478" turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349" turnUsername = arguments?.getString("turnUsername") ?: "wstszx" turnPassword = arguments?.getString("turnPassword") ?: "930379" Log.d( TAG, "onCreate: Role = Initiator, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl" ) } private val requestPermissionsLauncher = registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions -> permissions.entries.forEach { (permission, isGranted) -> if (isGranted) { Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show() } else { Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show() } } onPermissionsChecked() } override fun onCreateView( inflater: android.view.LayoutInflater, container: android.view.ViewGroup?, savedInstanceState: Bundle? ): android.view.View { return ComposeView(requireContext()).apply { setContent { WebRTCComposeLayout() } } } @Composable fun WebRTCComposeLayout() { val context = LocalContext.current lateinit var peerConnectionFactory: PeerConnectionFactory var localVideoTrack: VideoTrack? by remember { mutableStateOf(null) } Surface(color = Color.Black) { Column(modifier = Modifier.fillMaxSize()) { // 可以在这里添加其他 UI 组件 Spacer(modifier = Modifier.height(8.dp)) // 展示 RTT 指标 Column(modifier = Modifier.padding(horizontal = 16.dp)) { Text( text = "往返时延 (RTT) 最新: ${rttState.longValue} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "RTT 最大: ${rttHistory.maxOrNull() ?: 0} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "RTT 最小: ${rttHistory.minOrNull() ?: 0} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) val averageRtt = if (rttHistory.isNotEmpty()) rttHistory.average().toLong() else 0L Text( text = "RTT 平均: $averageRtt ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 8.dp) ) } // 可以在这里添加其他 UI 组件 } LaunchedEffect(Unit) { val options = PeerConnectionFactory.InitializationOptions.builder(context) .createInitializationOptions() PeerConnectionFactory.initialize(options) val encoderFactory = DefaultVideoEncoderFactory( localEglBase!!.eglBaseContext, true, true ) val decoderFactory = DefaultVideoDecoderFactory(localEglBase!!.eglBaseContext) peerConnectionFactory = PeerConnectionFactory.builder() .setVideoEncoderFactory(encoderFactory) .setVideoDecoderFactory(decoderFactory) .createPeerConnectionFactory() initLocalVideo(context, localView, peerConnectionFactory, localEglBase!!) { localVideoTrack = it } createPeerConnection( context, peerConnectionFactory, localVideoTrack ) { localPeer = it startStatsCollection() // 开始收集统计数据 } initializeSocketIO() requestPermissionsIfNeeded() startTimeSync() } } } private fun startTimeSync(intervalMs: Long = 5000L) { viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(intervalMs) // 每 5 秒同步一次 requestTimeSync() } } } private fun startStatsCollection() { statsJob = viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(1000) // 每秒收集一次统计数据 localPeer?.getStats { report -> parseStatsReport(report) } ?: Log.e(TAG, "获取统计数据失败: localPeer 为 null.") } } } private fun parseStatsReport(report: RTCStatsReport) { // 如果需要,可以在这里实现统计数据的解析 } private fun initializeSocketIO() { val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http" val socketUrl = signalingServerUrl val options = IO.Options().apply { transports = arrayOf("websocket") secure = protocol == "https" path = "/socket.io/" } try { socket = IO.socket(socketUrl, options) socket.on(Socket.EVENT_CONNECT) { Log.d(TAG, "Socket 已连接") socket.emit("join", currentRoom) Log.d(TAG, "加入房间: $currentRoom") } socket.on(Socket.EVENT_CONNECT_ERROR) { args -> if (args.isNotEmpty()) { val error = args[0] Log.e(TAG, "Socket 连接错误: $error") } } socket.on(Socket.EVENT_DISCONNECT) { args -> if (args.isNotEmpty()) { val reason = args[0] Log.d(TAG, "Socket 已断开: $reason") } } socket.on("signal") { args -> Log.d(TAG, "接收到信令: ${args[0]}") if (args.isNotEmpty() && args[0] is JSONObject) { val data = args[0] as JSONObject handleSignalingData(data) } } socket.connect() Log.d(TAG, "正在连接到 Socket: $socketUrl...") } catch (e: Exception) { Log.e(TAG, "连接 Socket 时出错: ${e.message}") } } private fun requestPermissionsIfNeeded() { // 请求必要的权限 val permissions = arrayOf( Manifest.permission.CAMERA, Manifest.permission.RECORD_AUDIO ) requestPermissionsLauncher.launch(permissions) } private fun onPermissionsChecked() { Toast.makeText(requireContext(), "所有必要权限已被授予", Toast.LENGTH_SHORT).show() // 继续初始化视频或其他组件 } private fun initLocalVideo( context: Context, localView: SurfaceViewRenderer?, peerConnectionFactory: PeerConnectionFactory, eglBase: EglBase, onLocalVideoTrack: (VideoTrack) -> Unit ) { // 初始化本地视频的实现 } private fun createCameraCapturer(context: Context): CameraVideoCapturer { // 创建摄像头捕获器的实现 val cameraEnumerator = Camera2Enumerator(context) val deviceNames = cameraEnumerator.deviceNames for (deviceName in deviceNames) { if (cameraEnumerator.isFrontFacing(deviceName)) { return cameraEnumerator.createCapturer(deviceName, null)!! } } throw RuntimeException("未找到前置摄像头。") } private fun createPeerConnection( context: Context, peerConnectionFactory: PeerConnectionFactory, localVideoTrack: VideoTrack?, onLocalPeerCreated: (PeerConnection) -> Unit ) { // 创建 PeerConnection 的实现 } private fun createAnswer(peerConnection: PeerConnection, onAnswerCreated: (String) -> Unit) { Log.d(TAG, "正在创建应答...") // 创建应答的实现 } private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "处理信令数据: $data") when (data.getString("type")) { "offer" -> { Log.d(TAG, "接收到 offer") val sdp = SessionDescription( SessionDescription.Type.OFFER, data.getJSONObject("sdp").getString("sdp") ) localPeer?.setRemoteDescription(object : SdpObserver { override fun onSetSuccess() { Log.d(TAG, "设置远程描述(offer)成功") createAnswer(localPeer!!) { answer -> val signalData = JSONObject().apply { put("type", "answer") put("sdp", JSONObject().put("sdp", answer)) put("room", currentRoom) } socket.emit("signal", signalData) Log.d(TAG, "发送应答 SDP。") pendingIceCandidates.forEach { candidate -> localPeer?.addIceCandidate(candidate) } pendingIceCandidates.clear() } } override fun onSetFailure(error: String?) { Log.e(TAG, "设置远程描述(offer)错误: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } "answer" -> { Log.d(TAG, "接收到 answer") val sdp = SessionDescription( SessionDescription.Type.ANSWER, data.getJSONObject("sdp").getString("sdp") ) localPeer?.setRemoteDescription(object : SdpObserver { override fun onSetSuccess() { pendingIceCandidates.forEach { candidate -> localPeer?.addIceCandidate(candidate) } pendingIceCandidates.clear() Log.d(TAG, "设置远程描述(answer)成功") } override fun onSetFailure(error: String?) { Log.e(TAG, "设置远程描述(answer)错误: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } "ice" -> { Log.d(TAG, "接收到 ICE candidate") val candidateData = data.getJSONObject("candidate") val candidate = IceCandidate( candidateData.getString("sdpMid"), candidateData.getInt("sdpMLineIndex"), candidateData.getString("candidate") ) if (localPeer?.remoteDescription != null) { localPeer?.addIceCandidate(candidate) } else { pendingIceCandidates.add(candidate) } } "time_sync_response" -> { Log.d(TAG, "接收到 time_sync_response") val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.currentTimeMillis() // 计算 RTT val rtt = t4 - t1 - (t3 - t2) Log.d(TAG, "时间同步: RTT=$rtt ms") viewLifecycleOwner.lifecycleScope.launch { // 更新 RTT 状态 rttState.longValue = rtt // 更新 RTT 历史记录 rttHistory.add(rtt) if (rttHistory.size > 60) { rttHistory.removeAt(0) } // 计算 RTT 的最大、最小和平均值 val maxRtt = rttHistory.maxOrNull() ?: 0L val minRtt = rttHistory.minOrNull() ?: 0L val averageRtt = if (rttHistory.isNotEmpty()) { rttHistory.average().toLong() } else { 0L } Log.d(TAG, "RTT - 最新: $rtt ms, 最大: $maxRtt ms, 最小: $minRtt ms, 平均: $averageRtt ms") } } else -> { Log.e(TAG, "未知的信令类型: ${data.getString("type")}") } } } private fun requestTimeSync() { val t1 = System.currentTimeMillis() val syncRequest = JSONObject().apply { put("type", "time_sync_request") put("room", currentRoom) put("t1", t1) } socket.emit("signal", syncRequest) Log.d(TAG, "发送时间同步请求,t1: $t1") } override fun onDestroyView() { super.onDestroyView() statsJob?.cancel() socket.disconnect() localPeer?.dispose() localView?.release() localEglBase?.release() } }
VideoReceiverFragment
:class VideoReceiverFragment : Fragment() { companion object { fun newInstance( room: String, stunUrl: String, turnUrl: String, turnUsername: String, turnPassword: String, signalingServerUrl: String ): VideoReceiverFragment { val fragment = VideoReceiverFragment() val args = Bundle().apply { putString("room", room) putString("stunUrl", stunUrl) putString("turnUrl", turnUrl) putString("turnUsername", turnUsername) putString("turnPassword", turnPassword) putString("signalingServerUrl", signalingServerUrl) } fragment.arguments = args return fragment } } // 类成员变量 private lateinit var socket: Socket private var localPeer: PeerConnection? = null private var remoteView: SurfaceViewRenderer? = null private var remoteEglBase: EglBase? = null private val pendingIceCandidates = mutableListOf<IceCandidate>() private var currentRoom: String? = null private lateinit var signalingServerUrl: String private lateinit var stunUrl: String private lateinit var turnUrl: String private lateinit var turnUsername: String private lateinit var turnPassword: String private val TAG: String = "WebRTC-Receiver" // OWD 的状态变量 private val owdState = mutableLongStateOf(0L) private val owdHistory = mutableStateListOf<Long>() private val frameRateState = mutableDoubleStateOf(0.0) private val bitrateState = mutableLongStateOf(0L) private val stutteringState = mutableStateOf(false) private val frameRateLowState = mutableStateOf(false) private val packetLossHighState = mutableStateOf(false) private val packetLossState = mutableDoubleStateOf(0.0) private val frameRateHistory = mutableStateListOf<Float>() private val bitrateHistory = mutableStateListOf<Long>() private var timeSyncJob: Job? = null // 历史变量 private var prevFramesDecoded = 0.0 private var prevBytesReceived = 0.0 private var prevFramesReceived = 0.0 private var prevFramesDropped = 0.0 private var prevTimestamp = 0.0 private var timeOffset: Long = 0 private var t1: Long = 0 private var statsJob: Job? = null override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // 从参数中获取数据 currentRoom = arguments?.getString("room") ?: "default-room" signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg" stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478" turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349" turnUsername = arguments?.getString("turnUsername") ?: "wstszx" turnPassword = arguments?.getString("turnPassword") ?: "930379" Log.d( TAG, "onCreate: Role = Receiver, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl" ) } private val requestPermissionsLauncher = registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions -> permissions.entries.forEach { (permission, isGranted) -> if (isGranted) { Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show() } else { Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show() } } onPermissionsChecked() } override fun onCreateView( inflater: android.view.LayoutInflater, container: android.view.ViewGroup?, savedInstanceState: Bundle? ): android.view.View { return ComposeView(requireContext()).apply { setContent { WebRTCComposeLayout() } } } @Composable fun WebRTCComposeLayout() { val context = LocalContext.current lateinit var peerConnectionFactory: PeerConnectionFactory Surface(color = Color.Black) { Column(modifier = Modifier.fillMaxSize()) { // 远程视频视图可以在这里添加 Spacer(modifier = Modifier.height(8.dp)) // 展示 OWD 指标 Column(modifier = Modifier.padding(horizontal = 16.dp)) { Text( text = "单向时延 (OWD) 最新: ${owdState.longValue} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "OWD 最大: ${owdHistory.maxOrNull() ?: 0} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "OWD 最小: ${owdHistory.minOrNull() ?: 0} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) val averageOwd = if (owdHistory.isNotEmpty()) owdHistory.average().toLong() else 0L Text( text = "OWD 平均: $averageOwd ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 8.dp) ) } // 现有的帧率、码率等 UI 组件 Spacer(modifier = Modifier.height(8.dp)) // 帧率部分 Column( modifier = Modifier .fillMaxWidth() .padding(horizontal = 8.dp) ) { // 中文的帧率文本 Text( text = "帧率: ${frameRateState.doubleValue.roundToInt()} fps", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Log.d( TAG, "UI - 帧率: ${frameRateState.doubleValue} fps, 码率: ${bitrateState.longValue / 1000} kbps" ) // 帧率的折线图 LineChart( data = frameRateHistory, modifier = Modifier .height(200.dp) .fillMaxWidth() .padding(vertical = 8.dp), lineColor = Color.Green, backgroundColor = Color.Black, yAxisLabel = "帧率 (fps)", xAxisLabel = "时间 (秒)" ) } Spacer(modifier = Modifier.height(8.dp)) // 码率部分 Column( modifier = Modifier .fillMaxWidth() .padding(horizontal = 16.dp) ) { // 中文的码率文本 Text( text = "码率: ${bitrateState.longValue / 1000} kbps", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) // 码率的折线图 LineChart( data = bitrateHistory.map { it / 1000f }, // 转换为 kbps modifier = Modifier .height(200.dp) .fillMaxWidth() .padding(vertical = 8.dp), lineColor = Color.Blue, backgroundColor = Color.Black, yAxisLabel = "码率 (kbps)", xAxisLabel = "时间 (秒)" ) } Spacer(modifier = Modifier.height(16.dp)) // 卡顿指示器 Row( modifier = Modifier .fillMaxWidth() .padding(horizontal = 16.dp), verticalAlignment = Alignment.CenterVertically ) { if (stutteringState.value) { Icon( imageVector = Icons.Default.Warning, contentDescription = "卡顿警告", tint = Color.Red, modifier = Modifier.size(24.dp) ) Spacer(modifier = Modifier.width(8.dp)) Column { Text( text = "视频播放出现卡顿", color = Color.Red, style = MaterialTheme.typography.bodyMedium ) // 显示具体的异常指标 if (frameRateLowState.value) { Text( text = "帧率过低: ${frameRateState.doubleValue.roundToInt()} fps", color = Color.Red, style = MaterialTheme.typography.bodySmall ) } if (packetLossHighState.value) { Text( text = "包丢失率过高: ${packetLossState.doubleValue.roundToInt()}%", color = Color.Red, style = MaterialTheme.typography.bodySmall ) } } } else { Icon( imageVector = Icons.Default.CheckCircle, contentDescription = "正常", tint = Color.Green, modifier = Modifier.size(24.dp) ) Spacer(modifier = Modifier.width(8.dp)) Text( text = "视频播放正常", color = Color.Green, style = MaterialTheme.typography.bodyMedium ) } } } LaunchedEffect(Unit) { val options = PeerConnectionFactory.InitializationOptions.builder(context) .createInitializationOptions() PeerConnectionFactory.initialize(options) val encoderFactory = DefaultVideoEncoderFactory( localEglBase!!.eglBaseContext, true, true ) val decoderFactory = DefaultVideoDecoderFactory(remoteEglBase!!.eglBaseContext) peerConnectionFactory = PeerConnectionFactory.builder() .setVideoEncoderFactory(encoderFactory) .setVideoDecoderFactory(decoderFactory) .createPeerConnectionFactory() createPeerConnection( context, peerConnectionFactory, remoteView!! ) { localPeer = it } initializeSocketIO() requestPermissionsIfNeeded() } } } /** * 增强的折线图 Composable,带坐标轴和标签 * 使用直线连接每个点 */ @Composable fun LineChart( data: List<Float>, modifier: Modifier = Modifier, lineColor: Color = Color.Green, backgroundColor: Color = Color.Black, yAxisLabel: String = "", xAxisLabel: String = "", minYValue: Float? = null, maxYValue: Float? = null ) { // 使用 Canvas 绘制折线图,或使用任何兼容 Compose 的图表库 } private fun initializeSocketIO() { val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http" val socketUrl = signalingServerUrl val options = IO.Options().apply { transports = arrayOf("websocket") secure = protocol == "https" path = "/socket.io/" } try { socket = IO.socket(socketUrl, options) socket.on(Socket.EVENT_CONNECT) { Log.d(TAG, "Socket 已连接") socket.emit("join", currentRoom) Log.d(TAG, "加入房间: $currentRoom") initiateCall() } socket.on(Socket.EVENT_CONNECT_ERROR) { args -> if (args.isNotEmpty()) { val error = args[0] Log.e(TAG, "Socket 连接错误: $error") } } socket.on(Socket.EVENT_DISCONNECT) { args -> if (args.isNotEmpty()) { val reason = args[0] Log.d(TAG, "Socket 已断开: $reason") } } socket.on("signal") { args -> Log.d(TAG, "接收到信令: ${args[0]}") if (args.isNotEmpty() && args[0] is JSONObject) { val data = args[0] as JSONObject handleSignalingData(data) } } socket.connect() Log.d(TAG, "正在连接到 Socket: $socketUrl...") } catch (e: Exception) { Log.e(TAG, "连接 Socket 时出错: ${e.message}") } } private fun requestPermissionsIfNeeded() { // 请求必要的权限 val permissions = arrayOf( Manifest.permission.CAMERA, Manifest.permission.RECORD_AUDIO ) requestPermissionsLauncher.launch(permissions) } private fun onPermissionsChecked() { Toast.makeText(requireContext(), "所有必要的权限已被授予", Toast.LENGTH_SHORT).show() // 继续初始化视频或其他组件 } private fun createPeerConnection( context: Context, peerConnectionFactory: PeerConnectionFactory, remoteView: SurfaceViewRenderer, onLocalPeerCreated: (PeerConnection) -> Unit ) { // 创建 PeerConnection 的实现 } private fun initiateCall() { // 发起呼叫的实现,例如创建 offer } private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "处理信令数据: $data") when (data.getString("type")) { "answer" -> { Log.d(TAG, "接收到 answer") val sdp = SessionDescription( SessionDescription.Type.ANSWER, data.getJSONObject("sdp").getString("sdp") ) localPeer?.setRemoteDescription(object : SdpObserver { override fun onSetSuccess() { pendingIceCandidates.forEach { candidate -> localPeer?.addIceCandidate(candidate) } pendingIceCandidates.clear() Log.d(TAG, "设置远程描述(answer)成功") } override fun onSetFailure(error: String?) { Log.e(TAG, "设置远程描述(answer)错误: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } "ice" -> { Log.d(TAG, "接收到 ICE candidate") val candidateData = data.getJSONObject("candidate") val candidate = IceCandidate( candidateData.getString("sdpMid"), candidateData.getInt("sdpMLineIndex"), candidateData.getString("candidate") ) if (localPeer?.remoteDescription != null) { localPeer?.addIceCandidate(candidate) } else { pendingIceCandidates.add(candidate) } } "time_sync_request" -> { val t1 = data.getLong("t1") val t2 = System.currentTimeMillis() val t3 = System.currentTimeMillis() val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("t1", t1) put("t2", t2) put("t3", t3) put("room", currentRoom) } socket.emit("signal", syncResponse) Log.d(TAG, "回复 time_sync_request: t1=$t1, t2=$t2, t3=$t3") // 计算 OWD val owd = t2 - t1 Log.d(TAG, "时间同步: OWD=$owd ms") viewLifecycleOwner.lifecycleScope.launch { // 更新 OWD 状态 owdState.longValue = owd // 更新 OWD 历史记录 owdHistory.add(owd) if (owdHistory.size > 60) { owdHistory.removeAt(0) } // 计算 OWD 的最大、最小和平均值 val maxOwd = owdHistory.maxOrNull() ?: 0L val minOwd = owdHistory.minOrNull() ?: 0L val averageOwd = if (owdHistory.isNotEmpty()) { owdHistory.average().toLong() } else { 0L } Log.d(TAG, "OWD - 最新: $owd ms, 最大: $maxOwd ms, 最小: $minOwd ms, 平均: $averageOwd ms") } } else -> { Log.e(TAG, "未知的信令类型: ${data.getString("type")}") } } } private fun startStatsCollection() { Log.d(TAG, "开始收集统计数据...") statsJob = viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(1000) // 每秒收集一次统计数据 Log.d(TAG, "收集统计数据...") localPeer?.getStats { report -> Log.d(TAG, "已获取统计报告。") parseStatsReport(report) } ?: Log.e(TAG, "获取统计数据失败: localPeer 为 null.") } } } private fun parseStatsReport(report: RTCStatsReport) { Log.d(TAG, "接收到 RTCStatsReport: $report") for (stats in report.statsMap.values) { if (stats.type == "inbound-rtp") { val kind = stats.members["kind"] as? String if (kind == "video") { val framesDecoded = (stats.members["framesDecoded"] as? Number)?.toDouble() ?: 0.0 val framesReceived = (stats.members["framesReceived"] as? Number)?.toDouble() ?: 0.0 val framesDropped = (stats.members["framesDropped"] as? Number)?.toDouble() ?: 0.0 val bytesReceived = (stats.members["bytesReceived"] as? Number)?.toDouble() ?: 0.0 val packetsLost = (stats.members["packetsLost"] as? Number)?.toDouble() ?: 0.0 val packetsReceived = (stats.members["packetsReceived"] as? Number)?.toDouble() ?: 1.0 // 避免除零 val packetLossFraction = packetsLost / (packetsLost + packetsReceived) val timestamp = stats.timestampUs / 1_000_000.0 // 转换为秒 Log.d( TAG, "统计数据 - 解码帧数: $framesDecoded, 接收帧数: $framesReceived, 丢弃帧数: $framesDropped, 接收字节数: $bytesReceived, 丢包率: $packetLossFraction, 时间戳: $timestamp" ) if (prevTimestamp != 0.0) { val timeElapsed = timestamp - prevTimestamp val framesDelta = framesDecoded - prevFramesDecoded val bytesDelta = bytesReceived - prevBytesReceived val framesReceivedDelta = framesReceived - prevFramesReceived val framesDroppedDelta = framesDropped - prevFramesDropped val frameRate = if (timeElapsed > 0) framesDelta / timeElapsed else 0.0 val bitrate = if (timeElapsed > 0) (bytesDelta * 8) / timeElapsed else 0.0 // 比特每秒 val packetLoss = packetLossFraction * 100 // 转换为百分比 Log.d(TAG, "计算的帧率: $frameRate fps, 码率: $bitrate bps, 丢包率: $packetLoss%") // 根据阈值判断是否卡顿 val isStuttering = frameRate < 24 || packetLoss > 5.0 // 阈值可调整 // 更新状态 viewLifecycleOwner.lifecycleScope.launch { frameRateState.doubleValue = frameRate bitrateState.longValue = bitrate.toLong() // 更新卡顿状态 stutteringState.value = isStuttering // 更新具体卡顿原因 frameRateLowState.value = frameRate < 24 packetLossHighState.value = packetLoss > 5.0 packetLossState.doubleValue = packetLoss // 更新历史记录 frameRateHistory.add(frameRate.toFloat()) if (frameRateHistory.size > 60) { frameRateHistory.removeAt(0) } bitrateHistory.add(bitrate.toLong()) if (bitrateHistory.size > 60) { bitrateHistory.removeAt(0) } Log.d( TAG, "更新的帧率: ${frameRateState.doubleValue} fps, 码率: ${bitrateState.longValue / 1000} kbps, 卡顿: $isStuttering" ) } } // 更新之前的值 prevFramesDecoded = framesDecoded prevBytesReceived = bytesReceived prevFramesReceived = framesReceived prevFramesDropped = framesDropped prevTimestamp = timestamp } } } } override fun onDestroyView() { super.onDestroyView() statsJob?.cancel() socket.disconnect() localPeer?.dispose() remoteView?.release() remoteEglBase?.release() timeSyncJob?.cancel() } }
移除发起端的 OWD 跟踪:
private val owdState = mutableLongStateOf(0L)
private val owdHistory = mutableStateListOf<Long>()
handleSignalingData
方法中,针对 "time_sync_response"
类型,仅计算和跟踪 RTT。接收端添加 OWD 跟踪:
private val owdState = mutableLongStateOf(0L)
private val owdHistory = mutableStateListOf<Long>()
rttState
和 rttHistory
),因为接收端只需要跟踪 OWD。handleSignalingData
方法中,针对 "time_sync_request"
类型,计算 OWD 为 t2 - t1
并进行跟踪。统一时间同步:
time_sync_request
,包含 t1
。time_sync_response
,包含 t1
、t2
和 t3
。t1
、t2
、t3
和 t4
(当前时间)计算 RTT。t2 - t1
。UI 增强:
代码清理:
权限处理:
日志增强:
LaunchedEffect
和 lifecycleScope
时,确保所有 UI 更新都在主线程上执行,以防止出现意外行为。onDestroyView
方法中始终释放或处理资源,如 PeerConnection
、SurfaceViewRenderer
和 EglBase
,以防止内存泄漏。通过实施这些修改,您的应用将有效地使用统一的时间同步请求来计算和展示 RTT 和 OWD,向用户提供宝贵的网络性能见解,从而优化视频通信体验。
解决负值问题
我正在调试一个使用WebRTC的Kotlin(Android)视频通信应用。我发现OWD(单向延迟)计算出负值,正在查找原因并提出修复建议。
分析时间同步
捕捉时间戳并计算往返时间。使用RTT和OWD衡量延迟,考虑钟差影响。
考虑时钟同步问题
要准确测量OWD,需要确保时钟同步。没有时钟同步,基础时间戳的OWD计算会不准确。
调整计算方法
我在思考如何通过调整计算OWD的方法,例如改为(t3 - t1) / 2或确保时钟同步,来改进性能。
探讨时序问题
通过RTT估算OWD,假设对称延迟。记录差异计算时钟偏移量,确保准确性。这样做的目的是确保精确估算。
调整影响
为了修正负Owd问题,需要避免使用绝对时间戳,考虑在发起方计算或在接收方修改计算。
测量时偏差
可以通过调整计算方法,确保正值的计算,进而推测时钟偏差。_北京赛车pk
使用接收器本地时间,发送响应时增加时延,初始时间接收方收到时间同步响应后计算RTT和Offset,避免负OWD。
调整OBD计算
为了处理负的ODD,需要调整OBD计算方式。我会计算时钟偏移量,然后发送给接收方,让对方通过计算实时调整时延。这样,系统会变得更精准。
出现单向时延(OWD)为负数的主要原因是发起端(Initiator)和接收端(Receiver)之间的时钟不同步。由于两端设备的系统时间可能存在差异,直接使用绝对时间戳进行计算会导致不准确甚至负数的结果。
在当前实现中,接收端计算 OWD 的方式是 owd = t2 - t1
,其中:
t1
是发起端发送 time_sync_request
的时间。t2
是接收端接收到 time_sync_request
的时间。由于发起端和接收端的系统时钟不同步,t2 - t1
可能会出现负数或不准确的结果。这是因为 t1
和 t2
分别基于各自独立的系统时钟,无法直接进行比较。
为了准确计算 OWD,需要先同步发起端和接收端的时钟,或者通过计算时钟偏移量来调整时间戳。以下是实现时钟同步并正确计算 OWD 的步骤:
计算时钟偏移量: 使用类似于 NTP(网络时间协议)的方法,通过交换时间戳来估算发起端和接收端的时钟偏移量。
调整 OWD 计算: 使用计算出的时钟偏移量来调整接收端的时间戳,从而得到准确的 OWD。
在发起端和接收端之间交换更多的时间戳信息,以便计算时钟偏移量。
发起端(Initiator)发送 time_sync_request
:
private fun requestTimeSync() { val t1 = System.currentTimeMillis() val syncRequest = JSONObject().apply { put("type", "time_sync_request") put("room", currentRoom) put("t1", t1) } socket.emit("signal", syncRequest) Log.d(TAG, "发送时间同步请求,t1: $t1") }
接收端(Receiver)处理 time_sync_request
并发送 time_sync_response
:
"时间同步请求" -> { val t1 = data.getLong("t1") val t2 = System.currentTimeMillis() val t3 = t2 // 假设处理时间为零,如果有处理延迟,可以记录实际处理完成时间 val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("t1", t1) put("t2", t2) put("t3", t3) put("room", currentRoom) } socket.emit("signal", syncResponse) Log.d(TAG, "回复 time_sync_request: t1=$t1, t2=$t2, t3=$t3") }
发起端(Initiator)接收 time_sync_response
并计算 RTT 和时钟偏移量:
"time_sync_response" -> { val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.currentTimeMillis() // 计算 RTT val rtt = t4 - t1 - (t3 - t2) // 计算时钟偏移量 val offset = ((t2 - t1) + (t3 - t4)) / 2 Log.d(TAG, "时间同步: RTT=$rtt ms, 时钟偏移量=$offset ms") viewLifecycleOwner.lifecycleScope.launch { // 更新 RTT 状态 rttState.longValue = rtt // 更新 RTT 历史记录 rttHistory.add(rtt) if (rttHistory.size > 60) { rttHistory.removeAt(0) } // 计算 RTT 的最大、最小和平均值 val maxRtt = rttHistory.maxOrNull() ?: 0L val minRtt = rttHistory.minOrNull() ?: 0L val averageRtt = if (rttHistory.isNotEmpty()) { rttHistory.average().toLong() } else { 0L } Log.d(TAG, "RTT - 最新: $rtt ms, 最大: $maxRtt ms, 最小: $minRtt ms, 平均: $averageRtt ms") // 将时钟偏移量存储或发送给接收端以调整 OWD // 这里可以通过 socket.emit 发送偏移量给接收端,或使用其他机制 sendClockOffset(offset) } }
发起端发送时钟偏移量:
在计算出时钟偏移量后,将其发送给接收端,以便接收端调整自己的时间戳。
private fun sendClockOffset(offset: Long) { val offsetData = JSONObject().apply { put("type", "clock_offset") put("offset", offset) put("room", currentRoom) } socket.emit("signal", offsetData) Log.d(TAG, "发送时钟偏移量: offset=$offset ms") }
接收端接收并存储时钟偏移量:
在接收端,接收并存储时钟偏移量,用于调整 OWD 计算。
// 在 VideoReceiverFragment 中添加一个变量来存储时钟偏移量 private var clockOffset: Long = 0L // 在 handleSignalingData 方法中处理 clock_offset "clock_offset" -> { clockOffset = data.getLong("offset") Log.d(TAG, "接收到时钟偏移量: offset=$clockOffset ms") }
使用存储的时钟偏移量来调整接收端的时间戳,从而得到准确的 OWD。
接收端计算 OWD:
修改接收端计算 OWD 的方式,考虑时钟偏移量。
"time_sync_request" -> { val t1 = data.getLong("t1") val t2 = System.currentTimeMillis() val t3 = t2 // 假设处理时间为零,如果有处理延迟,可以记录实际处理完成时间 val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("t1", t1) put("t2", t2) put("t3", t3) put("room", currentRoom) } socket.emit("signal", syncResponse) Log.d(TAG, "回复 time_sync_request: t1=$t1, t2=$t2, t3=$t3") // 计算 OWD,考虑时钟偏移量 val adjustedT2 = t2 - clockOffset val owd = adjustedT2 - t1 if (owd >= 0) { Log.d(TAG, "时间同步: OWD=$owd ms") } else { Log.e(TAG, "计算的 OWD 为负数: OWD=$owd ms,可能由于时钟偏移未同步或计算错误。") } viewLifecycleOwner.lifecycleScope.launch { // 更新 OWD 状态 owdState.longValue = owd // 更新 OWD 历史记录 owdHistory.add(owd) if (owdHistory.size > 60) { owdHistory.removeAt(0) } // 计算 OWD 的最大、最小和平均值 val maxOwd = owdHistory.maxOrNull() ?: 0L val minOwd = owdHistory.minOrNull() ?: 0L val averageOwd = if (owdHistory.isNotEmpty()) owdHistory.average().toLong() else 0L Log.d(TAG, "OWD - 最新: $owd ms, 最大: $maxOwd ms, 最小: $minOwd ms, 平均: $averageOwd ms") } }
以下是针对 VideoReceiverFragment
的完整修正代码,包含时钟偏移量的计算和 OWD 的调整:
class VideoReceiverFragment : Fragment() { companion object { fun newInstance( room: String, stunUrl: String, turnUrl: String, turnUsername: String, turnPassword: String, signalingServerUrl: String ): VideoReceiverFragment { val fragment = VideoReceiverFragment() val args = Bundle().apply { putString("room", room) putString("stunUrl", stunUrl) putString("turnUrl", turnUrl) putString("turnUsername", turnUsername) putString("turnPassword", turnPassword) putString("signalingServerUrl", signalingServerUrl) } fragment.arguments = args return fragment } } // 类成员变量 private lateinit var socket: Socket private var localPeer: PeerConnection? = null private var remoteView: SurfaceViewRenderer? = null private var remoteEglBase: EglBase? = null private val pendingIceCandidates = mutableListOf<IceCandidate>() private var currentRoom: String? = null private lateinit var signalingServerUrl: String private lateinit var stunUrl: String private lateinit var turnUrl: String private lateinit var turnUsername: String private lateinit var turnPassword: String private val TAG: String = "WebRTC-Receiver" // OWD 的状态变量 private val owdState = mutableLongStateOf(0L) private val owdHistory = mutableStateListOf<Long>() private val frameRateState = mutableDoubleStateOf(0.0) private val bitrateState = mutableLongStateOf(0L) private val stutteringState = mutableStateOf(false) private val frameRateLowState = mutableStateOf(false) private val packetLossHighState = mutableStateOf(false) private val packetLossState = mutableDoubleStateOf(0.0) private val frameRateHistory = mutableStateListOf<Float>() private val bitrateHistory = mutableStateListOf<Long>() private var timeSyncJob: Job? = null // 历史变量 private var prevFramesDecoded = 0.0 private var prevBytesReceived = 0.0 private var prevFramesReceived = 0.0 private var prevFramesDropped = 0.0 private var prevTimestamp = 0.0 private var timeOffset: Long = 0 private var t1: Long = 0 private var statsJob: Job? = null // 存储时钟偏移量 private var clockOffset: Long = 0L override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // 从参数中获取数据 currentRoom = arguments?.getString("room") ?: "default-room" signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg" stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478" turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349" turnUsername = arguments?.getString("turnUsername") ?: "wstszx" turnPassword = arguments?.getString("turnPassword") ?: "930379" Log.d( TAG, "onCreate: Role = Receiver, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl" ) } private val requestPermissionsLauncher = registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions -> permissions.entries.forEach { (permission, isGranted) -> if (isGranted) { Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show() } else { Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show() } } onPermissionsChecked() } override fun onCreateView( inflater: android.view.LayoutInflater, container: android.view.ViewGroup?, savedInstanceState: Bundle? ): android.view.View { return ComposeView(requireContext()).apply { setContent { WebRTCComposeLayout() } } } @Composable fun WebRTCComposeLayout() { val context = LocalContext.current lateinit var peerConnectionFactory: PeerConnectionFactory Surface(color = Color.Black) { Column(modifier = Modifier.fillMaxSize()) { // 远程视频视图可以在这里添加 Spacer(modifier = Modifier.height(8.dp)) // 展示 OWD 指标 Column(modifier = Modifier.padding(horizontal = 16.dp)) { Text( text = "单向时延 (OWD) 最新: ${owdState.longValue} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "OWD 最大: ${owdHistory.maxOrNull() ?: 0} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "OWD 最小: ${owdHistory.minOrNull() ?: 0} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) val averageOwd = if (owdHistory.isNotEmpty()) owdHistory.average().toLong() else 0L Text( text = "OWD 平均: $averageOwd ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 8.dp) ) } // 现有的帧率、码率等 UI 组件 Spacer(modifier = Modifier.height(8.dp)) // 帧率部分 Column( modifier = Modifier .fillMaxWidth() .padding(horizontal = 8.dp) ) { // 中文的帧率文本 Text( text = "帧率: ${frameRateState.doubleValue.roundToInt()} fps", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Log.d( TAG, "UI - 帧率: ${frameRateState.doubleValue} fps, 码率: ${bitrateState.longValue / 1000} kbps" ) // 帧率的折线图 LineChart( data = frameRateHistory, modifier = Modifier .height(200.dp) .fillMaxWidth() .padding(vertical = 8.dp), lineColor = Color.Green, backgroundColor = Color.Black, yAxisLabel = "帧率 (fps)", xAxisLabel = "时间 (秒)" ) } Spacer(modifier = Modifier.height(8.dp)) // 码率部分 Column( modifier = Modifier .fillMaxWidth() .padding(horizontal = 16.dp) ) { // 中文的码率文本 Text( text = "码率: ${bitrateState.longValue / 1000} kbps", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) // 码率的折线图 LineChart( data = bitrateHistory.map { it / 1000f }, // 转换为 kbps modifier = Modifier .height(200.dp) .fillMaxWidth() .padding(vertical = 8.dp), lineColor = Color.Blue, backgroundColor = Color.Black, yAxisLabel = "码率 (kbps)", xAxisLabel = "时间 (秒)" ) } Spacer(modifier = Modifier.height(16.dp)) // 卡顿指示器 Row( modifier = Modifier .fillMaxWidth() .padding(horizontal = 16.dp), verticalAlignment = Alignment.CenterVertically ) { if (stutteringState.value) { Icon( imageVector = Icons.Default.Warning, contentDescription = "卡顿警告", tint = Color.Red, modifier = Modifier.size(24.dp) ) Spacer(modifier = Modifier.width(8.dp)) Column { Text( text = "视频播放出现卡顿", color = Color.Red, style = MaterialTheme.typography.bodyMedium ) // 显示具体的异常指标 if (frameRateLowState.value) { Text( text = "帧率过低: ${frameRateState.doubleValue.roundToInt()} fps", color = Color.Red, style = MaterialTheme.typography.bodySmall ) } if (packetLossHighState.value) { Text( text = "包丢失率过高: ${packetLossState.doubleValue.roundToInt()}%", color = Color.Red, style = MaterialTheme.typography.bodySmall ) } } } else { Icon( imageVector = Icons.Default.CheckCircle, contentDescription = "正常", tint = Color.Green, modifier = Modifier.size(24.dp) ) Spacer(modifier = Modifier.width(8.dp)) Text( text = "视频播放正常", color = Color.Green, style = MaterialTheme.typography.bodyMedium ) } } } LaunchedEffect(Unit) { val options = PeerConnectionFactory.InitializationOptions.builder(context) .createInitializationOptions() PeerConnectionFactory.initialize(options) val encoderFactory = DefaultVideoEncoderFactory( remoteEglBase!!.eglBaseContext, true, true ) val decoderFactory = DefaultVideoDecoderFactory(remoteEglBase!!.eglBaseContext) peerConnectionFactory = PeerConnectionFactory.builder() .setVideoEncoderFactory(encoderFactory) .setVideoDecoderFactory(decoderFactory) .createPeerConnectionFactory() createPeerConnection( context, peerConnectionFactory, remoteView!! ) { localPeer = it } initializeSocketIO() requestPermissionsIfNeeded() } } /** * 增强的折线图 Composable,带坐标轴和标签 * 使用直线连接每个点 */ @Composable fun LineChart( data: List<Float>, modifier: Modifier = Modifier, lineColor: Color = Color.Green, backgroundColor: Color = Color.Black, yAxisLabel: String = "", xAxisLabel: String = "", minYValue: Float? = null, maxYValue: Float? = null ) { // 使用 Canvas 绘制折线图,或使用任何兼容 Compose 的图表库 } private fun initializeSocketIO() { val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http" val socketUrl = signalingServerUrl val options = IO.Options().apply { transports = arrayOf("websocket") secure = protocol == "https" path = "/socket.io/" } try { socket = IO.socket(socketUrl, options) socket.on(Socket.EVENT_CONNECT) { Log.d(TAG, "Socket 已连接") socket.emit("join", currentRoom) Log.d(TAG, "加入房间: $currentRoom") initiateCall() } socket.on(Socket.EVENT_CONNECT_ERROR) { args -> if (args.isNotEmpty()) { val error = args[0] Log.e(TAG, "Socket 连接错误: $error") } } socket.on(Socket.EVENT_DISCONNECT) { args -> if (args.isNotEmpty()) { val reason = args[0] Log.d(TAG, "Socket 已断开: $reason") } } socket.on("signal") { args -> Log.d(TAG, "接收到信令: ${args[0]}") if (args.isNotEmpty() && args[0] is JSONObject) { val data = args[0] as JSONObject handleSignalingData(data) } } socket.connect() Log.d(TAG, "正在连接到 Socket: $socketUrl...") } catch (e: Exception) { Log.e(TAG, "连接 Socket 时出错: ${e.message}") } } private fun requestPermissionsIfNeeded() { // 请求必要的权限 val permissions = arrayOf( Manifest.permission.CAMERA, Manifest.permission.RECORD_AUDIO ) requestPermissionsLauncher.launch(permissions) } private fun onPermissionsChecked() { Toast.makeText(requireContext(), "所有必要的权限已被授予", Toast.LENGTH_SHORT).show() // 继续初始化视频或其他组件 } private fun createPeerConnection( context: Context, peerConnectionFactory: PeerConnectionFactory, remoteView: SurfaceViewRenderer, onLocalPeerCreated: (PeerConnection) -> Unit ) { // 创建 PeerConnection 的实现 } private fun initiateCall() { // 发起呼叫的实现,例如创建 offer } private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "处理信令数据: $data") when (data.getString("type")) { "answer" -> { Log.d(TAG, "接收到 answer") val sdp = SessionDescription( SessionDescription.Type.ANSWER, data.getJSONObject("sdp").getString("sdp") ) localPeer?.setRemoteDescription(object : SdpObserver { override fun onSetSuccess() { pendingIceCandidates.forEach { candidate -> localPeer?.addIceCandidate(candidate) } pendingIceCandidates.clear() Log.d(TAG, "设置远程描述(answer)成功") } override fun onSetFailure(error: String?) { Log.e(TAG, "设置远程描述(answer)错误: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } "ice" -> { Log.d(TAG, "接收到 ICE candidate") val candidateData = data.getJSONObject("candidate") val candidate = IceCandidate( candidateData.getString("sdpMid"), candidateData.getInt("sdpMLineIndex"), candidateData.getString("candidate") ) if (localPeer?.remoteDescription != null) { localPeer?.addIceCandidate(candidate) } else { pendingIceCandidates.add(candidate) } } "time_sync_request" -> { val t1 = data.getLong("t1") val t2 = System.currentTimeMillis() val t3 = t2 // 假设处理时间为零,如果有处理延迟,可以记录实际处理完成时间 val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("t1", t1) put("t2", t2) put("t3", t3) put("room", currentRoom) } socket.emit("signal", syncResponse) Log.d(TAG, "回复 time_sync_request: t1=$t1, t2=$t2, t3=$t3") // 计算 OWD,考虑时钟偏移量 val adjustedT2 = t2 - clockOffset val owd = adjustedT2 - t1 if (owd >= 0) { Log.d(TAG, "时间同步: OWD=$owd ms") } else { Log.e(TAG, "计算的 OWD 为负数: OWD=$owd ms,可能由于时钟偏移未同步或计算错误。") } viewLifecycleOwner.lifecycleScope.launch { // 更新 OWD 状态 owdState.longValue = owd // 更新 OWD 历史记录 owdHistory.add(owd) if (owdHistory.size > 60) { owdHistory.removeAt(0) } // 计算 OWD 的最大、最小和平均值 val maxOwd = owdHistory.maxOrNull() ?: 0L val minOwd = owdHistory.minOrNull() ?: 0L val averageOwd = if (owdHistory.isNotEmpty()) owdHistory.average().toLong() else 0L Log.d(TAG, "OWD - 最新: $owd ms, 最大: $maxOwd ms, 最小: $minOwd ms, 平均: $averageOwd ms") } } "clock_offset" -> { clockOffset = data.getLong("offset") Log.d(TAG, "接收到时钟偏移量: offset=$clockOffset ms") } else -> { Log.e(TAG, "未知的信令类型: ${data.getString("type")}") } } } private fun startStatsCollection() { Log.d(TAG, "开始收集统计数据...") statsJob = viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(1000) // 每秒收集一次统计数据 Log.d(TAG, "收集统计数据...") localPeer?.getStats { report -> Log.d(TAG, "已获取统计报告。") parseStatsReport(report) } ?: Log.e(TAG, "获取统计数据失败: localPeer 为 null.") } } } private fun parseStatsReport(report: RTCStatsReport) { Log.d(TAG, "接收到 RTCStatsReport: $report") for (stats in report.statsMap.values) { if (stats.type == "inbound-rtp") { val kind = stats.members["kind"] as? String if (kind == "video") { val framesDecoded = (stats.members["framesDecoded"] as? Number)?.toDouble() ?: 0.0 val framesReceived = (stats.members["framesReceived"] as? Number)?.toDouble() ?: 0.0 val framesDropped = (stats.members["framesDropped"] as? Number)?.toDouble() ?: 0.0 val bytesReceived = (stats.members["bytesReceived"] as? Number)?.toDouble() ?: 0.0 val packetsLost = (stats.members["packetsLost"] as? Number)?.toDouble() ?: 0.0 val packetsReceived = (stats.members["packetsReceived"] as? Number)?.toDouble() ?: 1.0 // 避免除零 val packetLossFraction = packetsLost / (packetsLost + packetsReceived) val timestamp = stats.timestampUs / 1_000_000.0 // 转换为秒 Log.d( TAG, "统计数据 - 解码帧数: $framesDecoded, 接收帧数: $framesReceived, 丢弃帧数: $framesDropped, 接收字节数: $bytesReceived, 丢包率: $packetLossFraction, 时间戳: $timestamp" ) if (prevTimestamp != 0.0) { val timeElapsed = timestamp - prevTimestamp val framesDelta = framesDecoded - prevFramesDecoded val bytesDelta = bytesReceived - prevBytesReceived val framesReceivedDelta = framesReceived - prevFramesReceived val framesDroppedDelta = framesDropped - prevFramesDropped val frameRate = if (timeElapsed > 0) framesDelta / timeElapsed else 0.0 val bitrate = if (timeElapsed > 0) (bytesDelta * 8) / timeElapsed else 0.0 // 比特每秒 val packetLoss = packetLossFraction * 100 // 转换为百分比 Log.d(TAG, "计算的帧率: $frameRate fps, 码率: $bitrate bps, 丢包率: $packetLoss%") // 根据阈值判断是否卡顿 val isStuttering = frameRate < 24 || packetLoss > 5.0 // 阈值可调整 // 更新状态 viewLifecycleOwner.lifecycleScope.launch { frameRateState.doubleValue = frameRate bitrateState.longValue = bitrate.toLong() // 更新卡顿状态 stutteringState.value = isStuttering // 更新具体卡顿原因 frameRateLowState.value = frameRate < 24 packetLossHighState.value = packetLoss > 5.0 packetLossState.doubleValue = packetLoss // 更新历史记录 frameRateHistory.add(frameRate.toFloat()) if (frameRateHistory.size > 60) { frameRateHistory.removeAt(0) } bitrateHistory.add(bitrate.toLong()) if (bitrateHistory.size > 60) { bitrateHistory.removeAt(0) } Log.d( TAG, "更新的帧率: ${frameRateState.doubleValue} fps, 码率: ${bitrateState.longValue / 1000} kbps, 卡顿: $isStuttering" ) } } // 更新之前的值 prevFramesDecoded = framesDecoded prevBytesReceived = bytesReceived prevFramesReceived = framesReceived prevFramesDropped = framesDropped prevTimestamp = timestamp } } } } override fun onDestroyView() { super.onDestroyView() statsJob?.cancel() socket.disconnect() localPeer?.dispose() remoteView?.release() remoteEglBase?.release() timeSyncJob?.cancel() } }
时钟偏移量的计算与传递:
time_sync_response
后,计算 RTT 和时钟偏移量。接收端调整 OWD 计算:
owd = (t2 - offset) - t1
来计算 OWD。确保时钟偏移量在 OWD 计算前已知:
time_sync_request
前确保已经接收到并存储了时钟偏移量。如果未接收到偏移量,可以选择不计算 OWD 或等待偏移量的更新。多次时间同步: 为了提高时钟偏移量的准确性,可以进行多次时间同步请求,并取平均值或使用其他滤波方法来减少误差。
异常处理: 增强异常处理机制,确保在时钟偏移量计算失败或数据异常时,应用能够正确处理,避免 OWD 出现负数。
时钟同步协议: 如果需要更高精度的时钟同步,可以考虑集成成熟的时钟同步协议(如 NTP 或 PTP),但这通常需要服务器端的支持。
通过上述修改和优化,应该能够有效解决单向时延(OWD)出现负数的问题,确保 OWD 的计算准确可靠。