
class VideoInitiatorFragment : Fragment() {

companion object {
fun newInstance(
room: String,
stunUrl: String,
turnUrl: String,
turnUsername: String,
turnPassword: String,
signalingServerUrl: String
): VideoInitiatorFragment {
val fragment = VideoInitiatorFragment()
val args = Bundle().apply {
putString("room", room)
putString("stunUrl", stunUrl)
putString("turnUrl", turnUrl)
putString("turnUsername", turnUsername)
putString("turnPassword", turnPassword)
putString("signalingServerUrl", signalingServerUrl)
fragment.arguments = args
return fragment

// Class member variables remain unchanged
private lateinit var socket: Socket
private var localPeer: PeerConnection? = null
private var localView: SurfaceViewRenderer? = null
private var localEglBase: EglBase? = null
private val pendingIceCandidates = mutableListOf<IceCandidate>()
private var currentRoom: String? = null
private lateinit var signalingServerUrl: String
private lateinit var stunUrl: String
private lateinit var turnUrl: String
private lateinit var turnUsername: String
private lateinit var turnPassword: String

private val TAG: String = "WebRTC-Initiator"

private val maxRttState = mutableLongStateOf(0L)
private val minRttState = mutableLongStateOf(Long.MAX_VALUE)
private val averageRttState = mutableLongStateOf(0L)
private val latestRttState = mutableLongStateOf(0L)
private val rttHistory = mutableStateListOf<Long>()

private var statsJob: Job? = null

override fun onCreate(savedInstanceState: Bundle?) {

// Retrieve data from arguments
currentRoom = arguments?.getString("room") ?: "default-room"
signalingServerUrl = arguments?.getString("signalingServerUrl") ?: ""
stunUrl = arguments?.getString("stunUrl") ?: ""
turnUrl = arguments?.getString("turnUrl") ?: ""
turnUsername = arguments?.getString("turnUsername") ?: "wstszx"
turnPassword = arguments?.getString("turnPassword") ?: "930379"

"onCreate: Role = Initiator, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl"

private val requestPermissionsLauncher =
registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions ->
permissions.entries.forEach { (permission, isGranted) ->
if (isGranted) {
Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show()
} else {
Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show()

override fun onCreateView(
inflater: android.view.LayoutInflater,
container: android.view.ViewGroup?,
savedInstanceState: Bundle?
): android.view.View {
return ComposeView(requireContext()).apply {
setContent {

fun WebRTCComposeLayout() {
val context = LocalContext.current
lateinit var peerConnectionFactory: PeerConnectionFactory
var localVideoTrack: VideoTrack? by remember { mutableStateOf(null) }

Surface(color = Color.Black) {
Column(modifier = Modifier.fillMaxSize()) {

// Local video view
factory = {
localView = SurfaceViewRenderer(it).apply {
modifier = Modifier
update = {
if (localEglBase == null) {
localEglBase = EglBase.create()
it.init(localEglBase!!.eglBaseContext, null)

Spacer(modifier = Modifier.height(8.dp))


LaunchedEffect(Unit) {
val options = PeerConnectionFactory.InitializationOptions.builder(context)

val encoderFactory = DefaultVideoEncoderFactory(
localEglBase!!.eglBaseContext, true, true

val decoderFactory = DefaultVideoDecoderFactory(localEglBase!!.eglBaseContext)

peerConnectionFactory = PeerConnectionFactory.builder()

initLocalVideo(context, localView, peerConnectionFactory, localEglBase!!) {
localVideoTrack = it

) {
localPeer = it
startStatsCollection() // Start collecting statistics



private fun startTimeSync(intervalMs: Long = 5000L) {
viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(intervalMs) // Sync every 5 seconds

private fun startStatsCollection() {
statsJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(1000) // Collect stats every second
localPeer?.getStats { report ->
} ?: Log.e(TAG, "Failed to get stats: localPeer is null.")

private fun parseStatsReport(report: RTCStatsReport) {
Log.d(TAG, "RTCStatsReport: ${report.statsMap}")
for (stats in report.statsMap.values) {
Log.d(TAG, "Stats type: ${stats.type}")
if (stats.type == "transport") {
Log.d(TAG, "Transport Stats found: $stats")
val currentRtt = (stats.members["currentRoundTripTime"] as? Number)?.toDouble()?.times(1000)?.toLong()
if (currentRtt != null && currentRtt > 0) {
viewLifecycleOwner.lifecycleScope.launch {
// Update latest RTT
latestRttState.longValue = currentRtt

// Update RTT history
if (rttHistory.size > 60) {

// Calculate max, min, and average RTT
val maxRtt = rttHistory.maxOrNull() ?: 0L
val minRtt = rttHistory.minOrNull() ?: 0L
val averageRtt = if (rttHistory.isNotEmpty()) {
} else {

maxRttState.longValue = maxRtt
minRttState.longValue = minRtt
averageRttState.longValue = averageRtt

Log.d(TAG, "RTT - Latest: $currentRtt ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms")

private fun initializeSocketIO() {
val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http"
val socketUrl = signalingServerUrl

val options = IO.Options().apply {
transports = arrayOf("websocket")
secure = protocol == "https"
path = "/"

try {
socket = IO.socket(socketUrl, options)

socket.on(Socket.EVENT_CONNECT) {
Log.d(TAG, "Socket connected")
socket.emit("join", currentRoom)
Log.d(TAG, "Joined room: $currentRoom")

socket.on(Socket.EVENT_CONNECT_ERROR) { args ->
if (args.isNotEmpty()) {
val error = args[0]
Log.e(TAG, "Socket connection error: $error")

socket.on(Socket.EVENT_DISCONNECT) { args ->
if (args.isNotEmpty()) {
val reason = args[0]
Log.d(TAG, "Socket disconnected: $reason")

socket.on("signal") { args ->
Log.d(TAG, "Received signaling: ${args[0]}")
if (args.isNotEmpty() && args[0] is JSONObject) {
val data = args[0] as JSONObject

Log.d(TAG, "Connecting to Socket: $socketUrl...")
} catch (e: Exception) {
Log.e(TAG, "Error connecting to Socket: ${e.message}")

private fun requestPermissionsIfNeeded() {
val permissions = arrayOf(

val permissionsToRequest = permissions.filter {
ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED

if (permissionsToRequest.isNotEmpty()) {
} else {

private fun onPermissionsChecked() {
Toast.makeText(requireContext(), "所有必要权限已被授予", Toast.LENGTH_SHORT).show()

private fun initLocalVideo(
context: Context,
localView: SurfaceViewRenderer?,
peerConnectionFactory: PeerConnectionFactory,
eglBase: EglBase,
onLocalVideoTrack: (VideoTrack) -> Unit
) {
val videoCapturer = createCameraCapturer(context)
val surfaceTextureHelper = SurfaceTextureHelper.create("CaptureThread", eglBase.eglBaseContext)
val videoSource = peerConnectionFactory.createVideoSource(videoCapturer.isScreencast)
videoCapturer.initialize(surfaceTextureHelper, context, videoSource.capturerObserver)

videoCapturer.startCapture(1920, 1080, 60)

val localVideoTrack = peerConnectionFactory.createVideoTrack("video_track", videoSource)

val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints())
val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource)

// Add audio and video tracks to local stream
val mediaStream = peerConnectionFactory.createLocalMediaStream("local_stream")


private fun createCameraCapturer(context: Context): CameraVideoCapturer {
val camera2Enumerator = Camera2Enumerator(context)
val deviceNames = camera2Enumerator.deviceNames

// Prefer back-facing camera
for (deviceName in deviceNames) {
if (camera2Enumerator.isBackFacing(deviceName)) {
val capturer = camera2Enumerator.createCapturer(deviceName, null)
if (capturer != null) {
return capturer

// Fallback to front-facing camera
for (deviceName in deviceNames) {
if (camera2Enumerator.isFrontFacing(deviceName)) {
val capturer = camera2Enumerator.createCapturer(deviceName, null)
if (capturer != null) {
return capturer

// Fallback to first available camera
return camera2Enumerator.createCapturer(deviceNames[0], null)
?: throw IllegalStateException("Unable to create camera capturer")

private fun createPeerConnection(
context: Context,
peerConnectionFactory: PeerConnectionFactory,
localVideoTrack: VideoTrack?,
onLocalPeerCreated: (PeerConnection) -> Unit
) {
val iceServers = listOf(

val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply {
bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE
rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE
tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED
continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY
sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN

localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer {
override fun onIceCandidate(iceCandidate: IceCandidate?) {
iceCandidate?.let {
Log.d(TAG, "ICE candidate: $it")
val signalData = JSONObject().apply {
put("type", "ice")
put("candidate", JSONObject().apply {
put("sdpMid", it.sdpMid)
put("sdpMLineIndex", it.sdpMLineIndex)
put("candidate", it.sdp)
put("room", currentRoom)
socket.emit("signal", signalData)

override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) {
Log.d(TAG, "ICE candidates removed")

override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
Log.d(TAG, "Signaling state changed to: $newState")

override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
Log.d(TAG, "ICE connection state changed to: $newState")

override fun onIceConnectionReceivingChange(receiving: Boolean) {
Log.d(TAG, "ICE connection receiving change: $receiving")

override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
Log.d(TAG, "ICE gathering state changed to: $newState")

override fun onAddStream(stream: MediaStream?) {
Log.d(TAG, "Stream added")

override fun onRemoveStream(stream: MediaStream?) {
Log.d(TAG, "Stream removed")

override fun onDataChannel(dataChannel: DataChannel?) {
Log.d(TAG, "Data channel created")

override fun onRenegotiationNeeded() {
Log.d(TAG, "Renegotiation needed")

override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) {
Log.d(TAG, "Track added")

override fun onTrack(transceiver: RtpTransceiver?) {
Log.d(TAG, "onTrack called")

override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
Log.d(TAG, "Connection state changed to: $newState")

localVideoTrack?.let {
localPeer?.addTrack(it, listOf("local_stream"))
val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints())
val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource)
localPeer?.addTrack(localAudioTrack, listOf("local_stream"))


private fun createAnswer(peerConnection: PeerConnection, onAnswerCreated: (String) -> Unit) {
Log.d(TAG, "Creating answer...")
val constraints = MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true"))
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"))

peerConnection.createAnswer(object : SdpObserver {
override fun onCreateSuccess(sessionDescription: SessionDescription?) {
sessionDescription?.let { sdp ->
peerConnection.setLocalDescription(object : SdpObserver {
override fun onSetSuccess() {
Log.d(TAG, "SetLocalDescription onSetSuccess")

override fun onSetFailure(error: String?) {
Log.e(TAG, "SetLocalDescription onSetFailure: $error")

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)

override fun onSetSuccess() {
Log.d(TAG, "createAnswer onSetSuccess")

override fun onCreateFailure(error: String?) {
Log.e(TAG, "createAnswer onCreateFailure: $error")

override fun onSetFailure(error: String?) {}
}, constraints)

private fun handleSignalingData(data: JSONObject) {
Log.d(TAG, "Handling signaling data: $data")
when (data.getString("type")) {
"offer" -> {
Log.d(TAG, "Received offer")
val sdp = SessionDescription(
localPeer?.setRemoteDescription(object : SdpObserver {
override fun onSetSuccess() {
Log.d(TAG, "Set remote description (offer) success")
createAnswer(localPeer!!) { answer ->
val signalData = JSONObject().apply {
put("type", "answer")
put("sdp", JSONObject().put("sdp", answer))
put("room", currentRoom)

socket.emit("signal", signalData)

pendingIceCandidates.forEach { candidate ->

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set remote description (offer) error: $error")

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)

"ice" -> {
Log.d(TAG, "Received ICE candidate")
val candidateData = data.getJSONObject("candidate")
val candidate = IceCandidate(

if (localPeer?.remoteDescription != null) {
} else {

"time_sync_response" -> {
Log.d(TAG, "Received time_sync_response")
val t1 = data.getLong("t1")
val t2 = data.getLong("t2")
val t3 = data.getLong("t3")
val t4 = System.currentTimeMillis()

val RTT = t4 - t1
val OWD = t2 - t1 // One-Way Delay from Initiator to Receiver

Log.d(TAG, "Time Sync: RTT=$RTT ms, OWD=$OWD ms")

viewLifecycleOwner.lifecycleScope.launch {
// Update RTT state
latestRttState.longValue = RTT

// Update RTT history
if (rttHistory.size > 60) {

// Calculate max, min, and average RTT
val maxRtt = rttHistory.maxOrNull() ?: 0L
val minRtt = rttHistory.minOrNull() ?: 0L
val averageRtt = if (rttHistory.isNotEmpty()) {
} else {

maxRttState.longValue = maxRtt
minRttState.longValue = minRtt
averageRttState.longValue = averageRtt

Log.d(TAG, "RTT - Latest: $RTT ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms")
Log.d(TAG, "OWD - Latest: $OWD ms")

else -> {
Log.e(TAG, "Unknown signaling type: ${data.getString("type")}")

private fun requestTimeSync() {
val t1 = System.currentTimeMillis()
val syncRequest = JSONObject().apply {
put("type", "time_sync_request")
put("room", currentRoom)
put("t1", t1)
socket.emit("signal", syncRequest)
Log.d(TAG, "Sent time sync request at t1: $t1")

override fun onDestroyView() {

class VideoReceiverFragment : Fragment() {

companion object {
fun newInstance(
room: String,
stunUrl: String,
turnUrl: String,
turnUsername: String,
turnPassword: String,
signalingServerUrl: String
): VideoReceiverFragment {
val fragment = VideoReceiverFragment()
val args = Bundle().apply {
putString("room", room)
putString("stunUrl", stunUrl)
putString("turnUrl", turnUrl)
putString("turnUsername", turnUsername)
putString("turnPassword", turnPassword)
putString("signalingServerUrl", signalingServerUrl)
fragment.arguments = args
return fragment

// Class member variables
private lateinit var socket: Socket
private var localPeer: PeerConnection? = null
private var remoteView: SurfaceViewRenderer? = null
private var remoteEglBase: EglBase? = null
private val pendingIceCandidates = mutableListOf<IceCandidate>()
private var currentRoom: String? = null
private lateinit var signalingServerUrl: String
private lateinit var stunUrl: String
private lateinit var turnUrl: String
private lateinit var turnUsername: String
private lateinit var turnPassword: String
private val TAG: String = "WebRTC-Receiver"

private val frameRateState = mutableDoubleStateOf(0.0)
private val bitrateState = mutableLongStateOf(0L)
private val stutteringState = mutableStateOf(false)

private val frameRateLowState = mutableStateOf(false)
private val packetLossHighState = mutableStateOf(false)
private val packetLossState = mutableDoubleStateOf(0.0)

private val frameRateHistory = mutableStateListOf<Float>()
private val bitrateHistory = mutableStateListOf<Long>()
private var timeSyncJob: Job? = null

// 添加历史记录列表用于存储所有单向时延值
private val latencyHistory = mutableStateListOf<Long>()

// 添加状态变量用于存储最大、最小和平均单向时延
private val maxLatencyState = mutableLongStateOf(0L)
private val minLatencyState = mutableLongStateOf(Long.MAX_VALUE)
private val averageLatencyState = mutableLongStateOf(0L)
private val latestLatencyState = mutableLongStateOf(0L)

// History variables
private var prevFramesDecoded = 0.0
private var prevBytesReceived = 0.0
private var prevFramesReceived = 0.0
private var prevFramesDropped = 0.0
private var prevTimestamp = 0.0
private var timeOffset: Long = 0
private var t1: Long = 0

private var statsJob: Job? = null

override fun onCreate(savedInstanceState: Bundle?) {

// Retrieve data from arguments
currentRoom = arguments?.getString("room") ?: "default-room"
signalingServerUrl = arguments?.getString("signalingServerUrl") ?: ""
stunUrl = arguments?.getString("stunUrl") ?: ""
turnUrl = arguments?.getString("turnUrl") ?: ""
turnUsername = arguments?.getString("turnUsername") ?: "wstszx"
turnPassword = arguments?.getString("turnPassword") ?: "930379"

"onCreate: Role = Client, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl"

private val requestPermissionsLauncher =
registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions ->
permissions.entries.forEach { (permission, isGranted) ->
if (isGranted) {
Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show()
} else {
Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show()

override fun onCreateView(
inflater: android.view.LayoutInflater,
container: android.view.ViewGroup?,
savedInstanceState: Bundle?
): android.view.View {
return ComposeView(requireContext()).apply {
setContent {

fun WebRTCComposeLayout() {
val context = LocalContext.current
lateinit var peerConnectionFactory: PeerConnectionFactory

Surface(color = Color.Black) {
Column(modifier = Modifier.fillMaxSize()) {

// Remote video view
factory = {
remoteView = SurfaceViewRenderer(it).apply {
modifier = Modifier
update = {
if (remoteEglBase?.eglBaseContext == null) {
remoteEglBase = EglBase.create()
it.init(remoteEglBase!!.eglBaseContext, null)

Spacer(modifier = Modifier.height(8.dp))

// 展示最大、最小和平均单向时延
Column(modifier = Modifier.padding(horizontal = 16.dp)) {
text = "当前单向时延: ${latestLatencyState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
text = "最大单向时延: ${maxLatencyState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
text = "最小单向时延: ${minLatencyState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
text = "平均单向时延: ${averageLatencyState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 8.dp)

// Spacer between video and charts
Spacer(modifier = Modifier.height(8.dp))

// Frame Rate Section
modifier = Modifier
.padding(horizontal = 8.dp)
) {
// Frame Rate Text in Chinese
text = "帧率: ${frameRateState.doubleValue.roundToInt()} fps",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
"UI - Frame Rate: ${frameRateState.doubleValue} fps, Bitrate: ${bitrateState.longValue / 1000} kbps"

// Line Chart for Frame Rate
data = frameRateHistory,
modifier = Modifier
.padding(vertical = 8.dp),
lineColor = Color.Green,
backgroundColor = Color.Black,
yAxisLabel = "帧率 (fps)",
xAxisLabel = "时间 (秒)"

// Spacer between charts
Spacer(modifier = Modifier.height(8.dp))

// Bitrate Section
modifier = Modifier
.padding(horizontal = 16.dp)
) {
// Bitrate Text in Chinese
text = "码率: ${bitrateState.longValue / 1000} kbps",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)

// Line Chart for Bitrate
data = { it / 1000f }, // Convert to kbps
modifier = Modifier
.padding(vertical = 8.dp),
lineColor = Color.Blue,
backgroundColor = Color.Black,
yAxisLabel = "码率 (kbps)",
xAxisLabel = "时间 (秒)"

// Spacer between metrics and stuttering indicator
Spacer(modifier = Modifier.height(16.dp))

// Stuttering Indicator
modifier = Modifier
.padding(horizontal = 16.dp),
verticalAlignment = Alignment.CenterVertically
) {
if (stutteringState.value) {
imageVector = Icons.Default.Warning,
contentDescription = "卡顿警告",
tint = Color.Red,
modifier = Modifier.size(24.dp)
Spacer(modifier = Modifier.width(8.dp))
Column {
text = "视频播放出现卡顿",
color = Color.Red,
style = MaterialTheme.typography.bodyMedium
// Additional information about which metrics are abnormal
if (frameRateLowState.value) {
text = "帧率过低: ${frameRateState.doubleValue.roundToInt()} fps",
color = Color.Red,
style = MaterialTheme.typography.bodySmall
if (packetLossHighState.value) {
text = "包丢失率过高: ${packetLossState.doubleValue.roundToInt()}%",
color = Color.Red,
style = MaterialTheme.typography.bodySmall
} else {
imageVector = Icons.Default.CheckCircle,
contentDescription = "正常",
tint = Color.Green,
modifier = Modifier.size(24.dp)
Spacer(modifier = Modifier.width(8.dp))
text = "视频播放正常",
color = Color.Green,
style = MaterialTheme.typography.bodyMedium

LaunchedEffect(Unit) {
val options = PeerConnectionFactory.InitializationOptions.builder(context)

val encoderFactory = DefaultVideoEncoderFactory(
EglBase.create().eglBaseContext, true, true

val decoderFactory = DefaultVideoDecoderFactory(remoteEglBase!!.eglBaseContext)

peerConnectionFactory = PeerConnectionFactory.builder()

) {
localPeer = it



* Enhanced Line Chart Composable with Axes and Labels
* Modified to use straight lines connecting each point
fun LineChart(
data: List<Float>,
modifier: Modifier = Modifier,
lineColor: Color = Color.Green,
backgroundColor: Color = Color.Black,
yAxisLabel: String = "",
xAxisLabel: String = "",
minYValue: Float? = null,
maxYValue: Float? = null
) {
Canvas(modifier = modifier.background(backgroundColor)) {
val padding = 40.dp.toPx() // Padding for axes and labels

if (data.isEmpty()) return@Canvas

val maxY = maxYValue ?: data.maxOrNull() ?: 1f
val minY = minYValue ?: data.minOrNull() ?: 0f
val yRange = maxY - minY
val pointCount = data.size
val spacing = (size.width - padding * 2) / (pointCount - 1).coerceAtLeast(1)

val points = data.mapIndexed { index, value ->
val x = padding + index * spacing
val y = if (yRange == 0f) size.height / 2 else (size.height - padding) - ((value - minY) / yRange) * (size.height - padding * 2)
Offset(x, y)

// Draw axes
color = Color.White,
start = Offset(padding, padding),
end = Offset(padding, size.height - padding),
strokeWidth = 2f
color = Color.White,
start = Offset(padding, size.height - padding),
end = Offset(size.width - padding, size.height - padding),
strokeWidth = 2f

// Draw y-axis labels
val yLabelCount = 5
val yStep = yRange / (yLabelCount - 1)
for (i in 0 until yLabelCount) {
val yValue = minY + i * yStep
val yPos = (size.height - padding) - ((yValue - minY) / yRange) * (size.height - padding * 2)

drawContext.canvas.nativeCanvas.apply {
val label = yValue.roundToInt().toString()
val textPaint = {
color =
textSize = 24f
textAlign =
padding - 8f,
yPos + textPaint.textSize / 2,

// Draw x-axis labels
val xLabelCount = 5
val xStep = (pointCount - 1).coerceAtLeast(1) / (xLabelCount - 1).coerceAtLeast(1)
for (i in 0 until xLabelCount) {
val index = i * xStep
val xPos = padding + index * spacing

drawContext.canvas.nativeCanvas.apply {
val label = index.toString()
val textPaint = {
color =
textSize = 24f
textAlign =
size.height - padding + textPaint.textSize + 4f,

// Optionally, draw axis labels
// Y-Axis Label
if (yAxisLabel.isNotEmpty()) {
drawContext.canvas.nativeCanvas.apply {
val textPaint = {
color =
textSize = 24f
textAlign =
isAntiAlias = true
// Rotate for vertical text
rotate(-90f, padding / 2, size.height / 2)
padding / 2,
size.height / 2,

// X-Axis Label
if (xAxisLabel.isNotEmpty()) {
drawContext.canvas.nativeCanvas.apply {
val textPaint = {
color =
textSize = 24f
textAlign =
isAntiAlias = true
size.width / 2,
size.height - padding / 2,

// Draw the straight lines connecting points
if (points.size >= 2) {
for (i in 0 until points.size - 1) {
color = lineColor,
start = points[i],
end = points[i + 1],
strokeWidth = 4f,
cap = StrokeCap.Round

// Optionally, draw points
points.forEach { point ->
color = lineColor,
radius = 4f,
center = point

private fun initializeSocketIO() {
val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http"
val socketUrl = signalingServerUrl

val options = IO.Options().apply {
transports = arrayOf("websocket")
secure = protocol == "https"
path = "/"

try {
socket = IO.socket(socketUrl, options)

socket.on(Socket.EVENT_CONNECT) {
Log.d(TAG, "Socket connected")
socket.emit("join", currentRoom)
Log.d(TAG, "Joined room: $currentRoom")

socket.on(Socket.EVENT_CONNECT_ERROR) { args ->
if (args.isNotEmpty()) {
val error = args[0]
Log.e(TAG, "Socket connection error: $error")

socket.on(Socket.EVENT_DISCONNECT) { args ->
if (args.isNotEmpty()) {
val reason = args[0]
Log.d(TAG, "Socket disconnected: $reason")

socket.on("signal") { args ->
Log.d(TAG, "Received signaling: ${args[0]}")
if (args.isNotEmpty() && args[0] is JSONObject) {
val data = args[0] as JSONObject

Log.d(TAG, "Connecting to Socket: $socketUrl...")
} catch (e: Exception) {
Log.e(TAG, "Error connecting to Socket: ${e.message}")

private fun requestPermissionsIfNeeded() {
val permissions = arrayOf(

val permissionsToRequest = permissions.filter {
ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED

if (permissionsToRequest.isNotEmpty()) {
} else {

private fun onPermissionsChecked() {
Toast.makeText(requireContext(), "所有必要的权限已授予", Toast.LENGTH_SHORT).show()

private fun createPeerConnection(
context: Context,
peerConnectionFactory: PeerConnectionFactory,
remoteView: SurfaceViewRenderer,
onLocalPeerCreated: (PeerConnection) -> Unit
) {
val iceServers = listOf(

val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply {
bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE
rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE
tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED
continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY
sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN

localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer {
override fun onIceCandidate(iceCandidate: IceCandidate?) {
iceCandidate?.let {
Log.d(TAG, "ICE candidate: $it")
val signalData = JSONObject().apply {
put("type", "ice")
put("candidate", JSONObject().apply {
put("sdpMid", it.sdpMid)
put("sdpMLineIndex", it.sdpMLineIndex)
put("candidate", it.sdp)
put("room", currentRoom)
socket.emit("signal", signalData)

override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) {
Log.d(TAG, "ICE candidates removed")

override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
Log.d(TAG, "Signaling state changed to: $newState")

override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
Log.d(TAG, "ICE connection state changed to: $newState")

override fun onIceConnectionReceivingChange(receiving: Boolean) {
Log.d(TAG, "ICE connection receiving change: $receiving")

override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
Log.d(TAG, "ICE gathering state changed to: $newState")

override fun onAddStream(stream: MediaStream?) {
Log.d(TAG, "Stream added")

override fun onRemoveStream(stream: MediaStream?) {
Log.d(TAG, "Stream removed")

override fun onDataChannel(dataChannel: DataChannel?) {
Log.d(TAG, "Data channel created")

override fun onRenegotiationNeeded() {
Log.d(TAG, "Renegotiation needed")

override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) {
Log.d(TAG, "Track added")
receiver?.track()?.let { track ->
if (track is VideoTrack) {

override fun onTrack(transceiver: RtpTransceiver?) {
Log.d(TAG, "onTrack called")
transceiver?.receiver?.track()?.let { track ->
if (track is VideoTrack) {

override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
Log.d(TAG, "Connection state changed to: $newState")



private fun initiateCall() {
Log.d(TAG, "Initiating call...")
val constraints = MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true"))
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"))

localPeer?.createOffer(object : SdpObserver {
override fun onCreateSuccess(sessionDescription: SessionDescription?) {
sessionDescription?.let { sdp ->
localPeer?.setLocalDescription(object : SdpObserver {
override fun onSetSuccess() {
val signalData = JSONObject().apply {
put("type", "offer")
put("sdp", JSONObject().put("sdp", sdp.description))
put("room", currentRoom)
socket.emit("signal", signalData)

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set local description error: $error")

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)

override fun onSetSuccess() {}
override fun onCreateFailure(error: String?) {
Log.e(TAG, "Create offer error: $error")

override fun onSetFailure(error: String?) {}
}, constraints)

private fun handleSignalingData(data: JSONObject) {
Log.d(TAG, "Handling signaling data: $data")
when (data.getString("type")) {

"answer" -> {
Log.d(TAG, "Received answer")
val sdp = SessionDescription(

localPeer?.setRemoteDescription(object : SdpObserver {
override fun onSetSuccess() {
pendingIceCandidates.forEach { candidate ->


Log.d(TAG, "Set remote description (answer) success")

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set remote description error: $error")

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)

"ice" -> {
Log.d(TAG, "Received ICE candidate")
val candidateData = data.getJSONObject("candidate")
val candidate = IceCandidate(

if (localPeer?.remoteDescription != null) {
} else {

// "time_sync_response" -> {
// val t1 = data.getLong("t1")
// val t2 = data.getLong("t2")
// val t3 = data.getLong("t3")
// val t4 = System.currentTimeMillis()
// val RTT = t4 - t1
// val oneWayDelay = RTT / 2
// timeOffset = ((t2 - t1) + (t3 - t4)) / 2
// Log.d(TAG, "时间同步: RTT=$RTT ms, 单向时延=$oneWayDelay ms, 时间偏移量=$timeOffset ms")
// // 更新 latencyState
// viewLifecycleOwner.lifecycleScope.launch {
// // 添加到历史记录
// latencyHistory.add(oneWayDelay)
// if (latencyHistory.size > 60) { // 保持最近60个时延值
// latencyHistory.removeAt(0)
// }
// // 计算最大、最小和平均单向时延
// val maxLatency = latencyHistory.maxOrNull() ?: 0L
// val minLatency = latencyHistory.minOrNull() ?: 0L
// val averageLatency = if (latencyHistory.isNotEmpty()) {
// latencyHistory.average().toLong()
// } else {
// 0L
// }
// maxLatencyState.longValue = maxLatency
// minLatencyState.longValue = minLatency
// averageLatencyState.longValue = averageLatency
// latestLatencyState.longValue = oneWayDelay
// }
// }

"time_sync_request" -> {
val t1 = data.getLong("t1")
val t2 = System.currentTimeMillis()
val t3 = System.currentTimeMillis()
val syncResponse = JSONObject().apply {
put("type", "time_sync_response")
put("t1", t1)
put("t2", t2)
put("t3", t3)
put("room", currentRoom)
socket.emit("signal", syncResponse)
Log.d(TAG, "回复 time_sync_response: t1=$t1, t2=$t2, t3=$t3")

else -> {
Log.e(TAG, "Unknown signaling type: ${data.getString("type")}")

private fun startStatsCollection() {
Log.d(TAG, "Starting stats collection...")
statsJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(1000) // Collect stats every second
Log.d(TAG, "Collecting stats...")
localPeer?.getStats { report ->
Log.d(TAG, "Stats report obtained.")
} ?: Log.e(TAG, "Failed to get stats: localPeer is null.")

private fun parseStatsReport(report: RTCStatsReport) {
Log.d(TAG, "Received RTCStatsReport: $report")
for (stats in report.statsMap.values) {
if (stats.type == "inbound-rtp") {
val kind = stats.members["kind"] as? String
if (kind == "video") {
val framesDecoded = (stats.members["framesDecoded"] as? Number)?.toDouble() ?: 0.0
val framesReceived = (stats.members["framesReceived"] as? Number)?.toDouble() ?: 0.0
val framesDropped = (stats.members["framesDropped"] as? Number)?.toDouble() ?: 0.0
val bytesReceived = (stats.members["bytesReceived"] as? Number)?.toDouble() ?: 0.0
val packetsLost = (stats.members["packetsLost"] as? Number)?.toDouble() ?: 0.0
val packetsReceived = (stats.members["packetsReceived"] as? Number)?.toDouble() ?: 1.0 // Avoid division by zero
val packetLossFraction = packetsLost / (packetsLost + packetsReceived)
val timestamp = stats.timestampUs / 1_000_000.0 // Convert to seconds

"Stats - Frames Decoded: $framesDecoded, Frames Received: $framesReceived, Frames Dropped: $framesDropped, Bytes Received: $bytesReceived, Packet Loss Fraction: $packetLossFraction, Timestamp: $timestamp"

if (prevTimestamp != 0.0) {
val timeElapsed = timestamp - prevTimestamp
val framesDelta = framesDecoded - prevFramesDecoded
val bytesDelta = bytesReceived - prevBytesReceived
val framesReceivedDelta = framesReceived - prevFramesReceived
val framesDroppedDelta = framesDropped - prevFramesDropped

val frameRate = if (timeElapsed > 0) framesDelta / timeElapsed else 0.0
val bitrate = if (timeElapsed > 0) (bytesDelta * 8) / timeElapsed else 0.0 // bits per second
val packetLoss = packetLossFraction * 100 // Convert to percentage

Log.d(TAG, "Calculated Frame Rate: $frameRate fps, Bitrate: $bitrate bps, Packet Loss: $packetLoss%")

// Determine stuttering based on thresholds
val isStuttering = frameRate < 24 || packetLoss > 5.0 // Thresholds can be adjusted

// Update states
viewLifecycleOwner.lifecycleScope.launch {
frameRateState.doubleValue = frameRate
bitrateState.longValue = bitrate.toLong()
// Update stuttering state
stutteringState.value = isStuttering
// Update specific stuttering causes
frameRateLowState.value = frameRate < 24
packetLossHighState.value = packetLoss > 5.0
packetLossState.doubleValue = packetLoss

// Update history
if (frameRateHistory.size > 60) {

if (bitrateHistory.size > 60) {

"Updated Frame Rate: ${frameRateState.doubleValue} fps, Bitrate: ${bitrateState.longValue / 1000} kbps, Stuttering: $isStuttering"

// Update previous values
prevFramesDecoded = framesDecoded
prevBytesReceived = bytesReceived
prevFramesReceived = framesReceived
prevFramesDropped = framesDropped
prevTimestamp = timestamp

override fun onDestroyView() {


















  • 处理问题*



更新了 VideoInitiatorFragment 和 VideoReceiverFragment,添加了显示值的 UI 元素。


To implement a unified time synchronization mechanism that calculates both One-Way Delay (OWD) and Round-Trip Time (RTT) using the same request, and display these metrics in the UI, we need to make the following modifications to both VideoInitiatorFragment and VideoReceiverFragment:

  1. Unified Time Synchronization Request:

    • The initiator (VideoInitiatorFragment) sends a time_sync_request containing the current timestamp (t1).
    • The receiver (VideoReceiverFragment) responds with a time_sync_response containing t1, its receive time (t2), and its send time (t3).
    • The initiator calculates RTT and OWD using these timestamps.
  2. UI Enhancements:

    • Display both RTT and OWD in the initiator's UI.
    • Optionally, display relevant metrics in the receiver's UI if needed.
  3. Code Refactoring:

    • Ensure that both fragments handle the time synchronization appropriately.
    • Maintain synchronization intervals and handle state updates efficiently.

Below is the complete modified code for both VideoInitiatorFragment and VideoReceiverFragment with the necessary changes:


class VideoInitiatorFragment : Fragment() { companion object { fun newInstance( room: String, stunUrl: String, turnUrl: String, turnUsername: String, turnPassword: String, signalingServerUrl: String ): VideoInitiatorFragment { val fragment = VideoInitiatorFragment() val args = Bundle().apply { putString("room", room) putString("stunUrl", stunUrl) putString("turnUrl", turnUrl) putString("turnUsername", turnUsername) putString("turnPassword", turnPassword) putString("signalingServerUrl", signalingServerUrl) } fragment.arguments = args return fragment } } // Class member variables remain unchanged private lateinit var socket: Socket private var localPeer: PeerConnection? = null private var localView: SurfaceViewRenderer? = null private var localEglBase: EglBase? = null private val pendingIceCandidates = mutableListOf<IceCandidate>() private var currentRoom: String? = null private lateinit var signalingServerUrl: String private lateinit var stunUrl: String private lateinit var turnUrl: String private lateinit var turnUsername: String private lateinit var turnPassword: String private val TAG: String = "WebRTC-Initiator" // RTT and OWD State Variables private val maxRttState = mutableLongStateOf(0L) private val minRttState = mutableLongStateOf(Long.MAX_VALUE) private val averageRttState = mutableLongStateOf(0L) private val latestRttState = mutableLongStateOf(0L) private val rttHistory = mutableStateListOf<Long>() private val maxOwdState = mutableLongStateOf(0L) private val minOwdState = mutableLongStateOf(Long.MAX_VALUE) private val averageOwdState = mutableLongStateOf(0L) private val latestOwdState = mutableLongStateOf(0L) private val owdHistory = mutableStateListOf<Long>() private var statsJob: Job? = null override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // Retrieve data from arguments currentRoom = arguments?.getString("room") ?: "default-room" signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "" stunUrl = arguments?.getString("stunUrl") ?: "" turnUrl = arguments?.getString("turnUrl") ?: "" turnUsername = arguments?.getString("turnUsername") ?: "wstszx" turnPassword = arguments?.getString("turnPassword") ?: "930379" Log.d( TAG, "onCreate: Role = Initiator, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl" ) } private val requestPermissionsLauncher = registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions -> permissions.entries.forEach { (permission, isGranted) -> if (isGranted) { Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show() } else { Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show() } } onPermissionsChecked() } override fun onCreateView( inflater: android.view.LayoutInflater, container: android.view.ViewGroup?, savedInstanceState: Bundle? ): android.view.View { return ComposeView(requireContext()).apply { setContent { WebRTCComposeLayout() } } } @Composable fun WebRTCComposeLayout() { val context = LocalContext.current lateinit var peerConnectionFactory: PeerConnectionFactory var localVideoTrack: VideoTrack? by remember { mutableStateOf(null) } // Collect RTT and OWD states val latestRtt by latestRttState.observeAsState() val maxRtt by maxRttState.observeAsState() val minRtt by minRttState.observeAsState() val averageRtt by averageRttState.observeAsState() val latestOwd by latestOwdState.observeAsState() val maxOwd by maxOwdState.observeAsState() val minOwd by minOwdState.observeAsState() val averageOwd by averageOwdState.observeAsState() Surface(color = Color.Black) { Column(modifier = Modifier.fillMaxSize()) { // Local video view AndroidView( factory = { localView = SurfaceViewRenderer(it).apply { setZOrderMediaOverlay(false) } localView!! }, modifier = Modifier .weight(1f) .fillMaxWidth(), update = { if (localEglBase == null) { localEglBase = EglBase.create() it.init(localEglBase!!.eglBaseContext, null) it.setMirror(false) } } ) Spacer(modifier = Modifier.height(8.dp)) // Display RTT and OWD Metrics Column(modifier = Modifier.padding(16.dp)) { Text( text = "往返时延 (RTT): ${latestRtt ?: "N/A"} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最大 RTT: ${maxRtt ?: "N/A"} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最小 RTT: ${minRtt ?: "N/A"} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "平均 RTT: ${averageRtt ?: "N/A"} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 8.dp) ) Text( text = "单向时延 (OWD): ${latestOwd ?: "N/A"} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最大 OWD: ${maxOwd ?: "N/A"} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最小 OWD: ${minOwd ?: "N/A"} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "平均 OWD: ${averageOwd ?: "N/A"} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 8.dp) ) } // Spacer between video and charts Spacer(modifier = Modifier.height(8.dp)) } LaunchedEffect(Unit) { val options = PeerConnectionFactory.InitializationOptions.builder(context) .createInitializationOptions() PeerConnectionFactory.initialize(options) val encoderFactory = DefaultVideoEncoderFactory( localEglBase!!.eglBaseContext, true, true ) val decoderFactory = DefaultVideoDecoderFactory(localEglBase!!.eglBaseContext) peerConnectionFactory = PeerConnectionFactory.builder() .setVideoEncoderFactory(encoderFactory) .setVideoDecoderFactory(decoderFactory) .createPeerConnectionFactory() initLocalVideo(context, localView, peerConnectionFactory, localEglBase!!) { localVideoTrack = it } createPeerConnection( context, peerConnectionFactory, localVideoTrack ) { localPeer = it startStatsCollection() // Start collecting statistics } initializeSocketIO() requestPermissionsIfNeeded() startTimeSync() } } } private fun startTimeSync(intervalMs: Long = 5000L) { viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(intervalMs) // Sync every 5 seconds requestTimeSync() } } } private fun startStatsCollection() { statsJob = viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(1000) // Collect stats every second localPeer?.getStats { report -> parseStatsReport(report) } ?: Log.e(TAG, "Failed to get stats: localPeer is null.") } } } private fun parseStatsReport(report: RTCStatsReport) { Log.d(TAG, "RTCStatsReport: ${report.statsMap}") for (stats in report.statsMap.values) { Log.d(TAG, "Stats type: ${stats.type}") if (stats.type == "transport") { Log.d(TAG, "Transport Stats found: $stats") val currentRtt = (stats.members["currentRoundTripTime"] as? Number)?.toDouble()?.times(1000)?.toLong() if (currentRtt != null && currentRtt > 0) { viewLifecycleOwner.lifecycleScope.launch { // Update latest RTT latestRttState.longValue = currentRtt // Update RTT history rttHistory.add(currentRtt) if (rttHistory.size > 60) { rttHistory.removeAt(0) } // Calculate max, min, and average RTT val maxRtt = rttHistory.maxOrNull() ?: 0L val minRtt = rttHistory.minOrNull() ?: 0L val averageRtt = if (rttHistory.isNotEmpty()) { rttHistory.average().toLong() } else { 0L } maxRttState.longValue = maxRtt minRttState.longValue = minRtt averageRttState.longValue = averageRtt Log.d(TAG, "RTT - Latest: $currentRtt ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms") } } } } } private fun initializeSocketIO() { val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http" val socketUrl = signalingServerUrl val options = IO.Options().apply { transports = arrayOf("websocket") secure = protocol == "https" path = "/" } try { socket = IO.socket(socketUrl, options) socket.on(Socket.EVENT_CONNECT) { Log.d(TAG, "Socket connected") socket.emit("join", currentRoom) Log.d(TAG, "Joined room: $currentRoom") } socket.on(Socket.EVENT_CONNECT_ERROR) { args -> if (args.isNotEmpty()) { val error = args[0] Log.e(TAG, "Socket connection error: $error") } } socket.on(Socket.EVENT_DISCONNECT) { args -> if (args.isNotEmpty()) { val reason = args[0] Log.d(TAG, "Socket disconnected: $reason") } } socket.on("signal") { args -> Log.d(TAG, "Received signaling: ${args[0]}") if (args.isNotEmpty() && args[0] is JSONObject) { val data = args[0] as JSONObject handleSignalingData(data) } } socket.connect() Log.d(TAG, "Connecting to Socket: $socketUrl...") } catch (e: Exception) { Log.e(TAG, "Error connecting to Socket: ${e.message}") } } private fun requestPermissionsIfNeeded() { val permissions = arrayOf( Manifest.permission.CAMERA, Manifest.permission.RECORD_AUDIO, Manifest.permission.INTERNET, Manifest.permission.ACCESS_NETWORK_STATE ) val permissionsToRequest = permissions.filter { ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED } if (permissionsToRequest.isNotEmpty()) { requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray()) } else { onPermissionsChecked() } } private fun onPermissionsChecked() { Toast.makeText(requireContext(), "所有必要权限已被授予", Toast.LENGTH_SHORT).show() } private fun initLocalVideo( context: Context, localView: SurfaceViewRenderer?, peerConnectionFactory: PeerConnectionFactory, eglBase: EglBase, onLocalVideoTrack: (VideoTrack) -> Unit ) { val videoCapturer = createCameraCapturer(context) val surfaceTextureHelper = SurfaceTextureHelper.create("CaptureThread", eglBase.eglBaseContext) val videoSource = peerConnectionFactory.createVideoSource(videoCapturer.isScreencast) videoCapturer.initialize(surfaceTextureHelper, context, videoSource.capturerObserver) videoCapturer.startCapture(1920, 1080, 60) val localVideoTrack = peerConnectionFactory.createVideoTrack("video_track", videoSource) localVideoTrack.addSink(localView) val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints()) val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource) // Add audio and video tracks to local stream val mediaStream = peerConnectionFactory.createLocalMediaStream("local_stream") mediaStream.addTrack(localAudioTrack) mediaStream.addTrack(localVideoTrack) onLocalVideoTrack(localVideoTrack) } private fun createCameraCapturer(context: Context): CameraVideoCapturer { val camera2Enumerator = Camera2Enumerator(context) val deviceNames = camera2Enumerator.deviceNames // Prefer back-facing camera for (deviceName in deviceNames) { if (camera2Enumerator.isBackFacing(deviceName)) { val capturer = camera2Enumerator.createCapturer(deviceName, null) if (capturer != null) { return capturer } } } // Fallback to front-facing camera for (deviceName in deviceNames) { if (camera2Enumerator.isFrontFacing(deviceName)) { val capturer = camera2Enumerator.createCapturer(deviceName, null) if (capturer != null) { return capturer } } } // Fallback to first available camera return camera2Enumerator.createCapturer(deviceNames[0], null) ?: throw IllegalStateException("Unable to create camera capturer") } private fun createPeerConnection( context: Context, peerConnectionFactory: PeerConnectionFactory, localVideoTrack: VideoTrack?, onLocalPeerCreated: (PeerConnection) -> Unit ) { val iceServers = listOf( PeerConnection.IceServer.builder(stunUrl).createIceServer(), PeerConnection.IceServer.builder(turnUrl) .setUsername(turnUsername) .setPassword(turnPassword) .createIceServer() ) val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply { bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN } localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer { override fun onIceCandidate(iceCandidate: IceCandidate?) { iceCandidate?.let { Log.d(TAG, "ICE candidate: $it") val signalData = JSONObject().apply { put("type", "ice") put("candidate", JSONObject().apply { put("sdpMid", it.sdpMid) put("sdpMLineIndex", it.sdpMLineIndex) put("candidate", it.sdp) }) put("room", currentRoom) } socket.emit("signal", signalData) } } override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) { Log.d(TAG, "ICE candidates removed") } override fun onSignalingChange(newState: PeerConnection.SignalingState?) { Log.d(TAG, "Signaling state changed to: $newState") } override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) { Log.d(TAG, "ICE connection state changed to: $newState") } override fun onIceConnectionReceivingChange(receiving: Boolean) { Log.d(TAG, "ICE connection receiving change: $receiving") } override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) { Log.d(TAG, "ICE gathering state changed to: $newState") } override fun onAddStream(stream: MediaStream?) { Log.d(TAG, "Stream added") } override fun onRemoveStream(stream: MediaStream?) { Log.d(TAG, "Stream removed") } override fun onDataChannel(dataChannel: DataChannel?) { Log.d(TAG, "Data channel created") } override fun onRenegotiationNeeded() { Log.d(TAG, "Renegotiation needed") } override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) { Log.d(TAG, "Track added") } override fun onTrack(transceiver: RtpTransceiver?) { Log.d(TAG, "onTrack called") } override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) { Log.d(TAG, "Connection state changed to: $newState") } }) localVideoTrack?.let { localPeer?.addTrack(it, listOf("local_stream")) } val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints()) val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource) localPeer?.addTrack(localAudioTrack, listOf("local_stream")) onLocalPeerCreated(localPeer!!) } private fun createAnswer(peerConnection: PeerConnection, onAnswerCreated: (String) -> Unit) { Log.d(TAG, "Creating answer...") val constraints = MediaConstraints().apply { mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true")) mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true")) } peerConnection.createAnswer(object : SdpObserver { override fun onCreateSuccess(sessionDescription: SessionDescription?) { sessionDescription?.let { sdp -> peerConnection.setLocalDescription(object : SdpObserver { override fun onSetSuccess() { Log.d(TAG, "SetLocalDescription onSetSuccess") onAnswerCreated(sdp.description) } override fun onSetFailure(error: String?) { Log.e(TAG, "SetLocalDescription onSetFailure: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } } override fun onSetSuccess() { Log.d(TAG, "createAnswer onSetSuccess") } override fun onCreateFailure(error: String?) { Log.e(TAG, "createAnswer onCreateFailure: $error") } override fun onSetFailure(error: String?) {} }, constraints) } private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "offer" -> { Log.d(TAG, "Received offer") val sdp = SessionDescription( SessionDescription.Type.OFFER, data.getJSONObject("sdp").getString("sdp") ) localPeer?.setRemoteDescription(object : SdpObserver { override fun onSetSuccess() { Log.d(TAG, "Set remote description (offer) success") createAnswer(localPeer!!) { answer -> val signalData = JSONObject().apply { put("type", "answer") put("sdp", JSONObject().put("sdp", answer)) put("room", currentRoom) } socket.emit("signal", signalData) pendingIceCandidates.forEach { candidate -> localPeer?.addIceCandidate(candidate) } pendingIceCandidates.clear() } } override fun onSetFailure(error: String?) { Log.e(TAG, "Set remote description (offer) error: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } "ice" -> { Log.d(TAG, "Received ICE candidate") val candidateData = data.getJSONObject("candidate") val candidate = IceCandidate( candidateData.getString("sdpMid"), candidateData.getInt("sdpMLineIndex"), candidateData.getString("candidate") ) if (localPeer?.remoteDescription != null) { localPeer?.addIceCandidate(candidate) } else { pendingIceCandidates.add(candidate) } } "time_sync_response" -> { Log.d(TAG, "Received time_sync_response") val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.currentTimeMillis() val rtt = t4 - t1 val owd = t2 - t1 // One-Way Delay from Initiator to Receiver Log.d(TAG, "Time Sync: RTT=$rtt ms, OWD=$owd ms") viewLifecycleOwner.lifecycleScope.launch { // Update RTT state latestRttState.longValue = rtt // Update RTT history rttHistory.add(rtt) if (rttHistory.size > 60) { rttHistory.removeAt(0) } // Calculate max, min, and average RTT val maxRtt = rttHistory.maxOrNull() ?: 0L val minRtt = rttHistory.minOrNull() ?: 0L val averageRtt = if (rttHistory.isNotEmpty()) { rttHistory.average().toLong() } else { 0L } maxRttState.longValue = maxRtt minRttState.longValue = minRtt averageRttState.longValue = averageRtt // Update OWD state latestOwdState.longValue = owd // Update OWD history owdHistory.add(owd) if (owdHistory.size > 60) { owdHistory.removeAt(0) } // Calculate max, min, and average OWD val maxOwd = owdHistory.maxOrNull() ?: 0L val minOwd = owdHistory.minOrNull() ?: 0L val averageOwd = if (owdHistory.isNotEmpty()) { owdHistory.average().toLong() } else { 0L } maxOwdState.longValue = maxOwd minOwdState.longValue = minOwd averageOwdState.longValue = averageOwd Log.d(TAG, "RTT - Latest: $rtt ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms") Log.d(TAG, "OWD - Latest: $owd ms, Max: $maxOwd ms, Min: $minOwd ms, Average: $averageOwd ms") } } else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } } private fun requestTimeSync() { val t1 = System.currentTimeMillis() val syncRequest = JSONObject().apply { put("type", "time_sync_request") put("room", currentRoom) put("t1", t1) } socket.emit("signal", syncRequest) Log.d(TAG, "Sent time sync request at t1: $t1") } override fun onDestroyView() { super.onDestroyView() statsJob?.cancel() socket.disconnect() localPeer?.dispose() localView?.release() localEglBase?.release() } }


class VideoReceiverFragment : Fragment() { companion object { fun newInstance( room: String, stunUrl: String, turnUrl: String, turnUsername: String, turnPassword: String, signalingServerUrl: String ): VideoReceiverFragment { val fragment = VideoReceiverFragment() val args = Bundle().apply { putString("room", room) putString("stunUrl", stunUrl) putString("turnUrl", turnUrl) putString("turnUsername", turnUsername) putString("turnPassword", turnPassword) putString("signalingServerUrl", signalingServerUrl) } fragment.arguments = args return fragment } } // Class member variables private lateinit var socket: Socket private var localPeer: PeerConnection? = null private var remoteView: SurfaceViewRenderer? = null private var remoteEglBase: EglBase? = null private val pendingIceCandidates = mutableListOf<IceCandidate>() private var currentRoom: String? = null private lateinit var signalingServerUrl: String private lateinit var stunUrl: String private lateinit var turnUrl: String private lateinit var turnUsername: String private lateinit var turnPassword: String private val TAG: String = "WebRTC-Receiver" private val frameRateState = mutableDoubleStateOf(0.0) private val bitrateState = mutableLongStateOf(0L) private val stutteringState = mutableStateOf(false) private val frameRateLowState = mutableStateOf(false) private val packetLossHighState = mutableStateOf(false) private val packetLossState = mutableDoubleStateOf(0.0) private val frameRateHistory = mutableStateListOf<Float>() private val bitrateHistory = mutableStateListOf<Long>() private var timeSyncJob: Job? = null // 添加历史记录列表用于存储所有单向时延值 private val latencyHistory = mutableStateListOf<Long>() // 添加状态变量用于存储最大、最小和平均单向时延 private val maxLatencyState = mutableLongStateOf(0L) private val minLatencyState = mutableLongStateOf(Long.MAX_VALUE) private val averageLatencyState = mutableLongStateOf(0L) private val latestLatencyState = mutableLongStateOf(0L) // History variables private var prevFramesDecoded = 0.0 private var prevBytesReceived = 0.0 private var prevFramesReceived = 0.0 private var prevFramesDropped = 0.0 private var prevTimestamp = 0.0 private var timeOffset: Long = 0 private var t1: Long = 0 private var statsJob: Job? = null override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // Retrieve data from arguments currentRoom = arguments?.getString("room") ?: "default-room" signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "" stunUrl = arguments?.getString("stunUrl") ?: "" turnUrl = arguments?.getString("turnUrl") ?: "" turnUsername = arguments?.getString("turnUsername") ?: "wstszx" turnPassword = arguments?.getString("turnPassword") ?: "930379" Log.d( TAG, "onCreate: Role = Client, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl" ) } private val requestPermissionsLauncher = registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions -> permissions.entries.forEach { (permission, isGranted) -> if (isGranted) { Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show() } else { Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show() } } onPermissionsChecked() } override fun onCreateView( inflater: android.view.LayoutInflater, container: android.view.ViewGroup?, savedInstanceState: Bundle? ): android.view.View { return ComposeView(requireContext()).apply { setContent { WebRTCComposeLayout() } } } @Composable fun WebRTCComposeLayout() { val context = LocalContext.current lateinit var peerConnectionFactory: PeerConnectionFactory // Collect latency states val latestLatency by latestLatencyState.observeAsState() val maxLatency by maxLatencyState.observeAsState() val minLatency by minLatencyState.observeAsState() val averageLatency by averageLatencyState.observeAsState() Surface(color = Color.Black) { Column(modifier = Modifier.fillMaxSize()) { // Remote video view AndroidView( factory = { remoteView = SurfaceViewRenderer(it).apply { setZOrderMediaOverlay(false) } remoteView!! }, modifier = Modifier .weight(1f) .fillMaxWidth(), update = { if (remoteEglBase?.eglBaseContext == null) { remoteEglBase = EglBase.create() it.init(remoteEglBase!!.eglBaseContext, null) it.setMirror(false) } } ) Spacer(modifier = Modifier.height(8.dp)) // 展示最大、最小和平均单向时延 Column(modifier = Modifier.padding(horizontal = 16.dp)) { Text( text = "当前单向时延: ${latestLatency ?: "N/A"} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最大单向时延: ${maxLatency ?: "N/A"} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最小单向时延: ${minLatency ?: "N/A"} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "平均单向时延: ${averageLatency ?: "N/A"} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 8.dp) ) } // Spacer between video and charts Spacer(modifier = Modifier.height(8.dp)) // Frame Rate Section Column( modifier = Modifier .fillMaxWidth() .padding(horizontal = 8.dp) ) { // Frame Rate Text in Chinese Text( text = "帧率: ${frameRateState.doubleValue.roundToInt()} fps", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Log.d( TAG, "UI - Frame Rate: ${frameRateState.doubleValue} fps, Bitrate: ${bitrateState.longValue / 1000} kbps" ) // Line Chart for Frame Rate LineChart( data = frameRateHistory, modifier = Modifier .height(200.dp) .fillMaxWidth() .padding(vertical = 8.dp), lineColor = Color.Green, backgroundColor = Color.Black, yAxisLabel = "帧率 (fps)", xAxisLabel = "时间 (秒)" ) } // Spacer between charts Spacer(modifier = Modifier.height(8.dp)) // Bitrate Section Column( modifier = Modifier .fillMaxWidth() .padding(horizontal = 16.dp) ) { // Bitrate Text in Chinese Text( text = "码率: ${bitrateState.longValue / 1000} kbps", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) // Line Chart for Bitrate LineChart( data = { it / 1000f }, // Convert to kbps modifier = Modifier .height(200.dp) .fillMaxWidth() .padding(vertical = 8.dp), lineColor = Color.Blue, backgroundColor = Color.Black, yAxisLabel = "码率 (kbps)", xAxisLabel = "时间 (秒)" ) } // Spacer between metrics and stuttering indicator Spacer(modifier = Modifier.height(16.dp)) // Stuttering Indicator Row( modifier = Modifier .fillMaxWidth() .padding(horizontal = 16.dp), verticalAlignment = Alignment.CenterVertically ) { if (stutteringState.value) { Icon( imageVector = Icons.Default.Warning, contentDescription = "卡顿警告", tint = Color.Red, modifier = Modifier.size(24.dp) ) Spacer(modifier = Modifier.width(8.dp)) Column { Text( text = "视频播放出现卡顿", color = Color.Red, style = MaterialTheme.typography.bodyMedium ) // Additional information about which metrics are abnormal if (frameRateLowState.value) { Text( text = "帧率过低: ${frameRateState.doubleValue.roundToInt()} fps", color = Color.Red, style = MaterialTheme.typography.bodySmall ) } if (packetLossHighState.value) { Text( text = "包丢失率过高: ${packetLossState.doubleValue.roundToInt()}%", color = Color.Red, style = MaterialTheme.typography.bodySmall ) } } } else { Icon( imageVector = Icons.Default.CheckCircle, contentDescription = "正常", tint = Color.Green, modifier = Modifier.size(24.dp) ) Spacer(modifier = Modifier.width(8.dp)) Text( text = "视频播放正常", color = Color.Green, style = MaterialTheme.typography.bodyMedium ) } } } LaunchedEffect(Unit) { val options = PeerConnectionFactory.InitializationOptions.builder(context) .createInitializationOptions() PeerConnectionFactory.initialize(options) val encoderFactory = DefaultVideoEncoderFactory( EglBase.create().eglBaseContext, true, true ) val decoderFactory = DefaultVideoDecoderFactory(remoteEglBase!!.eglBaseContext) peerConnectionFactory = PeerConnectionFactory.builder() .setVideoEncoderFactory(encoderFactory) .setVideoDecoderFactory(decoderFactory) .createPeerConnectionFactory() createPeerConnection( context, peerConnectionFactory, remoteView!! ) { localPeer = it } initializeSocketIO() requestPermissionsIfNeeded() } } } /** * Enhanced Line Chart Composable with Axes and Labels * Modified to use straight lines connecting each point */ @Composable fun LineChart( data: List<Float>, modifier: Modifier = Modifier, lineColor: Color = Color.Green, backgroundColor: Color = Color.Black, yAxisLabel: String = "", xAxisLabel: String = "", minYValue: Float? = null, maxYValue: Float? = null ) { Canvas(modifier = modifier.background(backgroundColor)) { val padding = 40.dp.toPx() // Padding for axes and labels if (data.isEmpty()) return@Canvas val maxY = maxYValue ?: data.maxOrNull() ?: 1f val minY = minYValue ?: data.minOrNull() ?: 0f val yRange = maxY - minY val pointCount = data.size val spacing = (size.width - padding * 2) / (pointCount - 1).coerceAtLeast(1) val points = data.mapIndexed { index, value -> val x = padding + index * spacing val y = if (yRange == 0f) size.height / 2 else (size.height - padding) - ((value - minY) / yRange) * (size.height - padding * 2) Offset(x, y) } // Draw axes drawLine( color = Color.White, start = Offset(padding, padding), end = Offset(padding, size.height - padding), strokeWidth = 2f ) drawLine( color = Color.White, start = Offset(padding, size.height - padding), end = Offset(size.width - padding, size.height - padding), strokeWidth = 2f ) // Draw y-axis labels val yLabelCount = 5 val yStep = yRange / (yLabelCount - 1) for (i in 0 until yLabelCount) { val yValue = minY + i * yStep val yPos = (size.height - padding) - ((yValue - minY) / yRange) * (size.height - padding * 2) drawContext.canvas.nativeCanvas.apply { val label = yValue.roundToInt().toString() val textPaint = { color = textSize = 24f textAlign = } drawText( label, padding - 8f, yPos + textPaint.textSize / 2, textPaint ) } } // Draw x-axis labels val xLabelCount = 5 val xStep = (pointCount - 1).coerceAtLeast(1) / (xLabelCount - 1).coerceAtLeast(1) for (i in 0 until xLabelCount) { val index = i * xStep val xPos = padding + index * spacing drawContext.canvas.nativeCanvas.apply { val label = index.toString() val textPaint = { color = textSize = 24f textAlign = } drawText( label, xPos, size.height - padding + textPaint.textSize + 4f, textPaint ) } } // Optionally, draw axis labels // Y-Axis Label if (yAxisLabel.isNotEmpty()) { drawContext.canvas.nativeCanvas.apply { val textPaint = { color = textSize = 24f textAlign = isAntiAlias = true } // Rotate for vertical text save() rotate(-90f, padding / 2, size.height / 2) drawText( yAxisLabel, padding / 2, size.height / 2, textPaint ) restore() } } // X-Axis Label if (xAxisLabel.isNotEmpty()) { drawContext.canvas.nativeCanvas.apply { val textPaint = { color = textSize = 24f textAlign = isAntiAlias = true } drawText( xAxisLabel, size.width / 2, size.height - padding / 2, textPaint ) } } // Draw the straight lines connecting points if (points.size >= 2) { for (i in 0 until points.size - 1) { drawLine( color = lineColor, start = points[i], end = points[i + 1], strokeWidth = 4f, cap = StrokeCap.Round ) } } // Optionally, draw points points.forEach { point -> drawCircle( color = lineColor, radius = 4f, center = point ) } } } private fun initializeSocketIO() { val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http" val socketUrl = signalingServerUrl val options = IO.Options().apply { transports = arrayOf("websocket") secure = protocol == "https" path = "/" } try { socket = IO.socket(socketUrl, options) socket.on(Socket.EVENT_CONNECT) { Log.d(TAG, "Socket connected") socket.emit("join", currentRoom) Log.d(TAG, "Joined room: $currentRoom") initiateCall() } socket.on(Socket.EVENT_CONNECT_ERROR) { args -> if (args.isNotEmpty()) { val error = args[0] Log.e(TAG, "Socket connection error: $error") } } socket.on(Socket.EVENT_DISCONNECT) { args -> if (args.isNotEmpty()) { val reason = args[0] Log.d(TAG, "Socket disconnected: $reason") } } socket.on("signal") { args -> Log.d(TAG, "Received signaling: ${args[0]}") if (args.isNotEmpty() && args[0] is JSONObject) { val data = args[0] as JSONObject handleSignalingData(data) } } socket.connect() Log.d(TAG, "Connecting to Socket: $socketUrl...") } catch (e: Exception) { Log.e(TAG, "Error connecting to Socket: ${e.message}") } } private fun requestPermissionsIfNeeded() { val permissions = arrayOf( Manifest.permission.CAMERA, Manifest.permission.RECORD_AUDIO, Manifest.permission.INTERNET, Manifest.permission.ACCESS_NETWORK_STATE ) val permissionsToRequest = permissions.filter { ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED } if (permissionsToRequest.isNotEmpty()) { requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray()) } else { onPermissionsChecked() } } private fun onPermissionsChecked() { Toast.makeText(requireContext(), "所有必要的权限已授予", Toast.LENGTH_SHORT).show() } private fun createPeerConnection( context: Context, peerConnectionFactory: PeerConnectionFactory, remoteView: SurfaceViewRenderer, onLocalPeerCreated: (PeerConnection) -> Unit ) { val iceServers = listOf( PeerConnection.IceServer.builder(stunUrl).createIceServer(), PeerConnection.IceServer.builder(turnUrl) .setUsername(turnUsername) .setPassword(turnPassword) .createIceServer() ) val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply { bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN } localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer { override fun onIceCandidate(iceCandidate: IceCandidate?) { iceCandidate?.let { Log.d(TAG, "ICE candidate: $it") val signalData = JSONObject().apply { put("type", "ice") put("candidate", JSONObject().apply { put("sdpMid", it.sdpMid) put("sdpMLineIndex", it.sdpMLineIndex) put("candidate", it.sdp) }) put("room", currentRoom) } socket.emit("signal", signalData) } } override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) { Log.d(TAG, "ICE candidates removed") } override fun onSignalingChange(newState: PeerConnection.SignalingState?) { Log.d(TAG, "Signaling state changed to: $newState") } override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) { Log.d(TAG, "ICE connection state changed to: $newState") } override fun onIceConnectionReceivingChange(receiving: Boolean) { Log.d(TAG, "ICE connection receiving change: $receiving") } override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) { Log.d(TAG, "ICE gathering state changed to: $newState") } override fun onAddStream(stream: MediaStream?) { Log.d(TAG, "Stream added") } override fun onRemoveStream(stream: MediaStream?) { Log.d(TAG, "Stream removed") } override fun onDataChannel(dataChannel: DataChannel?) { Log.d(TAG, "Data channel created") } override fun onRenegotiationNeeded() { Log.d(TAG, "Renegotiation needed") } override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) { Log.d(TAG, "Track added") receiver?.track()?.let { track -> if (track is VideoTrack) { track.addSink(remoteView) } } } override fun onTrack(transceiver: RtpTransceiver?) { Log.d(TAG, "onTrack called") transceiver?.receiver?.track()?.let { track -> if (track is VideoTrack) { track.addSink(remoteView) } } } override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) { Log.d(TAG, "Connection state changed to: $newState") } }) onLocalPeerCreated(localPeer!!) startStatsCollection() } private fun initiateCall() { Log.d(TAG, "Initiating call...") val constraints = MediaConstraints().apply { mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true")) mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true")) } localPeer?.createOffer(object : SdpObserver { override fun onCreateSuccess(sessionDescription: SessionDescription?) { sessionDescription?.let { sdp -> localPeer?.setLocalDescription(object : SdpObserver { override fun onSetSuccess() { val signalData = JSONObject().apply { put("type", "offer") put("sdp", JSONObject().put("sdp", sdp.description)) put("room", currentRoom) } socket.emit("signal", signalData) } override fun onSetFailure(error: String?) { Log.e(TAG, "Set local description error: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } } override fun onSetSuccess() {} override fun onCreateFailure(error: String?) { Log.e(TAG, "Create offer error: $error") } override fun onSetFailure(error: String?) {} }, constraints) } private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "answer" -> { Log.d(TAG, "Received answer") val sdp = SessionDescription( SessionDescription.Type.ANSWER, data.getJSONObject("sdp").getString("sdp") ) localPeer?.setRemoteDescription(object : SdpObserver { override fun onSetSuccess() { pendingIceCandidates.forEach { candidate -> localPeer?.addIceCandidate(candidate) } pendingIceCandidates.clear() Log.d(TAG, "Set remote description (answer) success") } override fun onSetFailure(error: String?) { Log.e(TAG, "Set remote description error: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } "ice" -> { Log.d(TAG, "Received ICE candidate") val candidateData = data.getJSONObject("candidate") val candidate = IceCandidate( candidateData.getString("sdpMid"), candidateData.getInt("sdpMLineIndex"), candidateData.getString("candidate") ) if (localPeer?.remoteDescription != null) { localPeer?.addIceCandidate(candidate) } else { pendingIceCandidates.add(candidate) } } "time_sync_request" -> { Log.d(TAG, "Received time_sync_request") val t1 = data.getLong("t1") val t2 = System.currentTimeMillis() val t3 = System.currentTimeMillis() val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("t1", t1) put("t2", t2) put("t3", t3) put("room", currentRoom) } socket.emit("signal", syncResponse) Log.d(TAG, "Replied with time_sync_response: t1=$t1, t2=$t2, t3=$t3") } else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } } private fun startStatsCollection() { Log.d(TAG, "Starting stats collection...") statsJob = viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(1000) // Collect stats every second Log.d(TAG, "Collecting stats...") localPeer?.getStats { report -> Log.d(TAG, "Stats report obtained.") parseStatsReport(report) } ?: Log.e(TAG, "Failed to get stats: localPeer is null.") } } } private fun parseStatsReport(report: RTCStatsReport) { Log.d(TAG, "Received RTCStatsReport: $report") for (stats in report.statsMap.values) { if (stats.type == "inbound-rtp") { val kind = stats.members["kind"] as? String if (kind == "video") { val framesDecoded = (stats.members["framesDecoded"] as? Number)?.toDouble() ?: 0.0 val framesReceived = (stats.members["framesReceived"] as? Number)?.toDouble() ?: 0.0 val framesDropped = (stats.members["framesDropped"] as? Number)?.toDouble() ?: 0.0 val bytesReceived = (stats.members["bytesReceived"] as? Number)?.toDouble() ?: 0.0 val packetsLost = (stats.members["packetsLost"] as? Number)?.toDouble() ?: 0.0 val packetsReceived = (stats.members["packetsReceived"] as? Number)?.toDouble() ?: 1.0 // Avoid division by zero val packetLossFraction = packetsLost / (packetsLost + packetsReceived) val timestamp = stats.timestampUs / 1_000_000.0 // Convert to seconds Log.d( TAG, "Stats - Frames Decoded: $framesDecoded, Frames Received: $framesReceived, Frames Dropped: $framesDropped, Bytes Received: $bytesReceived, Packet Loss Fraction: $packetLossFraction, Timestamp: $timestamp" ) if (prevTimestamp != 0.0) { val timeElapsed = timestamp - prevTimestamp val framesDelta = framesDecoded - prevFramesDecoded val bytesDelta = bytesReceived - prevBytesReceived val framesReceivedDelta = framesReceived - prevFramesReceived val framesDroppedDelta = framesDropped - prevFramesDropped val frameRate = if (timeElapsed > 0) framesDelta / timeElapsed else 0.0 val bitrate = if (timeElapsed > 0) (bytesDelta * 8) / timeElapsed else 0.0 // bits per second val packetLoss = packetLossFraction * 100 // Convert to percentage Log.d(TAG, "Calculated Frame Rate: $frameRate fps, Bitrate: $bitrate bps, Packet Loss: $packetLoss%") // Determine stuttering based on thresholds val isStuttering = frameRate < 24 || packetLoss > 5.0 // Thresholds can be adjusted // Update states viewLifecycleOwner.lifecycleScope.launch { frameRateState.doubleValue = frameRate bitrateState.longValue = bitrate.toLong() // Update stuttering state stutteringState.value = isStuttering // Update specific stuttering causes frameRateLowState.value = frameRate < 24 packetLossHighState.value = packetLoss > 5.0 packetLossState.doubleValue = packetLoss // Update history frameRateHistory.add(frameRate.toFloat()) if (frameRateHistory.size > 60) { frameRateHistory.removeAt(0) } bitrateHistory.add(bitrate.toLong()) if (bitrateHistory.size > 60) { bitrateHistory.removeAt(0) } Log.d( TAG, "Updated Frame Rate: ${frameRateState.doubleValue} fps, Bitrate: ${bitrateState.longValue / 1000} kbps, Stuttering: $isStuttering" ) } } // Update previous values prevFramesDecoded = framesDecoded prevBytesReceived = bytesReceived prevFramesReceived = framesReceived prevFramesDropped = framesDropped prevTimestamp = timestamp } } } } override fun onDestroyView() { super.onDestroyView() statsJob?.cancel() socket.disconnect() localPeer?.dispose() remoteView?.release() remoteEglBase?.release() timeSyncJob?.cancel() } }

Explanation of Changes:

  1. Time Synchronization Handling:

    • Initiator (VideoInitiatorFragment):

      • Added handling for time_sync_response where it receives t1, t2, and t3.
      • Calculated RTT as t4 - t1.
      • Calculated OWD as t2 - t1.
      • Updated both RTT and OWD state variables and maintained their histories for max, min, and average calculations.
      • Added UI elements to display RTT and OWD metrics.
    • Receiver (VideoReceiverFragment):

      • Added handling for time_sync_request by replying with time_sync_response containing t1, t2, and t3.
      • This ensures that the initiator can calculate both RTT and OWD using the same request.
  2. UI Enhancements:

    • Initiator (VideoInitiatorFragment):

      • Added UI components within the WebRTCComposeLayout composable to display the latest, maximum, minimum, and average values for both RTT and OWD.
      • Utilized observeAsState() to reactively update the UI when state variables change.
    • Receiver (VideoReceiverFragment):

      • Optionally, you can also display latency metrics if needed. In this implementation, the receiver primarily handles incoming streams and may not require displaying RTT and OWD unless you want to monitor its own metrics.
  3. State Variables:

    • Introduced separate state variables for RTT and OWD in the initiator to track and display their respective metrics.
    • Maintained history lists (rttHistory and owdHistory) to calculate max, min, and average values.
  4. Logging:

    • Enhanced logging statements to provide detailed insights into the time synchronization process and the calculated RTT and OWD values.
  5. Permissions Handling:

    • Ensured that both fragments request necessary permissions and handle the user’s response appropriately.
  6. Peer Connection Management:

    • Both fragments maintain their peer connections, handle ICE candidates, and manage media streams appropriately.

Testing the Implementation:

  1. Set Up Signaling Server:

    • Ensure that your signaling server correctly handles the time_sync_request and time_sync_response messages.
    • The server should simply relay these messages between the initiator and receiver.
  2. Run the Application:

    • Start the initiator and receiver fragments.
    • Observe the logs to verify that time synchronization requests and responses are being sent and received correctly.
    • Check the UI to ensure that RTT and OWD metrics are being displayed and updated in real-time.
  3. Adjust Synchronization Interval:

    • The startTimeSync function in the initiator is set to send synchronization requests every 5 seconds (5000L). Adjust this interval as needed based on your application's requirements.
  4. Handle Edge Cases:

    • Ensure that the application gracefully handles scenarios where time synchronization responses are delayed or missing.
    • Implement retry mechanisms or timeouts if necessary.


By integrating a unified time synchronization mechanism, you can effectively monitor both RTT and OWD in your WebRTC application. This enhancement provides valuable insights into the network performance and helps in diagnosing potential latency issues. The UI modifications ensure that these metrics are readily available to users, facilitating better understanding and control over the video communication quality.


Unresolved reference: observeAsState




用户在Kotlin和Jetpack Compose中遇到错误 "Unresolved reference: observeAsState",因为代码中自定义状态持有器与标准函数不匹配。


首先,确保Jetpack Compose的依赖被正确添加。接着,确认 'observeAsState' 函数的正确导入路径。最后,验证使用的状态函数是否正确。


在确保latestRttState具有可观察性类型后,建议将mutableLongStateOf替换为mutableStateOf,并导入import androidx.compose.runtime.*


出现 "Unresolved reference: observeAsState" 错误,通常是因为缺少必要的导入或者使用了不适当的状态管理方法。在您提供的代码中,observeAsState 通常用于将 LiveDataFlow 转换为 Compose 的状态,而您使用的是 mutableLongStateOf,这可能不是标准的 Compose 函数。


1. 替换 mutableLongStateOfmutableStateOf

mutableLongStateOf 可能是自定义的函数,但在标准的 Jetpack Compose 中,我们使用 mutableStateOf 来创建可观察的状态。因此,请将所有 mutableLongStateOf 替换为 mutableStateOf


// 替换前 private val latestRttState = mutableLongStateOf(0L) // 替换后 private val latestRttState = mutableStateOf(0L)

2. 移除 observeAsState 并直接使用 by 关键字

observeAsState 通常用于 LiveDataFlow,而对于 MutableState,我们可以直接使用 by 关键字进行委托,从而简化代码。


// 替换前 val latestRtt by latestRttState.observeAsState() // 替换后 val latestRtt by latestRttState

3. 确保正确导入 Compose 的状态相关包

确保在文件顶部导入了必要的 Compose 运行时包:

import androidx.compose.runtime.*

4. 完整示例



@Composable fun WebRTCComposeLayout() { val context = LocalContext.current lateinit var peerConnectionFactory: PeerConnectionFactory var localVideoTrack: VideoTrack? by remember { mutableStateOf(null) } // 收集 RTT 和 OWD 状态 val latestRtt by latestRttState val maxRtt by maxRttState val minRtt by minRttState val averageRtt by averageRttState val latestOwd by latestOwdState val maxOwd by maxOwdState val minOwd by minOwdState val averageOwd by averageOwdState Surface(color = Color.Black) { Column(modifier = Modifier.fillMaxSize()) { // 本地视频视图 AndroidView( factory = { localView = SurfaceViewRenderer(it).apply { setZOrderMediaOverlay(false) } localView!! }, modifier = Modifier .weight(1f) .fillMaxWidth(), update = { if (localEglBase == null) { localEglBase = EglBase.create() it.init(localEglBase!!.eglBaseContext, null) it.setMirror(false) } } ) Spacer(modifier = Modifier.height(8.dp)) // 显示 RTT 和 OWD 指标 Column(modifier = Modifier.padding(16.dp)) { Text( text = "往返时延 (RTT): ${latestRtt ?: "N/A"} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最大 RTT: ${maxRtt ?: "N/A"} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最小 RTT: ${minRtt ?: "N/A"} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "平均 RTT: ${averageRtt ?: "N/A"} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 8.dp) ) Text( text = "单向时延 (OWD): ${latestOwd ?: "N/A"} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最大 OWD: ${maxOwd ?: "N/A"} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最小 OWD: ${minOwd ?: "N/A"} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "平均 OWD: ${averageOwd ?: "N/A"} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 8.dp) ) } // Spacer between video and charts Spacer(modifier = Modifier.height(8.dp)) } // 其他代码保持不变... } }

5. 检查依赖项

确保在您的 build.gradle 文件中包含了必要的 Compose 和 Lifecycle 依赖项。例如:

dependencies { implementation "androidx.compose.ui:ui:1.4.3" implementation "androidx.compose.material:material:1.4.3" implementation "androidx.compose.ui:ui-tooling-preview:1.4.3" implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.6.1" implementation "androidx.activity:activity-compose:1.7.2" // 其他依赖项... }

6. 总结

  • 替换状态函数:mutableLongStateOf 替换为 mutableStateOf
  • 移除 observeAsState 对于 MutableState 类型,直接使用 by 关键字进行委托,而无需使用 observeAsState
  • 添加必要导入: 确保导入了 androidx.compose.runtime.* 包。
  • 检查依赖项: 确保项目中包含了必要的 Compose 和 Lifecycle 相关依赖。

通过以上步骤,应该可以解决 "Unresolved reference: observeAsState" 的问题。如果在实施过程中遇到其他问题,请随时提供更多信息以便进一步协助。










正在修改函数代码,确保导入和变量都正确无误。举例说明 VideoInitiatorFragment.ktVideoReceiverFragment.kt 的具体修改。




出现 "Unresolved reference: observeAsState" 错误,通常是因为使用了不正确的状态管理方法或缺少必要的导入。在您的代码中,observeAsState 主要用于将 LiveDataFlow 转换为 Compose 的状态,而您使用的是 mutableLongStateOf,这可能不是标准的 Compose 函数。为了解决这个问题,我们需要进行以下修改:

  1. 替换 mutableLongStateOfmutableStateOf

    • mutableLongStateOf 可能是自定义的函数,但在标准的 Jetpack Compose 中,我们使用 mutableStateOf 来创建可观察的状态。
  2. 移除 observeAsState 并直接使用 by 关键字

    • 对于 MutableState 类型,可以直接使用 by 关键字进行委托,而无需使用 observeAsState
  3. 确保正确导入 Compose 的状态相关包

    • 确保在文件顶部导入了必要的 Compose 运行时包,如 import androidx.compose.runtime.*



package com.example.webrtcapp import android.Manifest import android.content.Context import import android.os.Bundle import android.util.Log import android.widget.Toast import androidx.activity.result.contract.ActivityResultContracts import import* import androidx.compose.material.MaterialTheme import androidx.compose.material.Surface import androidx.compose.material.Text import androidx.compose.runtime.* import androidx.compose.ui.Alignment import androidx.compose.ui.Modifier import import androidx.compose.ui.platform.ComposeView import androidx.compose.ui.platform.LocalContext import androidx.compose.ui.unit.dp import androidx.core.content.ContextCompat import import androidx.lifecycle.lifecycleScope import io.socket.client.IO import io.socket.client.Socket import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.launch import org.json.JSONObject import org.webrtc.* class VideoInitiatorFragment : Fragment() { companion object { fun newInstance( room: String, stunUrl: String, turnUrl: String, turnUsername: String, turnPassword: String, signalingServerUrl: String ): VideoInitiatorFragment { val fragment = VideoInitiatorFragment() val args = Bundle().apply { putString("room", room) putString("stunUrl", stunUrl) putString("turnUrl", turnUrl) putString("turnUsername", turnUsername) putString("turnPassword", turnPassword) putString("signalingServerUrl", signalingServerUrl) } fragment.arguments = args return fragment } } // 类成员变量 private lateinit var socket: Socket private var localPeer: PeerConnection? = null private var localView: SurfaceViewRenderer? = null private var localEglBase: EglBase? = null private val pendingIceCandidates = mutableListOf<IceCandidate>() private var currentRoom: String? = null private lateinit var signalingServerUrl: String private lateinit var stunUrl: String private lateinit var turnUrl: String private lateinit var turnUsername: String private lateinit var turnPassword: String private val TAG: String = "WebRTC-Initiator" // RTT 和 OWD 状态变量 private val maxRttState = mutableStateOf(0L) private val minRttState = mutableStateOf(Long.MAX_VALUE) private val averageRttState = mutableStateOf(0L) private val latestRttState = mutableStateOf(0L) private val rttHistory = mutableStateListOf<Long>() private val maxOwdState = mutableStateOf(0L) private val minOwdState = mutableStateOf(Long.MAX_VALUE) private val averageOwdState = mutableStateOf(0L) private val latestOwdState = mutableStateOf(0L) private val owdHistory = mutableStateListOf<Long>() private var statsJob: Job? = null override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // 从参数中获取数据 currentRoom = arguments?.getString("room") ?: "default-room" signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "" stunUrl = arguments?.getString("stunUrl") ?: "" turnUrl = arguments?.getString("turnUrl") ?: "" turnUsername = arguments?.getString("turnUsername") ?: "wstszx" turnPassword = arguments?.getString("turnPassword") ?: "930379" Log.d( TAG, "onCreate: Role = Initiator, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl" ) } private val requestPermissionsLauncher = registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions -> permissions.entries.forEach { (permission, isGranted) -> if (isGranted) { Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show() } else { Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show() } } onPermissionsChecked() } override fun onCreateView( inflater: android.view.LayoutInflater, container: android.view.ViewGroup?, savedInstanceState: Bundle? ): android.view.View { return ComposeView(requireContext()).apply { setContent { WebRTCComposeLayout() } } } @Composable fun WebRTCComposeLayout() { val context = LocalContext.current lateinit var peerConnectionFactory: PeerConnectionFactory var localVideoTrack: VideoTrack? by remember { mutableStateOf(null) } // 收集 RTT 和 OWD 状态 val latestRtt by latestRttState val maxRtt by maxRttState val minRtt by minRttState val averageRtt by averageRttState val latestOwd by latestOwdState val maxOwd by maxOwdState val minOwd by minOwdState val averageOwd by averageOwdState Surface(color = Color.Black) { Column(modifier = Modifier.fillMaxSize()) { // 本地视频视图 AndroidView( factory = { localView = SurfaceViewRenderer(it).apply { setZOrderMediaOverlay(false) } localView!! }, modifier = Modifier .weight(1f) .fillMaxWidth(), update = { if (localEglBase == null) { localEglBase = EglBase.create() it.init(localEglBase!!.eglBaseContext, null) it.setMirror(false) } } ) Spacer(modifier = Modifier.height(8.dp)) // 显示 RTT 和 OWD 指标 Column(modifier = Modifier.padding(16.dp)) { Text( text = "往返时延 (RTT): ${latestRtt} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最大 RTT: ${maxRtt} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最小 RTT: ${minRtt} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "平均 RTT: ${averageRtt} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 8.dp) ) Text( text = "单向时延 (OWD): ${latestOwd} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最大 OWD: ${maxOwd} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最小 OWD: ${minOwd} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "平均 OWD: ${averageOwd} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 8.dp) ) } // Spacer between video and charts Spacer(modifier = Modifier.height(8.dp)) } LaunchedEffect(Unit) { val options = PeerConnectionFactory.InitializationOptions.builder(context) .createInitializationOptions() PeerConnectionFactory.initialize(options) val encoderFactory = DefaultVideoEncoderFactory( localEglBase!!.eglBaseContext, true, true ) val decoderFactory = DefaultVideoDecoderFactory(localEglBase!!.eglBaseContext) peerConnectionFactory = PeerConnectionFactory.builder() .setVideoEncoderFactory(encoderFactory) .setVideoDecoderFactory(decoderFactory) .createPeerConnectionFactory() initLocalVideo(context, localView, peerConnectionFactory, localEglBase!!) { localVideoTrack = it } createPeerConnection( context, peerConnectionFactory, localVideoTrack ) { localPeer = it startStatsCollection() // 开始收集统计信息 } initializeSocketIO() requestPermissionsIfNeeded() startTimeSync() } } } private fun startTimeSync(intervalMs: Long = 5000L) { viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(intervalMs) // 每5秒同步一次 requestTimeSync() } } } private fun startStatsCollection() { statsJob = viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(1000) // 每秒收集一次统计信息 localPeer?.getStats { report -> parseStatsReport(report) } ?: Log.e(TAG, "获取统计信息失败: localPeer 为 null。") } } } private fun parseStatsReport(report: RTCStatsReport) { Log.d(TAG, "RTCStatsReport: ${report.statsMap}") for (stats in report.statsMap.values) { Log.d(TAG, "Stats 类型: ${stats.type}") if (stats.type == "transport") { Log.d(TAG, "找到 Transport Stats: $stats") val currentRtt = (stats.members["currentRoundTripTime"] as? Number)?.toDouble()?.times(1000)?.toLong() if (currentRtt != null && currentRtt > 0) { viewLifecycleOwner.lifecycleScope.launch { // 更新最新 RTT latestRttState.value = currentRtt // 更新 RTT 历史记录 rttHistory.add(currentRtt) if (rttHistory.size > 60) { rttHistory.removeAt(0) } // 计算最大、最小和平均 RTT val maxRtt = rttHistory.maxOrNull() ?: 0L val minRtt = rttHistory.minOrNull() ?: 0L val averageRtt = if (rttHistory.isNotEmpty()) { rttHistory.average().toLong() } else { 0L } maxRttState.value = maxRtt minRttState.value = minRtt averageRttState.value = averageRtt Log.d(TAG, "RTT - 最新: $currentRtt ms, 最大: $maxRtt ms, 最小: $minRtt ms, 平均: $averageRtt ms") } } } } } private fun initializeSocketIO() { val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http" val socketUrl = signalingServerUrl val options = IO.Options().apply { transports = arrayOf("websocket") secure = protocol == "https" path = "/" } try { socket = IO.socket(socketUrl, options) socket.on(Socket.EVENT_CONNECT) { Log.d(TAG, "Socket 已连接") socket.emit("join", currentRoom) Log.d(TAG, "加入房间: $currentRoom") } socket.on(Socket.EVENT_CONNECT_ERROR) { args -> if (args.isNotEmpty()) { val error = args[0] Log.e(TAG, "Socket 连接错误: $error") } } socket.on(Socket.EVENT_DISCONNECT) { args -> if (args.isNotEmpty()) { val reason = args[0] Log.d(TAG, "Socket 已断开: $reason") } } socket.on("signal") { args -> Log.d(TAG, "接收到信令: ${args[0]}") if (args.isNotEmpty() && args[0] is JSONObject) { val data = args[0] as JSONObject handleSignalingData(data) } } socket.connect() Log.d(TAG, "正在连接到 Socket: $socketUrl...") } catch (e: Exception) { Log.e(TAG, "连接到 Socket 时出错: ${e.message}") } } private fun requestPermissionsIfNeeded() { val permissions = arrayOf( Manifest.permission.CAMERA, Manifest.permission.RECORD_AUDIO, Manifest.permission.INTERNET, Manifest.permission.ACCESS_NETWORK_STATE ) val permissionsToRequest = permissions.filter { ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED } if (permissionsToRequest.isNotEmpty()) { requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray()) } else { onPermissionsChecked() } } private fun onPermissionsChecked() { Toast.makeText(requireContext(), "所有必要权限已被授予", Toast.LENGTH_SHORT).show() } private fun initLocalVideo( context: Context, localView: SurfaceViewRenderer?, peerConnectionFactory: PeerConnectionFactory, eglBase: EglBase, onLocalVideoTrack: (VideoTrack) -> Unit ) { val videoCapturer = createCameraCapturer(context) val surfaceTextureHelper = SurfaceTextureHelper.create("CaptureThread", eglBase.eglBaseContext) val videoSource = peerConnectionFactory.createVideoSource(videoCapturer.isScreencast) videoCapturer.initialize(surfaceTextureHelper, context, videoSource.capturerObserver) videoCapturer.startCapture(1920, 1080, 60) val localVideoTrack = peerConnectionFactory.createVideoTrack("video_track", videoSource) localVideoTrack.addSink(localView) val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints()) val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource) // 将音频和视频轨道添加到本地流 val mediaStream = peerConnectionFactory.createLocalMediaStream("local_stream") mediaStream.addTrack(localAudioTrack) mediaStream.addTrack(localVideoTrack) onLocalVideoTrack(localVideoTrack) } private fun createCameraCapturer(context: Context): CameraVideoCapturer { val camera2Enumerator = Camera2Enumerator(context) val deviceNames = camera2Enumerator.deviceNames // 优先选择后置摄像头 for (deviceName in deviceNames) { if (camera2Enumerator.isBackFacing(deviceName)) { val capturer = camera2Enumerator.createCapturer(deviceName, null) if (capturer != null) { return capturer } } } // 如果没有后置摄像头,选择前置摄像头 for (deviceName in deviceNames) { if (camera2Enumerator.isFrontFacing(deviceName)) { val capturer = camera2Enumerator.createCapturer(deviceName, null) if (capturer != null) { return capturer } } } // 如果没有前置摄像头,则使用第一个可用摄像头 return camera2Enumerator.createCapturer(deviceNames[0], null) ?: throw IllegalStateException("无法创建摄像头捕捉器") } private fun createPeerConnection( context: Context, peerConnectionFactory: PeerConnectionFactory, localVideoTrack: VideoTrack?, onLocalPeerCreated: (PeerConnection) -> Unit ) { val iceServers = listOf( PeerConnection.IceServer.builder(stunUrl).createIceServer(), PeerConnection.IceServer.builder(turnUrl) .setUsername(turnUsername) .setPassword(turnPassword) .createIceServer() ) val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply { bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN } localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer { override fun onIceCandidate(iceCandidate: IceCandidate?) { iceCandidate?.let { Log.d(TAG, "ICE candidate: $it") val signalData = JSONObject().apply { put("type", "ice") put("candidate", JSONObject().apply { put("sdpMid", it.sdpMid) put("sdpMLineIndex", it.sdpMLineIndex) put("candidate", it.sdp) }) put("room", currentRoom) } socket.emit("signal", signalData) } } override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) { Log.d(TAG, "ICE candidates removed") } override fun onSignalingChange(newState: PeerConnection.SignalingState?) { Log.d(TAG, "Signaling state changed to: $newState") } override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) { Log.d(TAG, "ICE connection state changed to: $newState") } override fun onIceConnectionReceivingChange(receiving: Boolean) { Log.d(TAG, "ICE connection receiving change: $receiving") } override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) { Log.d(TAG, "ICE gathering state changed to: $newState") } override fun onAddStream(stream: MediaStream?) { Log.d(TAG, "Stream added") } override fun onRemoveStream(stream: MediaStream?) { Log.d(TAG, "Stream removed") } override fun onDataChannel(dataChannel: DataChannel?) { Log.d(TAG, "Data channel created") } override fun onRenegotiationNeeded() { Log.d(TAG, "Renegotiation needed") } override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) { Log.d(TAG, "Track added") } override fun onTrack(transceiver: RtpTransceiver?) { Log.d(TAG, "onTrack called") } override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) { Log.d(TAG, "Connection state changed to: $newState") } }) localVideoTrack?.let { localPeer?.addTrack(it, listOf("local_stream")) } val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints()) val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource) localPeer?.addTrack(localAudioTrack, listOf("local_stream")) onLocalPeerCreated(localPeer!!) } private fun createAnswer(peerConnection: PeerConnection, onAnswerCreated: (String) -> Unit) { Log.d(TAG, "Creating answer...") val constraints = MediaConstraints().apply { mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true")) mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true")) } peerConnection.createAnswer(object : SdpObserver { override fun onCreateSuccess(sessionDescription: SessionDescription?) { sessionDescription?.let { sdp -> peerConnection.setLocalDescription(object : SdpObserver { override fun onSetSuccess() { Log.d(TAG, "SetLocalDescription onSetSuccess") onAnswerCreated(sdp.description) } override fun onSetFailure(error: String?) { Log.e(TAG, "SetLocalDescription onSetFailure: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } } override fun onSetSuccess() { Log.d(TAG, "createAnswer onSetSuccess") } override fun onCreateFailure(error: String?) { Log.e(TAG, "createAnswer onCreateFailure: $error") } override fun onSetFailure(error: String?) {} }, constraints) } private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "offer" -> { Log.d(TAG, "Received offer") val sdp = SessionDescription( SessionDescription.Type.OFFER, data.getJSONObject("sdp").getString("sdp") ) localPeer?.setRemoteDescription(object : SdpObserver { override fun onSetSuccess() { Log.d(TAG, "Set remote description (offer) success") createAnswer(localPeer!!) { answer -> val signalData = JSONObject().apply { put("type", "answer") put("sdp", JSONObject().put("sdp", answer)) put("room", currentRoom) } socket.emit("signal", signalData) pendingIceCandidates.forEach { candidate -> localPeer?.addIceCandidate(candidate) } pendingIceCandidates.clear() } } override fun onSetFailure(error: String?) { Log.e(TAG, "Set remote description (offer) error: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } "ice" -> { Log.d(TAG, "Received ICE candidate") val candidateData = data.getJSONObject("candidate") val candidate = IceCandidate( candidateData.getString("sdpMid"), candidateData.getInt("sdpMLineIndex"), candidateData.getString("candidate") ) if (localPeer?.remoteDescription != null) { localPeer?.addIceCandidate(candidate) } else { pendingIceCandidates.add(candidate) } } "time_sync_response" -> { Log.d(TAG, "Received time_sync_response") val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.currentTimeMillis() val rtt = t4 - t1 val owd = t2 - t1 // 从发起者到接收者的单向时延 Log.d(TAG, "时间同步: RTT=$rtt ms, OWD=$owd ms") viewLifecycleOwner.lifecycleScope.launch { // 更新 RTT 状态 latestRttState.value = rtt // 更新 RTT 历史记录 rttHistory.add(rtt) if (rttHistory.size > 60) { rttHistory.removeAt(0) } // 计算最大、最小和平均 RTT val maxRtt = rttHistory.maxOrNull() ?: 0L val minRtt = rttHistory.minOrNull() ?: 0L val averageRtt = if (rttHistory.isNotEmpty()) { rttHistory.average().toLong() } else { 0L } maxRttState.value = maxRtt minRttState.value = minRtt averageRttState.value = averageRtt // 更新 OWD 状态 latestOwdState.value = owd // 更新 OWD 历史记录 owdHistory.add(owd) if (owdHistory.size > 60) { owdHistory.removeAt(0) } // 计算最大、最小和平均 OWD val maxOwd = owdHistory.maxOrNull() ?: 0L val minOwd = owdHistory.minOrNull() ?: 0L val averageOwd = if (owdHistory.isNotEmpty()) { owdHistory.average().toLong() } else { 0L } maxOwdState.value = maxOwd minOwdState.value = minOwd averageOwdState.value = averageOwd Log.d(TAG, "RTT - 最新: $rtt ms, 最大: $maxRtt ms, 最小: $minRtt ms, 平均: $averageRtt ms") Log.d(TAG, "OWD - 最新: $owd ms, 最大: $maxOwd ms, 最小: $minOwd ms, 平均: $averageOwd ms") } } else -> { Log.e(TAG, "未知的信令类型: ${data.getString("type")}") } } } private fun requestTimeSync() { val t1 = System.currentTimeMillis() val syncRequest = JSONObject().apply { put("type", "time_sync_request") put("room", currentRoom) put("t1", t1) } socket.emit("signal", syncRequest) Log.d(TAG, "发送时间同步请求,t1: $t1") } override fun onDestroyView() { super.onDestroyView() statsJob?.cancel() socket.disconnect() localPeer?.dispose() localView?.release() localEglBase?.release() } }


package com.example.webrtcapp import android.Manifest import android.content.Context import import android.os.Bundle import android.util.Log import android.widget.Toast import androidx.activity.result.contract.ActivityResultContracts import import* import androidx.compose.material.* import androidx.compose.material.icons.Icons import androidx.compose.material.icons.filled.CheckCircle import androidx.compose.material.icons.filled.Warning import androidx.compose.runtime.* import androidx.compose.ui.Alignment import androidx.compose.ui.Modifier import import androidx.compose.ui.platform.ComposeView import androidx.compose.ui.platform.LocalContext import androidx.compose.ui.unit.dp import androidx.core.content.ContextCompat import import androidx.lifecycle.lifecycleScope import io.socket.client.IO import io.socket.client.Socket import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.launch import org.json.JSONObject import org.webrtc.* class VideoReceiverFragment : Fragment() { companion object { fun newInstance( room: String, stunUrl: String, turnUrl: String, turnUsername: String, turnPassword: String, signalingServerUrl: String ): VideoReceiverFragment { val fragment = VideoReceiverFragment() val args = Bundle().apply { putString("room", room) putString("stunUrl", stunUrl) putString("turnUrl", turnUrl) putString("turnUsername", turnUsername) putString("turnPassword", turnPassword) putString("signalingServerUrl", signalingServerUrl) } fragment.arguments = args return fragment } } // 类成员变量 private lateinit var socket: Socket private var localPeer: PeerConnection? = null private var remoteView: SurfaceViewRenderer? = null private var remoteEglBase: EglBase? = null private val pendingIceCandidates = mutableListOf<IceCandidate>() private var currentRoom: String? = null private lateinit var signalingServerUrl: String private lateinit var stunUrl: String private lateinit var turnUrl: String private lateinit var turnUsername: String private lateinit var turnPassword: String private val TAG: String = "WebRTC-Receiver" private val frameRateState = mutableStateOf(0.0) private val bitrateState = mutableStateOf(0L) private val stutteringState = mutableStateOf(false) private val frameRateLowState = mutableStateOf(false) private val packetLossHighState = mutableStateOf(false) private val packetLossState = mutableStateOf(0.0) private val frameRateHistory = mutableStateListOf<Float>() private val bitrateHistory = mutableStateListOf<Long>() private var timeSyncJob: Job? = null // 添加历史记录列表用于存储所有单向时延值 private val latencyHistory = mutableStateListOf<Long>() // 添加状态变量用于存储最大、最小和平均单向时延 private val maxLatencyState = mutableStateOf(0L) private val minLatencyState = mutableStateOf(Long.MAX_VALUE) private val averageLatencyState = mutableStateOf(0L) private val latestLatencyState = mutableStateOf(0L) // 历史变量 private var prevFramesDecoded = 0.0 private var prevBytesReceived = 0.0 private var prevFramesReceived = 0.0 private var prevFramesDropped = 0.0 private var prevTimestamp = 0.0 private var timeOffset: Long = 0 private var t1: Long = 0 private var statsJob: Job? = null override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // 从参数中获取数据 currentRoom = arguments?.getString("room") ?: "default-room" signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "" stunUrl = arguments?.getString("stunUrl") ?: "" turnUrl = arguments?.getString("turnUrl") ?: "" turnUsername = arguments?.getString("turnUsername") ?: "wstszx" turnPassword = arguments?.getString("turnPassword") ?: "930379" Log.d( TAG, "onCreate: Role = Client, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl" ) } private val requestPermissionsLauncher = registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions -> permissions.entries.forEach { (permission, isGranted) -> if (isGranted) { Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show() } else { Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show() } } onPermissionsChecked() } override fun onCreateView( inflater: android.view.LayoutInflater, container: android.view.ViewGroup?, savedInstanceState: Bundle? ): android.view.View { return ComposeView(requireContext()).apply { setContent { WebRTCComposeLayout() } } } @Composable fun WebRTCComposeLayout() { val context = LocalContext.current lateinit var peerConnectionFactory: PeerConnectionFactory // 收集延迟状态 val latestLatency by latestLatencyState val maxLatency by maxLatencyState val minLatency by minLatencyState val averageLatency by averageLatencyState Surface(color = Color.Black) { Column(modifier = Modifier.fillMaxSize()) { // 远程视频视图 AndroidView( factory = { remoteView = SurfaceViewRenderer(it).apply { setZOrderMediaOverlay(false) } remoteView!! }, modifier = Modifier .weight(1f) .fillMaxWidth(), update = { if (remoteEglBase?.eglBaseContext == null) { remoteEglBase = EglBase.create() it.init(remoteEglBase!!.eglBaseContext, null) it.setMirror(false) } } ) Spacer(modifier = Modifier.height(8.dp)) // 展示 RTT 和 OWD 指标 Column(modifier = Modifier.padding(16.dp)) { Text( text = "当前单向时延: ${latestLatency} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最大单向时延: ${maxLatency} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "最小单向时延: ${minLatency} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Text( text = "平均单向时延: ${averageLatency} ms", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 8.dp) ) } // Spacer between video and charts Spacer(modifier = Modifier.height(8.dp)) // 帧率部分 Column( modifier = Modifier .fillMaxWidth() .padding(horizontal = 8.dp) ) { // 帧率文本 Text( text = "帧率: ${frameRateState.value.roundToInt()} fps", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) Log.d( TAG, "UI - 帧率: ${frameRateState.value} fps, 码率: ${bitrateState.value / 1000} kbps" ) // 帧率折线图 LineChart( data = frameRateHistory, modifier = Modifier .height(200.dp) .fillMaxWidth() .padding(vertical = 8.dp), lineColor = Color.Green, backgroundColor = Color.Black, yAxisLabel = "帧率 (fps)", xAxisLabel = "时间 (秒)" ) } // Spacer between charts Spacer(modifier = Modifier.height(8.dp)) // 码率部分 Column( modifier = Modifier .fillMaxWidth() .padding(horizontal = 16.dp) ) { // 码率文本 Text( text = "码率: ${bitrateState.value / 1000} kbps", color = Color.White, style = MaterialTheme.typography.bodyMedium, modifier = Modifier.padding(bottom = 4.dp) ) // 码率折线图 LineChart( data = { it / 1000f }, // 转换为 kbps modifier = Modifier .height(200.dp) .fillMaxWidth() .padding(vertical = 8.dp), lineColor = Color.Blue, backgroundColor = Color.Black, yAxisLabel = "码率 (kbps)", xAxisLabel = "时间 (秒)" ) } // Spacer between metrics and stuttering indicator Spacer(modifier = Modifier.height(16.dp)) // 卡顿指示器 Row( modifier = Modifier .fillMaxWidth() .padding(horizontal = 16.dp), verticalAlignment = Alignment.CenterVertically ) { if (stutteringState.value) { Icon( imageVector = Icons.Default.Warning, contentDescription = "卡顿警告", tint = Color.Red, modifier = Modifier.size(24.dp) ) Spacer(modifier = Modifier.width(8.dp)) Column { Text( text = "视频播放出现卡顿", color = Color.Red, style = MaterialTheme.typography.bodyMedium ) // 显示具体的异常指标 if (frameRateLowState.value) { Text( text = "帧率过低: ${frameRateState.value.roundToInt()} fps", color = Color.Red, style = MaterialTheme.typography.bodySmall ) } if (packetLossHighState.value) { Text( text = "包丢失率过高: ${packetLossState.value.roundToInt()}%", color = Color.Red, style = MaterialTheme.typography.bodySmall ) } } } else { Icon( imageVector = Icons.Default.CheckCircle, contentDescription = "正常", tint = Color.Green, modifier = Modifier.size(24.dp) ) Spacer(modifier = Modifier.width(8.dp)) Text( text = "视频播放正常", color = Color.Green, style = MaterialTheme.typography.bodyMedium ) } } } LaunchedEffect(Unit) { val options = PeerConnectionFactory.InitializationOptions.builder(context) .createInitializationOptions() PeerConnectionFactory.initialize(options) val encoderFactory = DefaultVideoEncoderFactory( EglBase.create().eglBaseContext, true, true ) val decoderFactory = DefaultVideoDecoderFactory(remoteEglBase!!.eglBaseContext) peerConnectionFactory = PeerConnectionFactory.builder() .setVideoEncoderFactory(encoderFactory) .setVideoDecoderFactory(decoderFactory) .createPeerConnectionFactory() createPeerConnection( context, peerConnectionFactory, remoteView!! ) { localPeer = it } initializeSocketIO() requestPermissionsIfNeeded() } } } /** * 增强的折线图 Composable,带有坐标轴和标签 * 修改为使用连接每个点的直线 */ @Composable fun LineChart( data: List<Float>, modifier: Modifier = Modifier, lineColor: Color = Color.Green, backgroundColor: Color = Color.Black, yAxisLabel: String = "", xAxisLabel: String = "", minYValue: Float? = null, maxYValue: Float? = null ) { Canvas(modifier = modifier.background(backgroundColor)) { val padding = 40.dp.toPx() // 坐标轴和标签的填充 if (data.isEmpty()) return@Canvas val maxY = maxYValue ?: data.maxOrNull() ?: 1f val minY = minYValue ?: data.minOrNull() ?: 0f val yRange = maxY - minY val pointCount = data.size val spacing = (size.width - padding * 2) / (pointCount - 1).coerceAtLeast(1) val points = data.mapIndexed { index, value -> val x = padding + index * spacing val y = if (yRange == 0f) size.height / 2 else (size.height - padding) - ((value - minY) / yRange) * (size.height - padding * 2) Offset(x, y) } // 绘制坐标轴 drawLine( color = Color.White, start = Offset(padding, padding), end = Offset(padding, size.height - padding), strokeWidth = 2f ) drawLine( color = Color.White, start = Offset(padding, size.height - padding), end = Offset(size.width - padding, size.height - padding), strokeWidth = 2f ) // 绘制 Y 轴标签 val yLabelCount = 5 val yStep = yRange / (yLabelCount - 1) for (i in 0 until yLabelCount) { val yValue = minY + i * yStep val yPos = (size.height - padding) - ((yValue - minY) / yRange) * (size.height - padding * 2) drawContext.canvas.nativeCanvas.apply { val label = yValue.roundToInt().toString() val textPaint = { color = textSize = 24f textAlign = } drawText( label, padding - 8f, yPos + textPaint.textSize / 2, textPaint ) } } // 绘制 X 轴标签 val xLabelCount = 5 val xStep = (pointCount - 1).coerceAtLeast(1) / (xLabelCount - 1).coerceAtLeast(1) for (i in 0 until xLabelCount) { val index = i * xStep val xPos = padding + index * spacing drawContext.canvas.nativeCanvas.apply { val label = index.toString() val textPaint = { color = textSize = 24f textAlign = } drawText( label, xPos, size.height - padding + textPaint.textSize + 4f, textPaint ) } } // 可选地,绘制坐标轴标签 // Y 轴标签 if (yAxisLabel.isNotEmpty()) { drawContext.canvas.nativeCanvas.apply { val textPaint = { color = textSize = 24f textAlign = isAntiAlias = true } // 旋转垂直文本 save() rotate(-90f, padding / 2, size.height / 2) drawText( yAxisLabel, padding / 2, size.height / 2, textPaint ) restore() } } // X 轴标签 if (xAxisLabel.isNotEmpty()) { drawContext.canvas.nativeCanvas.apply { val textPaint = { color = textSize = 24f textAlign = isAntiAlias = true } drawText( xAxisLabel, size.width / 2, size.height - padding / 2, textPaint ) } } // 绘制连接点的直线 if (points.size >= 2) { for (i in 0 until points.size - 1) { drawLine( color = lineColor, start = points[i], end = points[i + 1], strokeWidth = 4f, cap = StrokeCap.Round ) } } // 可选地,绘制点 points.forEach { point -> drawCircle( color = lineColor, radius = 4f, center = point ) } } } private fun initializeSocketIO() { val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http" val socketUrl = signalingServerUrl val options = IO.Options().apply { transports = arrayOf("websocket") secure = protocol == "https" path = "/" } try { socket = IO.socket(socketUrl, options) socket.on(Socket.EVENT_CONNECT) { Log.d(TAG, "Socket 已连接") socket.emit("join", currentRoom) Log.d(TAG, "加入房间: $currentRoom") initiateCall() } socket.on(Socket.EVENT_CONNECT_ERROR) { args -> if (args.isNotEmpty()) { val error = args[0] Log.e(TAG, "Socket 连接错误: $error") } } socket.on(Socket.EVENT_DISCONNECT) { args -> if (args.isNotEmpty()) { val reason = args[0] Log.d(TAG, "Socket 已断开: $reason") } } socket.on("signal") { args -> Log.d(TAG, "接收到信令: ${args[0]}") if (args.isNotEmpty() && args[0] is JSONObject) { val data = args[0] as JSONObject handleSignalingData(data) } } socket.connect() Log.d(TAG, "正在连接到 Socket: $socketUrl...") } catch (e: Exception) { Log.e(TAG, "连接到 Socket 时出错: ${e.message}") } } private fun requestPermissionsIfNeeded() { val permissions = arrayOf( Manifest.permission.CAMERA, Manifest.permission.RECORD_AUDIO, Manifest.permission.INTERNET, Manifest.permission.ACCESS_NETWORK_STATE ) val permissionsToRequest = permissions.filter { ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED } if (permissionsToRequest.isNotEmpty()) { requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray()) } else { onPermissionsChecked() } } private fun onPermissionsChecked() { Toast.makeText(requireContext(), "所有必要的权限已授予", Toast.LENGTH_SHORT).show() } private fun createPeerConnection( context: Context, peerConnectionFactory: PeerConnectionFactory, remoteView: SurfaceViewRenderer, onLocalPeerCreated: (PeerConnection) -> Unit ) { val iceServers = listOf( PeerConnection.IceServer.builder(stunUrl).createIceServer(), PeerConnection.IceServer.builder(turnUrl) .setUsername(turnUsername) .setPassword(turnPassword) .createIceServer() ) val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply { bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN } localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer { override fun onIceCandidate(iceCandidate: IceCandidate?) { iceCandidate?.let { Log.d(TAG, "ICE candidate: $it") val signalData = JSONObject().apply { put("type", "ice") put("candidate", JSONObject().apply { put("sdpMid", it.sdpMid) put("sdpMLineIndex", it.sdpMLineIndex) put("candidate", it.sdp) }) put("room", currentRoom) } socket.emit("signal", signalData) } } override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) { Log.d(TAG, "ICE candidates removed") } override fun onSignalingChange(newState: PeerConnection.SignalingState?) { Log.d(TAG, "Signaling state changed to: $newState") } override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) { Log.d(TAG, "ICE connection state changed to: $newState") } override fun onIceConnectionReceivingChange(receiving: Boolean) { Log.d(TAG, "ICE connection receiving change: $receiving") } override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) { Log.d(TAG, "ICE gathering state changed to: $newState") } override fun onAddStream(stream: MediaStream?) { Log.d(TAG, "Stream added") } override fun onRemoveStream(stream: MediaStream?) { Log.d(TAG, "Stream removed") } override fun onDataChannel(dataChannel: DataChannel?) { Log.d(TAG, "Data channel created") } override fun onRenegotiationNeeded() { Log.d(TAG, "Renegotiation needed") } override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) { Log.d(TAG, "Track added") } override fun onTrack(transceiver: RtpTransceiver?) { Log.d(TAG, "onTrack called") } override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) { Log.d(TAG, "Connection state changed to: $newState") } }) onLocalPeerCreated(localPeer!!) startStatsCollection() } private fun initiateCall() { Log.d(TAG, "Initiating call...") val constraints = MediaConstraints().apply { mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true")) mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true")) } localPeer?.createOffer(object : SdpObserver { override fun onCreateSuccess(sessionDescription: SessionDescription?) { sessionDescription?.let { sdp -> localPeer?.setLocalDescription(object : SdpObserver { override fun onSetSuccess() { val signalData = JSONObject().apply { put("type", "offer") put("sdp", JSONObject().put("sdp", sdp.description)) put("room", currentRoom) } socket.emit("signal", signalData) } override fun onSetFailure(error: String?) { Log.e(TAG, "Set local description error: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } } override fun onSetSuccess() {} override fun onCreateFailure(error: String?) { Log.e(TAG, "Create offer error: $error") } override fun onSetFailure(error: String?) {} }, constraints) } private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "answer" -> { Log.d(TAG, "Received answer") val sdp = SessionDescription( SessionDescription.Type.ANSWER, data.getJSONObject("sdp").getString("sdp") ) localPeer?.setRemoteDescription(object : SdpObserver { override fun onSetSuccess() { pendingIceCandidates.forEach { candidate -> localPeer?.addIceCandidate(candidate) } pendingIceCandidates.clear() Log.d(TAG, "Set remote description (answer) success") } override fun onSetFailure(error: String?) { Log.e(TAG, "Set remote description error: $error") } override fun onCreateSuccess(p0: SessionDescription?) {} override fun onCreateFailure(p0: String?) {} }, sdp) } "ice" -> { Log.d(TAG, "Received ICE candidate") val candidateData = data.getJSONObject("candidate") val candidate = IceCandidate( candidateData.getString("sdpMid"), candidateData.getInt("sdpMLineIndex"), candidateData.getString("candidate") ) if (localPeer?.remoteDescription != null) { localPeer?.addIceCandidate(candidate) } else { pendingIceCandidates.add(candidate) } } "time_sync_request" -> { Log.d(TAG, "Received time_sync_request") val t1 = data.getLong("t1") val t2 = System.currentTimeMillis() val t3 = System.currentTimeMillis() val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("t1", t1) put("t2", t2) put("t3", t3) put("room", currentRoom) } socket.emit("signal", syncResponse) Log.d(TAG, "回复 time_sync_response: t1=$t1, t2=$t2, t3=$t3") } else -> { Log.e(TAG, "未知的信令类型: ${data.getString("type")}") } } } private fun startStatsCollection() { statsJob = viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(1000) // 每秒收集一次统计信息 localPeer?.getStats { report -> parseStatsReport(report) } ?: Log.e(TAG, "获取统计信息失败: localPeer 为 null。") } } } private fun parseStatsReport(report: RTCStatsReport) { Log.d(TAG, "收到 RTCStatsReport: $report") for (stats in report.statsMap.values) { if (stats.type == "inbound-rtp") { val kind = stats.members["kind"] as? String if (kind == "video") { val framesDecoded = (stats.members["framesDecoded"] as? Number)?.toDouble() ?: 0.0 val framesReceived = (stats.members["framesReceived"] as? Number)?.toDouble() ?: 0.0 val framesDropped = (stats.members["framesDropped"] as? Number)?.toDouble() ?: 0.0 val bytesReceived = (stats.members["bytesReceived"] as? Number)?.toDouble() ?: 0.0 val packetsLost = (stats.members["packetsLost"] as? Number)?.toDouble() ?: 0.0 val packetsReceived = (stats.members["packetsReceived"] as? Number)?.toDouble() ?: 1.0 // 避免除以零 val packetLossFraction = packetsLost / (packetsLost + packetsReceived) val timestamp = stats.timestampUs / 1_000_000.0 // 转换为秒 Log.d( TAG, "Stats - 解码帧数: $framesDecoded, 接收帧数: $framesReceived, 丢弃帧数: $framesDropped, 接收字节数: $bytesReceived, 丢包率: $packetLossFraction, 时间戳: $timestamp" ) if (prevTimestamp != 0.0) { val timeElapsed = timestamp - prevTimestamp val framesDelta = framesDecoded - prevFramesDecoded val bytesDelta = bytesReceived - prevBytesReceived val framesReceivedDelta = framesReceived - prevFramesReceived val framesDroppedDelta = framesDropped - prevFramesDropped val frameRate = if (timeElapsed > 0) framesDelta / timeElapsed else 0.0 val bitrate = if (timeElapsed > 0) (bytesDelta * 8) / timeElapsed else 0.0 // 比特每秒 val packetLoss = packetLossFraction * 100 // 转换为百分比 Log.d(TAG, "计算出的帧率: $frameRate fps, 码率: $bitrate bps, 丢包率: $packetLoss%") // 根据阈值判断是否卡顿 val isStuttering = frameRate < 24 || packetLoss > 5.0 // 阈值可调整 // 更新状态 viewLifecycleOwner.lifecycleScope.launch { frameRateState.value = frameRate bitrateState.value = bitrate.toLong() // 更新卡顿状态 stutteringState.value = isStuttering // 更新具体的卡顿原因 frameRateLowState.value = frameRate < 24 packetLossHighState.value = packetLoss > 5.0 packetLossState.value = packetLoss // 更新历史记录 frameRateHistory.add(frameRate.toFloat()) if (frameRateHistory.size > 60) { frameRateHistory.removeAt(0) } bitrateHistory.add(bitrate.toLong()) if (bitrateHistory.size > 60) { bitrateHistory.removeAt(0) } Log.d( TAG, "更新后的帧率: ${frameRateState.value} fps, 码率: ${bitrateState.value / 1000} kbps, 卡顿: $isStuttering" ) } } // 更新之前的值 prevFramesDecoded = framesDecoded prevBytesReceived = bytesReceived prevFramesReceived = framesReceived prevFramesDropped = framesDropped prevTimestamp = timestamp } } } } override fun onDestroyView() { super.onDestroyView() statsJob?.cancel() socket.disconnect() localPeer?.dispose() remoteView?.release() remoteEglBase?.release() timeSyncJob?.cancel() } }


  1. 替换 mutableLongStateOfmutableStateOf

    • VideoInitiatorFragmentVideoReceiverFragment 中,将所有使用 mutableLongStateOf 和类似的自定义状态函数替换为标准的 mutableStateOf


    // 替换前 private val latestRttState = mutableLongStateOf(0L) // 替换后 private val latestRttState = mutableStateOf(0L)
  2. 移除 observeAsState 并直接使用 by 关键字

    • 对于 MutableState 类型的变量,直接使用 by 关键字进行委托,而无需使用 observeAsState


    // 替换前 val latestRtt by latestRttState.observeAsState() // 替换后 val latestRtt by latestRttState
  3. 确保正确导入 Compose 的状态相关包

    • 在文件顶部添加必要的 Compose 运行时导入:
    import androidx.compose.runtime.*
  4. 更新 Compose 组件中的状态变量使用

    • 在 Compose 函数中,直接使用 by 关键字访问状态变量,而不再使用 observeAsState


    @Composable fun WebRTCComposeLayout() { // ... val latestRtt by latestRttState // ... }
  5. 检查依赖项

    • 确保您的 build.gradle 文件中包含了必要的 Compose 和 Lifecycle 相关依赖项。例如:
    dependencies { implementation "androidx.compose.ui:ui:1.4.3" implementation "androidx.compose.material:material:1.4.3" implementation "androidx.compose.ui:ui-tooling-preview:1.4.3" implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.6.1" implementation "androidx.activity:activity-compose:1.7.2" // 其他依赖项... }


通过上述修改,您可以解决 "Unresolved reference: observeAsState" 的问题,并确保您的 VideoInitiatorFragmentVideoReceiverFragment 正确使用 Jetpack Compose 的状态管理机制。这些修改不仅解决了编译错误,还优化了状态管理,使代码更加简洁和高效。如果在实施过程中遇到其他问题,请随时提供更多信息以便进一步协助。

