问题

class VideoInitiatorFragment : Fragment() {

companion object {
fun newInstance(
room: String,
stunUrl: String,
turnUrl: String,
turnUsername: String,
turnPassword: String,
signalingServerUrl: String
): VideoInitiatorFragment {
val fragment = VideoInitiatorFragment()
val args = Bundle().apply {
putString("room", room)
putString("stunUrl", stunUrl)
putString("turnUrl", turnUrl)
putString("turnUsername", turnUsername)
putString("turnPassword", turnPassword)
putString("signalingServerUrl", signalingServerUrl)
}
fragment.arguments = args
return fragment
}
}

// 类成员变量保持不变
private lateinit var socket: Socket
private var localPeer: PeerConnection? = null
private var localView: SurfaceViewRenderer? = null
private var localEglBase: EglBase? = null
private val pendingIceCandidates = mutableListOf<IceCandidate>()
private var currentRoom: String? = null
private lateinit var signalingServerUrl: String
private lateinit var stunUrl: String
private lateinit var turnUrl: String
private lateinit var turnUsername: String
private lateinit var turnPassword: String

private val TAG: String = "WebRTC-Initiator"

// RTT State Variables
private val maxRttState = mutableLongStateOf(0L)
private val minRttState = mutableLongStateOf(Long.MAX_VALUE)
private val averageRttState = mutableLongStateOf(0L)
private val latestRttState = mutableLongStateOf(0L)

// History list to compute average RTT
private val rttHistory = mutableStateListOf<Long>()
private var pingTimestamp: Long = 0

// Job for stats collection
private var statsJob: Job? = null

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

// 从参数中获取数据
currentRoom = arguments?.getString("room") ?: "default-room"
signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg"
stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478"
turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349"
turnUsername = arguments?.getString("turnUsername") ?: "wstszx"
turnPassword = arguments?.getString("turnPassword") ?: "930379"

Log.d(
TAG,
"onCreate: 角色 = 服务器, 房间 = $currentRoom, 信令服务器 = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl"
)
}

private val requestPermissionsLauncher =
registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions ->
permissions.entries.forEach { (permission, isGranted) ->
if (isGranted) {
Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show()
} else {
Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show()
}
}
onPermissionsChecked()
}

override fun onCreateView(
inflater: android.view.LayoutInflater,
container: android.view.ViewGroup?,
savedInstanceState: Bundle?
): android.view.View {
return ComposeView(requireContext()).apply {
setContent {
WebRTCComposeLayout()
}
}
}

@Composable
fun WebRTCComposeLayout() {
val context = LocalContext.current
lateinit var peerConnectionFactory: PeerConnectionFactory
var localVideoTrack: VideoTrack? by remember { mutableStateOf(null) }

Surface(color = Color.Black) {
Column(modifier = Modifier.fillMaxSize()) {

// 本地视频视图,填充整个屏幕
AndroidView(
factory = {
localView = SurfaceViewRenderer(it).apply {
setZOrderMediaOverlay(false)
}
localView!!
},
modifier = Modifier
.weight(1f)
.fillMaxWidth(),
update = {
if (localEglBase == null) {
localEglBase = EglBase.create()
it.init(localEglBase!!.eglBaseContext, null)
it.setMirror(false)
}
}
)

Spacer(modifier = Modifier.height(8.dp))

// RTT 统计数据显示
Column(modifier = Modifier.padding(16.dp)) {
Text(
text = "最新往返时延: ${latestRttState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最大往返时延: ${maxRttState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最小往返时延: ${minRttState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "平均往返时延: ${averageRttState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 8.dp)
)
}

}

LaunchedEffect(Unit) {
val options = PeerConnectionFactory.InitializationOptions.builder(context)
.createInitializationOptions()
PeerConnectionFactory.initialize(options)

val encoderFactory = DefaultVideoEncoderFactory(
localEglBase!!.eglBaseContext, true, true
)

val decoderFactory = DefaultVideoDecoderFactory(localEglBase!!.eglBaseContext)

peerConnectionFactory = PeerConnectionFactory.builder()
.setVideoEncoderFactory(encoderFactory)
.setVideoDecoderFactory(decoderFactory)
.createPeerConnectionFactory()

initLocalVideo(context, localView, peerConnectionFactory, localEglBase!!) {
localVideoTrack = it
}

createPeerConnection(
context,
peerConnectionFactory,
localVideoTrack
) {
localPeer = it
startStatsCollection() // 启动统计收集
}

initializeSocketIO()

requestPermissionsIfNeeded()
}
}
}

private fun startStatsCollection() {
statsJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(1000) // 每秒收集一次统计数据
localPeer?.getStats { report ->
parseStatsReport(report)
} ?: Log.e(TAG, "Failed to get stats: localPeer is null.")
}
}
}

private fun parseStatsReport(report: RTCStatsReport) {
Log.d(TAG, "RTCStatsReport: ${report.statsMap}")
for (stats in report.statsMap.values) {
Log.d(TAG, "Stats type: ${stats.type}")
if (stats.type == "transport") {
Log.d(TAG, "Transport Stats found: $stats")
val currentRtt = (stats.members["currentRoundTripTime"] as? Number)?.toDouble()?.times(1000)?.toLong()
if (currentRtt != null && currentRtt > 0) {
viewLifecycleOwner.lifecycleScope.launch {
// 更新最新 RTT
latestRttState.longValue = currentRtt

// 更新历史记录
rttHistory.add(currentRtt)
if (rttHistory.size > 60) {
rttHistory.removeAt(0)
}

// 计算最大、最小和平均 RTT
val maxRtt = rttHistory.maxOrNull() ?: 0L
val minRtt = rttHistory.minOrNull() ?: 0L
val averageRtt = if (rttHistory.isNotEmpty()) {
rttHistory.average().toLong()
} else {
0L
}

maxRttState.longValue = maxRtt
minRttState.longValue = minRtt
averageRttState.longValue = averageRtt

Log.d(TAG, "RTT - Latest: $currentRtt ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms")
}
}
}
}
}

private fun startSendingTimestamps() {
viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(5000) // 每 5 秒

// 发送 ping
pingTimestamp = System.currentTimeMillis()
val pingData = JSONObject().apply {
put("type", "ping")
put("timestamp", pingTimestamp)
put("room", currentRoom)
}
socket.emit("signal", pingData)
Log.d(TAG, "发送 ping 时间: $pingTimestamp")
}
}
}

private fun initializeSocketIO() {
val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http"
val socketUrl = signalingServerUrl

val options = IO.Options().apply {
transports = arrayOf("websocket")
secure = protocol == "https"
path = "/socket.io/"
}

try {
socket = IO.socket(socketUrl, options)

socket.on(Socket.EVENT_CONNECT) {
Log.d(TAG, "Socket 已连接")
socket.emit("join", currentRoom)
Log.d(TAG, "已加入房间: $currentRoom")
startSendingTimestamps()
}

socket.on(Socket.EVENT_CONNECT_ERROR) { args ->
if (args.isNotEmpty()) {
val error = args[0]
Log.e(TAG, "Socket 连接错误: $error")
}
}

socket.on(Socket.EVENT_DISCONNECT) { args ->
if (args.isNotEmpty()) {
val reason = args[0]
Log.d(TAG, "Socket 已断开: $reason")
}
}

socket.on("signal") { args ->
Log.d(TAG, "收到信令: ${args[0]}")
if (args.isNotEmpty() && args[0] is JSONObject) {
val data = args[0] as JSONObject
handleSignalingData(data)
}
}

socket.connect()
Log.d(TAG, "正在连接到 Socket: $socketUrl...")
} catch (e: Exception) {
Log.e(TAG, "连接 Socket 时出错: ${e.message}")
}
}

private fun requestPermissionsIfNeeded() {
val permissions = arrayOf(
Manifest.permission.CAMERA,
Manifest.permission.RECORD_AUDIO,
Manifest.permission.INTERNET,
Manifest.permission.ACCESS_NETWORK_STATE
)

val permissionsToRequest = permissions.filter {
ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED
}

if (permissionsToRequest.isNotEmpty()) {
requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray())
} else {
onPermissionsChecked()
}
}

private fun onPermissionsChecked() {
Toast.makeText(requireContext(), "所有必要权限已被授予", Toast.LENGTH_SHORT).show()
}

private fun initLocalVideo(
context: Context,
localView: SurfaceViewRenderer?,
peerConnectionFactory: PeerConnectionFactory,
eglBase: EglBase,
onLocalVideoTrack: (VideoTrack) -> Unit
) {
val videoCapturer = createCameraCapturer(context)
val surfaceTextureHelper = SurfaceTextureHelper.create("CaptureThread", eglBase.eglBaseContext)
val videoSource = peerConnectionFactory.createVideoSource(videoCapturer.isScreencast)
videoCapturer.initialize(surfaceTextureHelper, context, videoSource.capturerObserver)

videoCapturer.startCapture(1920, 1080, 60)

val localVideoTrack = peerConnectionFactory.createVideoTrack("video_track", videoSource)
localVideoTrack.addSink(localView)

val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints())
val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource)

// 添加音视频轨道到本地流
val mediaStream = peerConnectionFactory.createLocalMediaStream("local_stream")
mediaStream.addTrack(localAudioTrack)
mediaStream.addTrack(localVideoTrack)

onLocalVideoTrack(localVideoTrack)
}

private fun createCameraCapturer(context: Context): CameraVideoCapturer {
val camera2Enumerator = Camera2Enumerator(context)
val deviceNames = camera2Enumerator.deviceNames

// 优先选择后置摄像头
for (deviceName in deviceNames) {
if (camera2Enumerator.isBackFacing(deviceName)) {
val capturer = camera2Enumerator.createCapturer(deviceName, null)
if (capturer != null) {
return capturer
}
}
}

// 如果没有后置摄像头,则尝试前置摄像头
for (deviceName in deviceNames) {
if (camera2Enumerator.isFrontFacing(deviceName)) {
val capturer = camera2Enumerator.createCapturer(deviceName, null)
if (capturer != null) {
return capturer
}
}
}

// 如果没有前置摄像头,则使用第一个摄像头
return camera2Enumerator.createCapturer(deviceNames[0], null)
?: throw IllegalStateException("无法创建摄像头捕获器")
}

private fun createPeerConnection(
context: Context,
peerConnectionFactory: PeerConnectionFactory,
localVideoTrack: VideoTrack?,
onLocalPeerCreated: (PeerConnection) -> Unit
) {
val iceServers = listOf(
PeerConnection.IceServer.builder(stunUrl).createIceServer(),
PeerConnection.IceServer.builder(turnUrl)
.setUsername(turnUsername)
.setPassword(turnPassword)
.createIceServer()
)

val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply {
bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE
rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE
tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED
continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY
sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN
}

localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer {
override fun onIceCandidate(iceCandidate: IceCandidate?) {
iceCandidate?.let {
Log.d(TAG, "ICE candidate: $it")
val signalData = JSONObject().apply {
put("type", "ice")
put("candidate", JSONObject().apply {
put("sdpMid", it.sdpMid)
put("sdpMLineIndex", it.sdpMLineIndex)
put("candidate", it.sdp)
})
put("room", currentRoom)
}
socket.emit("signal", signalData)
}
}

override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) {
Log.d(TAG, "ICE candidates removed")
}

override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
Log.d(TAG, "Signaling state changed to: $newState")
}

override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
Log.d(TAG, "ICE connection state changed to: $newState")
}

override fun onIceConnectionReceivingChange(receiving: Boolean) {
Log.d(TAG, "ICE connection receiving change: $receiving")
}

override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
Log.d(TAG, "ICE gathering state changed to: $newState")
}

override fun onAddStream(stream: MediaStream?) {
Log.d(TAG, "Stream added")
}

override fun onRemoveStream(stream: MediaStream?) {
Log.d(TAG, "Stream removed")
}

override fun onDataChannel(dataChannel: DataChannel?) {
Log.d(TAG, "Data channel created")
}

override fun onRenegotiationNeeded() {
Log.d(TAG, "Renegotiation needed")
}

override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) {
Log.d(TAG, "Track added")
}

override fun onTrack(transceiver: RtpTransceiver?) {
Log.d(TAG, "onTrack called")
}

override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
Log.d(TAG, "Connection state changed to: $newState")
}
})

localVideoTrack?.let {
localPeer?.addTrack(it, listOf("local_stream"))
}
val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints())
val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource)
localPeer?.addTrack(localAudioTrack, listOf("local_stream"))

onLocalPeerCreated(localPeer!!)
}

private fun createAnswer(peerConnection: PeerConnection, onAnswerCreated: (String) -> Unit) {
Log.d(TAG, "Creating answer...")
val constraints = MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true"))
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"))
}

peerConnection.createAnswer(object : SdpObserver {
override fun onCreateSuccess(sessionDescription: SessionDescription?) {
sessionDescription?.let { sdp ->
peerConnection.setLocalDescription(object : SdpObserver {
override fun onSetSuccess() {
Log.d(TAG, "SetLocalDescription onSetSuccess")
onAnswerCreated(sdp.description)
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "SetLocalDescription onSetFailure: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}
}

override fun onSetSuccess() {
Log.d(TAG, "createAnswer onSetSuccess")
}

override fun onCreateFailure(error: String?) {
Log.e(TAG, "createAnswer onCreateFailure: $error")
}

override fun onSetFailure(error: String?) {}
}, constraints)
}

private fun handleSignalingData(data: JSONObject) {
Log.d(TAG, "Handling signaling data: $data")
when (data.getString("type")) {
"offer" -> {
Log.d(TAG, "Received offer")
val sdp = SessionDescription(
SessionDescription.Type.OFFER,
data.getJSONObject("sdp").getString("sdp")
)
localPeer?.setRemoteDescription(object : SdpObserver {
override fun onSetSuccess() {
Log.d(TAG, "Set remote description (offer) success")
createAnswer(localPeer!!) { answer ->
val signalData = JSONObject().apply {
put("type", "answer")
put("sdp", JSONObject().put("sdp", answer))
put("room", currentRoom)
}

socket.emit("signal", signalData)

pendingIceCandidates.forEach { candidate ->
localPeer?.addIceCandidate(candidate)
}
pendingIceCandidates.clear()
}
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set remote description (offer) error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}

"ice" -> {
Log.d(TAG, "Received ICE candidate")
val candidateData = data.getJSONObject("candidate")
val candidate = IceCandidate(
candidateData.getString("sdpMid"),
candidateData.getInt("sdpMLineIndex"),
candidateData.getString("candidate")
)

if (localPeer?.remoteDescription != null) {
localPeer?.addIceCandidate(candidate)
} else {
pendingIceCandidates.add(candidate)
}
}

"time_sync_request" -> {
Log.d(TAG, "收到 time_sync_request")
if (data.has("t1")) {
val t1 = data.getLong("t1")
val t2 = System.currentTimeMillis()
val t3 = System.currentTimeMillis()
val syncResponse = JSONObject().apply {
put("type", "time_sync_response")
put("t1", t1)
put("t2", t2)
put("t3", t3)
put("room", currentRoom)
}
socket.emit("signal", syncResponse)
Log.d(TAG, "回复 time_sync_response: t1=$t1, t2=$t2, t3=$t3")
} else {
Log.e(TAG, "time_sync_request 缺少 t1")
}
}

"pong" -> { // 处理 pong 消息
val receivedTimestamp = data.getLong("timestamp")
val currentTimestamp = System.currentTimeMillis()
val rtt = currentTimestamp - pingTimestamp

viewLifecycleOwner.lifecycleScope.launch {
latestRttState.longValue = rtt
rttHistory.add(rtt)
if (rttHistory.size > 60) {
rttHistory.removeAt(0)
}

maxRttState.longValue = rttHistory.maxOrNull() ?: 0
minRttState.longValue = rttHistory.minOrNull() ?: 0
averageRttState.longValue = rttHistory.average().toLong()

Log.d(TAG, "RTT: $rtt ms")
}
}

else -> {
Log.e(TAG, "Unknown signaling type: ${data.getString("type")}")
}
}
}

private fun requestTimeSync() {
val t1 = System.currentTimeMillis()
val syncRequest = JSONObject().apply {
put("type", "time_sync_request")
put("room", currentRoom)
put("t1", t1)
}
socket.emit("signal", syncRequest)
Log.d(TAG, "发送时间同步请求 at t1: $t1")
}

override fun onDestroyView() {
super.onDestroyView()
statsJob?.cancel()
socket.disconnect()
localPeer?.dispose()
localView?.release()
localEglBase?.release()
}
}


class VideoReceiverFragment : Fragment() {

companion object {
fun newInstance(
room: String,
stunUrl: String,
turnUrl: String,
turnUsername: String,
turnPassword: String,
signalingServerUrl: String
): VideoReceiverFragment {
val fragment = VideoReceiverFragment()
val args = Bundle().apply {
putString("room", room)
putString("stunUrl", stunUrl)
putString("turnUrl", turnUrl)
putString("turnUsername", turnUsername)
putString("turnPassword", turnPassword)
putString("signalingServerUrl", signalingServerUrl)
}
fragment.arguments = args
return fragment
}
}

// Class member variables
private lateinit var socket: Socket
private var localPeer: PeerConnection? = null
private var remoteView: SurfaceViewRenderer? = null
private var remoteEglBase: EglBase? = null
private val pendingIceCandidates = mutableListOf<IceCandidate>()
private var currentRoom: String? = null
private lateinit var signalingServerUrl: String
private lateinit var stunUrl: String
private lateinit var turnUrl: String
private lateinit var turnUsername: String
private lateinit var turnPassword: String
private val TAG: String = "WebRTC-Receiver"

private val frameRateState = mutableDoubleStateOf(0.0)
private val bitrateState = mutableLongStateOf(0L)
private val stutteringState = mutableStateOf(false)

private val frameRateLowState = mutableStateOf(false)
private val packetLossHighState = mutableStateOf(false)
private val packetLossState = mutableDoubleStateOf(0.0)

private val frameRateHistory = mutableStateListOf<Float>()
private val bitrateHistory = mutableStateListOf<Long>()
private var timeSyncJob: Job? = null

// 添加历史记录列表用于存储所有单向时延值
private val latencyHistory = mutableStateListOf<Long>()

// 添加状态变量用于存储最大、最小和平均单向时延
private val maxLatencyState = mutableLongStateOf(0L)
private val minLatencyState = mutableLongStateOf(Long.MAX_VALUE)
private val averageLatencyState = mutableLongStateOf(0L)
private val latestLatencyState = mutableLongStateOf(0L)

// History variables
private var prevFramesDecoded = 0.0
private var prevBytesReceived = 0.0
private var prevFramesReceived = 0.0
private var prevFramesDropped = 0.0
private var prevTimestamp = 0.0
private var timeOffset: Long = 0
private var t1: Long = 0

private var statsJob: Job? = null

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

// Retrieve data from arguments
currentRoom = arguments?.getString("room") ?: "default-room"
signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg"
stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478"
turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349"
turnUsername = arguments?.getString("turnUsername") ?: "wstszx"
turnPassword = arguments?.getString("turnPassword") ?: "930379"

Log.d(
TAG,
"onCreate: Role = Client, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl"
)
}

private val requestPermissionsLauncher =
registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions ->
permissions.entries.forEach { (permission, isGranted) ->
if (isGranted) {
Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show()
} else {
Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show()
}
}
onPermissionsChecked()
}

override fun onCreateView(
inflater: android.view.LayoutInflater,
container: android.view.ViewGroup?,
savedInstanceState: Bundle?
): android.view.View {
return ComposeView(requireContext()).apply {
setContent {
WebRTCComposeLayout()
}
}
}

@Composable
fun WebRTCComposeLayout() {
val context = LocalContext.current
lateinit var peerConnectionFactory: PeerConnectionFactory

Surface(color = Color.Black) {
Column(modifier = Modifier.fillMaxSize()) {

// Remote video view
AndroidView(
factory = {
remoteView = SurfaceViewRenderer(it).apply {
setZOrderMediaOverlay(false)
}
remoteView!!
},
modifier = Modifier
.weight(1f)
.fillMaxWidth(),
update = {
if (remoteEglBase?.eglBaseContext == null) {
remoteEglBase = EglBase.create()
it.init(remoteEglBase!!.eglBaseContext, null)
it.setMirror(false)
}
}
)

Spacer(modifier = Modifier.height(8.dp))

// 展示最大、最小和平均单向时延
Column(modifier = Modifier.padding(horizontal = 16.dp)) {
Text(
text = "当前单向时延: ${latestLatencyState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最大单向时延: ${maxLatencyState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最小单向时延: ${minLatencyState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "平均单向时延: ${averageLatencyState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 8.dp)
)
}

// Spacer between video and charts
Spacer(modifier = Modifier.height(8.dp))

// Frame Rate Section
Column(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 8.dp)
) {
// Frame Rate Text in Chinese
Text(
text = "帧率: ${frameRateState.doubleValue.roundToInt()} fps",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Log.d(
TAG,
"UI - Frame Rate: ${frameRateState.doubleValue} fps, Bitrate: ${bitrateState.longValue / 1000} kbps"
)

// Line Chart for Frame Rate
LineChart(
data = frameRateHistory,
modifier = Modifier
.height(200.dp)
.fillMaxWidth()
.padding(vertical = 8.dp),
lineColor = Color.Green,
backgroundColor = Color.Black,
yAxisLabel = "帧率 (fps)",
xAxisLabel = "时间 (秒)"
)
}

// Spacer between charts
Spacer(modifier = Modifier.height(8.dp))

// Bitrate Section
Column(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 16.dp)
) {
// Bitrate Text in Chinese
Text(
text = "码率: ${bitrateState.longValue / 1000} kbps",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)

// Line Chart for Bitrate
LineChart(
data = bitrateHistory.map { it / 1000f }, // Convert to kbps
modifier = Modifier
.height(200.dp)
.fillMaxWidth()
.padding(vertical = 8.dp),
lineColor = Color.Blue,
backgroundColor = Color.Black,
yAxisLabel = "码率 (kbps)",
xAxisLabel = "时间 (秒)"
)
}

// Spacer between metrics and stuttering indicator
Spacer(modifier = Modifier.height(16.dp))

// Stuttering Indicator
Row(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 16.dp),
verticalAlignment = Alignment.CenterVertically
) {
if (stutteringState.value) {
Icon(
imageVector = Icons.Default.Warning,
contentDescription = "卡顿警告",
tint = Color.Red,
modifier = Modifier.size(24.dp)
)
Spacer(modifier = Modifier.width(8.dp))
Column {
Text(
text = "视频播放出现卡顿",
color = Color.Red,
style = MaterialTheme.typography.bodyMedium
)
// Additional information about which metrics are abnormal
if (frameRateLowState.value) {
Text(
text = "帧率过低: ${frameRateState.doubleValue.roundToInt()} fps",
color = Color.Red,
style = MaterialTheme.typography.bodySmall
)
}
if (packetLossHighState.value) {
Text(
text = "包丢失率过高: ${packetLossState.doubleValue.roundToInt()}%",
color = Color.Red,
style = MaterialTheme.typography.bodySmall
)
}
}
} else {
Icon(
imageVector = Icons.Default.CheckCircle,
contentDescription = "正常",
tint = Color.Green,
modifier = Modifier.size(24.dp)
)
Spacer(modifier = Modifier.width(8.dp))
Text(
text = "视频播放正常",
color = Color.Green,
style = MaterialTheme.typography.bodyMedium
)
}
}
}

LaunchedEffect(Unit) {
val options = PeerConnectionFactory.InitializationOptions.builder(context)
.createInitializationOptions()
PeerConnectionFactory.initialize(options)

val encoderFactory = DefaultVideoEncoderFactory(
EglBase.create().eglBaseContext, true, true
)

val decoderFactory = DefaultVideoDecoderFactory(remoteEglBase!!.eglBaseContext)

peerConnectionFactory = PeerConnectionFactory.builder()
.setVideoEncoderFactory(encoderFactory)
.setVideoDecoderFactory(decoderFactory)
.createPeerConnectionFactory()

createPeerConnection(
context,
peerConnectionFactory,
remoteView!!
) {
localPeer = it
}

initializeSocketIO()

requestPermissionsIfNeeded()
}
}
}

private fun startPeriodicTimeSync(intervalMs: Long = 5000L) {
timeSyncJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
requestTimeSync()
delay(intervalMs)
}
}
}

/**
* Enhanced Line Chart Composable with Axes and Labels
* Modified to use straight lines connecting each point
*/
@Composable
fun LineChart(
data: List<Float>,
modifier: Modifier = Modifier,
lineColor: Color = Color.Green,
backgroundColor: Color = Color.Black,
yAxisLabel: String = "",
xAxisLabel: String = "",
minYValue: Float? = null,
maxYValue: Float? = null
) {
Canvas(modifier = modifier.background(backgroundColor)) {
val padding = 40.dp.toPx() // Padding for axes and labels

if (data.isEmpty()) return@Canvas

val maxY = maxYValue ?: data.maxOrNull() ?: 1f
val minY = minYValue ?: data.minOrNull() ?: 0f
val yRange = maxY - minY
val pointCount = data.size
val spacing = (size.width - padding * 2) / (pointCount - 1).coerceAtLeast(1)

val points = data.mapIndexed { index, value ->
val x = padding + index * spacing
val y = if (yRange == 0f) size.height / 2 else (size.height - padding) - ((value - minY) / yRange) * (size.height - padding * 2)
Offset(x, y)
}

// Draw axes
drawLine(
color = Color.White,
start = Offset(padding, padding),
end = Offset(padding, size.height - padding),
strokeWidth = 2f
)
drawLine(
color = Color.White,
start = Offset(padding, size.height - padding),
end = Offset(size.width - padding, size.height - padding),
strokeWidth = 2f
)

// Draw y-axis labels
val yLabelCount = 5
val yStep = yRange / (yLabelCount - 1)
for (i in 0 until yLabelCount) {
val yValue = minY + i * yStep
val yPos = (size.height - padding) - ((yValue - minY) / yRange) * (size.height - padding * 2)

drawContext.canvas.nativeCanvas.apply {
val label = yValue.roundToInt().toString()
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.RIGHT
}
drawText(
label,
padding - 8f,
yPos + textPaint.textSize / 2,
textPaint
)
}
}

// Draw x-axis labels
val xLabelCount = 5
val xStep = (pointCount - 1).coerceAtLeast(1) / (xLabelCount - 1).coerceAtLeast(1)
for (i in 0 until xLabelCount) {
val index = i * xStep
val xPos = padding + index * spacing

drawContext.canvas.nativeCanvas.apply {
val label = index.toString()
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
}
drawText(
label,
xPos,
size.height - padding + textPaint.textSize + 4f,
textPaint
)
}
}

// Optionally, draw axis labels
// Y-Axis Label
if (yAxisLabel.isNotEmpty()) {
drawContext.canvas.nativeCanvas.apply {
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
isAntiAlias = true
}
// Rotate for vertical text
save()
rotate(-90f, padding / 2, size.height / 2)
drawText(
yAxisLabel,
padding / 2,
size.height / 2,
textPaint
)
restore()
}
}

// X-Axis Label
if (xAxisLabel.isNotEmpty()) {
drawContext.canvas.nativeCanvas.apply {
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
isAntiAlias = true
}
drawText(
xAxisLabel,
size.width / 2,
size.height - padding / 2,
textPaint
)
}
}

// Draw the straight lines connecting points
if (points.size >= 2) {
for (i in 0 until points.size - 1) {
drawLine(
color = lineColor,
start = points[i],
end = points[i + 1],
strokeWidth = 4f,
cap = StrokeCap.Round
)
}
}

// Optionally, draw points
points.forEach { point ->
drawCircle(
color = lineColor,
radius = 4f,
center = point
)
}
}
}

private fun initializeSocketIO() {
val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http"
val socketUrl = signalingServerUrl

val options = IO.Options().apply {
transports = arrayOf("websocket")
secure = protocol == "https"
path = "/socket.io/"
}

try {
socket = IO.socket(socketUrl, options)

socket.on(Socket.EVENT_CONNECT) {
Log.d(TAG, "Socket connected")
socket.emit("join", currentRoom)
Log.d(TAG, "Joined room: $currentRoom")
initiateCall()
startPeriodicTimeSync()
}

socket.on(Socket.EVENT_CONNECT_ERROR) { args ->
if (args.isNotEmpty()) {
val error = args[0]
Log.e(TAG, "Socket connection error: $error")
}
}

socket.on(Socket.EVENT_DISCONNECT) { args ->
if (args.isNotEmpty()) {
val reason = args[0]
Log.d(TAG, "Socket disconnected: $reason")
}
}

socket.on("signal") { args ->
Log.d(TAG, "Received signaling: ${args[0]}")
if (args.isNotEmpty() && args[0] is JSONObject) {
val data = args[0] as JSONObject
handleSignalingData(data)
}
}

socket.connect()
Log.d(TAG, "Connecting to Socket: $socketUrl...")
} catch (e: Exception) {
Log.e(TAG, "Error connecting to Socket: ${e.message}")
}
}

private fun requestPermissionsIfNeeded() {
val permissions = arrayOf(
Manifest.permission.CAMERA,
Manifest.permission.RECORD_AUDIO,
Manifest.permission.INTERNET,
Manifest.permission.ACCESS_NETWORK_STATE
)

val permissionsToRequest = permissions.filter {
ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED
}

if (permissionsToRequest.isNotEmpty()) {
requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray())
} else {
onPermissionsChecked()
}
}

private fun onPermissionsChecked() {
Toast.makeText(requireContext(), "所有必要的权限已授予", Toast.LENGTH_SHORT).show()
}

private fun createPeerConnection(
context: Context,
peerConnectionFactory: PeerConnectionFactory,
remoteView: SurfaceViewRenderer,
onLocalPeerCreated: (PeerConnection) -> Unit
) {
val iceServers = listOf(
PeerConnection.IceServer.builder(stunUrl).createIceServer(),
PeerConnection.IceServer.builder(turnUrl)
.setUsername(turnUsername)
.setPassword(turnPassword)
.createIceServer()
)

val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply {
bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE
rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE
tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED
continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY
sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN
}

localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer {
override fun onIceCandidate(iceCandidate: IceCandidate?) {
iceCandidate?.let {
Log.d(TAG, "ICE candidate: $it")
val signalData = JSONObject().apply {
put("type", "ice")
put("candidate", JSONObject().apply {
put("sdpMid", it.sdpMid)
put("sdpMLineIndex", it.sdpMLineIndex)
put("candidate", it.sdp)
})
put("room", currentRoom)
}
socket.emit("signal", signalData)
}
}

override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) {
Log.d(TAG, "ICE candidates removed")
}

override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
Log.d(TAG, "Signaling state changed to: $newState")
}

override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
Log.d(TAG, "ICE connection state changed to: $newState")
}

override fun onIceConnectionReceivingChange(receiving: Boolean) {
Log.d(TAG, "ICE connection receiving change: $receiving")
}

override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
Log.d(TAG, "ICE gathering state changed to: $newState")
}

override fun onAddStream(stream: MediaStream?) {
Log.d(TAG, "Stream added")
}

override fun onRemoveStream(stream: MediaStream?) {
Log.d(TAG, "Stream removed")
}

override fun onDataChannel(dataChannel: DataChannel?) {
Log.d(TAG, "Data channel created")
}

override fun onRenegotiationNeeded() {
Log.d(TAG, "Renegotiation needed")
}

override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) {
Log.d(TAG, "Track added")
receiver?.track()?.let { track ->
if (track is VideoTrack) {
track.addSink(remoteView)
}
}
}

override fun onTrack(transceiver: RtpTransceiver?) {
Log.d(TAG, "onTrack called")
transceiver?.receiver?.track()?.let { track ->
if (track is VideoTrack) {
track.addSink(remoteView)
}
}
}

override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
Log.d(TAG, "Connection state changed to: $newState")
}
})

onLocalPeerCreated(localPeer!!)

// Start collecting statistics
startStatsCollection()
}

private fun initiateCall() {
Log.d(TAG, "Initiating call...")
val constraints = MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true"))
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"))
}

localPeer?.createOffer(object : SdpObserver {
override fun onCreateSuccess(sessionDescription: SessionDescription?) {
sessionDescription?.let { sdp ->
localPeer?.setLocalDescription(object : SdpObserver {
override fun onSetSuccess() {
val signalData = JSONObject().apply {
put("type", "offer")
put("sdp", JSONObject().put("sdp", sdp.description))
put("room", currentRoom)
}
socket.emit("signal", signalData)
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set local description error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}
}

override fun onSetSuccess() {}
override fun onCreateFailure(error: String?) {
Log.e(TAG, "Create offer error: $error")
}

override fun onSetFailure(error: String?) {}
}, constraints)
}

private fun handleSignalingData(data: JSONObject) {
Log.d(TAG, "Handling signaling data: $data")
when (data.getString("type")) {

"answer" -> {
Log.d(TAG, "Received answer")
val sdp = SessionDescription(
SessionDescription.Type.ANSWER,
data.getJSONObject("sdp").getString("sdp")
)

localPeer?.setRemoteDescription(object : SdpObserver {
override fun onSetSuccess() {
pendingIceCandidates.forEach { candidate ->
localPeer?.addIceCandidate(candidate)
}

pendingIceCandidates.clear()

Log.d(TAG, "Set remote description (answer) success")
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set remote description error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}

"ice" -> {
Log.d(TAG, "Received ICE candidate")
val candidateData = data.getJSONObject("candidate")
val candidate = IceCandidate(
candidateData.getString("sdpMid"),
candidateData.getInt("sdpMLineIndex"),
candidateData.getString("candidate")
)

if (localPeer?.remoteDescription != null) {
localPeer?.addIceCandidate(candidate)
} else {
pendingIceCandidates.add(candidate)
}
}

"time_sync_response" -> {
val t1 = data.getLong("t1")
val t2 = data.getLong("t2")
val t3 = data.getLong("t3")
val t4 = System.currentTimeMillis()

val RTT = t4 - t1
val oneWayDelay = RTT / 2
timeOffset = ((t2 - t1) + (t3 - t4)) / 2

Log.d(TAG, "时间同步: RTT=$RTT ms, 单向时延=$oneWayDelay ms, 时间偏移量=$timeOffset ms")

// 更新 latencyState
viewLifecycleOwner.lifecycleScope.launch {
// 添加到历史记录
latencyHistory.add(oneWayDelay)
if (latencyHistory.size > 60) { // 保持最近60个时延值
latencyHistory.removeAt(0)
}

// 计算最大、最小和平均单向时延
val maxLatency = latencyHistory.maxOrNull() ?: 0L
val minLatency = latencyHistory.minOrNull() ?: 0L
val averageLatency = if (latencyHistory.isNotEmpty()) {
latencyHistory.average().toLong()
} else {
0L
}

maxLatencyState.longValue = maxLatency
minLatencyState.longValue = minLatency
averageLatencyState.longValue = averageLatency
latestLatencyState.longValue = oneWayDelay
}
}

"ping" -> { // 处理 ping 消息
val receivedTimestamp = data.getLong("timestamp")
val pongData = JSONObject().apply {
put("type", "pong")
put("timestamp", receivedTimestamp) // 将收到的时间戳发回
put("room", currentRoom)
}
socket.emit("signal", pongData)
Log.d(TAG, "发送 pong 对应时间戳: $receivedTimestamp")

}

else -> {
Log.e(TAG, "Unknown signaling type: ${data.getString("type")}")
}
}
}

private fun requestTimeSync() {
t1 = System.currentTimeMillis()
val syncRequest = JSONObject().apply {
put("type", "time_sync_request")
put("room", currentRoom)
put("t1", t1)
}
socket.emit("signal", syncRequest)
Log.d(TAG, "发送时间同步请求 at t1: $t1")
}

private fun startStatsCollection() {
Log.d(TAG, "Starting stats collection...")
statsJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(1000) // Collect stats every second
Log.d(TAG, "Collecting stats...")
localPeer?.getStats { report ->
Log.d(TAG, "Stats report obtained.")
parseStatsReport(report)
} ?: Log.e(TAG, "Failed to get stats: localPeer is null.")
}
}
}

private fun parseStatsReport(report: RTCStatsReport) {
Log.d(TAG, "Received RTCStatsReport: $report")
for (stats in report.statsMap.values) {
if (stats.type == "inbound-rtp") {
val kind = stats.members["kind"] as? String
if (kind == "video") {
val framesDecoded = (stats.members["framesDecoded"] as? Number)?.toDouble() ?: 0.0
val framesReceived = (stats.members["framesReceived"] as? Number)?.toDouble() ?: 0.0
val framesDropped = (stats.members["framesDropped"] as? Number)?.toDouble() ?: 0.0
val bytesReceived = (stats.members["bytesReceived"] as? Number)?.toDouble() ?: 0.0
val packetsLost = (stats.members["packetsLost"] as? Number)?.toDouble() ?: 0.0
val packetsReceived = (stats.members["packetsReceived"] as? Number)?.toDouble() ?: 1.0 // Avoid division by zero
val packetLossFraction = packetsLost / (packetsLost + packetsReceived)
val timestamp = stats.timestampUs / 1_000_000.0 // Convert to seconds

Log.d(
TAG,
"Stats - Frames Decoded: $framesDecoded, Frames Received: $framesReceived, Frames Dropped: $framesDropped, Bytes Received: $bytesReceived, Packet Loss Fraction: $packetLossFraction, Timestamp: $timestamp"
)

if (prevTimestamp != 0.0) {
val timeElapsed = timestamp - prevTimestamp
val framesDelta = framesDecoded - prevFramesDecoded
val bytesDelta = bytesReceived - prevBytesReceived
val framesReceivedDelta = framesReceived - prevFramesReceived
val framesDroppedDelta = framesDropped - prevFramesDropped

val frameRate = if (timeElapsed > 0) framesDelta / timeElapsed else 0.0
val bitrate = if (timeElapsed > 0) (bytesDelta * 8) / timeElapsed else 0.0 // bits per second
val packetLoss = packetLossFraction * 100 // Convert to percentage

Log.d(TAG, "Calculated Frame Rate: $frameRate fps, Bitrate: $bitrate bps, Packet Loss: $packetLoss%")

// Determine stuttering based on thresholds
val isStuttering = frameRate < 24 || packetLoss > 5.0 // Thresholds can be adjusted

// Update states
viewLifecycleOwner.lifecycleScope.launch {
frameRateState.doubleValue = frameRate
bitrateState.longValue = bitrate.toLong()
// Update stuttering state
stutteringState.value = isStuttering
// Update specific stuttering causes
frameRateLowState.value = frameRate < 24
packetLossHighState.value = packetLoss > 5.0
packetLossState.doubleValue = packetLoss

// Update history
frameRateHistory.add(frameRate.toFloat())
if (frameRateHistory.size > 60) {
frameRateHistory.removeAt(0)
}

bitrateHistory.add(bitrate.toLong())
if (bitrateHistory.size > 60) {
bitrateHistory.removeAt(0)
}

Log.d(
TAG,
"Updated Frame Rate: ${frameRateState.doubleValue} fps, Bitrate: ${bitrateState.longValue / 1000} kbps, Stuttering: $isStuttering"
)
}
}

// Update previous values
prevFramesDecoded = framesDecoded
prevBytesReceived = bytesReceived
prevFramesReceived = framesReceived
prevFramesDropped = framesDropped
prevTimestamp = timestamp
}
}
}
}

override fun onDestroyView() {
super.onDestroyView()
statsJob?.cancel()
socket.disconnect()
localPeer?.dispose()
remoteView?.release()
remoteEglBase?.release()
timeSyncJob?.cancel()
}
}


目前的单向时延和往返时延的统计方案没有在同一个请求中用吗?应该改成在一个请求中既统计单向时延又统计往返时延

思考

定时发送与响应

在VideoInitiatorFragment中,每5秒发送一次“ping”,VideoReceiverFragment则回复“pong”。

探讨现有方案改进

整合机制

正在整合RTT和单向延迟统计的机制,可能把时间同步请求和Ping-Pong机制结合,或重新实施更高效的时间同步协议。调整代码以实现这一点。

整合功能

在 VideoInitiatorFragment 中,我正在处理 "pong" 来计算 RTT。这样做的目的是将 ping-pong 和时间同步合并在一个信令消息中。可以通过调整消息字段或使用综合时间同步消息来实现这些操作。

探索时间同步请求

我正在推进VideoInitiatorFragment来发送时间同步请求,而VideoReceiverFragment则在收到请求后,根据时间戳计算往返时间RTT和单向延迟。这样一来,我就能集成时间同步和pong请求,优化消息流。

回答

当前,您的 VideoInitiatorFragmentVideoReceiverFragment 分别通过不同的信令消息来处理单向时延(One-Way Delay, OWD)和往返时延(Round-Trip Time, RTT)。具体来说,VideoInitiatorFragment 发送 ping 消息来测量 RTT,而 VideoReceiverFragment 处理 time_sync_request 来测量单向时延。这种分离的方式可能导致信令复杂性增加,并且在某些情况下可能不够高效。

为了优化您的实现,建议将单向时延和往返时延的统计整合到同一个时间同步请求中。这不仅可以简化信令逻辑,还能确保两者的同步性和一致性。

以下是详细的修改建议和代码示例,以帮助您在单个请求中同时统计单向时延和往返时延。

1. 统一时间同步机制

1.1 定义统一的时间同步消息类型

使用单一的 time_sync 消息类型来处理时间同步请求和响应。通过在请求中包含必要的时间戳,接收方可以在响应中返回这些时间戳,从而允许发送方计算 RTT 和 OWD。

1.2 修改 VideoInitiatorFragment

VideoInitiatorFragment 中,移除现有的 ping 机制,并使用统一的时间同步请求来测量 RTT 和 OWD。

// 移除现有的 ping 相关代码 // private fun startSendingTimestamps() { ... } // 以及 handleSignalingData 中的 "pong" 处理 // 添加统一的时间同步请求方法 private fun startTimeSync() { viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(5000) // 每 5 秒同步一次时间 requestTimeSync() } } } private fun requestTimeSync() { val t1 = System.currentTimeMillis() val syncRequest = JSONObject().apply { put("type", "time_sync_request") put("room", currentRoom) put("t1", t1) } socket.emit("signal", syncRequest) Log.d(TAG, "发送时间同步请求 at t1: $t1") } private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { // 处理时间同步响应 "time_sync_response" -> { val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.currentTimeMillis() val RTT = t4 - t1 val oneWayDelay = ((t2 - t1) + (t3 - t4)) / 2 val timeOffset = ((t2 - t1) + (t3 - t4)) / 2 Log.d(TAG, "时间同步: RTT=$RTT ms, 单向时延=$oneWayDelay ms, 时间偏移量=$timeOffset ms") // 更新 RTT 和 OWD 状态 viewLifecycleOwner.lifecycleScope.launch { // 更新 RTT 状态 latestRttState.longValue = RTT // 更新 OWD 状态 latestLatencyState.longValue = oneWayDelay latencyHistory.add(oneWayDelay) if (latencyHistory.size > 60) { latencyHistory.removeAt(0) } // 计算最大、最小和平均 RTT val maxRtt = latencyHistory.maxOrNull() ?: 0L val minRtt = latencyHistory.minOrNull() ?: 0L val averageRtt = if (latencyHistory.isNotEmpty()) { latencyHistory.average().toLong() } else { 0L } maxRttState.longValue = maxRtt minRttState.longValue = minRtt averageRttState.longValue = averageRtt Log.d(TAG, "RTT - Latest: $RTT ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms") Log.d(TAG, "OWD - Latest: $oneWayDelay ms") } } // 其他处理逻辑... } }

1.3 修改 VideoReceiverFragment

VideoReceiverFragment 中,修改时间同步的处理逻辑,使其能够响应统一的时间同步请求,并计算 OWD。

private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "time_sync_request" -> { Log.d(TAG, "收到 time_sync_request") val t1 = data.getLong("t1") val t2 = System.currentTimeMillis() // 模拟处理延迟 delay(10) // 可选:模拟处理延迟 val t3 = System.currentTimeMillis() val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("t1", t1) put("t2", t2) put("t3", t3) put("room", currentRoom) } socket.emit("signal", syncResponse) Log.d(TAG, "回复 time_sync_response: t1=$t1, t2=$t2, t3=$t3") } // 处理其他信令类型... "offer", "answer", "ice" -> { // 保持原有的处理逻辑 // ... } else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } }

1.4 移除不必要的代码

由于我们将 RTT 和 OWD 的统计整合到同一个时间同步请求中,您可以移除 VideoInitiatorFragment 中与 pong 相关的代码,以及 VideoReceiverFragment 中的 ping 处理逻辑。

2. 计算 RTT 和单向时延

通过上述修改,发送方在发送 time_sync_request 时记录了发送时间 t1,接收方在收到请求后记录接收时间 t2 并发送响应,发送方在接收到响应后记录响应时间 t4。基于这些时间戳,您可以计算 RTT 和 OWD:

  • RTT: RTT = t4 - t1
  • 单向时延: OWD = ((t2 - t1) + (t3 - t4)) / 2

这种计算方法类似于网络时间协议(NTP)的基本原理,能够有效地估计网络延迟和时钟偏移。

3. 完整的修改示例

为了确保清晰性,以下是整合后的关键代码片段。

3.1 VideoInitiatorFragment 修改后的关键部分

class VideoInitiatorFragment : Fragment() { // ... 其他成员变量和方法 ... override fun onCreateView( inflater: android.view.LayoutInflater, container: android.view.ViewGroup?, savedInstanceState: Bundle? ): android.view.View { return ComposeView(requireContext()).apply { setContent { WebRTCComposeLayout() } } } @Composable fun WebRTCComposeLayout() { // ... UI 代码 ... LaunchedEffect(Unit) { // ... WebRTC 初始化代码 ... initializeSocketIO() requestPermissionsIfNeeded() startTimeSync() // 启动统一的时间同步 } } private fun handleSignalingData(data: JSONObject) { // ... 如上所述 ... } private fun startTimeSync() { viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(5000) // 每 5 秒同步一次时间 requestTimeSync() } } } private fun requestTimeSync() { val t1 = System.currentTimeMillis() val syncRequest = JSONObject().apply { put("type", "time_sync_request") put("room", currentRoom) put("t1", t1) } socket.emit("signal", syncRequest) Log.d(TAG, "发送时间同步请求 at t1: $t1") } override fun onDestroyView() { super.onDestroyView() statsJob?.cancel() socket.disconnect() localPeer?.dispose() localView?.release() localEglBase?.release() } }

3.2 VideoReceiverFragment 修改后的关键部分

class VideoReceiverFragment : Fragment() { // ... 其他成员变量和方法 ... override fun onCreateView( inflater: android.view.LayoutInflater, container: android.view.ViewGroup?, savedInstanceState: Bundle? ): android.view.View { return ComposeView(requireContext()).apply { setContent { WebRTCComposeLayout() } } } @Composable fun WebRTCComposeLayout() { // ... UI 代码 ... LaunchedEffect(Unit) { // ... WebRTC 初始化代码 ... initializeSocketIO() requestPermissionsIfNeeded() // 不需要启动单独的 ping-pong 机制 } } private fun handleSignalingData(data: JSONObject) { when (data.getString("type")) { "time_sync_request" -> { val t1 = data.getLong("t1") val t2 = System.currentTimeMillis() delay(10) // 可选:模拟处理延迟 val t3 = System.currentTimeMillis() val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("t1", t1) put("t2", t2) put("t3", t3) put("room", currentRoom) } socket.emit("signal", syncResponse) Log.d(TAG, "回复 time_sync_response: t1=$t1, t2=$t2, t3=$t3") } // 处理其他信令类型... else -> { // 保持原有的处理逻辑 } } } override fun onDestroyView() { super.onDestroyView() statsJob?.cancel() socket.disconnect() localPeer?.dispose() remoteView?.release() remoteEglBase?.release() timeSyncJob?.cancel() } }

4. 优化和增强

4.1 增加时钟偏移量的校正

在上述计算中,我们已经计算了 timeOffset,它表示发送方和接收方之间的时钟偏移。可以使用这个偏移量来校正发送和接收的时间戳,以提高测量的准确性。

4.2 错误处理和重试机制

在实际网络环境中,时间同步请求可能会因为网络问题而失败。建议增加错误处理和重试机制,例如在发送时间同步请求后设置一个超时,如果在规定时间内未收到响应,则重新发送请求。

4.3 动态调整同步频率

根据网络状况和应用需求,动态调整时间同步的频率。例如,在网络稳定时减少同步频率,以节省带宽和计算资源;在网络波动较大时增加同步频率,以提高测量的实时性。

4.4 数据存储和可视化

将 RTT 和 OWD 的历史数据存储在更高效的数据结构中,并在 UI 中以图表形式实时展示,帮助用户更直观地了解网络状况。

5. 总结

通过将单向时延和往返时延的统计整合到同一个时间同步请求中,您可以简化信令逻辑,提高测量的效率和准确性。上述修改建议和代码示例提供了一个实现方向,您可以根据具体需求进一步优化和扩展。

如果在实施过程中遇到任何问题,欢迎进一步提问!

问题

目前单向时延和往返时延全部在VideoInitiatorFragment中计算的吗?

思考

探讨延迟计算

用户正在问,单向时延和往返时延是否都在VideoInitiatorFragment中计算。我回答说,目前它们在不同请求中计算,但可以改进统一计算。

审查代码

我正在分析VideoInitiatorFragment的startSendingTimestamps和handleSignalingData方法,处理"ping"、"pong"和"time_sync_request"/"time_sync_response"。

同步时间和处理数据

在VideoInitiatorFragment中,通过发送“ping”消息和处理“pong”来管理同步时间;use requestTimeSync()和handleSignalingData(),同时用“time_sync_request”测量OWD。同样地,VideoReceiverFragment处理“time_sync_request”和“ping”,也在计算RTT和OWD。

确认计算方式

我正在检查每个片段如何处理RTT和OWD,确定RTT和OWD是否完全在VideoInitiatorFragment中计算。

梳理延迟计算

我现在正在确认 VideoInitiatorFragment 中确实计算了单向延迟 (OWD) 和往返时间 (RTT)。此时也在探索是否引入更高阶的流程来评估复杂度对比。

测量和分析

VideoInitiatorFragment正在发送pingtime_sync_request消息来测量RTT和OWD。收到响应消息后,计算差异并得出结果。

回答

是的,当前您的实现中,**单向时延(One-Way Delay, OWD)往返时延(Round-Trip Time, RTT)**的计算都是在 VideoInitiatorFragment 中进行的。下面是详细的说明:

1. 往返时延(RTT)的计算

VideoInitiatorFragment 中,RTT 的计算通过以下步骤实现:

  1. 发送 Ping 消息

    • 方法 startSendingTimestamps() 每隔 5 秒发送一次 ping 消息。
    • 在发送 ping 消息之前,记录当前时间戳 pingTimestamp
    • ping 消息包含类型 ping、时间戳 timestamp 和房间 room
    private fun startSendingTimestamps() { viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(5000) // 每 5 秒 // 发送 ping pingTimestamp = System.currentTimeMillis() val pingData = JSONObject().apply { put("type", "ping") put("timestamp", pingTimestamp) put("room", currentRoom) } socket.emit("signal", pingData) Log.d(TAG, "发送 ping 时间: $pingTimestamp") } } }
  2. 接收 Pong 消息

    • handleSignalingData 方法中,处理类型为 pong 的消息。
    • 计算 RTT 为当前时间减去发送 ping 时记录的 pingTimestamp
    "pong" -> { // 处理 pong 消息 val receivedTimestamp = data.getLong("timestamp") val currentTimestamp = System.currentTimeMillis() val rtt = currentTimestamp - pingTimestamp viewLifecycleOwner.lifecycleScope.launch { latestRttState.longValue = rtt rttHistory.add(rtt) if (rttHistory.size > 60) { rttHistory.removeAt(0) } maxRttState.longValue = rttHistory.maxOrNull() ?: 0 minRttState.longValue = rttHistory.minOrNull() ?: 0 averageRttState.longValue = rttHistory.average().toLong() Log.d(TAG, "RTT: $rtt ms") } }

2. 单向时延(OWD)的计算

同样在 VideoInitiatorFragment 中,OWD 的计算通过时间同步请求和响应实现:

  1. 发送时间同步请求

    • 方法 requestTimeSync() 记录发送时间 t1,并发送包含 t1time_sync_request 消息。
    private fun requestTimeSync() { val t1 = System.currentTimeMillis() val syncRequest = JSONObject().apply { put("type", "time_sync_request") put("room", currentRoom) put("t1", t1) } socket.emit("signal", syncRequest) Log.d(TAG, "发送时间同步请求 at t1: $t1") }
  2. 接收时间同步响应

    • handleSignalingData 方法中,处理类型为 time_sync_response 的消息。
    • 通过时间戳 t1t2t3t4 计算 RTT 和 OWD。
    "time_sync_response" -> { val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.currentTimeMillis() val RTT = t4 - t1 val oneWayDelay = ((t2 - t1) + (t3 - t4)) / 2 val timeOffset = ((t2 - t1) + (t3 - t4)) / 2 Log.d(TAG, "时间同步: RTT=$RTT ms, 单向时延=$oneWayDelay ms, 时间偏移量=$timeOffset ms") // 更新 RTT 和 OWD 状态 viewLifecycleOwner.lifecycleScope.launch { // 更新 RTT 状态 latestRttState.longValue = RTT // 更新 OWD 状态 latestLatencyState.longValue = oneWayDelay latencyHistory.add(oneWayDelay) if (latencyHistory.size > 60) { latencyHistory.removeAt(0) } // 计算最大、最小和平均 RTT val maxRtt = latencyHistory.maxOrNull() ?: 0L val minRtt = latencyHistory.minOrNull() ?: 0L val averageRtt = if (latencyHistory.isNotEmpty()) { latencyHistory.average().toLong() } else { 0L } maxRttState.longValue = maxRtt minRttState.longValue = minRtt averageRttState.longValue = averageRtt Log.d(TAG, "RTT - Latest: $RTT ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms") Log.d(TAG, "OWD - Latest: $oneWayDelay ms") } }

3. 现有实现的总结

  • RTT 是通过 pingpong 消息的往返来计算的,主要在 VideoInitiatorFragment 中进行。
  • OWD 是通过时间同步请求和响应来计算的,同样也在 VideoInitiatorFragment 中处理。

因此,是的,当前所有的单向时延和往返时延的统计都是在 VideoInitiatorFragment 中计算的

4. 优化建议

尽管当前的实现功能齐全,但可以考虑将 RTT 和 OWD 的统计整合到一个统一的时间同步请求中,以简化信令逻辑并减少消息数量。这不仅可以提高效率,还能确保统计数据的一致性。

4.1 统一时间同步请求

您可以设计一个统一的 time_sync 消息类型,同时处理 RTT 和 OWD 的计算。这样,发送方只需发送一种类型的同步请求,接收方也只需响应这种请求。

示例实现:

VideoInitiatorFragment 中:

  1. 发送统一的时间同步请求

    private fun startUnifiedTimeSync() { viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(5000) // 每 5 秒同步一次时间 sendUnifiedTimeSyncRequest() } } } private fun sendUnifiedTimeSyncRequest() { val t1 = System.currentTimeMillis() val syncRequest = JSONObject().apply { put("type", "time_sync") put("room", currentRoom) put("t1", t1) } socket.emit("signal", syncRequest) Log.d(TAG, "发送统一时间同步请求 at t1: $t1") }
  2. 处理时间同步响应

    private fun handleSignalingData(data: JSONObject) { when (data.getString("type")) { "time_sync_response" -> { val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.currentTimeMillis() val RTT = t4 - t1 val oneWayDelay = ((t2 - t1) + (t3 - t4)) / 2 Log.d(TAG, "时间同步: RTT=$RTT ms, 单向时延=$oneWayDelay ms") viewLifecycleOwner.lifecycleScope.launch { // 更新 RTT latestRttState.longValue = RTT rttHistory.add(RTT) if (rttHistory.size > 60) rttHistory.removeAt(0) maxRttState.longValue = rttHistory.maxOrNull() ?: 0L minRttState.longValue = rttHistory.minOrNull() ?: 0L averageRttState.longValue = rttHistory.average().toLong() // 更新 OWD latestLatencyState.longValue = oneWayDelay latencyHistory.add(oneWayDelay) if (latencyHistory.size > 60) latencyHistory.removeAt(0) maxLatencyState.longValue = latencyHistory.maxOrNull() ?: 0L minLatencyState.longValue = latencyHistory.minOrNull() ?: 0L averageLatencyState.longValue = latencyHistory.average().toLong() Log.d(TAG, "RTT - Latest: $RTT ms, Max: ${maxRttState.longValue} ms, Min: ${minRttState.longValue} ms, Average: ${averageRttState.longValue} ms") Log.d(TAG, "OWD - Latest: $oneWayDelay ms") } } // 处理其他类型的消息... } }

VideoReceiverFragment 中:

  1. 处理统一的时间同步请求并响应

    private fun handleSignalingData(data: JSONObject) { when (data.getString("type")) { "time_sync" -> { val t1 = data.getLong("t1") val t2 = System.currentTimeMillis() // 可选:模拟处理延迟 delay(10) val t3 = System.currentTimeMillis() val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("room", currentRoom) put("t1", t1) put("t2", t2) put("t3", t3) } socket.emit("signal", syncResponse) Log.d(TAG, "回复统一时间同步响应: t1=$t1, t2=$t2, t3=$t3") } // 处理其他类型的消息... } }

4.2 移除冗余的 Ping/Pong 机制

通过上述统一的时间同步机制,您可以移除原有的 pingpong 相关代码,从而简化代码结构并减少网络开销。

5. 结论

目前,您的实现中确实在 VideoInitiatorFragment 中分别通过 ping/pongtime_sync_request/time_sync_response 两套机制计算 RTT 和 OWD。为了优化性能和简化逻辑,建议将这两种延时统计合并到一个统一的时间同步请求中,如上述示例所示。

这种优化不仅减少了信令消息的数量,还能确保 RTT 和 OWD 的计算更加同步和一致。如果您有进一步的问题或需要更详细的实现指导,欢迎随时提问!

分享这个问答