问题

class VideoInitiatorFragment : Fragment() {

companion object {
fun newInstance(
room: String,
stunUrl: String,
turnUrl: String,
turnUsername: String,
turnPassword: String,
signalingServerUrl: String
): VideoInitiatorFragment {
val fragment = VideoInitiatorFragment()
val args = Bundle().apply {
putString("room", room)
putString("stunUrl", stunUrl)
putString("turnUrl", turnUrl)
putString("turnUsername", turnUsername)
putString("turnPassword", turnPassword)
putString("signalingServerUrl", signalingServerUrl)
}
fragment.arguments = args
return fragment
}
}

// 类成员变量保持不变
private lateinit var socket: Socket
private var localPeer: PeerConnection? = null
private var localView: SurfaceViewRenderer? = null
private var localEglBase: EglBase? = null
private val pendingIceCandidates = mutableListOf<IceCandidate>()
private var currentRoom: String? = null
private lateinit var signalingServerUrl: String
private lateinit var stunUrl: String
private lateinit var turnUrl: String
private lateinit var turnUsername: String
private lateinit var turnPassword: String

private val TAG: String = "WebRTC-Initiator"

private val maxRttState = mutableLongStateOf(0L)
private val minRttState = mutableLongStateOf(Long.MAX_VALUE)
private val averageRttState = mutableLongStateOf(0L)
private val latestRttState = mutableLongStateOf(0L)
private val rttHistory = mutableStateListOf<Long>()

private val latencyHistory = mutableStateListOf<Long>()

private var statsJob: Job? = null

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

// 从参数中获取数据
currentRoom = arguments?.getString("room") ?: "default-room"
signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg"
stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478"
turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349"
turnUsername = arguments?.getString("turnUsername") ?: "wstszx"
turnPassword = arguments?.getString("turnPassword") ?: "930379"

Log.d(
TAG,
"onCreate: 角色 = 服务器, 房间 = $currentRoom, 信令服务器 = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl"
)
}

private val requestPermissionsLauncher =
registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions ->
permissions.entries.forEach { (permission, isGranted) ->
if (isGranted) {
Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show()
} else {
Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show()
}
}
onPermissionsChecked()
}

override fun onCreateView(
inflater: android.view.LayoutInflater,
container: android.view.ViewGroup?,
savedInstanceState: Bundle?
): android.view.View {
return ComposeView(requireContext()).apply {
setContent {
WebRTCComposeLayout()
}
}
}

@Composable
fun WebRTCComposeLayout() {
val context = LocalContext.current
lateinit var peerConnectionFactory: PeerConnectionFactory
var localVideoTrack: VideoTrack? by remember { mutableStateOf(null) }

Surface(color = Color.Black) {
Column(modifier = Modifier.fillMaxSize()) {

// 本地视频视图,填充整个屏幕
AndroidView(
factory = {
localView = SurfaceViewRenderer(it).apply {
setZOrderMediaOverlay(false)
}
localView!!
},
modifier = Modifier
.weight(1f)
.fillMaxWidth(),
update = {
if (localEglBase == null) {
localEglBase = EglBase.create()
it.init(localEglBase!!.eglBaseContext, null)
it.setMirror(false)
}
}
)

Spacer(modifier = Modifier.height(8.dp))

}

LaunchedEffect(Unit) {
val options = PeerConnectionFactory.InitializationOptions.builder(context)
.createInitializationOptions()
PeerConnectionFactory.initialize(options)

val encoderFactory = DefaultVideoEncoderFactory(
localEglBase!!.eglBaseContext, true, true
)

val decoderFactory = DefaultVideoDecoderFactory(localEglBase!!.eglBaseContext)

peerConnectionFactory = PeerConnectionFactory.builder()
.setVideoEncoderFactory(encoderFactory)
.setVideoDecoderFactory(decoderFactory)
.createPeerConnectionFactory()

initLocalVideo(context, localView, peerConnectionFactory, localEglBase!!) {
localVideoTrack = it
}

createPeerConnection(
context,
peerConnectionFactory,
localVideoTrack
) {
localPeer = it
startStatsCollection() // 启动统计收集
}

initializeSocketIO()

requestPermissionsIfNeeded()
startTimeSync()
}
}
}

private fun startTimeSync() {
viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(5000) // 每 5 秒同步一次时间
requestTimeSync()
}
}
}

private fun startStatsCollection() {
statsJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(1000) // 每秒收集一次统计数据
localPeer?.getStats { report ->
parseStatsReport(report)
} ?: Log.e(TAG, "Failed to get stats: localPeer is null.")
}
}
}

private fun parseStatsReport(report: RTCStatsReport) {
Log.d(TAG, "RTCStatsReport: ${report.statsMap}")
for (stats in report.statsMap.values) {
Log.d(TAG, "Stats type: ${stats.type}")
if (stats.type == "transport") {
Log.d(TAG, "Transport Stats found: $stats")
val currentRtt = (stats.members["currentRoundTripTime"] as? Number)?.toDouble()?.times(1000)?.toLong()
if (currentRtt != null && currentRtt > 0) {
viewLifecycleOwner.lifecycleScope.launch {
// 更新最新 RTT
latestRttState.longValue = currentRtt

// 更新历史记录
rttHistory.add(currentRtt)
if (rttHistory.size > 60) {
rttHistory.removeAt(0)
}

// 计算最大、最小和平均 RTT
val maxRtt = rttHistory.maxOrNull() ?: 0L
val minRtt = rttHistory.minOrNull() ?: 0L
val averageRtt = if (rttHistory.isNotEmpty()) {
rttHistory.average().toLong()
} else {
0L
}

maxRttState.longValue = maxRtt
minRttState.longValue = minRtt
averageRttState.longValue = averageRtt

Log.d(TAG, "RTT - Latest: $currentRtt ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms")
}
}
}
}
}

private fun initializeSocketIO() {
val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http"
val socketUrl = signalingServerUrl

val options = IO.Options().apply {
transports = arrayOf("websocket")
secure = protocol == "https"
path = "/socket.io/"
}

try {
socket = IO.socket(socketUrl, options)

socket.on(Socket.EVENT_CONNECT) {
Log.d(TAG, "Socket 已连接")
socket.emit("join", currentRoom)
Log.d(TAG, "已加入房间: $currentRoom")
}

socket.on(Socket.EVENT_CONNECT_ERROR) { args ->
if (args.isNotEmpty()) {
val error = args[0]
Log.e(TAG, "Socket 连接错误: $error")
}
}

socket.on(Socket.EVENT_DISCONNECT) { args ->
if (args.isNotEmpty()) {
val reason = args[0]
Log.d(TAG, "Socket 已断开: $reason")
}
}

socket.on("signal") { args ->
Log.d(TAG, "收到信令: ${args[0]}")
if (args.isNotEmpty() && args[0] is JSONObject) {
val data = args[0] as JSONObject
handleSignalingData(data)
}
}

socket.connect()
Log.d(TAG, "正在连接到 Socket: $socketUrl...")
} catch (e: Exception) {
Log.e(TAG, "连接 Socket 时出错: ${e.message}")
}
}

private fun requestPermissionsIfNeeded() {
val permissions = arrayOf(
Manifest.permission.CAMERA,
Manifest.permission.RECORD_AUDIO,
Manifest.permission.INTERNET,
Manifest.permission.ACCESS_NETWORK_STATE
)

val permissionsToRequest = permissions.filter {
ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED
}

if (permissionsToRequest.isNotEmpty()) {
requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray())
} else {
onPermissionsChecked()
}
}

private fun onPermissionsChecked() {
Toast.makeText(requireContext(), "所有必要权限已被授予", Toast.LENGTH_SHORT).show()
}

private fun initLocalVideo(
context: Context,
localView: SurfaceViewRenderer?,
peerConnectionFactory: PeerConnectionFactory,
eglBase: EglBase,
onLocalVideoTrack: (VideoTrack) -> Unit
) {
val videoCapturer = createCameraCapturer(context)
val surfaceTextureHelper = SurfaceTextureHelper.create("CaptureThread", eglBase.eglBaseContext)
val videoSource = peerConnectionFactory.createVideoSource(videoCapturer.isScreencast)
videoCapturer.initialize(surfaceTextureHelper, context, videoSource.capturerObserver)

videoCapturer.startCapture(1920, 1080, 60)

val localVideoTrack = peerConnectionFactory.createVideoTrack("video_track", videoSource)
localVideoTrack.addSink(localView)

val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints())
val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource)

// 添加音视频轨道到本地流
val mediaStream = peerConnectionFactory.createLocalMediaStream("local_stream")
mediaStream.addTrack(localAudioTrack)
mediaStream.addTrack(localVideoTrack)

onLocalVideoTrack(localVideoTrack)
}

private fun createCameraCapturer(context: Context): CameraVideoCapturer {
val camera2Enumerator = Camera2Enumerator(context)
val deviceNames = camera2Enumerator.deviceNames

// 优先选择后置摄像头
for (deviceName in deviceNames) {
if (camera2Enumerator.isBackFacing(deviceName)) {
val capturer = camera2Enumerator.createCapturer(deviceName, null)
if (capturer != null) {
return capturer
}
}
}

// 如果没有后置摄像头,则尝试前置摄像头
for (deviceName in deviceNames) {
if (camera2Enumerator.isFrontFacing(deviceName)) {
val capturer = camera2Enumerator.createCapturer(deviceName, null)
if (capturer != null) {
return capturer
}
}
}

// 如果没有前置摄像头,则使用第一个摄像头
return camera2Enumerator.createCapturer(deviceNames[0], null)
?: throw IllegalStateException("无法创建摄像头捕获器")
}

private fun createPeerConnection(
context: Context,
peerConnectionFactory: PeerConnectionFactory,
localVideoTrack: VideoTrack?,
onLocalPeerCreated: (PeerConnection) -> Unit
) {
val iceServers = listOf(
PeerConnection.IceServer.builder(stunUrl).createIceServer(),
PeerConnection.IceServer.builder(turnUrl)
.setUsername(turnUsername)
.setPassword(turnPassword)
.createIceServer()
)

val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply {
bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE
rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE
tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED
continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY
sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN
}

localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer {
override fun onIceCandidate(iceCandidate: IceCandidate?) {
iceCandidate?.let {
Log.d(TAG, "ICE candidate: $it")
val signalData = JSONObject().apply {
put("type", "ice")
put("candidate", JSONObject().apply {
put("sdpMid", it.sdpMid)
put("sdpMLineIndex", it.sdpMLineIndex)
put("candidate", it.sdp)
})
put("room", currentRoom)
}
socket.emit("signal", signalData)
}
}

override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) {
Log.d(TAG, "ICE candidates removed")
}

override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
Log.d(TAG, "Signaling state changed to: $newState")
}

override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
Log.d(TAG, "ICE connection state changed to: $newState")
}

override fun onIceConnectionReceivingChange(receiving: Boolean) {
Log.d(TAG, "ICE connection receiving change: $receiving")
}

override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
Log.d(TAG, "ICE gathering state changed to: $newState")
}

override fun onAddStream(stream: MediaStream?) {
Log.d(TAG, "Stream added")
}

override fun onRemoveStream(stream: MediaStream?) {
Log.d(TAG, "Stream removed")
}

override fun onDataChannel(dataChannel: DataChannel?) {
Log.d(TAG, "Data channel created")
}

override fun onRenegotiationNeeded() {
Log.d(TAG, "Renegotiation needed")
}

override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) {
Log.d(TAG, "Track added")
}

override fun onTrack(transceiver: RtpTransceiver?) {
Log.d(TAG, "onTrack called")
}

override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
Log.d(TAG, "Connection state changed to: $newState")
}
})

localVideoTrack?.let {
localPeer?.addTrack(it, listOf("local_stream"))
}
val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints())
val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource)
localPeer?.addTrack(localAudioTrack, listOf("local_stream"))

onLocalPeerCreated(localPeer!!)
}

private fun createAnswer(peerConnection: PeerConnection, onAnswerCreated: (String) -> Unit) {
Log.d(TAG, "Creating answer...")
val constraints = MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true"))
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"))
}

peerConnection.createAnswer(object : SdpObserver {
override fun onCreateSuccess(sessionDescription: SessionDescription?) {
sessionDescription?.let { sdp ->
peerConnection.setLocalDescription(object : SdpObserver {
override fun onSetSuccess() {
Log.d(TAG, "SetLocalDescription onSetSuccess")
onAnswerCreated(sdp.description)
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "SetLocalDescription onSetFailure: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}
}

override fun onSetSuccess() {
Log.d(TAG, "createAnswer onSetSuccess")
}

override fun onCreateFailure(error: String?) {
Log.e(TAG, "createAnswer onCreateFailure: $error")
}

override fun onSetFailure(error: String?) {}
}, constraints)
}

private fun handleSignalingData(data: JSONObject) {
Log.d(TAG, "Handling signaling data: $data")
when (data.getString("type")) {
"offer" -> {
Log.d(TAG, "Received offer")
val sdp = SessionDescription(
SessionDescription.Type.OFFER,
data.getJSONObject("sdp").getString("sdp")
)
localPeer?.setRemoteDescription(object : SdpObserver {
override fun onSetSuccess() {
Log.d(TAG, "Set remote description (offer) success")
createAnswer(localPeer!!) { answer ->
val signalData = JSONObject().apply {
put("type", "answer")
put("sdp", JSONObject().put("sdp", answer))
put("room", currentRoom)
}

socket.emit("signal", signalData)

pendingIceCandidates.forEach { candidate ->
localPeer?.addIceCandidate(candidate)
}
pendingIceCandidates.clear()
}
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set remote description (offer) error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}

"ice" -> {
Log.d(TAG, "Received ICE candidate")
val candidateData = data.getJSONObject("candidate")
val candidate = IceCandidate(
candidateData.getString("sdpMid"),
candidateData.getInt("sdpMLineIndex"),
candidateData.getString("candidate")
)

if (localPeer?.remoteDescription != null) {
localPeer?.addIceCandidate(candidate)
} else {
pendingIceCandidates.add(candidate)
}
}

"time_sync_response" -> {
val t1 = data.getLong("t1")
val t2 = data.getLong("t2")
val t3 = data.getLong("t3")
val t4 = System.currentTimeMillis()

val RTT = t4 - t1
val oneWayDelay = ((t2 - t1) + (t3 - t4)) / 2
val timeOffset = ((t2 - t1) + (t3 - t4)) / 2

Log.d(TAG, "时间同步: RTT=$RTT ms, 单向时延=$oneWayDelay ms, 时间偏移量=$timeOffset ms")

// 更新 RTT 和 OWD 状态
viewLifecycleOwner.lifecycleScope.launch {
// 更新 RTT 状态
latestRttState.longValue = RTT

// 计算最大、最小和平均 RTT
val maxRtt = latencyHistory.maxOrNull() ?: 0L
val minRtt = latencyHistory.minOrNull() ?: 0L
val averageRtt = if (latencyHistory.isNotEmpty()) {
latencyHistory.average().toLong()
} else {
0L
}

maxRttState.longValue = maxRtt
minRttState.longValue = minRtt
averageRttState.longValue = averageRtt

Log.d(TAG, "RTT - Latest: $RTT ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms")
Log.d(TAG, "OWD - Latest: $oneWayDelay ms")
}
}

else -> {
Log.e(TAG, "Unknown signaling type: ${data.getString("type")}")
}
}
}

private fun requestTimeSync() {
val t1 = System.currentTimeMillis()
val syncRequest = JSONObject().apply {
put("type", "time_sync_request")
put("room", currentRoom)
put("t1", t1)
}
socket.emit("signal", syncRequest)
Log.d(TAG, "发送时间同步请求 at t1: $t1")
}

override fun onDestroyView() {
super.onDestroyView()
statsJob?.cancel()
socket.disconnect()
localPeer?.dispose()
localView?.release()
localEglBase?.release()
}
}

class VideoReceiverFragment : Fragment() {

companion object {
fun newInstance(
room: String,
stunUrl: String,
turnUrl: String,
turnUsername: String,
turnPassword: String,
signalingServerUrl: String
): VideoReceiverFragment {
val fragment = VideoReceiverFragment()
val args = Bundle().apply {
putString("room", room)
putString("stunUrl", stunUrl)
putString("turnUrl", turnUrl)
putString("turnUsername", turnUsername)
putString("turnPassword", turnPassword)
putString("signalingServerUrl", signalingServerUrl)
}
fragment.arguments = args
return fragment
}
}

// Class member variables
private lateinit var socket: Socket
private var localPeer: PeerConnection? = null
private var remoteView: SurfaceViewRenderer? = null
private var remoteEglBase: EglBase? = null
private val pendingIceCandidates = mutableListOf<IceCandidate>()
private var currentRoom: String? = null
private lateinit var signalingServerUrl: String
private lateinit var stunUrl: String
private lateinit var turnUrl: String
private lateinit var turnUsername: String
private lateinit var turnPassword: String
private val TAG: String = "WebRTC-Receiver"

private val frameRateState = mutableDoubleStateOf(0.0)
private val bitrateState = mutableLongStateOf(0L)
private val stutteringState = mutableStateOf(false)

private val frameRateLowState = mutableStateOf(false)
private val packetLossHighState = mutableStateOf(false)
private val packetLossState = mutableDoubleStateOf(0.0)

private val frameRateHistory = mutableStateListOf<Float>()
private val bitrateHistory = mutableStateListOf<Long>()
private var timeSyncJob: Job? = null

// 添加历史记录列表用于存储所有单向时延值
private val latencyHistory = mutableStateListOf<Long>()

// 添加状态变量用于存储最大、最小和平均单向时延
private val maxLatencyState = mutableLongStateOf(0L)
private val minLatencyState = mutableLongStateOf(Long.MAX_VALUE)
private val averageLatencyState = mutableLongStateOf(0L)
private val latestLatencyState = mutableLongStateOf(0L)

// History variables
private var prevFramesDecoded = 0.0
private var prevBytesReceived = 0.0
private var prevFramesReceived = 0.0
private var prevFramesDropped = 0.0
private var prevTimestamp = 0.0
private var timeOffset: Long = 0
private var t1: Long = 0

private var statsJob: Job? = null

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

// Retrieve data from arguments
currentRoom = arguments?.getString("room") ?: "default-room"
signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg"
stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478"
turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349"
turnUsername = arguments?.getString("turnUsername") ?: "wstszx"
turnPassword = arguments?.getString("turnPassword") ?: "930379"

Log.d(
TAG,
"onCreate: Role = Client, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl"
)
}

private val requestPermissionsLauncher =
registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions ->
permissions.entries.forEach { (permission, isGranted) ->
if (isGranted) {
Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show()
} else {
Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show()
}
}
onPermissionsChecked()
}

override fun onCreateView(
inflater: android.view.LayoutInflater,
container: android.view.ViewGroup?,
savedInstanceState: Bundle?
): android.view.View {
return ComposeView(requireContext()).apply {
setContent {
WebRTCComposeLayout()
}
}
}

@Composable
fun WebRTCComposeLayout() {
val context = LocalContext.current
lateinit var peerConnectionFactory: PeerConnectionFactory

Surface(color = Color.Black) {
Column(modifier = Modifier.fillMaxSize()) {

// Remote video view
AndroidView(
factory = {
remoteView = SurfaceViewRenderer(it).apply {
setZOrderMediaOverlay(false)
}
remoteView!!
},
modifier = Modifier
.weight(1f)
.fillMaxWidth(),
update = {
if (remoteEglBase?.eglBaseContext == null) {
remoteEglBase = EglBase.create()
it.init(remoteEglBase!!.eglBaseContext, null)
it.setMirror(false)
}
}
)

Spacer(modifier = Modifier.height(8.dp))

// 展示最大、最小和平均单向时延
Column(modifier = Modifier.padding(horizontal = 16.dp)) {
Text(
text = "当前单向时延: ${latestLatencyState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最大单向时延: ${maxLatencyState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最小单向时延: ${minLatencyState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "平均单向时延: ${averageLatencyState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 8.dp)
)
}



// Spacer between video and charts
Spacer(modifier = Modifier.height(8.dp))

// Frame Rate Section
Column(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 8.dp)
) {
// Frame Rate Text in Chinese
Text(
text = "帧率: ${frameRateState.doubleValue.roundToInt()} fps",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Log.d(
TAG,
"UI - Frame Rate: ${frameRateState.doubleValue} fps, Bitrate: ${bitrateState.longValue / 1000} kbps"
)

// Line Chart for Frame Rate
LineChart(
data = frameRateHistory,
modifier = Modifier
.height(200.dp)
.fillMaxWidth()
.padding(vertical = 8.dp),
lineColor = Color.Green,
backgroundColor = Color.Black,
yAxisLabel = "帧率 (fps)",
xAxisLabel = "时间 (秒)"
)
}

// Spacer between charts
Spacer(modifier = Modifier.height(8.dp))

// Bitrate Section
Column(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 16.dp)
) {
// Bitrate Text in Chinese
Text(
text = "码率: ${bitrateState.longValue / 1000} kbps",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)

// Line Chart for Bitrate
LineChart(
data = bitrateHistory.map { it / 1000f }, // Convert to kbps
modifier = Modifier
.height(200.dp)
.fillMaxWidth()
.padding(vertical = 8.dp),
lineColor = Color.Blue,
backgroundColor = Color.Black,
yAxisLabel = "码率 (kbps)",
xAxisLabel = "时间 (秒)"
)
}

// Spacer between metrics and stuttering indicator
Spacer(modifier = Modifier.height(16.dp))

// Stuttering Indicator
Row(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 16.dp),
verticalAlignment = Alignment.CenterVertically
) {
if (stutteringState.value) {
Icon(
imageVector = Icons.Default.Warning,
contentDescription = "卡顿警告",
tint = Color.Red,
modifier = Modifier.size(24.dp)
)
Spacer(modifier = Modifier.width(8.dp))
Column {
Text(
text = "视频播放出现卡顿",
color = Color.Red,
style = MaterialTheme.typography.bodyMedium
)
// Additional information about which metrics are abnormal
if (frameRateLowState.value) {
Text(
text = "帧率过低: ${frameRateState.doubleValue.roundToInt()} fps",
color = Color.Red,
style = MaterialTheme.typography.bodySmall
)
}
if (packetLossHighState.value) {
Text(
text = "包丢失率过高: ${packetLossState.doubleValue.roundToInt()}%",
color = Color.Red,
style = MaterialTheme.typography.bodySmall
)
}
}
} else {
Icon(
imageVector = Icons.Default.CheckCircle,
contentDescription = "正常",
tint = Color.Green,
modifier = Modifier.size(24.dp)
)
Spacer(modifier = Modifier.width(8.dp))
Text(
text = "视频播放正常",
color = Color.Green,
style = MaterialTheme.typography.bodyMedium
)
}
}
}

LaunchedEffect(Unit) {
val options = PeerConnectionFactory.InitializationOptions.builder(context)
.createInitializationOptions()
PeerConnectionFactory.initialize(options)

val encoderFactory = DefaultVideoEncoderFactory(
EglBase.create().eglBaseContext, true, true
)

val decoderFactory = DefaultVideoDecoderFactory(remoteEglBase!!.eglBaseContext)

peerConnectionFactory = PeerConnectionFactory.builder()
.setVideoEncoderFactory(encoderFactory)
.setVideoDecoderFactory(decoderFactory)
.createPeerConnectionFactory()

createPeerConnection(
context,
peerConnectionFactory,
remoteView!!
) {
localPeer = it
}

initializeSocketIO()

requestPermissionsIfNeeded()
}
}
}

private fun startPeriodicTimeSync(intervalMs: Long = 5000L) {
timeSyncJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
requestTimeSync()
delay(intervalMs)
}
}
}

/**
* Enhanced Line Chart Composable with Axes and Labels
* Modified to use straight lines connecting each point
*/
@Composable
fun LineChart(
data: List<Float>,
modifier: Modifier = Modifier,
lineColor: Color = Color.Green,
backgroundColor: Color = Color.Black,
yAxisLabel: String = "",
xAxisLabel: String = "",
minYValue: Float? = null,
maxYValue: Float? = null
) {
Canvas(modifier = modifier.background(backgroundColor)) {
val padding = 40.dp.toPx() // Padding for axes and labels

if (data.isEmpty()) return@Canvas

val maxY = maxYValue ?: data.maxOrNull() ?: 1f
val minY = minYValue ?: data.minOrNull() ?: 0f
val yRange = maxY - minY
val pointCount = data.size
val spacing = (size.width - padding * 2) / (pointCount - 1).coerceAtLeast(1)

val points = data.mapIndexed { index, value ->
val x = padding + index * spacing
val y = if (yRange == 0f) size.height / 2 else (size.height - padding) - ((value - minY) / yRange) * (size.height - padding * 2)
Offset(x, y)
}

// Draw axes
drawLine(
color = Color.White,
start = Offset(padding, padding),
end = Offset(padding, size.height - padding),
strokeWidth = 2f
)
drawLine(
color = Color.White,
start = Offset(padding, size.height - padding),
end = Offset(size.width - padding, size.height - padding),
strokeWidth = 2f
)

// Draw y-axis labels
val yLabelCount = 5
val yStep = yRange / (yLabelCount - 1)
for (i in 0 until yLabelCount) {
val yValue = minY + i * yStep
val yPos = (size.height - padding) - ((yValue - minY) / yRange) * (size.height - padding * 2)

drawContext.canvas.nativeCanvas.apply {
val label = yValue.roundToInt().toString()
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.RIGHT
}
drawText(
label,
padding - 8f,
yPos + textPaint.textSize / 2,
textPaint
)
}
}

// Draw x-axis labels
val xLabelCount = 5
val xStep = (pointCount - 1).coerceAtLeast(1) / (xLabelCount - 1).coerceAtLeast(1)
for (i in 0 until xLabelCount) {
val index = i * xStep
val xPos = padding + index * spacing

drawContext.canvas.nativeCanvas.apply {
val label = index.toString()
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
}
drawText(
label,
xPos,
size.height - padding + textPaint.textSize + 4f,
textPaint
)
}
}

// Optionally, draw axis labels
// Y-Axis Label
if (yAxisLabel.isNotEmpty()) {
drawContext.canvas.nativeCanvas.apply {
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
isAntiAlias = true
}
// Rotate for vertical text
save()
rotate(-90f, padding / 2, size.height / 2)
drawText(
yAxisLabel,
padding / 2,
size.height / 2,
textPaint
)
restore()
}
}

// X-Axis Label
if (xAxisLabel.isNotEmpty()) {
drawContext.canvas.nativeCanvas.apply {
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
isAntiAlias = true
}
drawText(
xAxisLabel,
size.width / 2,
size.height - padding / 2,
textPaint
)
}
}

// Draw the straight lines connecting points
if (points.size >= 2) {
for (i in 0 until points.size - 1) {
drawLine(
color = lineColor,
start = points[i],
end = points[i + 1],
strokeWidth = 4f,
cap = StrokeCap.Round
)
}
}

// Optionally, draw points
points.forEach { point ->
drawCircle(
color = lineColor,
radius = 4f,
center = point
)
}
}
}

private fun initializeSocketIO() {
val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http"
val socketUrl = signalingServerUrl

val options = IO.Options().apply {
transports = arrayOf("websocket")
secure = protocol == "https"
path = "/socket.io/"
}

try {
socket = IO.socket(socketUrl, options)

socket.on(Socket.EVENT_CONNECT) {
Log.d(TAG, "Socket connected")
socket.emit("join", currentRoom)
Log.d(TAG, "Joined room: $currentRoom")
initiateCall()
startPeriodicTimeSync()
}

socket.on(Socket.EVENT_CONNECT_ERROR) { args ->
if (args.isNotEmpty()) {
val error = args[0]
Log.e(TAG, "Socket connection error: $error")
}
}

socket.on(Socket.EVENT_DISCONNECT) { args ->
if (args.isNotEmpty()) {
val reason = args[0]
Log.d(TAG, "Socket disconnected: $reason")
}
}

socket.on("signal") { args ->
Log.d(TAG, "Received signaling: ${args[0]}")
if (args.isNotEmpty() && args[0] is JSONObject) {
val data = args[0] as JSONObject
handleSignalingData(data)
}
}

socket.connect()
Log.d(TAG, "Connecting to Socket: $socketUrl...")
} catch (e: Exception) {
Log.e(TAG, "Error connecting to Socket: ${e.message}")
}
}

private fun requestPermissionsIfNeeded() {
val permissions = arrayOf(
Manifest.permission.CAMERA,
Manifest.permission.RECORD_AUDIO,
Manifest.permission.INTERNET,
Manifest.permission.ACCESS_NETWORK_STATE
)

val permissionsToRequest = permissions.filter {
ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED
}

if (permissionsToRequest.isNotEmpty()) {
requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray())
} else {
onPermissionsChecked()
}
}

private fun onPermissionsChecked() {
Toast.makeText(requireContext(), "所有必要的权限已授予", Toast.LENGTH_SHORT).show()
}

private fun createPeerConnection(
context: Context,
peerConnectionFactory: PeerConnectionFactory,
remoteView: SurfaceViewRenderer,
onLocalPeerCreated: (PeerConnection) -> Unit
) {
val iceServers = listOf(
PeerConnection.IceServer.builder(stunUrl).createIceServer(),
PeerConnection.IceServer.builder(turnUrl)
.setUsername(turnUsername)
.setPassword(turnPassword)
.createIceServer()
)

val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply {
bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE
rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE
tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED
continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY
sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN
}

localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer {
override fun onIceCandidate(iceCandidate: IceCandidate?) {
iceCandidate?.let {
Log.d(TAG, "ICE candidate: $it")
val signalData = JSONObject().apply {
put("type", "ice")
put("candidate", JSONObject().apply {
put("sdpMid", it.sdpMid)
put("sdpMLineIndex", it.sdpMLineIndex)
put("candidate", it.sdp)
})
put("room", currentRoom)
}
socket.emit("signal", signalData)
}
}

override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) {
Log.d(TAG, "ICE candidates removed")
}

override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
Log.d(TAG, "Signaling state changed to: $newState")
}

override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
Log.d(TAG, "ICE connection state changed to: $newState")
}

override fun onIceConnectionReceivingChange(receiving: Boolean) {
Log.d(TAG, "ICE connection receiving change: $receiving")
}

override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
Log.d(TAG, "ICE gathering state changed to: $newState")
}

override fun onAddStream(stream: MediaStream?) {
Log.d(TAG, "Stream added")
}

override fun onRemoveStream(stream: MediaStream?) {
Log.d(TAG, "Stream removed")
}

override fun onDataChannel(dataChannel: DataChannel?) {
Log.d(TAG, "Data channel created")
}

override fun onRenegotiationNeeded() {
Log.d(TAG, "Renegotiation needed")
}

override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) {
Log.d(TAG, "Track added")
receiver?.track()?.let { track ->
if (track is VideoTrack) {
track.addSink(remoteView)
}
}
}

override fun onTrack(transceiver: RtpTransceiver?) {
Log.d(TAG, "onTrack called")
transceiver?.receiver?.track()?.let { track ->
if (track is VideoTrack) {
track.addSink(remoteView)
}
}
}

override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
Log.d(TAG, "Connection state changed to: $newState")
}
})

onLocalPeerCreated(localPeer!!)

// Start collecting statistics
startStatsCollection()
}

private fun initiateCall() {
Log.d(TAG, "Initiating call...")
val constraints = MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true"))
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"))
}

localPeer?.createOffer(object : SdpObserver {
override fun onCreateSuccess(sessionDescription: SessionDescription?) {
sessionDescription?.let { sdp ->
localPeer?.setLocalDescription(object : SdpObserver {
override fun onSetSuccess() {
val signalData = JSONObject().apply {
put("type", "offer")
put("sdp", JSONObject().put("sdp", sdp.description))
put("room", currentRoom)
}
socket.emit("signal", signalData)
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set local description error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}
}

override fun onSetSuccess() {}
override fun onCreateFailure(error: String?) {
Log.e(TAG, "Create offer error: $error")
}

override fun onSetFailure(error: String?) {}
}, constraints)
}

private fun handleSignalingData(data: JSONObject) {
Log.d(TAG, "Handling signaling data: $data")
when (data.getString("type")) {

"answer" -> {
Log.d(TAG, "Received answer")
val sdp = SessionDescription(
SessionDescription.Type.ANSWER,
data.getJSONObject("sdp").getString("sdp")
)

localPeer?.setRemoteDescription(object : SdpObserver {
override fun onSetSuccess() {
pendingIceCandidates.forEach { candidate ->
localPeer?.addIceCandidate(candidate)
}

pendingIceCandidates.clear()

Log.d(TAG, "Set remote description (answer) success")
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set remote description error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}

"ice" -> {
Log.d(TAG, "Received ICE candidate")
val candidateData = data.getJSONObject("candidate")
val candidate = IceCandidate(
candidateData.getString("sdpMid"),
candidateData.getInt("sdpMLineIndex"),
candidateData.getString("candidate")
)

if (localPeer?.remoteDescription != null) {
localPeer?.addIceCandidate(candidate)
} else {
pendingIceCandidates.add(candidate)
}
}

// "time_sync_response" -> {
// val t1 = data.getLong("t1")
// val t2 = data.getLong("t2")
// val t3 = data.getLong("t3")
// val t4 = System.currentTimeMillis()
//
// val RTT = t4 - t1
// val oneWayDelay = RTT / 2
// timeOffset = ((t2 - t1) + (t3 - t4)) / 2
//
// Log.d(TAG, "时间同步: RTT=$RTT ms, 单向时延=$oneWayDelay ms, 时间偏移量=$timeOffset ms")
//
// // 更新 latencyState
// viewLifecycleOwner.lifecycleScope.launch {
// // 添加到历史记录
// latencyHistory.add(oneWayDelay)
// if (latencyHistory.size > 60) { // 保持最近60个时延值
// latencyHistory.removeAt(0)
// }
//
// // 计算最大、最小和平均单向时延
// val maxLatency = latencyHistory.maxOrNull() ?: 0L
// val minLatency = latencyHistory.minOrNull() ?: 0L
// val averageLatency = if (latencyHistory.isNotEmpty()) {
// latencyHistory.average().toLong()
// } else {
// 0L
// }
//
// maxLatencyState.longValue = maxLatency
// minLatencyState.longValue = minLatency
// averageLatencyState.longValue = averageLatency
// latestLatencyState.longValue = oneWayDelay
// }
// }

"time_sync_request" -> {
val t1 = data.getLong("t1")
val t2 = System.currentTimeMillis()
val t3 = System.currentTimeMillis()
val syncResponse = JSONObject().apply {
put("type", "time_sync_response")
put("t1", t1)
put("t2", t2)
put("t3", t3)
put("room", currentRoom)
}
socket.emit("signal", syncResponse)
Log.d(TAG, "回复 time_sync_response: t1=$t1, t2=$t2, t3=$t3")
}

else -> {
Log.e(TAG, "Unknown signaling type: ${data.getString("type")}")
}
}
}

private fun requestTimeSync() {
t1 = System.currentTimeMillis()
val syncRequest = JSONObject().apply {
put("type", "time_sync_request")
put("room", currentRoom)
put("t1", t1)
}
socket.emit("signal", syncRequest)
Log.d(TAG, "发送时间同步请求 at t1: $t1")
}

private fun startStatsCollection() {
Log.d(TAG, "Starting stats collection...")
statsJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(1000) // Collect stats every second
Log.d(TAG, "Collecting stats...")
localPeer?.getStats { report ->
Log.d(TAG, "Stats report obtained.")
parseStatsReport(report)
} ?: Log.e(TAG, "Failed to get stats: localPeer is null.")
}
}
}

private fun parseStatsReport(report: RTCStatsReport) {
Log.d(TAG, "Received RTCStatsReport: $report")
for (stats in report.statsMap.values) {
if (stats.type == "inbound-rtp") {
val kind = stats.members["kind"] as? String
if (kind == "video") {
val framesDecoded = (stats.members["framesDecoded"] as? Number)?.toDouble() ?: 0.0
val framesReceived = (stats.members["framesReceived"] as? Number)?.toDouble() ?: 0.0
val framesDropped = (stats.members["framesDropped"] as? Number)?.toDouble() ?: 0.0
val bytesReceived = (stats.members["bytesReceived"] as? Number)?.toDouble() ?: 0.0
val packetsLost = (stats.members["packetsLost"] as? Number)?.toDouble() ?: 0.0
val packetsReceived = (stats.members["packetsReceived"] as? Number)?.toDouble() ?: 1.0 // Avoid division by zero
val packetLossFraction = packetsLost / (packetsLost + packetsReceived)
val timestamp = stats.timestampUs / 1_000_000.0 // Convert to seconds

Log.d(
TAG,
"Stats - Frames Decoded: $framesDecoded, Frames Received: $framesReceived, Frames Dropped: $framesDropped, Bytes Received: $bytesReceived, Packet Loss Fraction: $packetLossFraction, Timestamp: $timestamp"
)

if (prevTimestamp != 0.0) {
val timeElapsed = timestamp - prevTimestamp
val framesDelta = framesDecoded - prevFramesDecoded
val bytesDelta = bytesReceived - prevBytesReceived
val framesReceivedDelta = framesReceived - prevFramesReceived
val framesDroppedDelta = framesDropped - prevFramesDropped

val frameRate = if (timeElapsed > 0) framesDelta / timeElapsed else 0.0
val bitrate = if (timeElapsed > 0) (bytesDelta * 8) / timeElapsed else 0.0 // bits per second
val packetLoss = packetLossFraction * 100 // Convert to percentage

Log.d(TAG, "Calculated Frame Rate: $frameRate fps, Bitrate: $bitrate bps, Packet Loss: $packetLoss%")

// Determine stuttering based on thresholds
val isStuttering = frameRate < 24 || packetLoss > 5.0 // Thresholds can be adjusted

// Update states
viewLifecycleOwner.lifecycleScope.launch {
frameRateState.doubleValue = frameRate
bitrateState.longValue = bitrate.toLong()
// Update stuttering state
stutteringState.value = isStuttering
// Update specific stuttering causes
frameRateLowState.value = frameRate < 24
packetLossHighState.value = packetLoss > 5.0
packetLossState.doubleValue = packetLoss

// Update history
frameRateHistory.add(frameRate.toFloat())
if (frameRateHistory.size > 60) {
frameRateHistory.removeAt(0)
}

bitrateHistory.add(bitrate.toLong())
if (bitrateHistory.size > 60) {
bitrateHistory.removeAt(0)
}

Log.d(
TAG,
"Updated Frame Rate: ${frameRateState.doubleValue} fps, Bitrate: ${bitrateState.longValue / 1000} kbps, Stuttering: $isStuttering"
)
}
}

// Update previous values
prevFramesDecoded = framesDecoded
prevBytesReceived = bytesReceived
prevFramesReceived = framesReceived
prevFramesDropped = framesDropped
prevTimestamp = timestamp
}
}
}
}

override fun onDestroyView() {
super.onDestroyView()
statsJob?.cancel()
socket.disconnect()
localPeer?.dispose()
remoteView?.release()
remoteEglBase?.release()
timeSyncJob?.cancel()
}
}

目前的单向时延和往返时延的统计方案应该用同一个请求来计算,在发起端计算往返时延并展示,在接收端计算单向时延并展示,返回给我修改后的代码

思考

优化时间同步

我正在调整时间同步和RTT/OWD统计的实现,期望将提问者的信息与接收者的时间信息合并计算。

整合时间同步逻辑

我正在集中力量,通过启动器发送时间同步请求,接收侧回复时间同步响应,来计算RTT和OWD。这将统一时间同步逻辑,使得两边的计量更为一致。

同步请求与响应

现在正在处理时间同步请求和响应。发送请求,接收响应,计算RTT和OWD。接收者根据t1计算OWD,发送t2和t3。更新代码,显示统计数据。

同步改进

VideoInitiatorFragment中记录time_sync_request的发送时间t1,收到time_sync_response后记录t4并计算RTT。VideoReceiverFragment则在收到time_sync_request时响应time_sync_response,计算并存储RTT和OWD。

协调时间同步逻辑

助理正在调整两个类,计算RTT和OWD,以确保时间同步的准确性。

回答

To achieve a unified time synchronization mechanism where the initiator calculates the Round Trip Time (RTT) and the receiver calculates the One Way Delay (OWD) using the same request, we'll refactor both the VideoInitiatorFragment and VideoReceiverFragment. Here's a comprehensive guide along with the modified code.

Overview of Changes

  1. Unified Time Sync Requests:

    • The initiator (VideoInitiatorFragment) will periodically send a time_sync_request containing the timestamp t1.
    • The receiver (VideoReceiverFragment) will respond with a time_sync_response containing t1, t2, and t3 where:
      • t2 is the time when the receiver receives the request.
      • t3 is the time when the receiver sends the response.
  2. RTT Calculation (Initiator):

    • Upon receiving the time_sync_response, the initiator captures t4 (current time).
    • RTT is calculated as t4 - t1.
  3. OWD Calculation (Receiver):

    • The receiver calculates OWD as t3 - t2, representing the processing time on the receiver's side.
  4. State Management:

    • Both fragments maintain separate states for RTT and OWD to display the metrics appropriately.
  5. Optimizations and Cleanup:

    • Removed redundant or commented-out code related to time synchronization.
    • Ensured thread safety and proper lifecycle management for coroutines.

Modified Code

VideoInitiatorFragment.kt

class VideoInitiatorFragment : Fragment() { companion object { fun newInstance( room: String, stunUrl: String, turnUrl: String, turnUsername: String, turnPassword: String, signalingServerUrl: String ): VideoInitiatorFragment { val fragment = VideoInitiatorFragment() val args = Bundle().apply { putString("room", room) putString("stunUrl", stunUrl) putString("turnUrl", turnUrl) putString("turnUsername", turnUsername) putString("turnPassword", turnPassword) putString("signalingServerUrl", signalingServerUrl) } fragment.arguments = args return fragment } } // Class member variables remain unchanged private lateinit var socket: Socket private var localPeer: PeerConnection? = null private var localView: SurfaceViewRenderer? = null private var localEglBase: EglBase? = null private val pendingIceCandidates = mutableListOf<IceCandidate>() private var currentRoom: String? = null private lateinit var signalingServerUrl: String private lateinit var stunUrl: String private lateinit var turnUrl: String private lateinit var turnUsername: String private lateinit var turnPassword: String private val TAG: String = "WebRTC-Initiator" private val maxRttState = mutableLongStateOf(0L) private val minRttState = mutableLongStateOf(Long.MAX_VALUE) private val averageRttState = mutableLongStateOf(0L) private val latestRttState = mutableLongStateOf(0L) private val rttHistory = mutableStateListOf<Long>() private val latencyHistory = mutableStateListOf<Long>() private var statsJob: Job? = null // Time Sync Variables private var t1: Long = 0L private var t4: Long = 0L override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // Retrieve data from arguments currentRoom = arguments?.getString("room") ?: "default-room" signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg" stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478" turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349" turnUsername = arguments?.getString("turnUsername") ?: "wstszx" turnPassword = arguments?.getString("turnPassword") ?: "930379" Log.d( TAG, "onCreate: Role = Server, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl" ) } private val requestPermissionsLauncher = registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions -> permissions.entries.forEach { (permission, isGranted) -> if (isGranted) { Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show() } else { Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show() } } onPermissionsChecked() } override fun onCreateView( inflater: android.view.LayoutInflater, container: android.view.ViewGroup?, savedInstanceState: Bundle? ): android.view.View { return ComposeView(requireContext()).apply { setContent { WebRTCComposeLayout() } } } @Composable fun WebRTCComposeLayout() { val context = LocalContext.current lateinit var peerConnectionFactory: PeerConnectionFactory var localVideoTrack: VideoTrack? by remember { mutableStateOf(null) } Surface(color = Color.Black) { Column(modifier = Modifier.fillMaxSize()) { // Local video view AndroidView( factory = { localView = SurfaceViewRenderer(it).apply { setZOrderMediaOverlay(false) } localView!! }, modifier = Modifier .weight(1f) .fillMaxWidth(), update = { if (localEglBase == null) { localEglBase = EglBase.create() it.init(localEglBase!!.eglBaseContext, null) it.setMirror(false) } } ) Spacer(modifier = Modifier.height(8.dp)) // Display RTT Metrics Column(modifier = Modifier.padding(horizontal = 16.dp)) { Text( text = "当前往返时延 (RTT): ${latestRttState.longValue} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最大 RTT: ${maxRttState.longValue} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最小 RTT: ${minRttState.longValue} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "平均 RTT: ${averageRttState.longValue} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 8.dp) ) } Spacer(modifier = Modifier.height(8.dp)) } LaunchedEffect(Unit) { val options = PeerConnectionFactory.InitializationOptions.builder(context) .createInitializationOptions() PeerConnectionFactory.initialize(options) val encoderFactory = DefaultVideoEncoderFactory( localEglBase!!.eglBaseContext, true, true ) val decoderFactory = DefaultVideoDecoderFactory(localEglBase!!.eglBaseContext) peerConnectionFactory = PeerConnectionFactory.builder() .setVideoEncoderFactory(encoderFactory) .setVideoDecoderFactory(decoderFactory) .createPeerConnectionFactory() initLocalVideo(context, localView, peerConnectionFactory, localEglBase!!) { localVideoTrack = it } createPeerConnection( context, peerConnectionFactory, localVideoTrack ) { localPeer = it startStatsCollection() // Start stats collection } initializeSocketIO() requestPermissionsIfNeeded() startTimeSync() } } } private fun startTimeSync() { viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(5000) // Sync every 5 seconds requestTimeSync() } } } private fun startStatsCollection() { statsJob = viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(1000) // Collect stats every second localPeer?.getStats { report -> parseStatsReport(report) } ?: Log.e(TAG, "Failed to get stats: localPeer is null.") } } } private fun parseStatsReport(report: RTCStatsReport) { Log.d(TAG, "RTCStatsReport: ${report.statsMap}") for (stats in report.statsMap.values) { Log.d(TAG, "Stats type: ${stats.type}") if (stats.type == "transport") { Log.d(TAG, "Transport Stats found: $stats") val currentRtt = (stats.members["currentRoundTripTime"] as? Number)?.toDouble()?.times(1000)?.toLong() if (currentRtt != null && currentRtt > 0) { viewLifecycleOwner.lifecycleScope.launch { // Update latest RTT latestRttState.longValue = currentRtt // Update history rttHistory.add(currentRtt) if (rttHistory.size > 60) { rttHistory.removeAt(0) } // Calculate max, min, and average RTT val maxRtt = rttHistory.maxOrNull() ?: 0L val minRtt = rttHistory.minOrNull() ?: 0L val averageRtt = if (rttHistory.isNotEmpty()) { rttHistory.average().toLong() } else { 0L } maxRttState.longValue = maxRtt minRttState.longValue = minRtt averageRttState.longValue = averageRtt Log.d(TAG, "RTT - Latest: $currentRtt ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms") } } } } } private fun initializeSocketIO() { val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http" val socketUrl = signalingServerUrl val options = IO.Options().apply { transports = arrayOf("websocket") secure = protocol == "https" path = "/socket.io/" } try { socket = IO.socket(socketUrl, options) socket.on(Socket.EVENT_CONNECT) { Log.d(TAG, "Socket connected") socket.emit("join", currentRoom) Log.d(TAG, "Joined room: $currentRoom") } socket.on(Socket.EVENT_CONNECT_ERROR) { args -> if (args.isNotEmpty()) { val error = args[0] Log.e(TAG, "Socket connection error: $error") } } socket.on(Socket.EVENT_DISCONNECT) { args -> if (args.isNotEmpty()) { val reason = args[0] Log.d(TAG, "Socket disconnected: $reason") } } socket.on("signal") { args -> Log.d(TAG, "Received signaling: ${args[0]}") if (args.isNotEmpty() && args[0] is JSONObject) { val data = args[0] as JSONObject handleSignalingData(data) } } socket.connect() Log.d(TAG, "Connecting to Socket: $socketUrl...") } catch (e: Exception) { Log.e(TAG, "Error connecting to Socket: ${e.message}") } } private fun requestPermissionsIfNeeded() { val permissions = arrayOf( Manifest.permission.CAMERA, Manifest.permission.RECORD_AUDIO, Manifest.permission.INTERNET, Manifest.permission.ACCESS_NETWORK_STATE ) val permissionsToRequest = permissions.filter { ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED } if (permissionsToRequest.isNotEmpty()) { requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray()) } else { onPermissionsChecked() } } private fun onPermissionsChecked() { Toast.makeText(requireContext(), "所有必要权限已被授予", Toast.LENGTH_SHORT).show() } private fun initLocalVideo( context: Context, localView: SurfaceViewRenderer?, peerConnectionFactory: PeerConnectionFactory, eglBase: EglBase, onLocalVideoTrack: (VideoTrack) -> Unit ) { val videoCapturer = createCameraCapturer(context) val surfaceTextureHelper = SurfaceTextureHelper.create("CaptureThread", eglBase.eglBaseContext) val videoSource = peerConnectionFactory.createVideoSource(videoCapturer.isScreencast) videoCapturer.initialize(surfaceTextureHelper, context, videoSource.capturerObserver) videoCapturer.startCapture(1920, 1080, 60) val localVideoTrack = peerConnectionFactory.createVideoTrack("video_track", videoSource) localVideoTrack.addSink(localView) val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints()) val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource) // Add audio and video tracks to local stream val mediaStream = peerConnectionFactory.createLocalMediaStream("local_stream") mediaStream.addTrack(localAudioTrack) mediaStream.addTrack(localVideoTrack) onLocalVideoTrack(localVideoTrack) } private fun createCameraCapturer(context: Context): CameraVideoCapturer { val camera2Enumerator = Camera2Enumerator(context) val deviceNames = camera2Enumerator.deviceNames // Prefer back-facing camera for (deviceName in deviceNames) { if (camera2Enumerator.isBackFacing(deviceName)) { val capturer = camera2Enumerator.createCapturer(deviceName, null) if (capturer != null) { return capturer } } } // If no back-facing camera, try front-facing for (deviceName in deviceNames) { if (camera2Enumerator.isFrontFacing(deviceName)) { val capturer = camera2Enumerator.createCapturer(deviceName, null) if (capturer != null) { return capturer } } } // If no front-facing camera, use the first available return camera2Enumerator.createCapturer(deviceNames[0], null) ?: throw IllegalStateException("无法创建摄像头捕获器") } private fun createPeerConnection( context: Context, peerConnectionFactory: PeerConnectionFactory, localVideoTrack: VideoTrack?, onLocalPeerCreated: (PeerConnection) -> Unit ) { val iceServers = listOf( PeerConnection.IceServer.builder(stunUrl).createIceServer(), PeerConnection.IceServer.builder(turnUrl) .setUsername(turnUsername) .setPassword(turnPassword) .createIceServer() ) val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply { bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN } localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer { override fun onIceCandidate(iceCandidate: IceCandidate?) { iceCandidate?.let { Log.d(TAG, "ICE candidate: $it") val signalData = JSONObject().apply { put("type", "ice") put("candidate", JSONObject().apply { put("sdpMid", it.sdpMid) put("sdpMLineIndex", it.sdpMLineIndex) put("candidate", it.sdp) }) put("room", currentRoom) } socket.emit("signal", signalData) } } override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) { Log.d(TAG, "ICE candidates removed") } override fun onSignalingChange(newState: PeerConnection.SignalingState?) { Log.d(TAG, "Signaling state changed to: $newState") } override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) { Log.d(TAG, "ICE connection state changed to: $newState") } override fun onIceConnectionReceivingChange(receiving: Boolean) { Log.d(TAG, "ICE connection receiving change: $receiving") } override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) { Log.d(TAG, "ICE gathering state changed to: $newState") } override fun onAddStream(stream: MediaStream?) { Log.d(TAG, "Stream added") } override fun onRemoveStream(stream: MediaStream?) { Log.d(TAG, "Stream removed") } override fun onDataChannel(dataChannel: DataChannel?) { Log.d(TAG, "Data channel created") } override fun onRenegotiationNeeded() { Log.d(TAG, "Renegotiation needed") } override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) { Log.d(TAG, "Track added") } override fun onTrack(transceiver: RtpTransceiver?) { Log.d(TAG, "onTrack called") } override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) { Log.d(TAG, "Connection state changed to: $newState") } }) localVideoTrack?.let { localPeer?.addTrack(it, listOf("local_stream")) } val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints()) val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource) localPeer?.addTrack(localAudioTrack, listOf("local_stream")) onLocalPeerCreated(localPeer!!) } private fun createAnswer(peerConnection: PeerConnection, onAnswerCreated: (String) -> Unit) { Log.d(TAG, "Creating answer...") val constraints = MediaConstraints().apply { mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true")) mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true")) } peerConnection.createAnswer(object : SdpObserver { override fun onCreateSuccess(sessionDescription: SessionDescription?) { sessionDescription?.let { sdp -> peerConnection.setLocalDescription(object : SdpObserver { override fun onSetSuccess() { Log.d(TAG, "SetLocalDescription onSetSuccess") onAnswerCreated(sdp.description) } override fun onSetFailure(error: String?) { Log.e(TAG, "SetLocalDescription onSetFailure: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } } override fun onSetSuccess() { Log.d(TAG, "createAnswer onSetSuccess") } override fun onCreateFailure(error: String?) { Log.e(TAG, "createAnswer onCreateFailure: $error") } override fun onSetFailure(error: String?) {} }, constraints) } private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "offer" -> { Log.d(TAG, "Received offer") val sdp = SessionDescription( SessionDescription.Type.OFFER, data.getJSONObject("sdp").getString("sdp") ) localPeer?.setRemoteDescription(object : SdpObserver { override fun onSetSuccess() { Log.d(TAG, "Set remote description (offer) success") createAnswer(localPeer!!) { answer -> val signalData = JSONObject().apply { put("type", "answer") put("sdp", JSONObject().put("sdp", answer)) put("room", currentRoom) } socket.emit("signal", signalData) pendingIceCandidates.forEach { candidate -> localPeer?.addIceCandidate(candidate) } pendingIceCandidates.clear() } } override fun onSetFailure(error: String?) { Log.e(TAG, "Set remote description (offer) error: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } "ice" -> { Log.d(TAG, "Received ICE candidate") val candidateData = data.getJSONObject("candidate") val candidate = IceCandidate( candidateData.getString("sdpMid"), candidateData.getInt("sdpMLineIndex"), candidateData.getString("candidate") ) if (localPeer?.remoteDescription != null) { localPeer?.addIceCandidate(candidate) } else { pendingIceCandidates.add(candidate) } } "time_sync_response" -> { val responseT1 = data.getLong("t1") val responseT2 = data.getLong("t2") val responseT3 = data.getLong("t3") t4 = System.currentTimeMillis() // Calculate RTT val RTT = t4 - responseT1 Log.d(TAG, "Time Sync: RTT=$RTT ms") // Update RTT states viewLifecycleOwner.lifecycleScope.launch { // Update latest RTT latestRttState.longValue = RTT // Update history rttHistory.add(RTT) if (rttHistory.size > 60) { rttHistory.removeAt(0) } // Calculate max, min, and average RTT val maxRtt = rttHistory.maxOrNull() ?: 0L val minRtt = rttHistory.minOrNull() ?: 0L val averageRtt = if (rttHistory.isNotEmpty()) { rttHistory.average().toLong() } else { 0L } maxRttState.longValue = maxRtt minRttState.longValue = minRtt averageRttState.longValue = averageRtt Log.d(TAG, "RTT - Latest: $RTT ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms") } } else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } } private fun requestTimeSync() { t1 = System.currentTimeMillis() val syncRequest = JSONObject().apply { put("type", "time_sync_request") put("room", currentRoom) put("t1", t1) } socket.emit("signal", syncRequest) Log.d(TAG, "Sent time_sync_request at t1: $t1") } override fun onDestroyView() { super.onDestroyView() statsJob?.cancel() socket.disconnect() localPeer?.dispose() localView?.release() localEglBase?.release() } }

VideoReceiverFragment.kt

class VideoReceiverFragment : Fragment() { companion object { fun newInstance( room: String, stunUrl: String, turnUrl: String, turnUsername: String, turnPassword: String, signalingServerUrl: String ): VideoReceiverFragment { val fragment = VideoReceiverFragment() val args = Bundle().apply { putString("room", room) putString("stunUrl", stunUrl) putString("turnUrl", turnUrl) putString("turnUsername", turnUsername) putString("turnPassword", turnPassword) putString("signalingServerUrl", signalingServerUrl) } fragment.arguments = args return fragment } } // Class member variables private lateinit var socket: Socket private var localPeer: PeerConnection? = null private var remoteView: SurfaceViewRenderer? = null private var remoteEglBase: EglBase? = null private val pendingIceCandidates = mutableListOf<IceCandidate>() private var currentRoom: String? = null private lateinit var signalingServerUrl: String private lateinit var stunUrl: String private lateinit var turnUrl: String private lateinit var turnUsername: String private lateinit var turnPassword: String private val TAG: String = "WebRTC-Receiver" private val frameRateState = mutableDoubleStateOf(0.0) private val bitrateState = mutableLongStateOf(0L) private val stutteringState = mutableStateOf(false) private val frameRateLowState = mutableStateOf(false) private val packetLossHighState = mutableStateOf(false) private val packetLossState = mutableDoubleStateOf(0.0) private val frameRateHistory = mutableStateListOf<Float>() private val bitrateHistory = mutableStateListOf<Long>() private var timeSyncJob: Job? = null // Latency Variables private val latencyHistory = mutableStateListOf<Long>() private val maxLatencyState = mutableLongStateOf(0L) private val minLatencyState = mutableLongStateOf(Long.MAX_VALUE) private val averageLatencyState = mutableLongStateOf(0L) private val latestLatencyState = mutableLongStateOf(0L) // History variables private var prevFramesDecoded = 0.0 private var prevBytesReceived = 0.0 private var prevFramesReceived = 0.0 private var prevFramesDropped = 0.0 private var prevTimestamp = 0.0 private var timeOffset: Long = 0 private var t1: Long = 0 private var statsJob: Job? = null // OWD Variables private var owd: Long = 0L override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // Retrieve data from arguments currentRoom = arguments?.getString("room") ?: "default-room" signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg" stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478" turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349" turnUsername = arguments?.getString("turnUsername") ?: "wstszx" turnPassword = arguments?.getString("turnPassword") ?: "930379" Log.d( TAG, "onCreate: Role = Client, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl" ) } private val requestPermissionsLauncher = registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions -> permissions.entries.forEach { (permission, isGranted) -> if (isGranted) { Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show() } else { Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show() } } onPermissionsChecked() } override fun onCreateView( inflater: android.view.LayoutInflater, container: android.view.ViewGroup?, savedInstanceState: Bundle? ): android.view.View { return ComposeView(requireContext()).apply { setContent { WebRTCComposeLayout() } } } @Composable fun WebRTCComposeLayout() { val context = LocalContext.current lateinit var peerConnectionFactory: PeerConnectionFactory Surface(color = Color.Black) { Column(modifier = Modifier.fillMaxSize()) { // Remote video view AndroidView( factory = { remoteView = SurfaceViewRenderer(it).apply { setZOrderMediaOverlay(false) } remoteView!! }, modifier = Modifier .weight(1f) .fillMaxWidth(), update = { if (remoteEglBase == null) { remoteEglBase = EglBase.create() it.init(remoteEglBase!!.eglBaseContext, null) it.setMirror(false) } } ) Spacer(modifier = Modifier.height(8.dp)) // Display OWD Metrics Column(modifier = Modifier.padding(horizontal = 16.dp)) { Text( text = "当前单向时延 (OWD): ${latestLatencyState.longValue} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最大单向时延: ${maxLatencyState.longValue} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最小单向时延: ${minLatencyState.longValue} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "平均单向时延: ${averageLatencyState.longValue} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 8.dp) ) } Spacer(modifier = Modifier.height(8.dp)) // Frame Rate Section Column( modifier = Modifier .fillMaxWidth() .padding(horizontal = 8.dp) ) { // Frame Rate Text in Chinese Text( text = "帧率: ${frameRateState.doubleValue.roundToInt()} fps", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Log.d( TAG, "UI - Frame Rate: ${frameRateState.doubleValue} fps, Bitrate: ${bitrateState.longValue / 1000} kbps" ) // Line Chart for Frame Rate LineChart( data = frameRateHistory, modifier = Modifier .height(200.dp) .fillMaxWidth() .padding(vertical = 8.dp), lineColor = Color.Green, backgroundColor = Color.Black, yAxisLabel = "帧率 (fps)", xAxisLabel = "时间 (秒)" ) } // Spacer between charts Spacer(modifier = Modifier.height(8.dp)) // Bitrate Section Column( modifier = Modifier .fillMaxWidth() .padding(horizontal = 16.dp) ) { // Bitrate Text in Chinese Text( text = "码率: ${bitrateState.longValue / 1000} kbps", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) // Line Chart for Bitrate LineChart( data = bitrateHistory.map { it / 1000f }, // Convert to kbps modifier = Modifier .height(200.dp) .fillMaxWidth() .padding(vertical = 8.dp), lineColor = Color.Blue, backgroundColor = Color.Black, yAxisLabel = "码率 (kbps)", xAxisLabel = "时间 (秒)" ) } // Spacer between metrics and stuttering indicator Spacer(modifier = Modifier.height(16.dp)) // Stuttering Indicator Row( modifier = Modifier .fillMaxWidth() .padding(horizontal = 16.dp), verticalAlignment = Alignment.CenterVertically ) { if (stutteringState.value) { Icon( imageVector = Icons.Default.Warning, contentDescription = "卡顿警告", tint = Color.Red, modifier = Modifier.size(24.dp) ) Spacer(modifier = Modifier.width(8.dp)) Column { Text( text = "视频播放出现卡顿", color = Color.Red, style = MaterialTheme.typography.bodyMedium ) // Additional information about which metrics are abnormal if (frameRateLowState.value) { Text( text = "帧率过低: ${frameRateState.doubleValue.roundToInt()} fps", color = Color.Red, style = MaterialTheme.typography.bodySmall ) } if (packetLossHighState.value) { Text( text = "包丢失率过高: ${packetLossState.doubleValue.roundToInt()}%", color = Color.Red, style = MaterialTheme.typography.bodySmall ) } } } else { Icon( imageVector = Icons.Default.CheckCircle, contentDescription = "正常", tint = Color.Green, modifier = Modifier.size(24.dp) ) Spacer(modifier = Modifier.width(8.dp)) Text( text = "视频播放正常", color = Color.Green, style = MaterialTheme.typography.bodyMedium ) } } } LaunchedEffect(Unit) { val options = PeerConnectionFactory.InitializationOptions.builder(context) .createInitializationOptions() PeerConnectionFactory.initialize(options) val encoderFactory = DefaultVideoEncoderFactory( EglBase.create().eglBaseContext, true, true ) val decoderFactory = DefaultVideoDecoderFactory(remoteEglBase!!.eglBaseContext) peerConnectionFactory = PeerConnectionFactory.builder() .setVideoEncoderFactory(encoderFactory) .setVideoDecoderFactory(decoderFactory) .createPeerConnectionFactory() createPeerConnection( context, peerConnectionFactory, remoteView!! ) { localPeer = it } initializeSocketIO() requestPermissionsIfNeeded() } } } private fun initializeSocketIO() { val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http" val socketUrl = signalingServerUrl val options = IO.Options().apply { transports = arrayOf("websocket") secure = protocol == "https" path = "/socket.io/" } try { socket = IO.socket(socketUrl, options) socket.on(Socket.EVENT_CONNECT) { Log.d(TAG, "Socket connected") socket.emit("join", currentRoom) Log.d(TAG, "Joined room: $currentRoom") initiateCall() startPeriodicTimeSync() } socket.on(Socket.EVENT_CONNECT_ERROR) { args -> if (args.isNotEmpty()) { val error = args[0] Log.e(TAG, "Socket connection error: $error") } } socket.on(Socket.EVENT_DISCONNECT) { args -> if (args.isNotEmpty()) { val reason = args[0] Log.d(TAG, "Socket disconnected: $reason") } } socket.on("signal") { args -> Log.d(TAG, "Received signaling: ${args[0]}") if (args.isNotEmpty() && args[0] is JSONObject) { val data = args[0] as JSONObject handleSignalingData(data) } } socket.connect() Log.d(TAG, "Connecting to Socket: $socketUrl...") } catch (e: Exception) { Log.e(TAG, "Error connecting to Socket: ${e.message}") } } private fun requestPermissionsIfNeeded() { val permissions = arrayOf( Manifest.permission.CAMERA, Manifest.permission.RECORD_AUDIO, Manifest.permission.INTERNET, Manifest.permission.ACCESS_NETWORK_STATE ) val permissionsToRequest = permissions.filter { ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED } if (permissionsToRequest.isNotEmpty()) { requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray()) } else { onPermissionsChecked() } } private fun onPermissionsChecked() { Toast.makeText(requireContext(), "所有必要的权限已授予", Toast.LENGTH_SHORT).show() } private fun createPeerConnection( context: Context, peerConnectionFactory: PeerConnectionFactory, remoteView: SurfaceViewRenderer, onLocalPeerCreated: (PeerConnection) -> Unit ) { val iceServers = listOf( PeerConnection.IceServer.builder(stunUrl).createIceServer(), PeerConnection.IceServer.builder(turnUrl) .setUsername(turnUsername) .setPassword(turnPassword) .createIceServer() ) val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply { bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN } localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer { override fun onIceCandidate(iceCandidate: IceCandidate?) { iceCandidate?.let { Log.d(TAG, "ICE candidate: $it") val signalData = JSONObject().apply { put("type", "ice") put("candidate", JSONObject().apply { put("sdpMid", it.sdpMid) put("sdpMLineIndex", it.sdpMLineIndex) put("candidate", it.sdp) }) put("room", currentRoom) } socket.emit("signal", signalData) } } override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) { Log.d(TAG, "ICE candidates removed") } override fun onSignalingChange(newState: PeerConnection.SignalingState?) { Log.d(TAG, "Signaling state changed to: $newState") } override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) { Log.d(TAG, "ICE connection state changed to: $newState") } override fun onIceConnectionReceivingChange(receiving: Boolean) { Log.d(TAG, "ICE connection receiving change: $receiving") } override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) { Log.d(TAG, "ICE gathering state changed to: $newState") } override fun onAddStream(stream: MediaStream?) { Log.d(TAG, "Stream added") } override fun onRemoveStream(stream: MediaStream?) { Log.d(TAG, "Stream removed") } override fun onDataChannel(dataChannel: DataChannel?) { Log.d(TAG, "Data channel created") } override fun onRenegotiationNeeded() { Log.d(TAG, "Renegotiation needed") } override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) { Log.d(TAG, "Track added") receiver?.track()?.let { track -> if (track is VideoTrack) { track.addSink(remoteView) } } } override fun onTrack(transceiver: RtpTransceiver?) { Log.d(TAG, "onTrack called") transceiver?.receiver?.track()?.let { track -> if (track is VideoTrack) { track.addSink(remoteView) } } } override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) { Log.d(TAG, "Connection state changed to: $newState") } }) onLocalPeerCreated(localPeer!!) // Start collecting statistics startStatsCollection() } private fun initiateCall() { Log.d(TAG, "Initiating call...") val constraints = MediaConstraints().apply { mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true")) mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true")) } localPeer?.createOffer(object : SdpObserver { override fun onCreateSuccess(sessionDescription: SessionDescription?) { sessionDescription?.let { sdp -> localPeer?.setLocalDescription(object : SdpObserver { override fun onSetSuccess() { val signalData = JSONObject().apply { put("type", "offer") put("sdp", JSONObject().put("sdp", sdp.description)) put("room", currentRoom) } socket.emit("signal", signalData) } override fun onSetFailure(error: String?) { Log.e(TAG, "Set local description error: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } } override fun onSetSuccess() {} override fun onCreateFailure(error: String?) { Log.e(TAG, "Create offer error: $error") } override fun onSetFailure(error: String?) {} }, constraints) } private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "answer" -> { Log.d(TAG, "Received answer") val sdp = SessionDescription( SessionDescription.Type.ANSWER, data.getJSONObject("sdp").getString("sdp") ) localPeer?.setRemoteDescription(object : SdpObserver { override fun onSetSuccess() { pendingIceCandidates.forEach { candidate -> localPeer?.addIceCandidate(candidate) } pendingIceCandidates.clear() Log.d(TAG, "Set remote description (answer) success") } override fun onSetFailure(error: String?) { Log.e(TAG, "Set remote description error: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } "ice" -> { Log.d(TAG, "Received ICE candidate") val candidateData = data.getJSONObject("candidate") val candidate = IceCandidate( candidateData.getString("sdpMid"), candidateData.getInt("sdpMLineIndex"), candidateData.getString("candidate") ) if (localPeer?.remoteDescription != null) { localPeer?.addIceCandidate(candidate) } else { pendingIceCandidates.add(candidate) } } "time_sync_request" -> { val requestT1 = data.getLong("t1") val requestT2 = System.currentTimeMillis() val requestT3 = System.currentTimeMillis() // Calculate OWD as t3 - t2 owd = requestT3 - requestT2 Log.d(TAG, "OWD calculated: $owd ms") // Respond with time_sync_response val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("t1", requestT1) put("t2", requestT2) put("t3", requestT3) put("room", currentRoom) } socket.emit("signal", syncResponse) Log.d(TAG, "Sent time_sync_response: $syncResponse") // Update OWD states viewLifecycleOwner.lifecycleScope.launch { latestLatencyState.longValue = owd // Update history latencyHistory.add(owd) if (latencyHistory.size > 60) { latencyHistory.removeAt(0) } // Calculate max, min, and average OWD val maxLatency = latencyHistory.maxOrNull() ?: 0L val minLatency = latencyHistory.minOrNull() ?: 0L val averageLatency = if (latencyHistory.isNotEmpty()) { latencyHistory.average().toLong() } else { 0L } maxLatencyState.longValue = maxLatency minLatencyState.longValue = minLatency averageLatencyState.longValue = averageLatency Log.d(TAG, "OWD - Latest: $owd ms, Max: $maxLatency ms, Min: $minLatency ms, Average: $averageLatency ms") } } else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } } private fun startPeriodicTimeSync(intervalMs: Long = 5000L) { timeSyncJob = viewLifecycleOwner.lifecycleScope.launch { while (isActive) { requestTimeSync() delay(intervalMs) } } } private fun requestTimeSync() { t1 = System.currentTimeMillis() val syncRequest = JSONObject().apply { put("type", "time_sync_request") put("room", currentRoom) put("t1", t1) } socket.emit("signal", syncRequest) Log.d(TAG, "Sent time_sync_request at t1: $t1") } private fun startStatsCollection() { Log.d(TAG, "Starting stats collection...") statsJob = viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(1000) // Collect stats every second Log.d(TAG, "Collecting stats...") localPeer?.getStats { report -> Log.d(TAG, "Stats report obtained.") parseStatsReport(report) } ?: Log.e(TAG, "Failed to get stats: localPeer is null.") } } } private fun parseStatsReport(report: RTCStatsReport) { Log.d(TAG, "Received RTCStatsReport: $report") for (stats in report.statsMap.values) { if (stats.type == "inbound-rtp") { val kind = stats.members["kind"] as? String if (kind == "video") { val framesDecoded = (stats.members["framesDecoded"] as? Number)?.toDouble() ?: 0.0 val framesReceived = (stats.members["framesReceived"] as? Number)?.toDouble() ?: 0.0 val framesDropped = (stats.members["framesDropped"] as? Number)?.toDouble() ?: 0.0 val bytesReceived = (stats.members["bytesReceived"] as? Number)?.toDouble() ?: 0.0 val packetsLost = (stats.members["packetsLost"] as? Number)?.toDouble() ?: 0.0 val packetsReceived = (stats.members["packetsReceived"] as? Number)?.toDouble() ?: 1.0 // Avoid division by zero val packetLossFraction = packetsLost / (packetsLost + packetsReceived) val timestamp = stats.timestampUs / 1_000_000.0 // Convert to seconds Log.d( TAG, "Stats - Frames Decoded: $framesDecoded, Frames Received: $framesReceived, Frames Dropped: $framesDropped, Bytes Received: $bytesReceived, Packet Loss Fraction: $packetLossFraction, Timestamp: $timestamp" ) if (prevTimestamp != 0.0) { val timeElapsed = timestamp - prevTimestamp val framesDelta = framesDecoded - prevFramesDecoded val bytesDelta = bytesReceived - prevBytesReceived val framesReceivedDelta = framesReceived - prevFramesReceived val framesDroppedDelta = framesDropped - prevFramesDropped val frameRate = if (timeElapsed > 0) framesDelta / timeElapsed else 0.0 val bitrate = if (timeElapsed > 0) (bytesDelta * 8) / timeElapsed else 0.0 // bits per second val packetLoss = packetLossFraction * 100 // Convert to percentage Log.d(TAG, "Calculated Frame Rate: $frameRate fps, Bitrate: $bitrate bps, Packet Loss: $packetLoss%") // Determine stuttering based on thresholds val isStuttering = frameRate < 24 || packetLoss > 5.0 // Thresholds can be adjusted // Update states viewLifecycleOwner.lifecycleScope.launch { frameRateState.doubleValue = frameRate bitrateState.longValue = bitrate.toLong() // Update stuttering state stutteringState.value = isStuttering // Update specific stuttering causes frameRateLowState.value = frameRate < 24 packetLossHighState.value = packetLoss > 5.0 packetLossState.doubleValue = packetLoss // Update history frameRateHistory.add(frameRate.toFloat()) if (frameRateHistory.size > 60) { frameRateHistory.removeAt(0) } bitrateHistory.add(bitrate.toLong()) if (bitrateHistory.size > 60) { bitrateHistory.removeAt(0) } Log.d( TAG, "Updated Frame Rate: ${frameRateState.doubleValue} fps, Bitrate: ${bitrateState.longValue / 1000} kbps, Stuttering: $isStuttering" ) } } // Update previous values prevFramesDecoded = framesDecoded prevBytesReceived = bytesReceived prevFramesReceived = framesReceived prevFramesDropped = framesDropped prevTimestamp = timestamp } } } } /** * Enhanced Line Chart Composable with Axes and Labels * Modified to use straight lines connecting each point */ @Composable fun LineChart( data: List<Float>, modifier: Modifier = Modifier, lineColor: Color = Color.Green, backgroundColor: Color = Color.Black, yAxisLabel: String = "", xAxisLabel: String = "", minYValue: Float? = null, maxYValue: Float? = null ) { Canvas(modifier = modifier.background(backgroundColor)) { val padding = 40.dp.toPx() // Padding for axes and labels if (data.isEmpty()) return@Canvas val maxY = maxYValue ?: data.maxOrNull() ?: 1f val minY = minYValue ?: data.minOrNull() ?: 0f val yRange = maxY - minY val pointCount = data.size val spacing = (size.width - padding * 2) / (pointCount - 1).coerceAtLeast(1) val points = data.mapIndexed { index, value -> val x = padding + index * spacing val y = if (yRange == 0f) size.height / 2 else (size.height - padding) - ((value - minY) / yRange) * (size.height - padding * 2) Offset(x, y) } // Draw axes drawLine( color = Color.White, start = Offset(padding, padding), end = Offset(padding, size.height - padding), strokeWidth = 2f ) drawLine( color = Color.White, start = Offset(padding, size.height - padding), end = Offset(size.width - padding, size.height - padding), strokeWidth = 2f ) // Draw y-axis labels val yLabelCount = 5 val yStep = yRange / (yLabelCount - 1) for (i in 0 until yLabelCount) { val yValue = minY + i * yStep val yPos = (size.height - padding) - ((yValue - minY) / yRange) * (size.height - padding * 2) drawContext.canvas.nativeCanvas.apply { val label = yValue.roundToInt().toString() val textPaint = android.graphics.Paint().apply { color = android.graphics.Color.WHITE textSize = 24f textAlign = android.graphics.Paint.Align.RIGHT } drawText( label, padding - 8f, yPos + textPaint.textSize / 2, textPaint ) } } // Draw x-axis labels val xLabelCount = 5 val xStep = (pointCount - 1).coerceAtLeast(1) / (xLabelCount - 1).coerceAtLeast(1) for (i in 0 until xLabelCount) { val index = i * xStep val xPos = padding + index * spacing drawContext.canvas.nativeCanvas.apply { val label = index.toString() val textPaint = android.graphics.Paint().apply { color = android.graphics.Color.WHITE textSize = 24f textAlign = android.graphics.Paint.Align.CENTER } drawText( label, xPos, size.height - padding + textPaint.textSize + 4f, textPaint ) } } // Optionally, draw axis labels // Y-Axis Label if (yAxisLabel.isNotEmpty()) { drawContext.canvas.nativeCanvas.apply { val textPaint = android.graphics.Paint().apply { color = android.graphics.Color.WHITE textSize = 24f textAlign = android.graphics.Paint.Align.CENTER isAntiAlias = true } // Rotate for vertical text save() rotate(-90f, padding / 2, size.height / 2) drawText( yAxisLabel, padding / 2, size.height / 2, textPaint ) restore() } } // X-Axis Label if (xAxisLabel.isNotEmpty()) { drawContext.canvas.nativeCanvas.apply { val textPaint = android.graphics.Paint().apply { color = android.graphics.Color.WHITE textSize = 24f textAlign = android.graphics.Paint.Align.CENTER isAntiAlias = true } drawText( xAxisLabel, size.width / 2, size.height - padding / 2, textPaint ) } } // Draw the straight lines connecting points if (points.size >= 2) { for (i in 0 until points.size - 1) { drawLine( color = lineColor, start = points[i], end = points[i + 1], strokeWidth = 4f, cap = StrokeCap.Round ) } } // Optionally, draw points points.forEach { point -> drawCircle( color = lineColor, radius = 4f, center = point ) } } } private fun requestTimeSync() { t1 = System.currentTimeMillis() val syncRequest = JSONObject().apply { put("type", "time_sync_request") put("room", currentRoom) put("t1", t1) } socket.emit("signal", syncRequest) Log.d(TAG, "Sent time_sync_request at t1: $t1") } private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "answer" -> { Log.d(TAG, "Received answer") val sdp = SessionDescription( SessionDescription.Type.ANSWER, data.getJSONObject("sdp").getString("sdp") ) localPeer?.setRemoteDescription(object : SdpObserver { override fun onSetSuccess() { pendingIceCandidates.forEach { candidate -> localPeer?.addIceCandidate(candidate) } pendingIceCandidates.clear() Log.d(TAG, "Set remote description (answer) success") } override fun onSetFailure(error: String?) { Log.e(TAG, "Set remote description error: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } "ice" -> { Log.d(TAG, "Received ICE candidate") val candidateData = data.getJSONObject("candidate") val candidate = IceCandidate( candidateData.getString("sdpMid"), candidateData.getInt("sdpMLineIndex"), candidateData.getString("candidate") ) if (localPeer?.remoteDescription != null) { localPeer?.addIceCandidate(candidate) } else { pendingIceCandidates.add(candidate) } } "time_sync_response" -> { val responseT1 = data.getLong("t1") val responseT2 = data.getLong("t2") val responseT3 = data.getLong("t3") val t4 = System.currentTimeMillis() // Calculate RTT on Initiator // Since this is the receiver, we don't need to calculate RTT here. // Instead, calculate OWD as t3 - t2 val calculatedOWD = responseT3 - responseT2 Log.d(TAG, "OWD calculated: $calculatedOWD ms") // Update OWD states viewLifecycleOwner.lifecycleScope.launch { latestLatencyState.longValue = calculatedOWD // Update history latencyHistory.add(calculatedOWD) if (latencyHistory.size > 60) { latencyHistory.removeAt(0) } // Calculate max, min, and average OWD val maxLatency = latencyHistory.maxOrNull() ?: 0L val minLatency = latencyHistory.minOrNull() ?: 0L val averageLatency = if (latencyHistory.isNotEmpty()) { latencyHistory.average().toLong() } else { 0L } maxLatencyState.longValue = maxLatency minLatencyState.longValue = minLatency averageLatencyState.longValue = averageLatency Log.d(TAG, "OWD - Latest: $calculatedOWD ms, Max: $maxLatency ms, Min: $minLatency ms, Average: $averageLatency ms") } } else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } } override fun onDestroyView() { super.onDestroyView() statsJob?.cancel() socket.disconnect() localPeer?.dispose() remoteView?.release() remoteEglBase?.release() timeSyncJob?.cancel() } }

Detailed Explanation of Changes

1. Time Synchronization in Initiator (VideoInitiatorFragment)

  • Variables Added:

    • private var t1: Long = 0L
    • private var t4: Long = 0L
  • startTimeSync() Function:

    • Periodically sends time_sync_request every 5 seconds.
    • Records the timestamp t1 when the request is sent.
  • handleSignalingData() Function:

    • Handling time_sync_response:
      • Extracts t1, t2, and t3 from the response.
      • Captures t4 as the current time.
      • Calculates RTT as t4 - t1.
      • Updates the RTT state variables and history for UI display.
  • requestTimeSync() Function:

    • Constructs and sends the time_sync_request with the current t1.

2. Time Synchronization in Receiver (VideoReceiverFragment)

  • Variables Added:

    • private var owd: Long = 0L
  • handleSignalingData() Function:

    • Handling time_sync_request:

      • Extracts t1 from the request.
      • Captures t2 as the time when the request is received.
      • Captures t3 as the time when the response is sent.
      • Calculates OWD as t3 - t2.
      • Constructs and sends the time_sync_response with t1, t2, and t3.
      • Updates the OWD state variables and history for UI display.
    • Handling time_sync_response:

      • Although the receiver doesn't need to handle time_sync_response for OWD (as it's handled in the initiator), it's included to maintain symmetry and potential future use.

3. UI Updates

  • Initiator:

    • Displays RTT metrics (latest, max, min, average) in the UI.
  • Receiver:

    • Displays OWD metrics (latest, max, min, average) in the UI.

4. Cleanup

  • Ensured that all coroutines are properly canceled in the onDestroyView() method to prevent memory leaks.

5. Removed Redundant Code

  • Removed commented-out code related to time synchronization in both fragments to clean up the codebase.

Conclusion

By implementing a unified time synchronization mechanism, the initiator efficiently calculates RTT, while the receiver calculates OWD. This ensures accurate latency measurements on both ends, enhancing the overall quality monitoring of the WebRTC connection. The above modifications integrate seamlessly with your existing codebase, providing clear and concise latency metrics within your application's UI.

分享这个问答