问题

class VideoInitiatorFragment : Fragment() {

companion object {
fun newInstance(
room: String,
stunUrl: String,
turnUrl: String,
turnUsername: String,
turnPassword: String,
signalingServerUrl: String
): VideoInitiatorFragment {
val fragment = VideoInitiatorFragment()
val args = Bundle().apply {
putString("room", room)
putString("stunUrl", stunUrl)
putString("turnUrl", turnUrl)
putString("turnUsername", turnUsername)
putString("turnPassword", turnPassword)
putString("signalingServerUrl", signalingServerUrl)
}
fragment.arguments = args
return fragment
}
}

// 类成员变量保持不变
private lateinit var socket: Socket
private var localPeer: PeerConnection? = null
private var localView: SurfaceViewRenderer? = null
private var localEglBase: EglBase? = null
private val pendingIceCandidates = mutableListOf<IceCandidate>()
private var currentRoom: String? = null
private lateinit var signalingServerUrl: String
private lateinit var stunUrl: String
private lateinit var turnUrl: String
private lateinit var turnUsername: String
private lateinit var turnPassword: String

private val TAG: String = "WebRTC-Initiator"

// RTT State Variables
private val maxRttState = mutableStateOf(0L)
private val minRttState = mutableStateOf(Long.MAX_VALUE)
private val averageRttState = mutableStateOf(0L)
private val latestRttState = mutableStateOf(0L)

// History list to compute average RTT
private val rttHistory = mutableStateListOf<Long>()
private var pingTimestamp: Long = 0

// Job for stats collection
private var statsJob: Job? = null

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

// 从参数中获取数据
currentRoom = arguments?.getString("room") ?: "default-room"
signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg"
stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478"
turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349"
turnUsername = arguments?.getString("turnUsername") ?: "wstszx"
turnPassword = arguments?.getString("turnPassword") ?: "930379"

Log.d(
TAG,
"onCreate: 角色 = 服务器, 房间 = $currentRoom, 信令服务器 = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl"
)
}

private val requestPermissionsLauncher =
registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions ->
permissions.entries.forEach { (permission, isGranted) ->
if (isGranted) {
Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show()
} else {
Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show()
}
}
onPermissionsChecked()
}

override fun onCreateView(
inflater: android.view.LayoutInflater,
container: android.view.ViewGroup?,
savedInstanceState: Bundle?
): android.view.View {
return ComposeView(requireContext()).apply {
setContent {
WebRTCComposeLayout()
}
}
}

@Composable
fun WebRTCComposeLayout() {
val context = LocalContext.current
lateinit var peerConnectionFactory: PeerConnectionFactory
var localVideoTrack: VideoTrack? by remember { mutableStateOf(null) }

Surface(color = Color.Black) {
Column(modifier = Modifier.fillMaxSize()) {

// 本地视频视图,填充整个屏幕
AndroidView(
factory = {
localView = SurfaceViewRenderer(it).apply {
setZOrderMediaOverlay(false)
}
localView!!
},
modifier = Modifier
.weight(1f)
.fillMaxWidth(),
update = {
if (localEglBase == null) {
localEglBase = EglBase.create()
it.init(localEglBase!!.eglBaseContext, null)
it.setMirror(false)
}
}
)

Spacer(modifier = Modifier.height(8.dp))

// RTT 统计数据显示
Column(modifier = Modifier.padding(16.dp)) {
Text(
text = "最新往返时延: ${latestRttState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最大往返时延: ${maxRttState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最小往返时延: ${minRttState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "平均往返时延: ${averageRttState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 8.dp)
)
}

// 其他 UI 组件...

}

LaunchedEffect(Unit) {
val options = PeerConnectionFactory.InitializationOptions.builder(context)
.createInitializationOptions()
PeerConnectionFactory.initialize(options)

val encoderFactory = DefaultVideoEncoderFactory(
localEglBase!!.eglBaseContext, true, true
)

val decoderFactory = DefaultVideoDecoderFactory(localEglBase!!.eglBaseContext)

peerConnectionFactory = PeerConnectionFactory.builder()
.setVideoEncoderFactory(encoderFactory)
.setVideoDecoderFactory(decoderFactory)
.createPeerConnectionFactory()

initLocalVideo(context, localView, peerConnectionFactory, localEglBase!!) {
localVideoTrack = it
}

createPeerConnection(
context,
peerConnectionFactory,
localVideoTrack
) {
localPeer = it
startStatsCollection() // 启动统计收集
}

initializeSocketIO()

requestPermissionsIfNeeded()
}
}
}

private fun startStatsCollection() {
statsJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(1000) // 每秒收集一次统计数据
localPeer?.getStats { report ->
parseStatsReport(report)
} ?: Log.e(TAG, "Failed to get stats: localPeer is null.")
}
}
}

private fun parseStatsReport(report: RTCStatsReport) {
Log.d(TAG, "RTCStatsReport: ${report.statsMap}")
for (stats in report.statsMap.values) {
Log.d(TAG, "Stats type: ${stats.type}")
if (stats.type == "transport") {
Log.d(TAG, "Transport Stats found: $stats")
val currentRtt = (stats.members["currentRoundTripTime"] as? Number)?.toDouble()?.times(1000)?.toLong()
if (currentRtt != null && currentRtt > 0) {
viewLifecycleOwner.lifecycleScope.launch {
// 更新最新 RTT
latestRttState.value = currentRtt

// 更新历史记录
rttHistory.add(currentRtt)
if (rttHistory.size > 60) {
rttHistory.removeAt(0)
}

// 计算最大、最小和平均 RTT
val maxRtt = rttHistory.maxOrNull() ?: 0L
val minRtt = rttHistory.minOrNull() ?: 0L
val averageRtt = if (rttHistory.isNotEmpty()) {
rttHistory.average().toLong()
} else {
0L
}

maxRttState.value = maxRtt
minRttState.value = minRtt
averageRttState.value = averageRtt

Log.d(TAG, "RTT - Latest: $currentRtt ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms")
}
}
}
}
}

private fun startSendingTimestamps() {
viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(5000) // 每 5 秒

// 发送 ping
pingTimestamp = System.currentTimeMillis()
val pingData = JSONObject().apply {
put("type", "ping")
put("timestamp", pingTimestamp)
put("room", currentRoom)
}
socket.emit("signal", pingData)
Log.d(TAG, "发送 ping 时间: $pingTimestamp")
}
}
}

private fun initializeSocketIO() {
val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http"
val socketUrl = signalingServerUrl

val options = IO.Options().apply {
transports = arrayOf("websocket")
secure = protocol == "https"
path = "/socket.io/"
}

try {
socket = IO.socket(socketUrl, options)

socket.on(Socket.EVENT_CONNECT) {
Log.d(TAG, "Socket 已连接")
socket.emit("join", currentRoom)
Log.d(TAG, "已加入房间: $currentRoom")
startSendingTimestamps()
}

socket.on(Socket.EVENT_CONNECT_ERROR) { args ->
if (args.isNotEmpty()) {
val error = args[0]
Log.e(TAG, "Socket 连接错误: $error")
}
}

socket.on(Socket.EVENT_DISCONNECT) { args ->
if (args.isNotEmpty()) {
val reason = args[0]
Log.d(TAG, "Socket 已断开: $reason")
}
}

socket.on("signal") { args ->
Log.d(TAG, "收到信令: ${args[0]}")
if (args.isNotEmpty() && args[0] is JSONObject) {
val data = args[0] as JSONObject
handleSignalingData(data)
}
}

socket.connect()
Log.d(TAG, "正在连接到 Socket: $socketUrl...")
} catch (e: Exception) {
Log.e(TAG, "连接 Socket 时出错: ${e.message}")
}
}

private fun requestPermissionsIfNeeded() {
val permissions = arrayOf(
Manifest.permission.CAMERA,
Manifest.permission.RECORD_AUDIO,
Manifest.permission.INTERNET,
Manifest.permission.ACCESS_NETWORK_STATE
)

val permissionsToRequest = permissions.filter {
ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED
}

if (permissionsToRequest.isNotEmpty()) {
requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray())
} else {
onPermissionsChecked()
}
}

private fun onPermissionsChecked() {
Toast.makeText(requireContext(), "所有必要权限已被授予", Toast.LENGTH_SHORT).show()
}

private fun initLocalVideo(
context: Context,
localView: SurfaceViewRenderer?,
peerConnectionFactory: PeerConnectionFactory,
eglBase: EglBase,
onLocalVideoTrack: (VideoTrack) -> Unit
) {
val videoCapturer = createCameraCapturer(context)
val surfaceTextureHelper = SurfaceTextureHelper.create("CaptureThread", eglBase.eglBaseContext)
val videoSource = peerConnectionFactory.createVideoSource(videoCapturer.isScreencast)
videoCapturer.initialize(surfaceTextureHelper, context, videoSource.capturerObserver)

videoCapturer.startCapture(1920, 1080, 60)

val localVideoTrack = peerConnectionFactory.createVideoTrack("video_track", videoSource)
localVideoTrack.addSink(localView)

val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints())
val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource)

// 添加音视频轨道到本地流
val mediaStream = peerConnectionFactory.createLocalMediaStream("local_stream")
mediaStream.addTrack(localAudioTrack)
mediaStream.addTrack(localVideoTrack)

onLocalVideoTrack(localVideoTrack)
}

private fun createCameraCapturer(context: Context): CameraVideoCapturer {
val camera2Enumerator = Camera2Enumerator(context)
val deviceNames = camera2Enumerator.deviceNames

// 优先选择后置摄像头
for (deviceName in deviceNames) {
if (camera2Enumerator.isBackFacing(deviceName)) {
val capturer = camera2Enumerator.createCapturer(deviceName, null)
if (capturer != null) {
return capturer
}
}
}

// 如果没有后置摄像头,则尝试前置摄像头
for (deviceName in deviceNames) {
if (camera2Enumerator.isFrontFacing(deviceName)) {
val capturer = camera2Enumerator.createCapturer(deviceName, null)
if (capturer != null) {
return capturer
}
}
}

// 如果没有前置摄像头,则使用第一个摄像头
return camera2Enumerator.createCapturer(deviceNames[0], null)
?: throw IllegalStateException("无法创建摄像头捕获器")
}

private fun createPeerConnection(
context: Context,
peerConnectionFactory: PeerConnectionFactory,
localVideoTrack: VideoTrack?,
onLocalPeerCreated: (PeerConnection) -> Unit
) {
val iceServers = listOf(
PeerConnection.IceServer.builder(stunUrl).createIceServer(),
PeerConnection.IceServer.builder(turnUrl)
.setUsername(turnUsername)
.setPassword(turnPassword)
.createIceServer()
)

val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply {
bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE
rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE
tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED
continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY
sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN
}

localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer {
override fun onIceCandidate(iceCandidate: IceCandidate?) {
iceCandidate?.let {
Log.d(TAG, "ICE candidate: $it")
val signalData = JSONObject().apply {
put("type", "ice")
put("candidate", JSONObject().apply {
put("sdpMid", it.sdpMid)
put("sdpMLineIndex", it.sdpMLineIndex)
put("candidate", it.sdp)
})
put("room", currentRoom)
}
socket.emit("signal", signalData)
}
}

override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) {
Log.d(TAG, "ICE candidates removed")
}

override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
Log.d(TAG, "Signaling state changed to: $newState")
}

override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
Log.d(TAG, "ICE connection state changed to: $newState")
}

override fun onIceConnectionReceivingChange(receiving: Boolean) {
Log.d(TAG, "ICE connection receiving change: $receiving")
}

override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
Log.d(TAG, "ICE gathering state changed to: $newState")
}

override fun onAddStream(stream: MediaStream?) {
Log.d(TAG, "Stream added")
}

override fun onRemoveStream(stream: MediaStream?) {
Log.d(TAG, "Stream removed")
}

override fun onDataChannel(dataChannel: DataChannel?) {
Log.d(TAG, "Data channel created")
}

override fun onRenegotiationNeeded() {
Log.d(TAG, "Renegotiation needed")
}

override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) {
Log.d(TAG, "Track added")
}

override fun onTrack(transceiver: RtpTransceiver?) {
Log.d(TAG, "onTrack called")
}

override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
Log.d(TAG, "Connection state changed to: $newState")
}
})

localVideoTrack?.let {
localPeer?.addTrack(it, listOf("local_stream"))
}
val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints())
val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource)
localPeer?.addTrack(localAudioTrack, listOf("local_stream"))

onLocalPeerCreated(localPeer!!)
}

private fun createAnswer(peerConnection: PeerConnection, onAnswerCreated: (String) -> Unit) {
Log.d(TAG, "Creating answer...")
val constraints = MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true"))
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"))
}

peerConnection.createAnswer(object : SdpObserver {
override fun onCreateSuccess(sessionDescription: SessionDescription?) {
sessionDescription?.let { sdp ->
peerConnection.setLocalDescription(object : SdpObserver {
override fun onSetSuccess() {
Log.d(TAG, "SetLocalDescription onSetSuccess")
onAnswerCreated(sdp.description)
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "SetLocalDescription onSetFailure: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}
}

override fun onSetSuccess() {
Log.d(TAG, "createAnswer onSetSuccess")
}

override fun onCreateFailure(error: String?) {
Log.e(TAG, "createAnswer onCreateFailure: $error")
}

override fun onSetFailure(error: String?) {}
}, constraints)
}

private fun handleSignalingData(data: JSONObject) {
Log.d(TAG, "Handling signaling data: $data")
when (data.getString("type")) {
"offer" -> {
Log.d(TAG, "Received offer")
val sdp = SessionDescription(
SessionDescription.Type.OFFER,
data.getJSONObject("sdp").getString("sdp")
)
localPeer?.setRemoteDescription(object : SdpObserver {
override fun onSetSuccess() {
Log.d(TAG, "Set remote description (offer) success")
createAnswer(localPeer!!) { answer ->
val signalData = JSONObject().apply {
put("type", "answer")
put("sdp", JSONObject().put("sdp", answer))
put("room", currentRoom)
}

socket.emit("signal", signalData)

pendingIceCandidates.forEach { candidate ->
localPeer?.addIceCandidate(candidate)
}
pendingIceCandidates.clear()
}
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set remote description (offer) error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}

"ice" -> {
Log.d(TAG, "Received ICE candidate")
val candidateData = data.getJSONObject("candidate")
val candidate = IceCandidate(
candidateData.getString("sdpMid"),
candidateData.getInt("sdpMLineIndex"),
candidateData.getString("candidate")
)

if (localPeer?.remoteDescription != null) {
localPeer?.addIceCandidate(candidate)
} else {
pendingIceCandidates.add(candidate)
}
}

"time_sync_request" -> {
Log.d(TAG, "收到 time_sync_request")
if (data.has("t1")) {
val t1 = data.getLong("t1")
val t2 = System.currentTimeMillis()
val t3 = System.currentTimeMillis()
val syncResponse = JSONObject().apply {
put("type", "time_sync_response")
put("t1", t1)
put("t2", t2)
put("t3", t3)
put("room", currentRoom)
}
socket.emit("signal", syncResponse)
Log.d(TAG, "回复 time_sync_response: t1=$t1, t2=$t2, t3=$t3")
} else {
Log.e(TAG, "time_sync_request 缺少 t1")
}
}

"pong" -> { // 处理 pong 消息
val receivedTimestamp = data.getLong("timestamp")
val currentTimestamp = System.currentTimeMillis()
val rtt = currentTimestamp - pingTimestamp

viewLifecycleOwner.lifecycleScope.launch {
latestRttState.value = rtt
rttHistory.add(rtt)
if (rttHistory.size > 60) {
rttHistory.removeAt(0)
}

maxRttState.value = rttHistory.maxOrNull() ?: 0
minRttState.value = rttHistory.minOrNull() ?: 0
averageRttState.value = rttHistory.average().toLong()

Log.d(TAG, "RTT: $rtt ms")
}
}

else -> {
Log.e(TAG, "Unknown signaling type: ${data.getString("type")}")
}
}
}

private fun requestTimeSync() {
val t1 = System.currentTimeMillis()
val syncRequest = JSONObject().apply {
put("type", "time_sync_request")
put("room", currentRoom)
put("t1", t1)
}
socket.emit("signal", syncRequest)
Log.d(TAG, "发送时间同步请求 at t1: $t1")
}

override fun onDestroyView() {
super.onDestroyView()
statsJob?.cancel()
socket.disconnect()
localPeer?.dispose()
localView?.release()
localEglBase?.release()
}
content_copy
Use code with caution.

}

class VideoReceiverFragment : Fragment() {

companion object {
fun newInstance(
room: String,
stunUrl: String,
turnUrl: String,
turnUsername: String,
turnPassword: String,
signalingServerUrl: String
): VideoReceiverFragment {
val fragment = VideoReceiverFragment()
val args = Bundle().apply {
putString("room", room)
putString("stunUrl", stunUrl)
putString("turnUrl", turnUrl)
putString("turnUsername", turnUsername)
putString("turnPassword", turnPassword)
putString("signalingServerUrl", signalingServerUrl)
}
fragment.arguments = args
return fragment
}
}

// Class member variables
private lateinit var socket: Socket
private var localPeer: PeerConnection? = null
private var remoteView: SurfaceViewRenderer? = null
private var remoteEglBase: EglBase? = null
private val pendingIceCandidates = mutableListOf<IceCandidate>()
private var currentRoom: String? = null
private lateinit var signalingServerUrl: String
private lateinit var stunUrl: String
private lateinit var turnUrl: String
private lateinit var turnUsername: String
private lateinit var turnPassword: String
private val TAG: String = "WebRTC-Receiver"

private val frameRateState = mutableStateOf(0.0)
private val bitrateState = mutableStateOf(0L)
private val stutteringState = mutableStateOf(false)

private val frameRateLowState = mutableStateOf(false)
private val packetLossHighState = mutableStateOf(false)
private val packetLossState = mutableStateOf(0.0)

private val frameRateHistory = mutableStateListOf<Float>()
private val bitrateHistory = mutableStateListOf<Long>()
private var timeSyncJob: Job? = null

// 添加历史记录列表用于存储所有单向时延值
private val latencyHistory = mutableStateListOf<Long>()

// 添加状态变量用于存储最大、最小和平均单向时延
private val maxLatencyState = mutableStateOf(0L)
private val minLatencyState = mutableStateOf(Long.MAX_VALUE)
private val averageLatencyState = mutableStateOf(0L)
private val latestLatencyState = mutableStateOf(0L)

// History variables
private var prevFramesDecoded = 0.0
private var prevBytesReceived = 0.0
private var prevFramesReceived = 0.0
private var prevFramesDropped = 0.0
private var prevTimestamp = 0.0
private var timeOffset: Long = 0
private var t1: Long = 0

private var statsJob: Job? = null

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

// Retrieve data from arguments
currentRoom = arguments?.getString("room") ?: "default-room"
signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg"
stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478"
turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349"
turnUsername = arguments?.getString("turnUsername") ?: "wstszx"
turnPassword = arguments?.getString("turnPassword") ?: "930379"

Log.d(
TAG,
"onCreate: Role = Client, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl"
)
}

private val requestPermissionsLauncher =
registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions ->
permissions.entries.forEach { (permission, isGranted) ->
if (isGranted) {
Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show()
} else {
Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show()
}
}
onPermissionsChecked()
}

override fun onCreateView(
inflater: android.view.LayoutInflater,
container: android.view.ViewGroup?,
savedInstanceState: Bundle?
): android.view.View {
return ComposeView(requireContext()).apply {
setContent {
WebRTCComposeLayout()
}
}
}

@Composable
fun WebRTCComposeLayout() {
val context = LocalContext.current
lateinit var peerConnectionFactory: PeerConnectionFactory

Surface(color = Color.Black) {
Column(modifier = Modifier.fillMaxSize()) {

// Remote video view
AndroidView(
factory = {
remoteView = SurfaceViewRenderer(it).apply {
setZOrderMediaOverlay(false)
}
remoteView!!
},
modifier = Modifier
.weight(1f)
.fillMaxWidth(),
update = {
if (remoteEglBase?.eglBaseContext == null) {
remoteEglBase = EglBase.create()
it.init(remoteEglBase!!.eglBaseContext, null)
it.setMirror(false)
}
}
)

Spacer(modifier = Modifier.height(8.dp))

// 展示最大、最小和平均单向时延
Column(modifier = Modifier.padding(horizontal = 16.dp)) {
Text(
text = "当前单向时延: ${latestLatencyState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最大单向时延: ${maxLatencyState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最小单向时延: ${minLatencyState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "平均单向时延: ${averageLatencyState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 8.dp)
)
}

// Spacer between video and charts
Spacer(modifier = Modifier.height(8.dp))

// Frame Rate Section
Column(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 8.dp)
) {
// Frame Rate Text in Chinese
Text(
text = "帧率: ${frameRateState.value.roundToInt()} fps",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Log.d(
TAG,
"UI - Frame Rate: ${frameRateState.value} fps, Bitrate: ${bitrateState.value / 1000} kbps"
)

// Line Chart for Frame Rate
LineChart(
data = frameRateHistory,
modifier = Modifier
.height(200.dp)
.fillMaxWidth()
.padding(vertical = 8.dp),
lineColor = Color.Green,
backgroundColor = Color.Black,
yAxisLabel = "帧率 (fps)",
xAxisLabel = "时间 (秒)"
)
}

// Spacer between charts
Spacer(modifier = Modifier.height(8.dp))

// Bitrate Section
Column(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 16.dp)
) {
// Bitrate Text in Chinese
Text(
text = "码率: ${bitrateState.value / 1000} kbps",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)

// Line Chart for Bitrate
LineChart(
data = bitrateHistory.map { it / 1000f }, // Convert to kbps
modifier = Modifier
.height(200.dp)
.fillMaxWidth()
.padding(vertical = 8.dp),
lineColor = Color.Blue,
backgroundColor = Color.Black,
yAxisLabel = "码率 (kbps)",
xAxisLabel = "时间 (秒)"
)
}

// Spacer between metrics and stuttering indicator
Spacer(modifier = Modifier.height(16.dp))

// Stuttering Indicator
Row(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 16.dp),
verticalAlignment = Alignment.CenterVertically
) {
if (stutteringState.value) {
Icon(
imageVector = Icons.Default.Warning,
contentDescription = "卡顿警告",
tint = Color.Red,
modifier = Modifier.size(24.dp)
)
Spacer(modifier = Modifier.width(8.dp))
Column {
Text(
text = "视频播放出现卡顿",
color = Color.Red,
style = MaterialTheme.typography.bodyMedium
)
// Additional information about which metrics are abnormal
if (frameRateLowState.value) {
Text(
text = "帧率过低: ${frameRateState.value.roundToInt()} fps",
color = Color.Red,
style = MaterialTheme.typography.bodySmall
)
}
if (packetLossHighState.value) {
Text(
text = "包丢失率过高: ${packetLossState.value.roundToInt()}%",
color = Color.Red,
style = MaterialTheme.typography.bodySmall
)
}
}
} else {
Icon(
imageVector = Icons.Default.CheckCircle,
contentDescription = "正常",
tint = Color.Green,
modifier = Modifier.size(24.dp)
)
Spacer(modifier = Modifier.width(8.dp))
Text(
text = "视频播放正常",
color = Color.Green,
style = MaterialTheme.typography.bodyMedium
)
}
}
}

LaunchedEffect(Unit) {
val options = PeerConnectionFactory.InitializationOptions.builder(context)
.createInitializationOptions()
PeerConnectionFactory.initialize(options)

val encoderFactory = DefaultVideoEncoderFactory(
EglBase.create().eglBaseContext, true, true
)

val decoderFactory = DefaultVideoDecoderFactory(remoteEglBase!!.eglBaseContext)

peerConnectionFactory = PeerConnectionFactory.builder()
.setVideoEncoderFactory(encoderFactory)
.setVideoDecoderFactory(decoderFactory)
.createPeerConnectionFactory()

createPeerConnection(
context,
peerConnectionFactory,
remoteView!!
) {
localPeer = it
}

initializeSocketIO()

requestPermissionsIfNeeded()
}
}
}

private fun startPeriodicTimeSync(intervalMs: Long = 5000L) {
timeSyncJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
requestTimeSync()
delay(intervalMs)
}
}
}

/**
* Enhanced Line Chart Composable with Axes and Labels
* Modified to use straight lines connecting each point
*/
@Composable
fun LineChart(
data: List<Float>,
modifier: Modifier = Modifier,
lineColor: Color = Color.Green,
backgroundColor: Color = Color.Black,
yAxisLabel: String = "",
xAxisLabel: String = "",
minYValue: Float? = null,
maxYValue: Float? = null
) {
Canvas(modifier = modifier.background(backgroundColor)) {
val padding = 40.dp.toPx() // Padding for axes and labels

if (data.isEmpty()) return@Canvas

val maxY = maxYValue ?: data.maxOrNull() ?: 1f
val minY = minYValue ?: data.minOrNull() ?: 0f
val yRange = maxY - minY
val pointCount = data.size
val spacing = (size.width - padding * 2) / (pointCount - 1).coerceAtLeast(1)

val points = data.mapIndexed { index, value ->
val x = padding + index * spacing
val y = if (yRange == 0f) size.height / 2 else (size.height - padding) - ((value - minY) / yRange) * (size.height - padding * 2)
Offset(x, y)
}

// Draw axes
drawLine(
color = Color.White,
start = Offset(padding, padding),
end = Offset(padding, size.height - padding),
strokeWidth = 2f
)
drawLine(
color = Color.White,
start = Offset(padding, size.height - padding),
end = Offset(size.width - padding, size.height - padding),
strokeWidth = 2f
)

// Draw y-axis labels
val yLabelCount = 5
val yStep = yRange / (yLabelCount - 1)
for (i in 0 until yLabelCount) {
val yValue = minY + i * yStep
val yPos = (size.height - padding) - ((yValue - minY) / yRange) * (size.height - padding * 2)

drawContext.canvas.nativeCanvas.apply {
val label = yValue.roundToInt().toString()
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.RIGHT
}
drawText(
label,
padding - 8f,
yPos + textPaint.textSize / 2,
textPaint
)
}
}

// Draw x-axis labels
val xLabelCount = 5
val xStep = (pointCount - 1).coerceAtLeast(1) / (xLabelCount - 1).coerceAtLeast(1)
for (i in 0 until xLabelCount) {
val index = i * xStep
val xPos = padding + index * spacing

drawContext.canvas.nativeCanvas.apply {
val label = index.toString()
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
}
drawText(
label,
xPos,
size.height - padding + textPaint.textSize + 4f,
textPaint
)
}
}

// Optionally, draw axis labels
// Y-Axis Label
if (yAxisLabel.isNotEmpty()) {
drawContext.canvas.nativeCanvas.apply {
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
isAntiAlias = true
}
// Rotate for vertical text
save()
rotate(-90f, padding / 2, size.height / 2)
drawText(
yAxisLabel,
padding / 2,
size.height / 2,
textPaint
)
restore()
}
}

// X-Axis Label
if (xAxisLabel.isNotEmpty()) {
drawContext.canvas.nativeCanvas.apply {
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
isAntiAlias = true
}
drawText(
xAxisLabel,
size.width / 2,
size.height - padding / 2,
textPaint
)
}
}

// Draw the straight lines connecting points
if (points.size >= 2) {
for (i in 0 until points.size - 1) {
drawLine(
color = lineColor,
start = points[i],
end = points[i + 1],
strokeWidth = 4f,
cap = StrokeCap.Round
)
}
}

// Optionally, draw points
points.forEach { point ->
drawCircle(
color = lineColor,
radius = 4f,
center = point
)
}
}
}

private fun initializeSocketIO() {
val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http"
val socketUrl = signalingServerUrl

val options = IO.Options().apply {
transports = arrayOf("websocket")
secure = protocol == "https"
path = "/socket.io/"
}

try {
socket = IO.socket(socketUrl, options)

socket.on(Socket.EVENT_CONNECT) {
Log.d(TAG, "Socket connected")
socket.emit("join", currentRoom)
Log.d(TAG, "Joined room: $currentRoom")
initiateCall()
startPeriodicTimeSync()
}

socket.on(Socket.EVENT_CONNECT_ERROR) { args ->
if (args.isNotEmpty()) {
val error = args[0]
Log.e(TAG, "Socket connection error: $error")
}
}

socket.on(Socket.EVENT_DISCONNECT) { args ->
if (args.isNotEmpty()) {
val reason = args[0]
Log.d(TAG, "Socket disconnected: $reason")
}
}

socket.on("signal") { args ->
Log.d(TAG, "Received signaling: ${args[0]}")
if (args.isNotEmpty() && args[0] is JSONObject) {
val data = args[0] as JSONObject
handleSignalingData(data)
}
}

socket.connect()
Log.d(TAG, "Connecting to Socket: $socketUrl...")
} catch (e: Exception) {
Log.e(TAG, "Error connecting to Socket: ${e.message}")
}
}

private fun requestPermissionsIfNeeded() {
val permissions = arrayOf(
Manifest.permission.CAMERA,
Manifest.permission.RECORD_AUDIO,
Manifest.permission.INTERNET,
Manifest.permission.ACCESS_NETWORK_STATE
)

val permissionsToRequest = permissions.filter {
ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED
}

if (permissionsToRequest.isNotEmpty()) {
requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray())
} else {
onPermissionsChecked()
}
}

private fun onPermissionsChecked() {
Toast.makeText(requireContext(), "所有必要的权限已授予", Toast.LENGTH_SHORT).show()
}

private fun createPeerConnection(
context: Context,
peerConnectionFactory: PeerConnectionFactory,
remoteView: SurfaceViewRenderer,
onLocalPeerCreated: (PeerConnection) -> Unit
) {
val iceServers = listOf(
PeerConnection.IceServer.builder(stunUrl).createIceServer(),
PeerConnection.IceServer.builder(turnUrl)
.setUsername(turnUsername)
.setPassword(turnPassword)
.createIceServer()
)

val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply {
bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE
rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE
tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED
continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY
sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN
}

localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer {
override fun onIceCandidate(iceCandidate: IceCandidate?) {
iceCandidate?.let {
Log.d(TAG, "ICE candidate: $it")
val signalData = JSONObject().apply {
put("type", "ice")
put("candidate", JSONObject().apply {
put("sdpMid", it.sdpMid)
put("sdpMLineIndex", it.sdpMLineIndex)
put("candidate", it.sdp)
})
put("room", currentRoom)
}
socket.emit("signal", signalData)
}
}

override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) {
Log.d(TAG, "ICE candidates removed")
}

override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
Log.d(TAG, "Signaling state changed to: $newState")
}

override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
Log.d(TAG, "ICE connection state changed to: $newState")
}

override fun onIceConnectionReceivingChange(receiving: Boolean) {
Log.d(TAG, "ICE connection receiving change: $receiving")
}

override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
Log.d(TAG, "ICE gathering state changed to: $newState")
}

override fun onAddStream(stream: MediaStream?) {
Log.d(TAG, "Stream added")
}

override fun onRemoveStream(stream: MediaStream?) {
Log.d(TAG, "Stream removed")
}

override fun onDataChannel(dataChannel: DataChannel?) {
Log.d(TAG, "Data channel created")
}

override fun onRenegotiationNeeded() {
Log.d(TAG, "Renegotiation needed")
}

override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) {
Log.d(TAG, "Track added")
receiver?.track()?.let { track ->
if (track is VideoTrack) {
track.addSink(remoteView)
}
}
}

override fun onTrack(transceiver: RtpTransceiver?) {
Log.d(TAG, "onTrack called")
transceiver?.receiver?.track()?.let { track ->
if (track is VideoTrack) {
track.addSink(remoteView)
}
}
}

override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
Log.d(TAG, "Connection state changed to: $newState")
}
})

onLocalPeerCreated(localPeer!!)

// Start collecting statistics
startStatsCollection()
}

private fun initiateCall() {
Log.d(TAG, "Initiating call...")
val constraints = MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true"))
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"))
}

localPeer?.createOffer(object : SdpObserver {
override fun onCreateSuccess(sessionDescription: SessionDescription?) {
sessionDescription?.let { sdp ->
localPeer?.setLocalDescription(object : SdpObserver {
override fun onSetSuccess() {
val signalData = JSONObject().apply {
put("type", "offer")
put("sdp", JSONObject().put("sdp", sdp.description))
put("room", currentRoom)
}
socket.emit("signal", signalData)
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set local description error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}
}

override fun onSetSuccess() {}
override fun onCreateFailure(error: String?) {
Log.e(TAG, "Create offer error: $error")
}

override fun onSetFailure(error: String?) {}
}, constraints)
}

private fun handleSignalingData(data: JSONObject) {
Log.d(TAG, "Handling signaling data: $data")
when (data.getString("type")) {

"answer" -> {
Log.d(TAG, "Received answer")
val sdp = SessionDescription(
SessionDescription.Type.ANSWER,
data.getJSONObject("sdp").getString("sdp")
)

localPeer?.setRemoteDescription(object : SdpObserver {
override fun onSetSuccess() {
pendingIceCandidates.forEach { candidate ->
localPeer?.addIceCandidate(candidate)
}

pendingIceCandidates.clear()

Log.d(TAG, "Set remote description (answer) success")
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set remote description error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}

"ice" -> {
Log.d(TAG, "Received ICE candidate")
val candidateData = data.getJSONObject("candidate")
val candidate = IceCandidate(
candidateData.getString("sdpMid"),
candidateData.getInt("sdpMLineIndex"),
candidateData.getString("candidate")
)

if (localPeer?.remoteDescription != null) {
localPeer?.addIceCandidate(candidate)
} else {
pendingIceCandidates.add(candidate)
}
}

"time_sync_response" -> {
val t1 = data.getLong("t1")
val t2 = data.getLong("t2")
val t3 = data.getLong("t3")
val t4 = System.currentTimeMillis()

val RTT = t4 - t1
val oneWayDelay = RTT / 2
timeOffset = ((t2 - t1) + (t3 - t4)) / 2

Log.d(TAG, "时间同步: RTT=$RTT ms, 单向时延=$oneWayDelay ms, 时间偏移量=$timeOffset ms")

// 更新 latencyState
viewLifecycleOwner.lifecycleScope.launch {
// 添加到历史记录
latencyHistory.add(oneWayDelay)
if (latencyHistory.size > 60) { // 保持最近60个时延值
latencyHistory.removeAt(0)
}

// 计算最大、最小和平均单向时延
val maxLatency = latencyHistory.maxOrNull() ?: 0L
val minLatency = latencyHistory.minOrNull() ?: 0L
val averageLatency = if (latencyHistory.isNotEmpty()) {
latencyHistory.average().toLong()
} else {
0L
}

maxLatencyState.value = maxLatency
minLatencyState.value = minLatency
averageLatencyState.value = averageLatency
latestLatencyState.value = oneWayDelay
}
}

"ping" -> { // 处理 ping 消息
val receivedTimestamp = data.getLong("timestamp")
val pongData = JSONObject().apply {
put("type", "pong")
put("timestamp", receivedTimestamp) // 将收到的时间戳发回
put("room", currentRoom)
}
socket.emit("signal", pongData)
Log.d(TAG, "发送 pong 对应时间戳: $receivedTimestamp")

}

else -> {
Log.e(TAG, "Unknown signaling type: ${data.getString("type")}")
}
}
}

private fun requestTimeSync() {
t1 = System.currentTimeMillis()
val syncRequest = JSONObject().apply {
put("type", "time_sync_request")
put("room", currentRoom)
put("t1", t1)
}
socket.emit("signal", syncRequest)
Log.d(TAG, "发送时间同步请求 at t1: $t1")
}

private fun startStatsCollection() {
Log.d(TAG, "Starting stats collection...")
statsJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(1000) // Collect stats every second
Log.d(TAG, "Collecting stats...")
localPeer?.getStats { report ->
Log.d(TAG, "Stats report obtained.")
parseStatsReport(report)
} ?: Log.e(TAG, "Failed to get stats: localPeer is null.")
}
}
}

private fun parseStatsReport(report: RTCStatsReport) {
Log.d(TAG, "Received RTCStatsReport: $report")
for (stats in report.statsMap.values) {
if (stats.type == "inbound-rtp") {
val kind = stats.members["kind"] as? String
if (kind == "video") {
val framesDecoded = (stats.members["framesDecoded"] as? Number)?.toDouble() ?: 0.0
val framesReceived = (stats.members["framesReceived"] as? Number)?.toDouble() ?: 0.0
val framesDropped = (stats.members["framesDropped"] as? Number)?.toDouble() ?: 0.0
val bytesReceived = (stats.members["bytesReceived"] as? Number)?.toDouble() ?: 0.0
val packetsLost = (stats.members["packetsLost"] as? Number)?.toDouble() ?: 0.0
val packetsReceived = (stats.members["packetsReceived"] as? Number)?.toDouble() ?: 1.0 // Avoid division by zero
val packetLossFraction = packetsLost / (packetsLost + packetsReceived)
val timestamp = stats.timestampUs / 1_000_000.0 // Convert to seconds

Log.d(
TAG,
"Stats - Frames Decoded: $framesDecoded, Frames Received: $framesReceived, Frames Dropped: $framesDropped, Bytes Received: $bytesReceived, Packet Loss Fraction: $packetLossFraction, Timestamp: $timestamp"
)

if (prevTimestamp != 0.0) {
val timeElapsed = timestamp - prevTimestamp
val framesDelta = framesDecoded - prevFramesDecoded
val bytesDelta = bytesReceived - prevBytesReceived
val framesReceivedDelta = framesReceived - prevFramesReceived
val framesDroppedDelta = framesDropped - prevFramesDropped

val frameRate = if (timeElapsed > 0) framesDelta / timeElapsed else 0.0
val bitrate = if (timeElapsed > 0) (bytesDelta * 8) / timeElapsed else 0.0 // bits per second
val packetLoss = packetLossFraction * 100 // Convert to percentage

Log.d(TAG, "Calculated Frame Rate: $frameRate fps, Bitrate: $bitrate bps, Packet Loss: $packetLoss%")

// Determine stuttering based on thresholds
val isStuttering = frameRate < 24 || packetLoss > 5.0 // Thresholds can be adjusted

// Update states
viewLifecycleOwner.lifecycleScope.launch {
frameRateState.value = frameRate
bitrateState.value = bitrate.toLong()
// Update stuttering state
stutteringState.value = isStuttering
// Update specific stuttering causes
frameRateLowState.value = frameRate < 24
packetLossHighState.value = packetLoss > 5.0
packetLossState.value = packetLoss

// Update history
frameRateHistory.add(frameRate.toFloat())
if (frameRateHistory.size > 60) {
frameRateHistory.removeAt(0)
}

bitrateHistory.add(bitrate.toLong())
if (bitrateHistory.size > 60) {
bitrateHistory.removeAt(0)
}

Log.d(
TAG,
"Updated Frame Rate: ${frameRateState.value} fps, Bitrate: ${bitrateState.value / 1000} kbps, Stuttering: $isStuttering"
)
}
}

// Update previous values
prevFramesDecoded = framesDecoded
prevBytesReceived = bytesReceived
prevFramesReceived = framesReceived
prevFramesDropped = framesDropped
prevTimestamp = timestamp
}
}
}
}

override fun onDestroyView() {
super.onDestroyView()
statsJob?.cancel()
socket.disconnect()
localPeer?.dispose()
remoteView?.release()
remoteEglBase?.release()
timeSyncJob?.cancel()
}
content_copy
Use code with caution.

}

分析目前的代码,统计单向时延和往返时延的代码合理吗?用中文回答

思考

分析代码功能

我在查看Kotlin代码,观察两个Android Fragment类的视频流和WebRTC连接,特别是RTT和单向延迟的测量。这让我想到,具体实现是否合理,以及是否存在改进空间。

回答

从您提供的代码来看,VideoInitiatorFragmentVideoReceiverFragment 分别实现了往返时延(RTT)和单向时延(Latency)的统计。整体上,这些实现方法在逻辑上是合理的,但仍有一些细节和潜在的问题需要注意。以下是具体的分析:

1. 往返时延(RTT)的实现

实现方式:

  • 发送 Ping 消息:VideoInitiatorFragment 中,通过 startSendingTimestamps() 方法每隔5秒发送一次包含当前时间戳的 ping 消息。
  • 接收 Pong 消息: 当接收到 pong 消息时,计算 RTT 为当前系统时间减去发送 ping 时的时间戳。
  • 统计与记录: 将 RTT 添加到历史记录中,并更新最大、最小和平均 RTT 状态。

合理性分析:

  • 基本逻辑正确: 通过发送带有时间戳的 ping 消息,并在接收到 pong 消息后计算 RTT,这是一种常见且合理的方法。
  • 历史记录管理: 通过 rttHistory 保持最近60个 RTT 值,有助于计算平均 RTT,避免因单次异常波动影响整体统计。
  • UI 更新及时: 使用 mutableStateOfmutableStateListOf 保证了 UI 的实时更新。

潜在问题与改进建议:

  • 时间戳的准确性: 依赖于双方设备的系统时间。如果两端设备的系统时间不同步,可能会导致 RTT 计算不准确。虽然 RTT 本质上是往返时间,不依赖于绝对时间,但确保双方时间同步仍然是良好的实践。
  • 消息丢失处理: 如果 ping 消息或 pong 消息丢失,当前实现没有重试机制,可能导致 RTT 统计不完整。可以考虑在发送 ping 后设置超时机制,如果在预定时间内未收到 pong,则记录为丢失或重试。
  • 多重 Ping 处理: 如果在前一次 ping 未收到 pong 前发送了新的 ping,可能会混淆 RTT 计算。需要确保每次 ping 对应一个 pong,可以通过增加唯一标识符来区分不同的 ping 消息。

2. 单向时延(Latency)的实现

实现方式:

  • 时间同步请求:VideoReceiverFragment 中,通过 startPeriodicTimeSync() 方法定期发送 time_sync_request 消息,包含当前时间戳 t1
  • 时间同步响应: 接收到 time_sync_request 后,立即回复 time_sync_response 消息,包含 t1t2(接收时刻)、t3(发送时刻)。
  • 时延计算: 发送方收到 time_sync_response 后,计算 RTT 和单向时延,利用公式:
    • RTT = t4 - t1
    • OneWayDelay = RTT / 2
    • TimeOffset = ((t2 - t1) + (t3 - t4)) / 2

合理性分析:

  • 时间同步机制: 使用类似于 NTP(网络时间协议)的时间同步方法,通过交换时间戳计算 RTT 和单向时延,这是一个合理的方法。
  • 时延计算公式正确: 公式正确地考虑了往返时间,并假设发送和接收的处理时间对称,从而估算单向时延。

潜在问题与改进建议:

  • 系统时间同步: 单向时延的准确性高度依赖于双方设备的系统时间同步。如果存在时钟漂移或不同步,计算结果会受到影响。建议在初始化时进行一次完整的时间同步,或者使用专门的时钟同步协议。
  • 处理时间的延迟: 接收和发送 time_sync_response 消息的处理时间可能引入额外的延迟,特别是在高负载或设备性能较低的情况下。可以通过优化消息处理流程,减少处理时间,提高时延计算的准确性。
  • 单向时延的波动性: 网络环境的变化可能导致单向时延的波动较大。建议在计算单向时延时,考虑使用更多的数据点进行平滑处理,以获得更稳定的统计结果。
  • 错误处理与重试机制: 如果 time_sync_requesttime_sync_response 消息丢失,当前实现没有重试机制,可能导致时延统计不完整。可以考虑增加重试机制或超时处理,确保时延统计的连续性和准确性。

3. 综合建议

  • 优化权限请求逻辑: 确保在所有必要权限被授予后再进行 WebRTC 连接的初始化,避免因权限问题导致连接失败或统计不准确。
  • 资源释放与管理:onDestroyView() 方法中,确保所有资源(如 statsJobsocketPeerConnectionSurfaceViewRendererEglBase)被正确释放,避免内存泄漏。
  • 日志记录与调试: 目前代码中有大量的日志记录,这对于调试和监控非常有用。建议在生产环境中适当调整日志级别,避免过多的日志影响性能。

总结

总体而言,您的代码在实现 RTT 和单向时延统计方面采用了合理的方法,并且考虑了历史记录和实时更新。然而,为了提高统计的准确性和健壮性,建议进一步优化时间同步机制、处理潜在的消息丢失问题,并确保双方设备的时间同步。此外,适当的错误处理和资源管理也将有助于提升应用的稳定性和用户体验。

问题

目前的单向时延和往返时延的统计方案没有在同一个请求中用吗?应该改成在一个请求中既统计单向时延又统计往返时延

思考

整合单向时延和往返时延

分析代码后,用户询问是否在同一个请求中统计单向时延和往返时延,以提升同步性、效率或准确性。

探讨合并指标的好处和实施方法

我正在考虑将RTT和单向延迟统计信息合并的效率提升。这不仅可以减少消息数,还可确保同步。接着,我将阐明合并的意义和代码调整方式,包含示例代码。

回答

确实,将单向时延(Latency)和往返时延(RTT)的统计整合到同一个请求中,可以提高效率、减少消息数量,并确保统计数据的一致性和同步性。以下是对当前方案的分析以及如何进行优化的详细建议。

1. 当前方案的问题

1.1 单向时延和往返时延分开请求

  • RTT 统计:在 VideoInitiatorFragment 中,通过定期发送 ping 消息并接收 pong 消息来计算 RTT。
  • 单向时延统计:在 VideoReceiverFragment 中,通过定期发送 time_sync_requesttime_sync_response 消息来计算单向时延。

1.2 潜在问题

  • 消息冗余:分别发送不同类型的消息(pingtime_sync_request)可能导致消息数量增加,尤其是在高频率统计的情况下。
  • 同步性问题:单独的请求可能导致统计数据的不同步,特别是在网络延迟波动较大的环境中。
  • 维护复杂性:管理多种消息类型和对应的处理逻辑增加了代码的复杂性,增加了潜在的错误风险。

2. 优化方案:整合 RTT 和单向时延统计

将 RTT 和单向时延的统计整合到同一个请求中,可以优化通信流程,提高统计的准确性和效率。具体步骤如下:

2.1 设计统一的时间同步请求

  • 统一请求类型:使用一个统一的请求类型(例如 time_sync_request),同时携带用于 RTT 和单向时延计算的必要时间戳。
  • 响应包含必要信息:在 time_sync_response 中,除了回复 t1t2t3,还可以携带 pong 信息,以便同时计算 RTT 和单向时延。

2.2 修改 VideoInitiatorFragmentVideoReceiverFragment

2.2.1 修改 VideoInitiatorFragment

发送时间同步请求并计算 RTT 和单向时延

private fun startSendingTimeSyncRequests() { statsJob = viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(5000) // 每5秒发送一次时间同步请求 val t1 = System.currentTimeMillis() val syncRequest = JSONObject().apply { put("type", "time_sync_request") put("room", currentRoom) put("t1", t1) } socket.emit("signal", syncRequest) Log.d(TAG, "发送时间同步请求 at t1: $t1") } } } private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "处理信令数据: $data") when (data.getString("type")) { "time_sync_response" -> { val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.currentTimeMillis() val RTT = t4 - t1 val oneWayDelay = RTT / 2 val timeOffset = ((t2 - t1) + (t3 - t4)) / 2 Log.d(TAG, "时间同步: RTT=$RTT ms, 单向时延=$oneWayDelay ms, 时间偏移量=$timeOffset ms") // 更新 RTT 和单向时延的状态 viewLifecycleOwner.lifecycleScope.launch { // RTT 统计 latestRttState.value = RTT rttHistory.add(RTT) if (rttHistory.size > 60) { rttHistory.removeAt(0) } maxRttState.value = rttHistory.maxOrNull() ?: 0L minRttState.value = rttHistory.minOrNull() ?: 0L averageRttState.value = if (rttHistory.isNotEmpty()) rttHistory.average().toLong() else 0L // 单向时延统计 latestLatencyState.value = oneWayDelay latencyHistory.add(oneWayDelay) if (latencyHistory.size > 60) { latencyHistory.removeAt(0) } maxLatencyState.value = latencyHistory.maxOrNull() ?: 0L minLatencyState.value = latencyHistory.minOrNull() ?: 0L averageLatencyState.value = if (latencyHistory.isNotEmpty()) latencyHistory.average().toLong() else 0L Log.d(TAG, "RTT: $RTT ms, 单向时延: $oneWayDelay ms") } } // 其他类型信令处理... } }

解释:

  • 统一发送请求:通过 startSendingTimeSyncRequests() 方法每隔5秒发送一次 time_sync_request,包含发送时间 t1
  • 处理响应:在 handleSignalingData 方法中处理 time_sync_response,计算 RTT 和单向时延,并更新对应的统计状态。

2.2.2 修改 VideoReceiverFragment

处理时间同步请求并回复响应

private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "处理信令数据: $data") when (data.getString("type")) { "time_sync_request" -> { val t1 = data.getLong("t1") val t2 = System.currentTimeMillis() val t3 = System.currentTimeMillis() val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("t1", t1) put("t2", t2) put("t3", t3) put("room", currentRoom) } socket.emit("signal", syncResponse) Log.d(TAG, "回复时间同步响应: t1=$t1, t2=$t2, t3=$t3") } // 其他类型信令处理... } }

解释:

  • 统一回复响应:在接收到 time_sync_request 后,立即发送包含 t1t2t3time_sync_response,用于计算 RTT 和单向时延。

2.3 移除独立的 Ping-Pong 机制

由于时间同步请求已经能够同时计算 RTT 和单向时延,因此可以移除之前单独用于 RTT 统计的 pingpong 消息处理逻辑,以简化系统。

2.4 更新 UI 和状态管理

确保在 UI 中同时显示 RTT 和单向时延的统计数据。由于统计数据现在由同一个请求产生,更新逻辑也会更加统一和简洁。

3. 具体代码修改示例

以下是如何整合上述优化方案的具体代码示例。

3.1 VideoInitiatorFragment 的修改

添加统一时间同步请求的发送

// 在 LaunchedEffect 中替换原有的 startSendingTimestamps() 调用 LaunchedEffect(Unit) { // ... 初始化 PeerConnectionFactory 等 initializeSocketIO() requestPermissionsIfNeeded() startSendingTimeSyncRequests() // 启动统一的时间同步请求 }

修改 handleSignalingData 方法

private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "处理信令数据: $data") when (data.getString("type")) { "time_sync_response" -> { val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.currentTimeMillis() val RTT = t4 - t1 val oneWayDelay = RTT / 2 val timeOffset = ((t2 - t1) + (t3 - t4)) / 2 Log.d(TAG, "时间同步: RTT=$RTT ms, 单向时延=$oneWayDelay ms, 时间偏移量=$timeOffset ms") // 更新 RTT 和单向时延的状态 viewLifecycleOwner.lifecycleScope.launch { // RTT 统计 latestRttState.value = RTT rttHistory.add(RTT) if (rttHistory.size > 60) { rttHistory.removeAt(0) } maxRttState.value = rttHistory.maxOrNull() ?: 0L minRttState.value = rttHistory.minOrNull() ?: 0L averageRttState.value = if (rttHistory.isNotEmpty()) rttHistory.average().toLong() else 0L // 单向时延统计 latestLatencyState.value = oneWayDelay latencyHistory.add(oneWayDelay) if (latencyHistory.size > 60) { latencyHistory.removeAt(0) } maxLatencyState.value = latencyHistory.maxOrNull() ?: 0L minLatencyState.value = latencyHistory.minOrNull() ?: 0L averageLatencyState.value = if (latencyHistory.isNotEmpty()) latencyHistory.average().toLong() else 0L Log.d(TAG, "RTT: $RTT ms, 单向时延: $oneWayDelay ms") } } // 其他类型信令处理... } }

移除原有的 Ping-Pong 逻辑

删除以下方法及相关调用:

  • startSendingTimestamps()
  • 处理 pong 消息的逻辑

3.2 VideoReceiverFragment 的修改

处理统一的时间同步请求

private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "处理信令数据: $data") when (data.getString("type")) { "time_sync_request" -> { val t1 = data.getLong("t1") val t2 = System.currentTimeMillis() val t3 = System.currentTimeMillis() val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("t1", t1) put("t2", t2) put("t3", t3) put("room", currentRoom) } socket.emit("signal", syncResponse) Log.d(TAG, "回复时间同步响应: t1=$t1, t2=$t2, t3=$t3") } // 其他类型信令处理... } }

启动统一的时间同步请求

LaunchedEffect(Unit) { // ... 初始化 PeerConnectionFactory 等 initializeSocketIO() requestPermissionsIfNeeded() startPeriodicTimeSync() // 启动统一的时间同步请求 }

更新 initiateCall 方法(如果需要)

确保在建立连接后,启动统一的时间同步请求,而不是独立的 RTT 请求。

4. 注意事项和优化建议

4.1 时间同步的准确性

  • 设备时钟同步:确保发送方和接收方的设备时钟尽可能同步,或者考虑使用网络时间协议(NTP)来校准设备时间,以提高单向时延计算的准确性。
  • 处理延迟:优化消息的处理逻辑,减少在接收方处理 time_sync_request 和发送 time_sync_response 的延迟。

4.2 消息丢失和错误处理

  • 超时机制:在发送 time_sync_request 后,设置超时机制。如果在预定时间内未收到 time_sync_response,则记录为丢失或触发重试机制。
  • 唯一标识符:为每个时间同步请求添加唯一标识符(如 requestId),以确保请求和响应的匹配,避免多次请求导致的数据混淆。

4.3 性能优化

  • 减少消息频率:根据实际需求调整时间同步请求的频率,避免过高的请求频率对网络和设备性能造成压力。
  • 批量处理:如果需要同时处理多种统计数据,可以考虑将它们批量处理,减少UI更新的频率,提升性能。

4.4 UI 和用户体验

  • 实时反馈:确保统计数据的实时性和准确性,通过合适的动画和图表展示数据,提升用户体验。
  • 错误提示:在统计数据异常或请求失败时,提供相应的错误提示,帮助用户了解当前网络状况。

5. 总结

通过将单向时延和往返时延的统计整合到同一个时间同步请求中,可以简化通信流程,提高统计的效率和准确性。以上优化方案不仅减少了消息的数量,还确保了统计数据的一致性和同步性。同时,注意时间同步的准确性、消息的可靠性以及性能优化,将进一步提升整体系统的稳定性和用户体验。

如果在实施过程中遇到具体问题或需要进一步的代码示例,请随时提出!

分享这个问答