问题

class VideoInitiatorFragment : Fragment() {

companion object {
fun newInstance(
room: String,
stunUrl: String,
turnUrl: String,
turnUsername: String,
turnPassword: String,
signalingServerUrl: String
): VideoInitiatorFragment {
val fragment = VideoInitiatorFragment()
val args = Bundle().apply {
putString("room", room)
putString("stunUrl", stunUrl)
putString("turnUrl", turnUrl)
putString("turnUsername", turnUsername)
putString("turnPassword", turnPassword)
putString("signalingServerUrl", signalingServerUrl)
}
fragment.arguments = args
return fragment
}
}

// Class member variables remain unchanged
private lateinit var socket: Socket
private var localPeer: PeerConnection? = null
private var localView: SurfaceViewRenderer? = null
private var localEglBase: EglBase? = null
private val pendingIceCandidates = mutableListOf<IceCandidate>()
private var currentRoom: String? = null
private lateinit var signalingServerUrl: String
private lateinit var stunUrl: String
private lateinit var turnUrl: String
private lateinit var turnUsername: String
private lateinit var turnPassword: String

private val TAG: String = "WebRTC-Initiator"

private val maxRttState = mutableLongStateOf(0L)
private val minRttState = mutableLongStateOf(Long.MAX_VALUE)
private val averageRttState = mutableLongStateOf(0L)
private val latestRttState = mutableLongStateOf(0L)
private val rttHistory = mutableStateListOf<Long>()

private var statsJob: Job? = null

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

// Retrieve data from arguments
currentRoom = arguments?.getString("room") ?: "default-room"
signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg"
stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478"
turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349"
turnUsername = arguments?.getString("turnUsername") ?: "wstszx"
turnPassword = arguments?.getString("turnPassword") ?: "930379"

Log.d(
TAG,
"onCreate: Role = Initiator, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl"
)
}

private val requestPermissionsLauncher =
registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions ->
permissions.entries.forEach { (permission, isGranted) ->
if (isGranted) {
Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show()
} else {
Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show()
}
}
onPermissionsChecked()
}

override fun onCreateView(
inflater: android.view.LayoutInflater,
container: android.view.ViewGroup?,
savedInstanceState: Bundle?
): android.view.View {
return ComposeView(requireContext()).apply {
setContent {
WebRTCComposeLayout()
}
}
}

@Composable
fun WebRTCComposeLayout() {
val context = LocalContext.current
lateinit var peerConnectionFactory: PeerConnectionFactory
var localVideoTrack: VideoTrack? by remember { mutableStateOf(null) }

Surface(color = Color.Black) {
Column(modifier = Modifier.fillMaxSize()) {

// Local video view
AndroidView(
factory = {
localView = SurfaceViewRenderer(it).apply {
setZOrderMediaOverlay(false)
}
localView!!
},
modifier = Modifier
.weight(1f)
.fillMaxWidth(),
update = {
if (localEglBase == null) {
localEglBase = EglBase.create()
it.init(localEglBase!!.eglBaseContext, null)
it.setMirror(false)
}
}
)

Spacer(modifier = Modifier.height(8.dp))

}

LaunchedEffect(Unit) {
val options = PeerConnectionFactory.InitializationOptions.builder(context)
.createInitializationOptions()
PeerConnectionFactory.initialize(options)

val encoderFactory = DefaultVideoEncoderFactory(
localEglBase!!.eglBaseContext, true, true
)

val decoderFactory = DefaultVideoDecoderFactory(localEglBase!!.eglBaseContext)

peerConnectionFactory = PeerConnectionFactory.builder()
.setVideoEncoderFactory(encoderFactory)
.setVideoDecoderFactory(decoderFactory)
.createPeerConnectionFactory()

initLocalVideo(context, localView, peerConnectionFactory, localEglBase!!) {
localVideoTrack = it
}

createPeerConnection(
context,
peerConnectionFactory,
localVideoTrack
) {
localPeer = it
startStatsCollection() // Start collecting statistics
}

initializeSocketIO()

requestPermissionsIfNeeded()
startTimeSync()
}
}
}

private fun startTimeSync(intervalMs: Long = 5000L) {
viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(intervalMs) // Sync every 5 seconds
requestTimeSync()
}
}
}

private fun startStatsCollection() {
statsJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(1000) // Collect stats every second
localPeer?.getStats { report ->
parseStatsReport(report)
} ?: Log.e(TAG, "Failed to get stats: localPeer is null.")
}
}
}

private fun parseStatsReport(report: RTCStatsReport) {
Log.d(TAG, "RTCStatsReport: ${report.statsMap}")
for (stats in report.statsMap.values) {
Log.d(TAG, "Stats type: ${stats.type}")
if (stats.type == "transport") {
Log.d(TAG, "Transport Stats found: $stats")
val currentRtt = (stats.members["currentRoundTripTime"] as? Number)?.toDouble()?.times(1000)?.toLong()
if (currentRtt != null && currentRtt > 0) {
viewLifecycleOwner.lifecycleScope.launch {
// Update latest RTT
latestRttState.longValue = currentRtt

// Update RTT history
rttHistory.add(currentRtt)
if (rttHistory.size > 60) {
rttHistory.removeAt(0)
}

// Calculate max, min, and average RTT
val maxRtt = rttHistory.maxOrNull() ?: 0L
val minRtt = rttHistory.minOrNull() ?: 0L
val averageRtt = if (rttHistory.isNotEmpty()) {
rttHistory.average().toLong()
} else {
0L
}

maxRttState.longValue = maxRtt
minRttState.longValue = minRtt
averageRttState.longValue = averageRtt

Log.d(TAG, "RTT - Latest: $currentRtt ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms")
}
}
}
}
}

private fun initializeSocketIO() {
val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http"
val socketUrl = signalingServerUrl

val options = IO.Options().apply {
transports = arrayOf("websocket")
secure = protocol == "https"
path = "/socket.io/"
}

try {
socket = IO.socket(socketUrl, options)

socket.on(Socket.EVENT_CONNECT) {
Log.d(TAG, "Socket connected")
socket.emit("join", currentRoom)
Log.d(TAG, "Joined room: $currentRoom")
}

socket.on(Socket.EVENT_CONNECT_ERROR) { args ->
if (args.isNotEmpty()) {
val error = args[0]
Log.e(TAG, "Socket connection error: $error")
}
}

socket.on(Socket.EVENT_DISCONNECT) { args ->
if (args.isNotEmpty()) {
val reason = args[0]
Log.d(TAG, "Socket disconnected: $reason")
}
}

socket.on("signal") { args ->
Log.d(TAG, "Received signaling: ${args[0]}")
if (args.isNotEmpty() && args[0] is JSONObject) {
val data = args[0] as JSONObject
handleSignalingData(data)
}
}

socket.connect()
Log.d(TAG, "Connecting to Socket: $socketUrl...")
} catch (e: Exception) {
Log.e(TAG, "Error connecting to Socket: ${e.message}")
}
}

private fun requestPermissionsIfNeeded() {
val permissions = arrayOf(
Manifest.permission.CAMERA,
Manifest.permission.RECORD_AUDIO,
Manifest.permission.INTERNET,
Manifest.permission.ACCESS_NETWORK_STATE
)

val permissionsToRequest = permissions.filter {
ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED
}

if (permissionsToRequest.isNotEmpty()) {
requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray())
} else {
onPermissionsChecked()
}
}

private fun onPermissionsChecked() {
Toast.makeText(requireContext(), "所有必要权限已被授予", Toast.LENGTH_SHORT).show()
}

private fun initLocalVideo(
context: Context,
localView: SurfaceViewRenderer?,
peerConnectionFactory: PeerConnectionFactory,
eglBase: EglBase,
onLocalVideoTrack: (VideoTrack) -> Unit
) {
val videoCapturer = createCameraCapturer(context)
val surfaceTextureHelper = SurfaceTextureHelper.create("CaptureThread", eglBase.eglBaseContext)
val videoSource = peerConnectionFactory.createVideoSource(videoCapturer.isScreencast)
videoCapturer.initialize(surfaceTextureHelper, context, videoSource.capturerObserver)

videoCapturer.startCapture(1920, 1080, 60)

val localVideoTrack = peerConnectionFactory.createVideoTrack("video_track", videoSource)
localVideoTrack.addSink(localView)

val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints())
val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource)

// Add audio and video tracks to local stream
val mediaStream = peerConnectionFactory.createLocalMediaStream("local_stream")
mediaStream.addTrack(localAudioTrack)
mediaStream.addTrack(localVideoTrack)

onLocalVideoTrack(localVideoTrack)
}

private fun createCameraCapturer(context: Context): CameraVideoCapturer {
val camera2Enumerator = Camera2Enumerator(context)
val deviceNames = camera2Enumerator.deviceNames

// Prefer back-facing camera
for (deviceName in deviceNames) {
if (camera2Enumerator.isBackFacing(deviceName)) {
val capturer = camera2Enumerator.createCapturer(deviceName, null)
if (capturer != null) {
return capturer
}
}
}

// Fallback to front-facing camera
for (deviceName in deviceNames) {
if (camera2Enumerator.isFrontFacing(deviceName)) {
val capturer = camera2Enumerator.createCapturer(deviceName, null)
if (capturer != null) {
return capturer
}
}
}

// Fallback to first available camera
return camera2Enumerator.createCapturer(deviceNames[0], null)
?: throw IllegalStateException("Unable to create camera capturer")
}

private fun createPeerConnection(
context: Context,
peerConnectionFactory: PeerConnectionFactory,
localVideoTrack: VideoTrack?,
onLocalPeerCreated: (PeerConnection) -> Unit
) {
val iceServers = listOf(
PeerConnection.IceServer.builder(stunUrl).createIceServer(),
PeerConnection.IceServer.builder(turnUrl)
.setUsername(turnUsername)
.setPassword(turnPassword)
.createIceServer()
)

val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply {
bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE
rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE
tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED
continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY
sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN
}

localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer {
override fun onIceCandidate(iceCandidate: IceCandidate?) {
iceCandidate?.let {
Log.d(TAG, "ICE candidate: $it")
val signalData = JSONObject().apply {
put("type", "ice")
put("candidate", JSONObject().apply {
put("sdpMid", it.sdpMid)
put("sdpMLineIndex", it.sdpMLineIndex)
put("candidate", it.sdp)
})
put("room", currentRoom)
}
socket.emit("signal", signalData)
}
}

override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) {
Log.d(TAG, "ICE candidates removed")
}

override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
Log.d(TAG, "Signaling state changed to: $newState")
}

override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
Log.d(TAG, "ICE connection state changed to: $newState")
}

override fun onIceConnectionReceivingChange(receiving: Boolean) {
Log.d(TAG, "ICE connection receiving change: $receiving")
}

override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
Log.d(TAG, "ICE gathering state changed to: $newState")
}

override fun onAddStream(stream: MediaStream?) {
Log.d(TAG, "Stream added")
}

override fun onRemoveStream(stream: MediaStream?) {
Log.d(TAG, "Stream removed")
}

override fun onDataChannel(dataChannel: DataChannel?) {
Log.d(TAG, "Data channel created")
}

override fun onRenegotiationNeeded() {
Log.d(TAG, "Renegotiation needed")
}

override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) {
Log.d(TAG, "Track added")
}

override fun onTrack(transceiver: RtpTransceiver?) {
Log.d(TAG, "onTrack called")
}

override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
Log.d(TAG, "Connection state changed to: $newState")
}
})

localVideoTrack?.let {
localPeer?.addTrack(it, listOf("local_stream"))
}
val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints())
val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource)
localPeer?.addTrack(localAudioTrack, listOf("local_stream"))

onLocalPeerCreated(localPeer!!)
}

private fun createAnswer(peerConnection: PeerConnection, onAnswerCreated: (String) -> Unit) {
Log.d(TAG, "Creating answer...")
val constraints = MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true"))
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"))
}

peerConnection.createAnswer(object : SdpObserver {
override fun onCreateSuccess(sessionDescription: SessionDescription?) {
sessionDescription?.let { sdp ->
peerConnection.setLocalDescription(object : SdpObserver {
override fun onSetSuccess() {
Log.d(TAG, "SetLocalDescription onSetSuccess")
onAnswerCreated(sdp.description)
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "SetLocalDescription onSetFailure: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}
}

override fun onSetSuccess() {
Log.d(TAG, "createAnswer onSetSuccess")
}

override fun onCreateFailure(error: String?) {
Log.e(TAG, "createAnswer onCreateFailure: $error")
}

override fun onSetFailure(error: String?) {}
}, constraints)
}

private fun handleSignalingData(data: JSONObject) {
Log.d(TAG, "Handling signaling data: $data")
when (data.getString("type")) {
"offer" -> {
Log.d(TAG, "Received offer")
val sdp = SessionDescription(
SessionDescription.Type.OFFER,
data.getJSONObject("sdp").getString("sdp")
)
localPeer?.setRemoteDescription(object : SdpObserver {
override fun onSetSuccess() {
Log.d(TAG, "Set remote description (offer) success")
createAnswer(localPeer!!) { answer ->
val signalData = JSONObject().apply {
put("type", "answer")
put("sdp", JSONObject().put("sdp", answer))
put("room", currentRoom)
}

socket.emit("signal", signalData)

pendingIceCandidates.forEach { candidate ->
localPeer?.addIceCandidate(candidate)
}
pendingIceCandidates.clear()
}
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set remote description (offer) error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}

"ice" -> {
Log.d(TAG, "Received ICE candidate")
val candidateData = data.getJSONObject("candidate")
val candidate = IceCandidate(
candidateData.getString("sdpMid"),
candidateData.getInt("sdpMLineIndex"),
candidateData.getString("candidate")
)

if (localPeer?.remoteDescription != null) {
localPeer?.addIceCandidate(candidate)
} else {
pendingIceCandidates.add(candidate)
}
}

"time_sync_response" -> {
Log.d(TAG, "Received time_sync_response")
val t1 = data.getLong("t1")
val t2 = data.getLong("t2")
val t3 = data.getLong("t3")
val t4 = System.currentTimeMillis()

val RTT = t4 - t1
val OWD = t2 - t1 // One-Way Delay from Initiator to Receiver

Log.d(TAG, "Time Sync: RTT=$RTT ms, OWD=$OWD ms")

viewLifecycleOwner.lifecycleScope.launch {
// Update RTT state
latestRttState.longValue = RTT

// Update RTT history
rttHistory.add(RTT)
if (rttHistory.size > 60) {
rttHistory.removeAt(0)
}

// Calculate max, min, and average RTT
val maxRtt = rttHistory.maxOrNull() ?: 0L
val minRtt = rttHistory.minOrNull() ?: 0L
val averageRtt = if (rttHistory.isNotEmpty()) {
rttHistory.average().toLong()
} else {
0L
}

maxRttState.longValue = maxRtt
minRttState.longValue = minRtt
averageRttState.longValue = averageRtt


Log.d(TAG, "RTT - Latest: $RTT ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms")
Log.d(TAG, "OWD - Latest: $OWD ms")
}
}

else -> {
Log.e(TAG, "Unknown signaling type: ${data.getString("type")}")
}
}
}

private fun requestTimeSync() {
val t1 = System.currentTimeMillis()
val syncRequest = JSONObject().apply {
put("type", "time_sync_request")
put("room", currentRoom)
put("t1", t1)
}
socket.emit("signal", syncRequest)
Log.d(TAG, "Sent time sync request at t1: $t1")
}

override fun onDestroyView() {
super.onDestroyView()
statsJob?.cancel()
socket.disconnect()
localPeer?.dispose()
localView?.release()
localEglBase?.release()
}
}


class VideoReceiverFragment : Fragment() {

companion object {
fun newInstance(
room: String,
stunUrl: String,
turnUrl: String,
turnUsername: String,
turnPassword: String,
signalingServerUrl: String
): VideoReceiverFragment {
val fragment = VideoReceiverFragment()
val args = Bundle().apply {
putString("room", room)
putString("stunUrl", stunUrl)
putString("turnUrl", turnUrl)
putString("turnUsername", turnUsername)
putString("turnPassword", turnPassword)
putString("signalingServerUrl", signalingServerUrl)
}
fragment.arguments = args
return fragment
}
}

// Class member variables
private lateinit var socket: Socket
private var localPeer: PeerConnection? = null
private var remoteView: SurfaceViewRenderer? = null
private var remoteEglBase: EglBase? = null
private val pendingIceCandidates = mutableListOf<IceCandidate>()
private var currentRoom: String? = null
private lateinit var signalingServerUrl: String
private lateinit var stunUrl: String
private lateinit var turnUrl: String
private lateinit var turnUsername: String
private lateinit var turnPassword: String
private val TAG: String = "WebRTC-Receiver"

private val frameRateState = mutableDoubleStateOf(0.0)
private val bitrateState = mutableLongStateOf(0L)
private val stutteringState = mutableStateOf(false)

private val frameRateLowState = mutableStateOf(false)
private val packetLossHighState = mutableStateOf(false)
private val packetLossState = mutableDoubleStateOf(0.0)

private val frameRateHistory = mutableStateListOf<Float>()
private val bitrateHistory = mutableStateListOf<Long>()
private var timeSyncJob: Job? = null

// 添加历史记录列表用于存储所有单向时延值
private val latencyHistory = mutableStateListOf<Long>()

// 添加状态变量用于存储最大、最小和平均单向时延
private val maxLatencyState = mutableLongStateOf(0L)
private val minLatencyState = mutableLongStateOf(Long.MAX_VALUE)
private val averageLatencyState = mutableLongStateOf(0L)
private val latestLatencyState = mutableLongStateOf(0L)

// History variables
private var prevFramesDecoded = 0.0
private var prevBytesReceived = 0.0
private var prevFramesReceived = 0.0
private var prevFramesDropped = 0.0
private var prevTimestamp = 0.0
private var timeOffset: Long = 0
private var t1: Long = 0

private var statsJob: Job? = null

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

// Retrieve data from arguments
currentRoom = arguments?.getString("room") ?: "default-room"
signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg"
stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478"
turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349"
turnUsername = arguments?.getString("turnUsername") ?: "wstszx"
turnPassword = arguments?.getString("turnPassword") ?: "930379"

Log.d(
TAG,
"onCreate: Role = Client, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl"
)
}

private val requestPermissionsLauncher =
registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions ->
permissions.entries.forEach { (permission, isGranted) ->
if (isGranted) {
Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show()
} else {
Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show()
}
}
onPermissionsChecked()
}

override fun onCreateView(
inflater: android.view.LayoutInflater,
container: android.view.ViewGroup?,
savedInstanceState: Bundle?
): android.view.View {
return ComposeView(requireContext()).apply {
setContent {
WebRTCComposeLayout()
}
}
}

@Composable
fun WebRTCComposeLayout() {
val context = LocalContext.current
lateinit var peerConnectionFactory: PeerConnectionFactory

Surface(color = Color.Black) {
Column(modifier = Modifier.fillMaxSize()) {

// Remote video view
AndroidView(
factory = {
remoteView = SurfaceViewRenderer(it).apply {
setZOrderMediaOverlay(false)
}
remoteView!!
},
modifier = Modifier
.weight(1f)
.fillMaxWidth(),
update = {
if (remoteEglBase?.eglBaseContext == null) {
remoteEglBase = EglBase.create()
it.init(remoteEglBase!!.eglBaseContext, null)
it.setMirror(false)
}
}
)

Spacer(modifier = Modifier.height(8.dp))

// 展示最大、最小和平均单向时延
Column(modifier = Modifier.padding(horizontal = 16.dp)) {
Text(
text = "当前单向时延: ${latestLatencyState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最大单向时延: ${maxLatencyState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最小单向时延: ${minLatencyState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "平均单向时延: ${averageLatencyState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 8.dp)
)
}



// Spacer between video and charts
Spacer(modifier = Modifier.height(8.dp))

// Frame Rate Section
Column(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 8.dp)
) {
// Frame Rate Text in Chinese
Text(
text = "帧率: ${frameRateState.doubleValue.roundToInt()} fps",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Log.d(
TAG,
"UI - Frame Rate: ${frameRateState.doubleValue} fps, Bitrate: ${bitrateState.longValue / 1000} kbps"
)

// Line Chart for Frame Rate
LineChart(
data = frameRateHistory,
modifier = Modifier
.height(200.dp)
.fillMaxWidth()
.padding(vertical = 8.dp),
lineColor = Color.Green,
backgroundColor = Color.Black,
yAxisLabel = "帧率 (fps)",
xAxisLabel = "时间 (秒)"
)
}

// Spacer between charts
Spacer(modifier = Modifier.height(8.dp))

// Bitrate Section
Column(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 16.dp)
) {
// Bitrate Text in Chinese
Text(
text = "码率: ${bitrateState.longValue / 1000} kbps",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)

// Line Chart for Bitrate
LineChart(
data = bitrateHistory.map { it / 1000f }, // Convert to kbps
modifier = Modifier
.height(200.dp)
.fillMaxWidth()
.padding(vertical = 8.dp),
lineColor = Color.Blue,
backgroundColor = Color.Black,
yAxisLabel = "码率 (kbps)",
xAxisLabel = "时间 (秒)"
)
}

// Spacer between metrics and stuttering indicator
Spacer(modifier = Modifier.height(16.dp))

// Stuttering Indicator
Row(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 16.dp),
verticalAlignment = Alignment.CenterVertically
) {
if (stutteringState.value) {
Icon(
imageVector = Icons.Default.Warning,
contentDescription = "卡顿警告",
tint = Color.Red,
modifier = Modifier.size(24.dp)
)
Spacer(modifier = Modifier.width(8.dp))
Column {
Text(
text = "视频播放出现卡顿",
color = Color.Red,
style = MaterialTheme.typography.bodyMedium
)
// Additional information about which metrics are abnormal
if (frameRateLowState.value) {
Text(
text = "帧率过低: ${frameRateState.doubleValue.roundToInt()} fps",
color = Color.Red,
style = MaterialTheme.typography.bodySmall
)
}
if (packetLossHighState.value) {
Text(
text = "包丢失率过高: ${packetLossState.doubleValue.roundToInt()}%",
color = Color.Red,
style = MaterialTheme.typography.bodySmall
)
}
}
} else {
Icon(
imageVector = Icons.Default.CheckCircle,
contentDescription = "正常",
tint = Color.Green,
modifier = Modifier.size(24.dp)
)
Spacer(modifier = Modifier.width(8.dp))
Text(
text = "视频播放正常",
color = Color.Green,
style = MaterialTheme.typography.bodyMedium
)
}
}
}

LaunchedEffect(Unit) {
val options = PeerConnectionFactory.InitializationOptions.builder(context)
.createInitializationOptions()
PeerConnectionFactory.initialize(options)

val encoderFactory = DefaultVideoEncoderFactory(
EglBase.create().eglBaseContext, true, true
)

val decoderFactory = DefaultVideoDecoderFactory(remoteEglBase!!.eglBaseContext)

peerConnectionFactory = PeerConnectionFactory.builder()
.setVideoEncoderFactory(encoderFactory)
.setVideoDecoderFactory(decoderFactory)
.createPeerConnectionFactory()

createPeerConnection(
context,
peerConnectionFactory,
remoteView!!
) {
localPeer = it
}

initializeSocketIO()

requestPermissionsIfNeeded()
}
}
}

/**
* Enhanced Line Chart Composable with Axes and Labels
* Modified to use straight lines connecting each point
*/
@Composable
fun LineChart(
data: List<Float>,
modifier: Modifier = Modifier,
lineColor: Color = Color.Green,
backgroundColor: Color = Color.Black,
yAxisLabel: String = "",
xAxisLabel: String = "",
minYValue: Float? = null,
maxYValue: Float? = null
) {
Canvas(modifier = modifier.background(backgroundColor)) {
val padding = 40.dp.toPx() // Padding for axes and labels

if (data.isEmpty()) return@Canvas

val maxY = maxYValue ?: data.maxOrNull() ?: 1f
val minY = minYValue ?: data.minOrNull() ?: 0f
val yRange = maxY - minY
val pointCount = data.size
val spacing = (size.width - padding * 2) / (pointCount - 1).coerceAtLeast(1)

val points = data.mapIndexed { index, value ->
val x = padding + index * spacing
val y = if (yRange == 0f) size.height / 2 else (size.height - padding) - ((value - minY) / yRange) * (size.height - padding * 2)
Offset(x, y)
}

// Draw axes
drawLine(
color = Color.White,
start = Offset(padding, padding),
end = Offset(padding, size.height - padding),
strokeWidth = 2f
)
drawLine(
color = Color.White,
start = Offset(padding, size.height - padding),
end = Offset(size.width - padding, size.height - padding),
strokeWidth = 2f
)

// Draw y-axis labels
val yLabelCount = 5
val yStep = yRange / (yLabelCount - 1)
for (i in 0 until yLabelCount) {
val yValue = minY + i * yStep
val yPos = (size.height - padding) - ((yValue - minY) / yRange) * (size.height - padding * 2)

drawContext.canvas.nativeCanvas.apply {
val label = yValue.roundToInt().toString()
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.RIGHT
}
drawText(
label,
padding - 8f,
yPos + textPaint.textSize / 2,
textPaint
)
}
}

// Draw x-axis labels
val xLabelCount = 5
val xStep = (pointCount - 1).coerceAtLeast(1) / (xLabelCount - 1).coerceAtLeast(1)
for (i in 0 until xLabelCount) {
val index = i * xStep
val xPos = padding + index * spacing

drawContext.canvas.nativeCanvas.apply {
val label = index.toString()
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
}
drawText(
label,
xPos,
size.height - padding + textPaint.textSize + 4f,
textPaint
)
}
}

// Optionally, draw axis labels
// Y-Axis Label
if (yAxisLabel.isNotEmpty()) {
drawContext.canvas.nativeCanvas.apply {
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
isAntiAlias = true
}
// Rotate for vertical text
save()
rotate(-90f, padding / 2, size.height / 2)
drawText(
yAxisLabel,
padding / 2,
size.height / 2,
textPaint
)
restore()
}
}

// X-Axis Label
if (xAxisLabel.isNotEmpty()) {
drawContext.canvas.nativeCanvas.apply {
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
isAntiAlias = true
}
drawText(
xAxisLabel,
size.width / 2,
size.height - padding / 2,
textPaint
)
}
}

// Draw the straight lines connecting points
if (points.size >= 2) {
for (i in 0 until points.size - 1) {
drawLine(
color = lineColor,
start = points[i],
end = points[i + 1],
strokeWidth = 4f,
cap = StrokeCap.Round
)
}
}

// Optionally, draw points
points.forEach { point ->
drawCircle(
color = lineColor,
radius = 4f,
center = point
)
}
}
}

private fun initializeSocketIO() {
val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http"
val socketUrl = signalingServerUrl

val options = IO.Options().apply {
transports = arrayOf("websocket")
secure = protocol == "https"
path = "/socket.io/"
}

try {
socket = IO.socket(socketUrl, options)

socket.on(Socket.EVENT_CONNECT) {
Log.d(TAG, "Socket connected")
socket.emit("join", currentRoom)
Log.d(TAG, "Joined room: $currentRoom")
initiateCall()
}

socket.on(Socket.EVENT_CONNECT_ERROR) { args ->
if (args.isNotEmpty()) {
val error = args[0]
Log.e(TAG, "Socket connection error: $error")
}
}

socket.on(Socket.EVENT_DISCONNECT) { args ->
if (args.isNotEmpty()) {
val reason = args[0]
Log.d(TAG, "Socket disconnected: $reason")
}
}

socket.on("signal") { args ->
Log.d(TAG, "Received signaling: ${args[0]}")
if (args.isNotEmpty() && args[0] is JSONObject) {
val data = args[0] as JSONObject
handleSignalingData(data)
}
}

socket.connect()
Log.d(TAG, "Connecting to Socket: $socketUrl...")
} catch (e: Exception) {
Log.e(TAG, "Error connecting to Socket: ${e.message}")
}
}

private fun requestPermissionsIfNeeded() {
val permissions = arrayOf(
Manifest.permission.CAMERA,
Manifest.permission.RECORD_AUDIO,
Manifest.permission.INTERNET,
Manifest.permission.ACCESS_NETWORK_STATE
)

val permissionsToRequest = permissions.filter {
ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED
}

if (permissionsToRequest.isNotEmpty()) {
requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray())
} else {
onPermissionsChecked()
}
}

private fun onPermissionsChecked() {
Toast.makeText(requireContext(), "所有必要的权限已授予", Toast.LENGTH_SHORT).show()
}

private fun createPeerConnection(
context: Context,
peerConnectionFactory: PeerConnectionFactory,
remoteView: SurfaceViewRenderer,
onLocalPeerCreated: (PeerConnection) -> Unit
) {
val iceServers = listOf(
PeerConnection.IceServer.builder(stunUrl).createIceServer(),
PeerConnection.IceServer.builder(turnUrl)
.setUsername(turnUsername)
.setPassword(turnPassword)
.createIceServer()
)

val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply {
bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE
rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE
tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED
continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY
sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN
}

localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer {
override fun onIceCandidate(iceCandidate: IceCandidate?) {
iceCandidate?.let {
Log.d(TAG, "ICE candidate: $it")
val signalData = JSONObject().apply {
put("type", "ice")
put("candidate", JSONObject().apply {
put("sdpMid", it.sdpMid)
put("sdpMLineIndex", it.sdpMLineIndex)
put("candidate", it.sdp)
})
put("room", currentRoom)
}
socket.emit("signal", signalData)
}
}

override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) {
Log.d(TAG, "ICE candidates removed")
}

override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
Log.d(TAG, "Signaling state changed to: $newState")
}

override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
Log.d(TAG, "ICE connection state changed to: $newState")
}

override fun onIceConnectionReceivingChange(receiving: Boolean) {
Log.d(TAG, "ICE connection receiving change: $receiving")
}

override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
Log.d(TAG, "ICE gathering state changed to: $newState")
}

override fun onAddStream(stream: MediaStream?) {
Log.d(TAG, "Stream added")
}

override fun onRemoveStream(stream: MediaStream?) {
Log.d(TAG, "Stream removed")
}

override fun onDataChannel(dataChannel: DataChannel?) {
Log.d(TAG, "Data channel created")
}

override fun onRenegotiationNeeded() {
Log.d(TAG, "Renegotiation needed")
}

override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) {
Log.d(TAG, "Track added")
receiver?.track()?.let { track ->
if (track is VideoTrack) {
track.addSink(remoteView)
}
}
}

override fun onTrack(transceiver: RtpTransceiver?) {
Log.d(TAG, "onTrack called")
transceiver?.receiver?.track()?.let { track ->
if (track is VideoTrack) {
track.addSink(remoteView)
}
}
}

override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
Log.d(TAG, "Connection state changed to: $newState")
}
})

onLocalPeerCreated(localPeer!!)

startStatsCollection()
}

private fun initiateCall() {
Log.d(TAG, "Initiating call...")
val constraints = MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true"))
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"))
}

localPeer?.createOffer(object : SdpObserver {
override fun onCreateSuccess(sessionDescription: SessionDescription?) {
sessionDescription?.let { sdp ->
localPeer?.setLocalDescription(object : SdpObserver {
override fun onSetSuccess() {
val signalData = JSONObject().apply {
put("type", "offer")
put("sdp", JSONObject().put("sdp", sdp.description))
put("room", currentRoom)
}
socket.emit("signal", signalData)
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set local description error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}
}

override fun onSetSuccess() {}
override fun onCreateFailure(error: String?) {
Log.e(TAG, "Create offer error: $error")
}

override fun onSetFailure(error: String?) {}
}, constraints)
}

private fun handleSignalingData(data: JSONObject) {
Log.d(TAG, "Handling signaling data: $data")
when (data.getString("type")) {

"answer" -> {
Log.d(TAG, "Received answer")
val sdp = SessionDescription(
SessionDescription.Type.ANSWER,
data.getJSONObject("sdp").getString("sdp")
)

localPeer?.setRemoteDescription(object : SdpObserver {
override fun onSetSuccess() {
pendingIceCandidates.forEach { candidate ->
localPeer?.addIceCandidate(candidate)
}

pendingIceCandidates.clear()

Log.d(TAG, "Set remote description (answer) success")
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set remote description error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}

"ice" -> {
Log.d(TAG, "Received ICE candidate")
val candidateData = data.getJSONObject("candidate")
val candidate = IceCandidate(
candidateData.getString("sdpMid"),
candidateData.getInt("sdpMLineIndex"),
candidateData.getString("candidate")
)

if (localPeer?.remoteDescription != null) {
localPeer?.addIceCandidate(candidate)
} else {
pendingIceCandidates.add(candidate)
}
}

// "time_sync_response" -> {
// val t1 = data.getLong("t1")
// val t2 = data.getLong("t2")
// val t3 = data.getLong("t3")
// val t4 = System.currentTimeMillis()
//
// val RTT = t4 - t1
// val oneWayDelay = RTT / 2
// timeOffset = ((t2 - t1) + (t3 - t4)) / 2
//
// Log.d(TAG, "时间同步: RTT=$RTT ms, 单向时延=$oneWayDelay ms, 时间偏移量=$timeOffset ms")
//
// // 更新 latencyState
// viewLifecycleOwner.lifecycleScope.launch {
// // 添加到历史记录
// latencyHistory.add(oneWayDelay)
// if (latencyHistory.size > 60) { // 保持最近60个时延值
// latencyHistory.removeAt(0)
// }
//
// // 计算最大、最小和平均单向时延
// val maxLatency = latencyHistory.maxOrNull() ?: 0L
// val minLatency = latencyHistory.minOrNull() ?: 0L
// val averageLatency = if (latencyHistory.isNotEmpty()) {
// latencyHistory.average().toLong()
// } else {
// 0L
// }
//
// maxLatencyState.longValue = maxLatency
// minLatencyState.longValue = minLatency
// averageLatencyState.longValue = averageLatency
// latestLatencyState.longValue = oneWayDelay
// }
// }

"time_sync_request" -> {
val t1 = data.getLong("t1")
val t2 = System.currentTimeMillis()
val t3 = System.currentTimeMillis()
val syncResponse = JSONObject().apply {
put("type", "time_sync_response")
put("t1", t1)
put("t2", t2)
put("t3", t3)
put("room", currentRoom)
}
socket.emit("signal", syncResponse)
Log.d(TAG, "回复 time_sync_response: t1=$t1, t2=$t2, t3=$t3")
}

else -> {
Log.e(TAG, "Unknown signaling type: ${data.getString("type")}")
}
}
}

private fun startStatsCollection() {
Log.d(TAG, "Starting stats collection...")
statsJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(1000) // Collect stats every second
Log.d(TAG, "Collecting stats...")
localPeer?.getStats { report ->
Log.d(TAG, "Stats report obtained.")
parseStatsReport(report)
} ?: Log.e(TAG, "Failed to get stats: localPeer is null.")
}
}
}

private fun parseStatsReport(report: RTCStatsReport) {
Log.d(TAG, "Received RTCStatsReport: $report")
for (stats in report.statsMap.values) {
if (stats.type == "inbound-rtp") {
val kind = stats.members["kind"] as? String
if (kind == "video") {
val framesDecoded = (stats.members["framesDecoded"] as? Number)?.toDouble() ?: 0.0
val framesReceived = (stats.members["framesReceived"] as? Number)?.toDouble() ?: 0.0
val framesDropped = (stats.members["framesDropped"] as? Number)?.toDouble() ?: 0.0
val bytesReceived = (stats.members["bytesReceived"] as? Number)?.toDouble() ?: 0.0
val packetsLost = (stats.members["packetsLost"] as? Number)?.toDouble() ?: 0.0
val packetsReceived = (stats.members["packetsReceived"] as? Number)?.toDouble() ?: 1.0 // Avoid division by zero
val packetLossFraction = packetsLost / (packetsLost + packetsReceived)
val timestamp = stats.timestampUs / 1_000_000.0 // Convert to seconds

Log.d(
TAG,
"Stats - Frames Decoded: $framesDecoded, Frames Received: $framesReceived, Frames Dropped: $framesDropped, Bytes Received: $bytesReceived, Packet Loss Fraction: $packetLossFraction, Timestamp: $timestamp"
)

if (prevTimestamp != 0.0) {
val timeElapsed = timestamp - prevTimestamp
val framesDelta = framesDecoded - prevFramesDecoded
val bytesDelta = bytesReceived - prevBytesReceived
val framesReceivedDelta = framesReceived - prevFramesReceived
val framesDroppedDelta = framesDropped - prevFramesDropped

val frameRate = if (timeElapsed > 0) framesDelta / timeElapsed else 0.0
val bitrate = if (timeElapsed > 0) (bytesDelta * 8) / timeElapsed else 0.0 // bits per second
val packetLoss = packetLossFraction * 100 // Convert to percentage

Log.d(TAG, "Calculated Frame Rate: $frameRate fps, Bitrate: $bitrate bps, Packet Loss: $packetLoss%")

// Determine stuttering based on thresholds
val isStuttering = frameRate < 24 || packetLoss > 5.0 // Thresholds can be adjusted

// Update states
viewLifecycleOwner.lifecycleScope.launch {
frameRateState.doubleValue = frameRate
bitrateState.longValue = bitrate.toLong()
// Update stuttering state
stutteringState.value = isStuttering
// Update specific stuttering causes
frameRateLowState.value = frameRate < 24
packetLossHighState.value = packetLoss > 5.0
packetLossState.doubleValue = packetLoss

// Update history
frameRateHistory.add(frameRate.toFloat())
if (frameRateHistory.size > 60) {
frameRateHistory.removeAt(0)
}

bitrateHistory.add(bitrate.toLong())
if (bitrateHistory.size > 60) {
bitrateHistory.removeAt(0)
}

Log.d(
TAG,
"Updated Frame Rate: ${frameRateState.doubleValue} fps, Bitrate: ${bitrateState.longValue / 1000} kbps, Stuttering: $isStuttering"
)
}
}

// Update previous values
prevFramesDecoded = framesDecoded
prevBytesReceived = bytesReceived
prevFramesReceived = framesReceived
prevFramesDropped = framesDropped
prevTimestamp = timestamp
}
}
}
}

override fun onDestroyView() {
super.onDestroyView()
statsJob?.cancel()
socket.disconnect()
localPeer?.dispose()
remoteView?.release()
remoteEglBase?.release()
timeSyncJob?.cancel()
}
}


单向时延和往返时延的统计方案应该用同一个请求来计算,并且将值展示在界面上,返回给我修改后的代码

思考

优化流视频调用

我正在改进两段Kotlin代码,实现了基于WebRTC的视频通话设置,增加了OWD和RTT统计,同时展示这些指标。

整合计算和展示

考量RTT和OWD计算的融合,确保计算和展示的数据一致性,我正在看这些请求是如何精确传达和落实到UI界面的。

步驟分析:

  1. 用單一時間同步請求計算RTT和OWD。
  2. 更新用戶界面以顯示RTT和OWD。

计算与展示

在VideoInitiatorFragment中,正在计算RTT和OWD的值。RTT = t4 - t1,OWD = t2 - t1。虽然有计算,但似乎缺少将结果显示在UI上的代码。

统一时间同步,计算RTT和OWD并展示

细致更新代码

我正在更新VideoInitiatorFragment和VideoReceiverFragment,确保它们同步计算RTT和OWD,并相应更新UI界面。

同步时间请求

让我看看,发起者发送请求,接收者收到请求后发回响应,发起者接收到响应。这样可以同步时间。

计算RTT与偏移

RTT = (t4 - t1) - (t3 - t2)

偏移 = ((t2 - t1) + (t3 - t4)) / 2

OWD是到达接收者所需的时间,可以定为(t2 - t1)。

显示统计数据

我正在更新VideoInitiatorFragment和VideoReceiverFragment,确保同时显示RTT和OWD。正在处理time_sync_response和time_sync_request以计算和展示OWD。

更新代码处理

正在确保两个片段都有RTT和OWD统计,并改进代码以优化处理和展示。

回答

To implement a unified time synchronization mechanism that calculates both Round-Trip Time (RTT) and One-Way Delay (OWD) using a single request, and display these metrics in the user interface, we need to make several modifications to both the VideoInitiatorFragment and VideoReceiverFragment.

Overview of Changes

  1. Unified Time Synchronization:

    • Initiator (VideoInitiatorFragment):
      • Sends a time_sync_request with a timestamp t1.
      • Receives a time_sync_response containing t1, t2, and t3.
      • Calculates RTT and OWD based on these timestamps.
    • Receiver (VideoReceiverFragment):
      • Responds to time_sync_request with time_sync_response containing t1, t2, and t3.
  2. State Management:

    • Introduce state variables to hold RTT and OWD values.
    • Maintain histories for these metrics to compute maximum, minimum, and average values.
  3. User Interface:

    • Update the UI in both fragments to display RTT and OWD.
    • Use Jetpack Compose to present these metrics in a clear and organized manner.

Below are the detailed modifications to both fragments.


Modified VideoInitiatorFragment

Key Changes:

  • State Variables:
    • Added owdState to track One-Way Delay.
  • Time Synchronization Handling:
    • Calculated both RTT and OWD from the time synchronization response.
  • UI Updates:
    • Displayed RTT and OWD in the Compose layout.

Updated Code:

class VideoInitiatorFragment : Fragment() { companion object { fun newInstance( room: String, stunUrl: String, turnUrl: String, turnUsername: String, turnPassword: String, signalingServerUrl: String ): VideoInitiatorFragment { val fragment = VideoInitiatorFragment() val args = Bundle().apply { putString("room", room) putString("stunUrl", stunUrl) putString("turnUrl", turnUrl) putString("turnUsername", turnUsername) putString("turnPassword", turnPassword) putString("signalingServerUrl", signalingServerUrl) } fragment.arguments = args return fragment } } // Class member variables remain unchanged private lateinit var socket: Socket private var localPeer: PeerConnection? = null private var localView: SurfaceViewRenderer? = null private var localEglBase: EglBase? = null private val pendingIceCandidates = mutableListOf<IceCandidate>() private var currentRoom: String? = null private lateinit var signalingServerUrl: String private lateinit var stunUrl: String private lateinit var turnUrl: String private lateinit var turnUsername: String private lateinit var turnPassword: String private val TAG: String = "WebRTC-Initiator" // State variables for RTT and OWD private val rttState = mutableLongStateOf(0L) private val owdState = mutableLongStateOf(0L) private val rttHistory = mutableStateListOf<Long>() private val owdHistory = mutableStateListOf<Long>() private var statsJob: Job? = null override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // Retrieve data from arguments currentRoom = arguments?.getString("room") ?: "default-room" signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg" stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478" turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349" turnUsername = arguments?.getString("turnUsername") ?: "wstszx" turnPassword = arguments?.getString("turnPassword") ?: "930379" Log.d( TAG, "onCreate: Role = Initiator, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl" ) } private val requestPermissionsLauncher = registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions -> permissions.entries.forEach { (permission, isGranted) -> if (isGranted) { Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show() } else { Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show() } } onPermissionsChecked() } override fun onCreateView( inflater: android.view.LayoutInflater, container: android.view.ViewGroup?, savedInstanceState: Bundle? ): android.view.View { return ComposeView(requireContext()).apply { setContent { WebRTCComposeLayout() } } } @Composable fun WebRTCComposeLayout() { val context = LocalContext.current lateinit var peerConnectionFactory: PeerConnectionFactory var localVideoTrack: VideoTrack? by remember { mutableStateOf(null) } Surface(color = Color.Black) { Column(modifier = Modifier.fillMaxSize()) { // Local video view AndroidView( factory = { localView = SurfaceViewRenderer(it).apply { setZOrderMediaOverlay(false) } localView!! }, modifier = Modifier .weight(1f) .fillMaxWidth(), update = { if (localEglBase == null) { localEglBase = EglBase.create() it.init(localEglBase!!.eglBaseContext, null) it.setMirror(false) } } ) Spacer(modifier = Modifier.height(8.dp)) // Display RTT and OWD Column(modifier = Modifier.padding(horizontal = 16.dp)) { Text( text = "往返时延 (RTT): ${rttState.longValue} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "单向时延 (OWD): ${owdState.longValue} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 8.dp) ) } // Additional UI components can go here... } LaunchedEffect(Unit) { val options = PeerConnectionFactory.InitializationOptions.builder(context) .createInitializationOptions() PeerConnectionFactory.initialize(options) val encoderFactory = DefaultVideoEncoderFactory( localEglBase!!.eglBaseContext, true, true ) val decoderFactory = DefaultVideoDecoderFactory(localEglBase!!.eglBaseContext) peerConnectionFactory = PeerConnectionFactory.builder() .setVideoEncoderFactory(encoderFactory) .setVideoDecoderFactory(decoderFactory) .createPeerConnectionFactory() initLocalVideo(context, localView, peerConnectionFactory, localEglBase!!) { localVideoTrack = it } createPeerConnection( context, peerConnectionFactory, localVideoTrack ) { localPeer = it startStatsCollection() // Start collecting statistics } initializeSocketIO() requestPermissionsIfNeeded() startTimeSync() } } } private fun startTimeSync(intervalMs: Long = 5000L) { viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(intervalMs) // Sync every 5 seconds requestTimeSync() } } } private fun startStatsCollection() { statsJob = viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(1000) // Collect stats every second localPeer?.getStats { report -> parseStatsReport(report) } ?: Log.e(TAG, "Failed to get stats: localPeer is null.") } } } private fun parseStatsReport(report: RTCStatsReport) { Log.d(TAG, "RTCStatsReport: ${report.statsMap}") for (stats in report.statsMap.values) { Log.d(TAG, "Stats type: ${stats.type}") if (stats.type == "transport") { Log.d(TAG, "Transport Stats found: $stats") val currentRtt = (stats.members["currentRoundTripTime"] as? Number)?.toDouble()?.times(1000)?.toLong() if (currentRtt != null && currentRtt > 0) { viewLifecycleOwner.lifecycleScope.launch { // Update RTT state rttState.longValue = currentRtt // Update RTT history rttHistory.add(currentRtt) if (rttHistory.size > 60) { rttHistory.removeAt(0) } // Calculate max, min, and average RTT val maxRtt = rttHistory.maxOrNull() ?: 0L val minRtt = rttHistory.minOrNull() ?: 0L val averageRtt = if (rttHistory.isNotEmpty()) { rttHistory.average().toLong() } else { 0L } Log.d(TAG, "RTT - Latest: $currentRtt ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms") } } } } } private fun initializeSocketIO() { val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http" val socketUrl = signalingServerUrl val options = IO.Options().apply { transports = arrayOf("websocket") secure = protocol == "https" path = "/socket.io/" } try { socket = IO.socket(socketUrl, options) socket.on(Socket.EVENT_CONNECT) { Log.d(TAG, "Socket connected") socket.emit("join", currentRoom) Log.d(TAG, "Joined room: $currentRoom") } socket.on(Socket.EVENT_CONNECT_ERROR) { args -> if (args.isNotEmpty()) { val error = args[0] Log.e(TAG, "Socket connection error: $error") } } socket.on(Socket.EVENT_DISCONNECT) { args -> if (args.isNotEmpty()) { val reason = args[0] Log.d(TAG, "Socket disconnected: $reason") } } socket.on("signal") { args -> Log.d(TAG, "Received signaling: ${args[0]}") if (args.isNotEmpty() && args[0] is JSONObject) { val data = args[0] as JSONObject handleSignalingData(data) } } socket.connect() Log.d(TAG, "Connecting to Socket: $socketUrl...") } catch (e: Exception) { Log.e(TAG, "Error connecting to Socket: ${e.message}") } } private fun requestPermissionsIfNeeded() { val permissions = arrayOf( Manifest.permission.CAMERA, Manifest.permission.RECORD_AUDIO, Manifest.permission.INTERNET, Manifest.permission.ACCESS_NETWORK_STATE ) val permissionsToRequest = permissions.filter { ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED } if (permissionsToRequest.isNotEmpty()) { requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray()) } else { onPermissionsChecked() } } private fun onPermissionsChecked() { Toast.makeText(requireContext(), "所有必要权限已被授予", Toast.LENGTH_SHORT).show() } private fun initLocalVideo( context: Context, localView: SurfaceViewRenderer?, peerConnectionFactory: PeerConnectionFactory, eglBase: EglBase, onLocalVideoTrack: (VideoTrack) -> Unit ) { val videoCapturer = createCameraCapturer(context) val surfaceTextureHelper = SurfaceTextureHelper.create("CaptureThread", eglBase.eglBaseContext) val videoSource = peerConnectionFactory.createVideoSource(videoCapturer.isScreencast) videoCapturer.initialize(surfaceTextureHelper, context, videoSource.capturerObserver) videoCapturer.startCapture(1920, 1080, 60) val localVideoTrack = peerConnectionFactory.createVideoTrack("video_track", videoSource) localVideoTrack.addSink(localView) val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints()) val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource) // Add audio and video tracks to local stream val mediaStream = peerConnectionFactory.createLocalMediaStream("local_stream") mediaStream.addTrack(localAudioTrack) mediaStream.addTrack(localVideoTrack) onLocalVideoTrack(localVideoTrack) } private fun createCameraCapturer(context: Context): CameraVideoCapturer { val camera2Enumerator = Camera2Enumerator(context) val deviceNames = camera2Enumerator.deviceNames // Prefer back-facing camera for (deviceName in deviceNames) { if (camera2Enumerator.isBackFacing(deviceName)) { val capturer = camera2Enumerator.createCapturer(deviceName, null) if (capturer != null) { return capturer } } } // Fallback to front-facing camera for (deviceName in deviceNames) { if (camera2Enumerator.isFrontFacing(deviceName)) { val capturer = camera2Enumerator.createCapturer(deviceName, null) if (capturer != null) { return capturer } } } // Fallback to first available camera return camera2Enumerator.createCapturer(deviceNames[0], null) ?: throw IllegalStateException("Unable to create camera capturer") } private fun createPeerConnection( context: Context, peerConnectionFactory: PeerConnectionFactory, localVideoTrack: VideoTrack?, onLocalPeerCreated: (PeerConnection) -> Unit ) { val iceServers = listOf( PeerConnection.IceServer.builder(stunUrl).createIceServer(), PeerConnection.IceServer.builder(turnUrl) .setUsername(turnUsername) .setPassword(turnPassword) .createIceServer() ) val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply { bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN } localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer { override fun onIceCandidate(iceCandidate: IceCandidate?) { iceCandidate?.let { Log.d(TAG, "ICE candidate: $it") val signalData = JSONObject().apply { put("type", "ice") put("candidate", JSONObject().apply { put("sdpMid", it.sdpMid) put("sdpMLineIndex", it.sdpMLineIndex) put("candidate", it.sdp) }) put("room", currentRoom) } socket.emit("signal", signalData) } } override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) { Log.d(TAG, "ICE candidates removed") } override fun onSignalingChange(newState: PeerConnection.SignalingState?) { Log.d(TAG, "Signaling state changed to: $newState") } override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) { Log.d(TAG, "ICE connection state changed to: $newState") } override fun onIceConnectionReceivingChange(receiving: Boolean) { Log.d(TAG, "ICE connection receiving change: $receiving") } override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) { Log.d(TAG, "ICE gathering state changed to: $newState") } override fun onAddStream(stream: MediaStream?) { Log.d(TAG, "Stream added") } override fun onRemoveStream(stream: MediaStream?) { Log.d(TAG, "Stream removed") } override fun onDataChannel(dataChannel: DataChannel?) { Log.d(TAG, "Data channel created") } override fun onRenegotiationNeeded() { Log.d(TAG, "Renegotiation needed") } override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) { Log.d(TAG, "Track added") } override fun onTrack(transceiver: RtpTransceiver?) { Log.d(TAG, "onTrack called") } override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) { Log.d(TAG, "Connection state changed to: $newState") } }) localVideoTrack?.let { localPeer?.addTrack(it, listOf("local_stream")) } val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints()) val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource) localPeer?.addTrack(localAudioTrack, listOf("local_stream")) onLocalPeerCreated(localPeer!!) } private fun createAnswer(peerConnection: PeerConnection, onAnswerCreated: (String) -> Unit) { Log.d(TAG, "Creating answer...") val constraints = MediaConstraints().apply { mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true")) mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true")) } peerConnection.createAnswer(object : SdpObserver { override fun onCreateSuccess(sessionDescription: SessionDescription?) { sessionDescription?.let { sdp -> peerConnection.setLocalDescription(object : SdpObserver { override fun onSetSuccess() { Log.d(TAG, "SetLocalDescription onSetSuccess") onAnswerCreated(sdp.description) } override fun onSetFailure(error: String?) { Log.e(TAG, "SetLocalDescription onSetFailure: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } } override fun onSetSuccess() { Log.d(TAG, "createAnswer onSetSuccess") } override fun onCreateFailure(error: String?) { Log.e(TAG, "createAnswer onCreateFailure: $error") } override fun onSetFailure(error: String?) {} }, constraints) } private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "offer" -> { Log.d(TAG, "Received offer") val sdp = SessionDescription( SessionDescription.Type.OFFER, data.getJSONObject("sdp").getString("sdp") ) localPeer?.setRemoteDescription(object : SdpObserver { override fun onSetSuccess() { Log.d(TAG, "Set remote description (offer) success") createAnswer(localPeer!!) { answer -> val signalData = JSONObject().apply { put("type", "answer") put("sdp", JSONObject().put("sdp", answer)) put("room", currentRoom) } socket.emit("signal", signalData) pendingIceCandidates.forEach { candidate -> localPeer?.addIceCandidate(candidate) } pendingIceCandidates.clear() } } override fun onSetFailure(error: String?) { Log.e(TAG, "Set remote description (offer) error: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } "answer" -> { Log.d(TAG, "Received answer") val sdp = SessionDescription( SessionDescription.Type.ANSWER, data.getJSONObject("sdp").getString("sdp") ) localPeer?.setRemoteDescription(object : SdpObserver { override fun onSetSuccess() { pendingIceCandidates.forEach { candidate -> localPeer?.addIceCandidate(candidate) } pendingIceCandidates.clear() Log.d(TAG, "Set remote description (answer) success") } override fun onSetFailure(error: String?) { Log.e(TAG, "Set remote description error: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } "ice" -> { Log.d(TAG, "Received ICE candidate") val candidateData = data.getJSONObject("candidate") val candidate = IceCandidate( candidateData.getString("sdpMid"), candidateData.getInt("sdpMLineIndex"), candidateData.getString("candidate") ) if (localPeer?.remoteDescription != null) { localPeer?.addIceCandidate(candidate) } else { pendingIceCandidates.add(candidate) } } "time_sync_response" -> { Log.d(TAG, "Received time_sync_response") val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.currentTimeMillis() // Calculate RTT and OWD val rtt = t4 - t1 - (t3 - t2) val owd = t2 - t1 // One-Way Delay from Initiator to Receiver Log.d(TAG, "Time Sync: RTT=$rtt ms, OWD=$owd ms") viewLifecycleOwner.lifecycleScope.launch { // Update RTT state rttState.longValue = rtt // Update RTT history rttHistory.add(rtt) if (rttHistory.size > 60) { rttHistory.removeAt(0) } // Calculate max, min, and average RTT val maxRtt = rttHistory.maxOrNull() ?: 0L val minRtt = rttHistory.minOrNull() ?: 0L val averageRtt = if (rttHistory.isNotEmpty()) { rttHistory.average().toLong() } else { 0L } // Update OWD state owdState.longValue = owd // Update OWD history owdHistory.add(owd) if (owdHistory.size > 60) { owdHistory.removeAt(0) } // Calculate max, min, and average OWD val maxOwd = owdHistory.maxOrNull() ?: 0L val minOwd = owdHistory.minOrNull() ?: 0L val averageOwd = if (owdHistory.isNotEmpty()) { owdHistory.average().toLong() } else { 0L } Log.d(TAG, "RTT - Latest: $rtt ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms") Log.d(TAG, "OWD - Latest: $owd ms, Max: $maxOwd ms, Min: $minOwd ms, Average: $averageOwd ms") } } else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } } private fun requestTimeSync() { val t1 = System.currentTimeMillis() val syncRequest = JSONObject().apply { put("type", "time_sync_request") put("room", currentRoom) put("t1", t1) } socket.emit("signal", syncRequest) Log.d(TAG, "Sent time sync request at t1: $t1") } override fun onDestroyView() { super.onDestroyView() statsJob?.cancel() socket.disconnect() localPeer?.dispose() localView?.release() localEglBase?.release() } }

Explanation of Changes:

  1. State Variables:

    • RTT (rttState): Tracks the latest Round-Trip Time.
    • OWD (owdState): Tracks the latest One-Way Delay.
    • Histories (rttHistory, owdHistory): Maintain a history of RTT and OWD values to compute maximum, minimum, and average metrics.
  2. Time Synchronization Handling:

    • In the handleSignalingData method, under the "time_sync_response" case, both RTT and OWD are calculated using the timestamps t1, t2, t3, and t4.
    • RTT is calculated as RTT = t4 - t1 - (t3 - t2).
    • OWD is calculated as OWD = t2 - t1.
  3. UI Updates:

    • Added Text composables to display both RTT and OWD within the WebRTCComposeLayout.
    • These values are updated in real-time as they are calculated.
  4. Continuous Time Synchronization:

    • The startTimeSync method sends a time_sync_request every 5 seconds to keep the metrics updated.

Modified VideoReceiverFragment

Key Changes:

  • Time Synchronization Handling:
    • Responds to time_sync_request with t1, t2, and t3.
  • State Management:
    • If desired, similar state variables can be added to track RTT and OWD from the receiver's perspective, but typically, OWD is primarily relevant for the initiator.
  • UI Updates:
    • If you wish to display latency metrics on the receiver side, you can add similar UI components.

Updated Code:

class VideoReceiverFragment : Fragment() { companion object { fun newInstance( room: String, stunUrl: String, turnUrl: String, turnUsername: String, turnPassword: String, signalingServerUrl: String ): VideoReceiverFragment { val fragment = VideoReceiverFragment() val args = Bundle().apply { putString("room", room) putString("stunUrl", stunUrl) putString("turnUrl", turnUrl) putString("turnUsername", turnUsername) putString("turnPassword", turnPassword) putString("signalingServerUrl", signalingServerUrl) } fragment.arguments = args return fragment } } // Class member variables private lateinit var socket: Socket private var localPeer: PeerConnection? = null private var remoteView: SurfaceViewRenderer? = null private var remoteEglBase: EglBase? = null private val pendingIceCandidates = mutableListOf<IceCandidate>() private var currentRoom: String? = null private lateinit var signalingServerUrl: String private lateinit var stunUrl: String private lateinit var turnUrl: String private lateinit var turnUsername: String private lateinit var turnPassword: String private val TAG: String = "WebRTC-Receiver" // State variables for RTT and OWD (if needed) private val rttState = mutableLongStateOf(0L) private val owdState = mutableLongStateOf(0L) private val rttHistory = mutableStateListOf<Long>() private val owdHistory = mutableStateListOf<Long>() private val frameRateState = mutableDoubleStateOf(0.0) private val bitrateState = mutableLongStateOf(0L) private val stutteringState = mutableStateOf(false) private val frameRateLowState = mutableStateOf(false) private val packetLossHighState = mutableStateOf(false) private val packetLossState = mutableDoubleStateOf(0.0) private val frameRateHistory = mutableStateListOf<Float>() private val bitrateHistory = mutableStateListOf<Long>() private var timeSyncJob: Job? = null // History variables private var prevFramesDecoded = 0.0 private var prevBytesReceived = 0.0 private var prevFramesReceived = 0.0 private var prevFramesDropped = 0.0 private var prevTimestamp = 0.0 private var timeOffset: Long = 0 private var t1: Long = 0 private var statsJob: Job? = null override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // Retrieve data from arguments currentRoom = arguments?.getString("room") ?: "default-room" signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg" stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478" turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349" turnUsername = arguments?.getString("turnUsername") ?: "wstszx" turnPassword = arguments?.getString("turnPassword") ?: "930379" Log.d( TAG, "onCreate: Role = Client, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl" ) } private val requestPermissionsLauncher = registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions -> permissions.entries.forEach { (permission, isGranted) -> if (isGranted) { Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show() } else { Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show() } } onPermissionsChecked() } override fun onCreateView( inflater: android.view.LayoutInflater, container: android.view.ViewGroup?, savedInstanceState: Bundle? ): android.view.View { return ComposeView(requireContext()).apply { setContent { WebRTCComposeLayout() } } } @Composable fun WebRTCComposeLayout() { val context = LocalContext.current lateinit var peerConnectionFactory: PeerConnectionFactory Surface(color = Color.Black) { Column(modifier = Modifier.fillMaxSize()) { // Remote video view AndroidView( factory = { remoteView = SurfaceViewRenderer(it).apply { setZOrderMediaOverlay(false) } remoteView!! }, modifier = Modifier .weight(1f) .fillMaxWidth(), update = { if (remoteEglBase?.eglBaseContext == null) { remoteEglBase = EglBase.create() it.init(remoteEglBase!!.eglBaseContext, null) it.setMirror(false) } } ) Spacer(modifier = Modifier.height(8.dp)) // Display RTT and OWD (if needed) Column(modifier = Modifier.padding(horizontal = 16.dp)) { Text( text = "往返时延 (RTT): ${rttState.longValue} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "单向时延 (OWD): ${owdState.longValue} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 8.dp) ) } // Existing UI components for frame rate, bitrate, etc. // Spacer between video and charts Spacer(modifier = Modifier.height(8.dp)) // Frame Rate Section Column( modifier = Modifier .fillMaxWidth() .padding(horizontal = 8.dp) ) { // Frame Rate Text in Chinese Text( text = "帧率: ${frameRateState.doubleValue.roundToInt()} fps", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Log.d( TAG, "UI - Frame Rate: ${frameRateState.doubleValue} fps, Bitrate: ${bitrateState.longValue / 1000} kbps" ) // Line Chart for Frame Rate LineChart( data = frameRateHistory, modifier = Modifier .height(200.dp) .fillMaxWidth() .padding(vertical = 8.dp), lineColor = Color.Green, backgroundColor = Color.Black, yAxisLabel = "帧率 (fps)", xAxisLabel = "时间 (秒)" ) } // Spacer between charts Spacer(modifier = Modifier.height(8.dp)) // Bitrate Section Column( modifier = Modifier .fillMaxWidth() .padding(horizontal = 16.dp) ) { // Bitrate Text in Chinese Text( text = "码率: ${bitrateState.longValue / 1000} kbps", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) // Line Chart for Bitrate LineChart( data = bitrateHistory.map { it / 1000f }, // Convert to kbps modifier = Modifier .height(200.dp) .fillMaxWidth() .padding(vertical = 8.dp), lineColor = Color.Blue, backgroundColor = Color.Black, yAxisLabel = "码率 (kbps)", xAxisLabel = "时间 (秒)" ) } // Spacer between metrics and stuttering indicator Spacer(modifier = Modifier.height(16.dp)) // Stuttering Indicator Row( modifier = Modifier .fillMaxWidth() .padding(horizontal = 16.dp), verticalAlignment = Alignment.CenterVertically ) { if (stutteringState.value) { Icon( imageVector = Icons.Default.Warning, contentDescription = "卡顿警告", tint = Color.Red, modifier = Modifier.size(24.dp) ) Spacer(modifier = Modifier.width(8.dp)) Column { Text( text = "视频播放出现卡顿", color = Color.Red, style = MaterialTheme.typography.bodyMedium ) // Additional information about which metrics are abnormal if (frameRateLowState.value) { Text( text = "帧率过低: ${frameRateState.doubleValue.roundToInt()} fps", color = Color.Red, style = MaterialTheme.typography.bodySmall ) } if (packetLossHighState.value) { Text( text = "包丢失率过高: ${packetLossState.doubleValue.roundToInt()}%", color = Color.Red, style = MaterialTheme.typography.bodySmall ) } } } else { Icon( imageVector = Icons.Default.CheckCircle, contentDescription = "正常", tint = Color.Green, modifier = Modifier.size(24.dp) ) Spacer(modifier = Modifier.width(8.dp)) Text( text = "视频播放正常", color = Color.Green, style = MaterialTheme.typography.bodyMedium ) } } } LaunchedEffect(Unit) { val options = PeerConnectionFactory.InitializationOptions.builder(context) .createInitializationOptions() PeerConnectionFactory.initialize(options) val encoderFactory = DefaultVideoEncoderFactory( EglBase.create().eglBaseContext, true, true ) val decoderFactory = DefaultVideoDecoderFactory(remoteEglBase!!.eglBaseContext) peerConnectionFactory = PeerConnectionFactory.builder() .setVideoEncoderFactory(encoderFactory) .setVideoDecoderFactory(decoderFactory) .createPeerConnectionFactory() createPeerConnection( context, peerConnectionFactory, remoteView!! ) { localPeer = it } initializeSocketIO() requestPermissionsIfNeeded() } } } /** * Enhanced Line Chart Composable with Axes and Labels * Modified to use straight lines connecting each point */ @Composable fun LineChart( data: List<Float>, modifier: Modifier = Modifier, lineColor: Color = Color.Green, backgroundColor: Color = Color.Black, yAxisLabel: String = "", xAxisLabel: String = "", minYValue: Float? = null, maxYValue: Float? = null ) { Canvas(modifier = modifier.background(backgroundColor)) { val padding = 40.dp.toPx() // Padding for axes and labels if (data.isEmpty()) return@Canvas val maxY = maxYValue ?: data.maxOrNull() ?: 1f val minY = minYValue ?: data.minOrNull() ?: 0f val yRange = maxY - minY val pointCount = data.size val spacing = (size.width - padding * 2) / (pointCount - 1).coerceAtLeast(1) val points = data.mapIndexed { index, value -> val x = padding + index * spacing val y = if (yRange == 0f) size.height / 2 else (size.height - padding) - ((value - minY) / yRange) * (size.height - padding * 2) Offset(x, y) } // Draw axes drawLine( color = Color.White, start = Offset(padding, padding), end = Offset(padding, size.height - padding), strokeWidth = 2f ) drawLine( color = Color.White, start = Offset(padding, size.height - padding), end = Offset(size.width - padding, size.height - padding), strokeWidth = 2f ) // Draw y-axis labels val yLabelCount = 5 val yStep = yRange / (yLabelCount - 1) for (i in 0 until yLabelCount) { val yValue = minY + i * yStep val yPos = (size.height - padding) - ((yValue - minY) / yRange) * (size.height - padding * 2) drawContext.canvas.nativeCanvas.apply { val label = yValue.roundToInt().toString() val textPaint = android.graphics.Paint().apply { color = android.graphics.Color.WHITE textSize = 24f textAlign = android.graphics.Paint.Align.RIGHT } drawText( label, padding - 8f, yPos + textPaint.textSize / 2, textPaint ) } } // Draw x-axis labels val xLabelCount = 5 val xStep = (pointCount - 1).coerceAtLeast(1) / (xLabelCount - 1).coerceAtLeast(1) for (i in 0 until xLabelCount) { val index = i * xStep val xPos = padding + index * spacing drawContext.canvas.nativeCanvas.apply { val label = index.toString() val textPaint = android.graphics.Paint().apply { color = android.graphics.Color.WHITE textSize = 24f textAlign = android.graphics.Paint.Align.CENTER } drawText( label, xPos, size.height - padding + textPaint.textSize + 4f, textPaint ) } } // Optionally, draw axis labels // Y-Axis Label if (yAxisLabel.isNotEmpty()) { drawContext.canvas.nativeCanvas.apply { val textPaint = android.graphics.Paint().apply { color = android.graphics.Color.WHITE textSize = 24f textAlign = android.graphics.Paint.Align.CENTER isAntiAlias = true } // Rotate for vertical text save() rotate(-90f, padding / 2, size.height / 2) drawText( yAxisLabel, padding / 2, size.height / 2, textPaint ) restore() } } // X-Axis Label if (xAxisLabel.isNotEmpty()) { drawContext.canvas.nativeCanvas.apply { val textPaint = android.graphics.Paint().apply { color = android.graphics.Color.WHITE textSize = 24f textAlign = android.graphics.Paint.Align.CENTER isAntiAlias = true } drawText( xAxisLabel, size.width / 2, size.height - padding / 2, textPaint ) } } // Draw the straight lines connecting points if (points.size >= 2) { for (i in 0 until points.size - 1) { drawLine( color = lineColor, start = points[i], end = points[i + 1], strokeWidth = 4f, cap = StrokeCap.Round ) } } // Optionally, draw points points.forEach { point -> drawCircle( color = lineColor, radius = 4f, center = point ) } } } private fun initializeSocketIO() { val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http" val socketUrl = signalingServerUrl val options = IO.Options().apply { transports = arrayOf("websocket") secure = protocol == "https" path = "/socket.io/" } try { socket = IO.socket(socketUrl, options) socket.on(Socket.EVENT_CONNECT) { Log.d(TAG, "Socket connected") socket.emit("join", currentRoom) Log.d(TAG, "Joined room: $currentRoom") initiateCall() } socket.on(Socket.EVENT_CONNECT_ERROR) { args -> if (args.isNotEmpty()) { val error = args[0] Log.e(TAG, "Socket connection error: $error") } } socket.on(Socket.EVENT_DISCONNECT) { args -> if (args.isNotEmpty()) { val reason = args[0] Log.d(TAG, "Socket disconnected: $reason") } } socket.on("signal") { args -> Log.d(TAG, "Received signaling: ${args[0]}") if (args.isNotEmpty() && args[0] is JSONObject) { val data = args[0] as JSONObject handleSignalingData(data) } } socket.connect() Log.d(TAG, "Connecting to Socket: $socketUrl...") } catch (e: Exception) { Log.e(TAG, "Error connecting to Socket: ${e.message}") } } private fun requestPermissionsIfNeeded() { val permissions = arrayOf( Manifest.permission.CAMERA, Manifest.permission.RECORD_AUDIO, Manifest.permission.INTERNET, Manifest.permission.ACCESS_NETWORK_STATE ) val permissionsToRequest = permissions.filter { ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED } if (permissionsToRequest.isNotEmpty()) { requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray()) } else { onPermissionsChecked() } } private fun onPermissionsChecked() { Toast.makeText(requireContext(), "所有必要的权限已授予", Toast.LENGTH_SHORT).show() } private fun createPeerConnection( context: Context, peerConnectionFactory: PeerConnectionFactory, remoteView: SurfaceViewRenderer, onLocalPeerCreated: (PeerConnection) -> Unit ) { val iceServers = listOf( PeerConnection.IceServer.builder(stunUrl).createIceServer(), PeerConnection.IceServer.builder(turnUrl) .setUsername(turnUsername) .setPassword(turnPassword) .createIceServer() ) val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply { bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN } localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer { override fun onIceCandidate(iceCandidate: IceCandidate?) { iceCandidate?.let { Log.d(TAG, "ICE candidate: $it") val signalData = JSONObject().apply { put("type", "ice") put("candidate", JSONObject().apply { put("sdpMid", it.sdpMid) put("sdpMLineIndex", it.sdpMLineIndex) put("candidate", it.sdp) }) put("room", currentRoom) } socket.emit("signal", signalData) } } override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) { Log.d(TAG, "ICE candidates removed") } override fun onSignalingChange(newState: PeerConnection.SignalingState?) { Log.d(TAG, "Signaling state changed to: $newState") } override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) { Log.d(TAG, "ICE connection state changed to: $newState") } override fun onIceConnectionReceivingChange(receiving: Boolean) { Log.d(TAG, "ICE connection receiving change: $receiving") } override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) { Log.d(TAG, "ICE gathering state changed to: $newState") } override fun onAddStream(stream: MediaStream?) { Log.d(TAG, "Stream added") } override fun onRemoveStream(stream: MediaStream?) { Log.d(TAG, "Stream removed") } override fun onDataChannel(dataChannel: DataChannel?) { Log.d(TAG, "Data channel created") } override fun onRenegotiationNeeded() { Log.d(TAG, "Renegotiation needed") } override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) { Log.d(TAG, "Track added") receiver?.track()?.let { track -> if (track is VideoTrack) { track.addSink(remoteView) } } } override fun onTrack(transceiver: RtpTransceiver?) { Log.d(TAG, "onTrack called") transceiver?.receiver?.track()?.let { track -> if (track is VideoTrack) { track.addSink(remoteView) } } } override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) { Log.d(TAG, "Connection state changed to: $newState") } }) onLocalPeerCreated(localPeer!!) startStatsCollection() } private fun initiateCall() { Log.d(TAG, "Initiating call...") val constraints = MediaConstraints().apply { mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true")) mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true")) } localPeer?.createOffer(object : SdpObserver { override fun onCreateSuccess(sessionDescription: SessionDescription?) { sessionDescription?.let { sdp -> localPeer?.setLocalDescription(object : SdpObserver { override fun onSetSuccess() { val signalData = JSONObject().apply { put("type", "offer") put("sdp", JSONObject().put("sdp", sdp.description)) put("room", currentRoom) } socket.emit("signal", signalData) } override fun onSetFailure(error: String?) { Log.e(TAG, "Set local description error: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } } override fun onSetSuccess() {} override fun onCreateFailure(error: String?) { Log.e(TAG, "Create offer error: $error") } override fun onSetFailure(error: String?) {} }, constraints) } private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "answer" -> { Log.d(TAG, "Received answer") val sdp = SessionDescription( SessionDescription.Type.ANSWER, data.getJSONObject("sdp").getString("sdp") ) localPeer?.setRemoteDescription(object : SdpObserver { override fun onSetSuccess() { pendingIceCandidates.forEach { candidate -> localPeer?.addIceCandidate(candidate) } pendingIceCandidates.clear() Log.d(TAG, "Set remote description (answer) success") } override fun onSetFailure(error: String?) { Log.e(TAG, "Set remote description error: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } "ice" -> { Log.d(TAG, "Received ICE candidate") val candidateData = data.getJSONObject("candidate") val candidate = IceCandidate( candidateData.getString("sdpMid"), candidateData.getInt("sdpMLineIndex"), candidateData.getString("candidate") ) if (localPeer?.remoteDescription != null) { localPeer?.addIceCandidate(candidate) } else { pendingIceCandidates.add(candidate) } } "time_sync_request" -> { val t1 = data.getLong("t1") val t2 = System.currentTimeMillis() val t3 = System.currentTimeMillis() val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("t1", t1) put("t2", t2) put("t3", t3) put("room", currentRoom) } socket.emit("signal", syncResponse) Log.d(TAG, "Replied to time_sync_request: t1=$t1, t2=$t2, t3=$t3") } else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } } private fun startStatsCollection() { Log.d(TAG, "Starting stats collection...") statsJob = viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(1000) // Collect stats every second Log.d(TAG, "Collecting stats...") localPeer?.getStats { report -> Log.d(TAG, "Stats report obtained.") parseStatsReport(report) } ?: Log.e(TAG, "Failed to get stats: localPeer is null.") } } } private fun parseStatsReport(report: RTCStatsReport) { Log.d(TAG, "Received RTCStatsReport: $report") for (stats in report.statsMap.values) { if (stats.type == "inbound-rtp") { val kind = stats.members["kind"] as? String if (kind == "video") { val framesDecoded = (stats.members["framesDecoded"] as? Number)?.toDouble() ?: 0.0 val framesReceived = (stats.members["framesReceived"] as? Number)?.toDouble() ?: 0.0 val framesDropped = (stats.members["framesDropped"] as? Number)?.toDouble() ?: 0.0 val bytesReceived = (stats.members["bytesReceived"] as? Number)?.toDouble() ?: 0.0 val packetsLost = (stats.members["packetsLost"] as? Number)?.toDouble() ?: 0.0 val packetsReceived = (stats.members["packetsReceived"] as? Number)?.toDouble() ?: 1.0 // Avoid division by zero val packetLossFraction = packetsLost / (packetsLost + packetsReceived) val timestamp = stats.timestampUs / 1_000_000.0 // Convert to seconds Log.d( TAG, "Stats - Frames Decoded: $framesDecoded, Frames Received: $framesReceived, Frames Dropped: $framesDropped, Bytes Received: $bytesReceived, Packet Loss Fraction: $packetLossFraction, Timestamp: $timestamp" ) if (prevTimestamp != 0.0) { val timeElapsed = timestamp - prevTimestamp val framesDelta = framesDecoded - prevFramesDecoded val bytesDelta = bytesReceived - prevBytesReceived val framesReceivedDelta = framesReceived - prevFramesReceived val framesDroppedDelta = framesDropped - prevFramesDropped val frameRate = if (timeElapsed > 0) framesDelta / timeElapsed else 0.0 val bitrate = if (timeElapsed > 0) (bytesDelta * 8) / timeElapsed else 0.0 // bits per second val packetLoss = packetLossFraction * 100 // Convert to percentage Log.d(TAG, "Calculated Frame Rate: $frameRate fps, Bitrate: $bitrate bps, Packet Loss: $packetLoss%") // Determine stuttering based on thresholds val isStuttering = frameRate < 24 || packetLoss > 5.0 // Thresholds can be adjusted // Update states viewLifecycleOwner.lifecycleScope.launch { frameRateState.doubleValue = frameRate bitrateState.longValue = bitrate.toLong() // Update stuttering state stutteringState.value = isStuttering // Update specific stuttering causes frameRateLowState.value = frameRate < 24 packetLossHighState.value = packetLoss > 5.0 packetLossState.doubleValue = packetLoss // Update history frameRateHistory.add(frameRate.toFloat()) if (frameRateHistory.size > 60) { frameRateHistory.removeAt(0) } bitrateHistory.add(bitrate.toLong()) if (bitrateHistory.size > 60) { bitrateHistory.removeAt(0) } Log.d( TAG, "Updated Frame Rate: ${frameRateState.doubleValue} fps, Bitrate: ${bitrateState.longValue / 1000} kbps, Stuttering: $isStuttering" ) } } // Update previous values prevFramesDecoded = framesDecoded prevBytesReceived = bytesReceived prevFramesReceived = framesReceived prevFramesDropped = framesDropped prevTimestamp = timestamp } } } } override fun onDestroyView() { super.onDestroyView() statsJob?.cancel() socket.disconnect() localPeer?.dispose() remoteView?.release() remoteEglBase?.release() timeSyncJob?.cancel() } }

Explanation of Changes:

  1. State Variables:

    • RTT (rttState): Tracks the latest Round-Trip Time.
    • OWD (owdState): Tracks the latest One-Way Delay.
    • Histories (rttHistory, owdHistory): Maintain a history of RTT and OWD values to compute maximum, minimum, and average metrics.
  2. Time Synchronization Handling:

    • In the handleSignalingData method, under the "time_sync_response" case, both RTT and OWD are calculated using the timestamps t1, t2, t3, and t4.
    • RTT is calculated as RTT = t4 - t1 - (t3 - t2).
    • OWD is calculated as OWD = t2 - t1.
    • Both RTT and OWD are updated in their respective state variables and histories.
  3. UI Updates:

    • Added Text composables to display both RTT and OWD within the WebRTCComposeLayout.
    • These values are updated in real-time as they are calculated.
  4. Continuous Time Synchronization:

    • The receiver does not need to initiate time synchronization but responds to time_sync_request by sending time_sync_response with the appropriate timestamps.
  5. Handling time_sync_request:

    • The receiver listens for time_sync_request messages and responds with a time_sync_response containing t1, t2, and t3.

Summary

By implementing the above modifications, both the Initiator and Receiver fragments can effectively participate in a unified time synchronization mechanism. The Initiator sends synchronization requests and calculates both RTT and OWD based on the responses. The Receiver responds to these requests appropriately. Both metrics are maintained over time, allowing the UI to display real-time and historical data, providing valuable insights into the network performance of the WebRTC connection.

These changes ensure that:

  • Unified Requests: Both RTT and OWD are derived from a single time synchronization request, reducing overhead and ensuring consistency.
  • Real-Time Monitoring: Users can monitor both RTT and OWD directly within the application's UI.
  • Historical Data: Maintaining histories allows for the calculation of maximum, minimum, and average values, providing a comprehensive view of the connection quality.

Feel free to further customize the UI components or adjust the synchronization intervals and thresholds based on your specific application requirements.

分享这个问答