class VideoInitiatorFragment : Fragment() {
companion object {
fun newInstance(
room: String,
stunUrl: String,
turnUrl: String,
turnUsername: String,
turnPassword: String,
signalingServerUrl: String
): VideoInitiatorFragment {
val fragment = VideoInitiatorFragment()
val args = Bundle().apply {
putString("room", room)
putString("stunUrl", stunUrl)
putString("turnUrl", turnUrl)
putString("turnUsername", turnUsername)
putString("turnPassword", turnPassword)
putString("signalingServerUrl", signalingServerUrl)
}
fragment.arguments = args
return fragment
}
}
// 类成员变量保持不变
private lateinit var socket: Socket
private var localPeer: PeerConnection? = null
private var localView: SurfaceViewRenderer? = null
private var localEglBase: EglBase? = null
private val pendingIceCandidates = mutableListOf<IceCandidate>()
private var currentRoom: String? = null
private lateinit var signalingServerUrl: String
private lateinit var stunUrl: String
private lateinit var turnUrl: String
private lateinit var turnUsername: String
private lateinit var turnPassword: String
private val TAG: String = "WebRTC-Initiator"
// RTT State Variables
private val maxRttState = mutableStateOf(0L)
private val minRttState = mutableStateOf(Long.MAX_VALUE)
private val averageRttState = mutableStateOf(0L)
private val latestRttState = mutableStateOf(0L)
// History list to compute average RTT
private val rttHistory = mutableStateListOf<Long>()
// Job for stats collection
private var statsJob: Job? = null
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
// 从参数中获取数据
currentRoom = arguments?.getString("room") ?: "default-room"
signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg"
stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478"
turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349"
turnUsername = arguments?.getString("turnUsername") ?: "wstszx"
turnPassword = arguments?.getString("turnPassword") ?: "930379"
Log.d(
TAG,
"onCreate: 角色 = 服务器, 房间 = $currentRoom, 信令服务器 = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl"
)
}
private val requestPermissionsLauncher =
registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions ->
permissions.entries.forEach { (permission, isGranted) ->
if (isGranted) {
Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show()
} else {
Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show()
}
}
onPermissionsChecked()
}
override fun onCreateView(
inflater: android.view.LayoutInflater,
container: android.view.ViewGroup?,
savedInstanceState: Bundle?
): android.view.View {
return ComposeView(requireContext()).apply {
setContent {
WebRTCComposeLayout()
}
}
}
@Composable
fun WebRTCComposeLayout() {
val context = LocalContext.current
lateinit var peerConnectionFactory: PeerConnectionFactory
var localVideoTrack: VideoTrack? by remember { mutableStateOf(null) }
Surface(color = Color.Black) {
Column(modifier = Modifier.fillMaxSize()) {
// 本地视频视图,填充整个屏幕
AndroidView(
factory = {
localView = SurfaceViewRenderer(it).apply {
setZOrderMediaOverlay(false)
}
localView!!
},
modifier = Modifier
.weight(1f)
.fillMaxWidth(),
update = {
if (localEglBase == null) {
localEglBase = EglBase.create()
it.init(localEglBase!!.eglBaseContext, null)
it.setMirror(false)
}
}
)
Spacer(modifier = Modifier.height(8.dp))
// RTT 统计数据显示
Column(modifier = Modifier.padding(16.dp)) {
Text(
text = "最新往返时延: ${latestRttState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最大往返时延: ${maxRttState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最小往返时延: ${minRttState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "平均往返时延: ${averageRttState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 8.dp)
)
}
// 其他 UI 组件...
}
LaunchedEffect(Unit) {
val options = PeerConnectionFactory.InitializationOptions.builder(context)
.createInitializationOptions()
PeerConnectionFactory.initialize(options)
val encoderFactory = DefaultVideoEncoderFactory(
localEglBase!!.eglBaseContext, true, true
)
val decoderFactory = DefaultVideoDecoderFactory(localEglBase!!.eglBaseContext)
peerConnectionFactory = PeerConnectionFactory.builder()
.setVideoEncoderFactory(encoderFactory)
.setVideoDecoderFactory(decoderFactory)
.createPeerConnectionFactory()
initLocalVideo(context, localView, peerConnectionFactory, localEglBase!!) {
localVideoTrack = it
}
createPeerConnection(
context,
peerConnectionFactory,
localVideoTrack
) {
localPeer = it
startStatsCollection() // 启动统计收集
}
initializeSocketIO()
requestPermissionsIfNeeded()
}
}
}
private fun startStatsCollection() {
statsJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(1000) // 每秒收集一次统计数据
localPeer?.getStats { report ->
parseStatsReport(report)
} ?: Log.e(TAG, "Failed to get stats: localPeer is null.")
}
}
}
private fun parseStatsReport(report: RTCStatsReport) {
Log.d(TAG, "RTCStatsReport: ${report.statsMap}")
for (stats in report.statsMap.values) {
Log.d(TAG, "Stats type: ${stats.type}")
if (stats.type == "transport") {
Log.d(TAG, "Transport Stats found: $stats")
val currentRtt = (stats.members["currentRoundTripTime"] as? Number)?.toDouble()?.times(1000)?.toLong()
if (currentRtt != null && currentRtt > 0) {
viewLifecycleOwner.lifecycleScope.launch {
// 更新最新 RTT
latestRttState.value = currentRtt
// 更新历史记录
rttHistory.add(currentRtt)
if (rttHistory.size > 60) {
rttHistory.removeAt(0)
}
// 计算最大、最小和平均 RTT
val maxRtt = rttHistory.maxOrNull() ?: 0L
val minRtt = rttHistory.minOrNull() ?: 0L
val averageRtt = if (rttHistory.isNotEmpty()) {
rttHistory.average().toLong()
} else {
0L
}
maxRttState.value = maxRtt
minRttState.value = minRtt
averageRttState.value = averageRtt
Log.d(TAG, "RTT - Latest: $currentRtt ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms")
}
}
}
}
}
private fun startSendingTimestamps() {
viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(5000) // 每5秒发送一次时间戳
val currentTime = System.currentTimeMillis()
val timestampData = JSONObject().apply {
put("type", "timestamp")
put("timestamp", currentTime)
put("room", currentRoom)
}
socket.emit("signal", timestampData)
Log.d(TAG, "发送时间戳: $currentTime")
}
}
}
private fun initializeSocketIO() {
val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http"
val socketUrl = signalingServerUrl
val options = IO.Options().apply {
transports = arrayOf("websocket")
secure = protocol == "https"
path = "/socket.io/"
}
try {
socket = IO.socket(socketUrl, options)
socket.on(Socket.EVENT_CONNECT) {
Log.d(TAG, "Socket 已连接")
socket.emit("join", currentRoom)
Log.d(TAG, "已加入房间: $currentRoom")
startSendingTimestamps()
}
socket.on(Socket.EVENT_CONNECT_ERROR) { args ->
if (args.isNotEmpty()) {
val error = args[0]
Log.e(TAG, "Socket 连接错误: $error")
}
}
socket.on(Socket.EVENT_DISCONNECT) { args ->
if (args.isNotEmpty()) {
val reason = args[0]
Log.d(TAG, "Socket 已断开: $reason")
}
}
socket.on("signal") { args ->
Log.d(TAG, "收到信令: ${args[0]}")
if (args.isNotEmpty() && args[0] is JSONObject) {
val data = args[0] as JSONObject
handleSignalingData(data)
}
}
socket.connect()
Log.d(TAG, "正在连接到 Socket: $socketUrl...")
} catch (e: Exception) {
Log.e(TAG, "连接 Socket 时出错: ${e.message}")
}
}
private fun requestPermissionsIfNeeded() {
val permissions = arrayOf(
Manifest.permission.CAMERA,
Manifest.permission.RECORD_AUDIO,
Manifest.permission.INTERNET,
Manifest.permission.ACCESS_NETWORK_STATE
)
val permissionsToRequest = permissions.filter {
ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED
}
if (permissionsToRequest.isNotEmpty()) {
requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray())
} else {
onPermissionsChecked()
}
}
private fun onPermissionsChecked() {
Toast.makeText(requireContext(), "所有必要权限已被授予", Toast.LENGTH_SHORT).show()
}
private fun initLocalVideo(
context: Context,
localView: SurfaceViewRenderer?,
peerConnectionFactory: PeerConnectionFactory,
eglBase: EglBase,
onLocalVideoTrack: (VideoTrack) -> Unit
) {
val videoCapturer = createCameraCapturer(context)
val surfaceTextureHelper = SurfaceTextureHelper.create("CaptureThread", eglBase.eglBaseContext)
val videoSource = peerConnectionFactory.createVideoSource(videoCapturer.isScreencast)
videoCapturer.initialize(surfaceTextureHelper, context, videoSource.capturerObserver)
videoCapturer.startCapture(1920, 1080, 60)
val localVideoTrack = peerConnectionFactory.createVideoTrack("video_track", videoSource)
localVideoTrack.addSink(localView)
val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints())
val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource)
// 添加音视频轨道到本地流
val mediaStream = peerConnectionFactory.createLocalMediaStream("local_stream")
mediaStream.addTrack(localAudioTrack)
mediaStream.addTrack(localVideoTrack)
onLocalVideoTrack(localVideoTrack)
}
private fun createCameraCapturer(context: Context): CameraVideoCapturer {
val camera2Enumerator = Camera2Enumerator(context)
val deviceNames = camera2Enumerator.deviceNames
// 优先选择后置摄像头
for (deviceName in deviceNames) {
if (camera2Enumerator.isBackFacing(deviceName)) {
val capturer = camera2Enumerator.createCapturer(deviceName, null)
if (capturer != null) {
return capturer
}
}
}
// 如果没有后置摄像头,则尝试前置摄像头
for (deviceName in deviceNames) {
if (camera2Enumerator.isFrontFacing(deviceName)) {
val capturer = camera2Enumerator.createCapturer(deviceName, null)
if (capturer != null) {
return capturer
}
}
}
// 如果没有前置摄像头,则使用第一个摄像头
return camera2Enumerator.createCapturer(deviceNames[0], null)
?: throw IllegalStateException("无法创建摄像头捕获器")
}
private fun createPeerConnection(
context: Context,
peerConnectionFactory: PeerConnectionFactory,
localVideoTrack: VideoTrack?,
onLocalPeerCreated: (PeerConnection) -> Unit
) {
val iceServers = listOf(
PeerConnection.IceServer.builder(stunUrl).createIceServer(),
PeerConnection.IceServer.builder(turnUrl)
.setUsername(turnUsername)
.setPassword(turnPassword)
.createIceServer()
)
val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply {
bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE
rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE
tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED
continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY
sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN
}
localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer {
override fun onIceCandidate(iceCandidate: IceCandidate?) {
iceCandidate?.let {
Log.d(TAG, "ICE candidate: $it")
val signalData = JSONObject().apply {
put("type", "ice")
put("candidate", JSONObject().apply {
put("sdpMid", it.sdpMid)
put("sdpMLineIndex", it.sdpMLineIndex)
put("candidate", it.sdp)
})
put("room", currentRoom)
}
socket.emit("signal", signalData)
}
}
override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) {
Log.d(TAG, "ICE candidates removed")
}
override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
Log.d(TAG, "Signaling state changed to: $newState")
}
override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
Log.d(TAG, "ICE connection state changed to: $newState")
}
override fun onIceConnectionReceivingChange(receiving: Boolean) {
Log.d(TAG, "ICE connection receiving change: $receiving")
}
override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
Log.d(TAG, "ICE gathering state changed to: $newState")
}
override fun onAddStream(stream: MediaStream?) {
Log.d(TAG, "Stream added")
}
override fun onRemoveStream(stream: MediaStream?) {
Log.d(TAG, "Stream removed")
}
override fun onDataChannel(dataChannel: DataChannel?) {
Log.d(TAG, "Data channel created")
}
override fun onRenegotiationNeeded() {
Log.d(TAG, "Renegotiation needed")
}
override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) {
Log.d(TAG, "Track added")
}
override fun onTrack(transceiver: RtpTransceiver?) {
Log.d(TAG, "onTrack called")
}
override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
Log.d(TAG, "Connection state changed to: $newState")
}
})
localVideoTrack?.let {
localPeer?.addTrack(it, listOf("local_stream"))
}
val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints())
val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource)
localPeer?.addTrack(localAudioTrack, listOf("local_stream"))
onLocalPeerCreated(localPeer!!)
}
private fun createAnswer(peerConnection: PeerConnection, onAnswerCreated: (String) -> Unit) {
Log.d(TAG, "Creating answer...")
val constraints = MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true"))
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"))
}
peerConnection.createAnswer(object : SdpObserver {
override fun onCreateSuccess(sessionDescription: SessionDescription?) {
sessionDescription?.let { sdp ->
peerConnection.setLocalDescription(object : SdpObserver {
override fun onSetSuccess() {
Log.d(TAG, "SetLocalDescription onSetSuccess")
onAnswerCreated(sdp.description)
}
override fun onSetFailure(error: String?) {
Log.e(TAG, "SetLocalDescription onSetFailure: $error")
}
override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}
}
override fun onSetSuccess() {
Log.d(TAG, "createAnswer onSetSuccess")
}
override fun onCreateFailure(error: String?) {
Log.e(TAG, "createAnswer onCreateFailure: $error")
}
override fun onSetFailure(error: String?) {}
}, constraints)
}
private fun handleSignalingData(data: JSONObject) {
Log.d(TAG, "Handling signaling data: $data")
when (data.getString("type")) {
"offer" -> {
Log.d(TAG, "Received offer")
val sdp = SessionDescription(
SessionDescription.Type.OFFER,
data.getJSONObject("sdp").getString("sdp")
)
localPeer?.setRemoteDescription(object : SdpObserver {
override fun onSetSuccess() {
Log.d(TAG, "Set remote description (offer) success")
createAnswer(localPeer!!) { answer ->
val signalData = JSONObject().apply {
put("type", "answer")
put("sdp", JSONObject().put("sdp", answer))
put("room", currentRoom)
}
socket.emit("signal", signalData)
pendingIceCandidates.forEach { candidate ->
localPeer?.addIceCandidate(candidate)
}
pendingIceCandidates.clear()
}
}
override fun onSetFailure(error: String?) {
Log.e(TAG, "Set remote description (offer) error: $error")
}
override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}
"ice" -> {
Log.d(TAG, "Received ICE candidate")
val candidateData = data.getJSONObject("candidate")
val candidate = IceCandidate(
candidateData.getString("sdpMid"),
candidateData.getInt("sdpMLineIndex"),
candidateData.getString("candidate")
)
if (localPeer?.remoteDescription != null) {
localPeer?.addIceCandidate(candidate)
} else {
pendingIceCandidates.add(candidate)
}
}
"time_sync_request" -> {
Log.d(TAG, "收到 time_sync_request")
if (data.has("t1")) {
val t1 = data.getLong("t1")
val t2 = System.currentTimeMillis()
val t3 = System.currentTimeMillis()
val syncResponse = JSONObject().apply {
put("type", "time_sync_response")
put("t1", t1)
put("t2", t2)
put("t3", t3)
put("room", currentRoom)
}
socket.emit("signal", syncResponse)
Log.d(TAG, "回复 time_sync_response: t1=$t1, t2=$t2, t3=$t3")
} else {
Log.e(TAG, "time_sync_request 缺少 t1")
}
}
else -> {
Log.e(TAG, "Unknown signaling type: ${data.getString("type")}")
}
}
}
private fun requestTimeSync() {
val t1 = System.currentTimeMillis()
val syncRequest = JSONObject().apply {
put("type", "time_sync_request")
put("room", currentRoom)
put("t1", t1)
}
socket.emit("signal", syncRequest)
Log.d(TAG, "发送时间同步请求 at t1: $t1")
}
override fun onDestroyView() {
super.onDestroyView()
statsJob?.cancel()
socket.disconnect()
localPeer?.dispose()
localView?.release()
localEglBase?.release()
}
}
class VideoReceiverFragment : Fragment() {
companion object {
fun newInstance(
room: String,
stunUrl: String,
turnUrl: String,
turnUsername: String,
turnPassword: String,
signalingServerUrl: String
): VideoReceiverFragment {
val fragment = VideoReceiverFragment()
val args = Bundle().apply {
putString("room", room)
putString("stunUrl", stunUrl)
putString("turnUrl", turnUrl)
putString("turnUsername", turnUsername)
putString("turnPassword", turnPassword)
putString("signalingServerUrl", signalingServerUrl)
}
fragment.arguments = args
return fragment
}
}
// Class member variables
private lateinit var socket: Socket
private var localPeer: PeerConnection? = null
private var remoteView: SurfaceViewRenderer? = null
private var remoteEglBase: EglBase? = null
private val pendingIceCandidates = mutableListOf<IceCandidate>()
private var currentRoom: String? = null
private lateinit var signalingServerUrl: String
private lateinit var stunUrl: String
private lateinit var turnUrl: String
private lateinit var turnUsername: String
private lateinit var turnPassword: String
private val TAG: String = "WebRTC-Receiver"
private val frameRateState = mutableStateOf(0.0)
private val bitrateState = mutableStateOf(0L)
private val stutteringState = mutableStateOf(false)
private val frameRateLowState = mutableStateOf(false)
private val packetLossHighState = mutableStateOf(false)
private val packetLossState = mutableStateOf(0.0)
private val frameRateHistory = mutableStateListOf<Float>()
private val bitrateHistory = mutableStateListOf<Long>()
private var timeSyncJob: Job? = null
// 添加历史记录列表用于存储所有单向时延值
private val latencyHistory = mutableStateListOf<Long>()
// 添加状态变量用于存储最大、最小和平均单向时延
private val maxLatencyState = mutableStateOf(0L)
private val minLatencyState = mutableStateOf(Long.MAX_VALUE)
private val averageLatencyState = mutableStateOf(0L)
private val latestLatencyState = mutableStateOf(0L)
// History variables
private var prevFramesDecoded = 0.0
private var prevBytesReceived = 0.0
private var prevFramesReceived = 0.0
private var prevFramesDropped = 0.0
private var prevTimestamp = 0.0
private var timeOffset: Long = 0
private var t1: Long = 0
private var statsJob: Job? = null
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
// Retrieve data from arguments
currentRoom = arguments?.getString("room") ?: "default-room"
signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg"
stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478"
turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349"
turnUsername = arguments?.getString("turnUsername") ?: "wstszx"
turnPassword = arguments?.getString("turnPassword") ?: "930379"
Log.d(
TAG,
"onCreate: Role = Client, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl"
)
}
private val requestPermissionsLauncher =
registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions ->
permissions.entries.forEach { (permission, isGranted) ->
if (isGranted) {
Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show()
} else {
Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show()
}
}
onPermissionsChecked()
}
override fun onCreateView(
inflater: android.view.LayoutInflater,
container: android.view.ViewGroup?,
savedInstanceState: Bundle?
): android.view.View {
return ComposeView(requireContext()).apply {
setContent {
WebRTCComposeLayout()
}
}
}
@Composable
fun WebRTCComposeLayout() {
val context = LocalContext.current
lateinit var peerConnectionFactory: PeerConnectionFactory
Surface(color = Color.Black) {
Column(modifier = Modifier.fillMaxSize()) {
// Remote video view
AndroidView(
factory = {
remoteView = SurfaceViewRenderer(it).apply {
setZOrderMediaOverlay(false)
}
remoteView!!
},
modifier = Modifier
.weight(1f)
.fillMaxWidth(),
update = {
if (remoteEglBase?.eglBaseContext == null) {
remoteEglBase = EglBase.create()
it.init(remoteEglBase!!.eglBaseContext, null)
it.setMirror(false)
}
}
)
Spacer(modifier = Modifier.height(8.dp))
// 展示最大、最小和平均单向时延
Column(modifier = Modifier.padding(horizontal = 16.dp)) {
Text(
text = "当前单向时延: ${latestLatencyState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最大单向时延: ${maxLatencyState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最小单向时延: ${minLatencyState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "平均单向时延: ${averageLatencyState.value} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 8.dp)
)
}
// Spacer between video and charts
Spacer(modifier = Modifier.height(8.dp))
// Frame Rate Section
Column(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 8.dp)
) {
// Frame Rate Text in Chinese
Text(
text = "帧率: ${frameRateState.value.roundToInt()} fps",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Log.d(
TAG,
"UI - Frame Rate: ${frameRateState.value} fps, Bitrate: ${bitrateState.value / 1000} kbps"
)
// Line Chart for Frame Rate
LineChart(
data = frameRateHistory,
modifier = Modifier
.height(200.dp)
.fillMaxWidth()
.padding(vertical = 8.dp),
lineColor = Color.Green,
backgroundColor = Color.Black,
yAxisLabel = "帧率 (fps)",
xAxisLabel = "时间 (秒)"
)
}
// Spacer between charts
Spacer(modifier = Modifier.height(8.dp))
// Bitrate Section
Column(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 16.dp)
) {
// Bitrate Text in Chinese
Text(
text = "码率: ${bitrateState.value / 1000} kbps",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
// Line Chart for Bitrate
LineChart(
data = bitrateHistory.map { it / 1000f }, // Convert to kbps
modifier = Modifier
.height(200.dp)
.fillMaxWidth()
.padding(vertical = 8.dp),
lineColor = Color.Blue,
backgroundColor = Color.Black,
yAxisLabel = "码率 (kbps)",
xAxisLabel = "时间 (秒)"
)
}
// Spacer between metrics and stuttering indicator
Spacer(modifier = Modifier.height(16.dp))
// Stuttering Indicator
Row(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 16.dp),
verticalAlignment = Alignment.CenterVertically
) {
if (stutteringState.value) {
Icon(
imageVector = Icons.Default.Warning,
contentDescription = "卡顿警告",
tint = Color.Red,
modifier = Modifier.size(24.dp)
)
Spacer(modifier = Modifier.width(8.dp))
Column {
Text(
text = "视频播放出现卡顿",
color = Color.Red,
style = MaterialTheme.typography.bodyMedium
)
// Additional information about which metrics are abnormal
if (frameRateLowState.value) {
Text(
text = "帧率过低: ${frameRateState.value.roundToInt()} fps",
color = Color.Red,
style = MaterialTheme.typography.bodySmall
)
}
if (packetLossHighState.value) {
Text(
text = "包丢失率过高: ${packetLossState.value.roundToInt()}%",
color = Color.Red,
style = MaterialTheme.typography.bodySmall
)
}
}
} else {
Icon(
imageVector = Icons.Default.CheckCircle,
contentDescription = "正常",
tint = Color.Green,
modifier = Modifier.size(24.dp)
)
Spacer(modifier = Modifier.width(8.dp))
Text(
text = "视频播放正常",
color = Color.Green,
style = MaterialTheme.typography.bodyMedium
)
}
}
}
LaunchedEffect(Unit) {
val options = PeerConnectionFactory.InitializationOptions.builder(context)
.createInitializationOptions()
PeerConnectionFactory.initialize(options)
val encoderFactory = DefaultVideoEncoderFactory(
EglBase.create().eglBaseContext, true, true
)
val decoderFactory = DefaultVideoDecoderFactory(remoteEglBase!!.eglBaseContext)
peerConnectionFactory = PeerConnectionFactory.builder()
.setVideoEncoderFactory(encoderFactory)
.setVideoDecoderFactory(decoderFactory)
.createPeerConnectionFactory()
createPeerConnection(
context,
peerConnectionFactory,
remoteView!!
) {
localPeer = it
}
initializeSocketIO()
requestPermissionsIfNeeded()
}
}
}
private fun startPeriodicTimeSync(intervalMs: Long = 5000L) {
timeSyncJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
requestTimeSync()
delay(intervalMs)
}
}
}
/**
* Enhanced Line Chart Composable with Axes and Labels
* Modified to use straight lines connecting each point
*/
@Composable
fun LineChart(
data: List<Float>,
modifier: Modifier = Modifier,
lineColor: Color = Color.Green,
backgroundColor: Color = Color.Black,
yAxisLabel: String = "",
xAxisLabel: String = "",
minYValue: Float? = null,
maxYValue: Float? = null
) {
Canvas(modifier = modifier.background(backgroundColor)) {
val padding = 40.dp.toPx() // Padding for axes and labels
if (data.isEmpty()) return@Canvas
val maxY = maxYValue ?: data.maxOrNull() ?: 1f
val minY = minYValue ?: data.minOrNull() ?: 0f
val yRange = maxY - minY
val pointCount = data.size
val spacing = (size.width - padding * 2) / (pointCount - 1).coerceAtLeast(1)
val points = data.mapIndexed { index, value ->
val x = padding + index * spacing
val y = if (yRange == 0f) size.height / 2 else (size.height - padding) - ((value - minY) / yRange) * (size.height - padding * 2)
Offset(x, y)
}
// Draw axes
drawLine(
color = Color.White,
start = Offset(padding, padding),
end = Offset(padding, size.height - padding),
strokeWidth = 2f
)
drawLine(
color = Color.White,
start = Offset(padding, size.height - padding),
end = Offset(size.width - padding, size.height - padding),
strokeWidth = 2f
)
// Draw y-axis labels
val yLabelCount = 5
val yStep = yRange / (yLabelCount - 1)
for (i in 0 until yLabelCount) {
val yValue = minY + i * yStep
val yPos = (size.height - padding) - ((yValue - minY) / yRange) * (size.height - padding * 2)
drawContext.canvas.nativeCanvas.apply {
val label = yValue.roundToInt().toString()
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.RIGHT
}
drawText(
label,
padding - 8f,
yPos + textPaint.textSize / 2,
textPaint
)
}
}
// Draw x-axis labels
val xLabelCount = 5
val xStep = (pointCount - 1).coerceAtLeast(1) / (xLabelCount - 1).coerceAtLeast(1)
for (i in 0 until xLabelCount) {
val index = i * xStep
val xPos = padding + index * spacing
drawContext.canvas.nativeCanvas.apply {
val label = index.toString()
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
}
drawText(
label,
xPos,
size.height - padding + textPaint.textSize + 4f,
textPaint
)
}
}
// Optionally, draw axis labels
// Y-Axis Label
if (yAxisLabel.isNotEmpty()) {
drawContext.canvas.nativeCanvas.apply {
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
isAntiAlias = true
}
// Rotate for vertical text
save()
rotate(-90f, padding / 2, size.height / 2)
drawText(
yAxisLabel,
padding / 2,
size.height / 2,
textPaint
)
restore()
}
}
// X-Axis Label
if (xAxisLabel.isNotEmpty()) {
drawContext.canvas.nativeCanvas.apply {
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
isAntiAlias = true
}
drawText(
xAxisLabel,
size.width / 2,
size.height - padding / 2,
textPaint
)
}
}
// Draw the straight lines connecting points
if (points.size >= 2) {
for (i in 0 until points.size - 1) {
drawLine(
color = lineColor,
start = points[i],
end = points[i + 1],
strokeWidth = 4f,
cap = StrokeCap.Round
)
}
}
// Optionally, draw points
points.forEach { point ->
drawCircle(
color = lineColor,
radius = 4f,
center = point
)
}
}
}
private fun initializeSocketIO() {
val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http"
val socketUrl = signalingServerUrl
val options = IO.Options().apply {
transports = arrayOf("websocket")
secure = protocol == "https"
path = "/socket.io/"
}
try {
socket = IO.socket(socketUrl, options)
socket.on(Socket.EVENT_CONNECT) {
Log.d(TAG, "Socket connected")
socket.emit("join", currentRoom)
Log.d(TAG, "Joined room: $currentRoom")
initiateCall()
startPeriodicTimeSync()
}
socket.on(Socket.EVENT_CONNECT_ERROR) { args ->
if (args.isNotEmpty()) {
val error = args[0]
Log.e(TAG, "Socket connection error: $error")
}
}
socket.on(Socket.EVENT_DISCONNECT) { args ->
if (args.isNotEmpty()) {
val reason = args[0]
Log.d(TAG, "Socket disconnected: $reason")
}
}
socket.on("signal") { args ->
Log.d(TAG, "Received signaling: ${args[0]}")
if (args.isNotEmpty() && args[0] is JSONObject) {
val data = args[0] as JSONObject
handleSignalingData(data)
}
}
socket.connect()
Log.d(TAG, "Connecting to Socket: $socketUrl...")
} catch (e: Exception) {
Log.e(TAG, "Error connecting to Socket: ${e.message}")
}
}
private fun requestPermissionsIfNeeded() {
val permissions = arrayOf(
Manifest.permission.CAMERA,
Manifest.permission.RECORD_AUDIO,
Manifest.permission.INTERNET,
Manifest.permission.ACCESS_NETWORK_STATE
)
val permissionsToRequest = permissions.filter {
ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED
}
if (permissionsToRequest.isNotEmpty()) {
requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray())
} else {
onPermissionsChecked()
}
}
private fun onPermissionsChecked() {
Toast.makeText(requireContext(), "所有必要的权限已授予", Toast.LENGTH_SHORT).show()
}
private fun createPeerConnection(
context: Context,
peerConnectionFactory: PeerConnectionFactory,
remoteView: SurfaceViewRenderer,
onLocalPeerCreated: (PeerConnection) -> Unit
) {
val iceServers = listOf(
PeerConnection.IceServer.builder(stunUrl).createIceServer(),
PeerConnection.IceServer.builder(turnUrl)
.setUsername(turnUsername)
.setPassword(turnPassword)
.createIceServer()
)
val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply {
bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE
rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE
tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED
continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY
sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN
}
localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer {
override fun onIceCandidate(iceCandidate: IceCandidate?) {
iceCandidate?.let {
Log.d(TAG, "ICE candidate: $it")
val signalData = JSONObject().apply {
put("type", "ice")
put("candidate", JSONObject().apply {
put("sdpMid", it.sdpMid)
put("sdpMLineIndex", it.sdpMLineIndex)
put("candidate", it.sdp)
})
put("room", currentRoom)
}
socket.emit("signal", signalData)
}
}
override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) {
Log.d(TAG, "ICE candidates removed")
}
override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
Log.d(TAG, "Signaling state changed to: $newState")
}
override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
Log.d(TAG, "ICE connection state changed to: $newState")
}
override fun onIceConnectionReceivingChange(receiving: Boolean) {
Log.d(TAG, "ICE connection receiving change: $receiving")
}
override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
Log.d(TAG, "ICE gathering state changed to: $newState")
}
override fun onAddStream(stream: MediaStream?) {
Log.d(TAG, "Stream added")
}
override fun onRemoveStream(stream: MediaStream?) {
Log.d(TAG, "Stream removed")
}
override fun onDataChannel(dataChannel: DataChannel?) {
Log.d(TAG, "Data channel created")
}
override fun onRenegotiationNeeded() {
Log.d(TAG, "Renegotiation needed")
}
override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) {
Log.d(TAG, "Track added")
receiver?.track()?.let { track ->
if (track is VideoTrack) {
track.addSink(remoteView)
}
}
}
override fun onTrack(transceiver: RtpTransceiver?) {
Log.d(TAG, "onTrack called")
transceiver?.receiver?.track()?.let { track ->
if (track is VideoTrack) {
track.addSink(remoteView)
}
}
}
override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
Log.d(TAG, "Connection state changed to: $newState")
}
})
onLocalPeerCreated(localPeer!!)
// Start collecting statistics
startStatsCollection()
}
private fun initiateCall() {
Log.d(TAG, "Initiating call...")
val constraints = MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true"))
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"))
}
localPeer?.createOffer(object : SdpObserver {
override fun onCreateSuccess(sessionDescription: SessionDescription?) {
sessionDescription?.let { sdp ->
localPeer?.setLocalDescription(object : SdpObserver {
override fun onSetSuccess() {
val signalData = JSONObject().apply {
put("type", "offer")
put("sdp", JSONObject().put("sdp", sdp.description))
put("room", currentRoom)
}
socket.emit("signal", signalData)
}
override fun onSetFailure(error: String?) {
Log.e(TAG, "Set local description error: $error")
}
override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}
}
override fun onSetSuccess() {}
override fun onCreateFailure(error: String?) {
Log.e(TAG, "Create offer error: $error")
}
override fun onSetFailure(error: String?) {}
}, constraints)
}
private fun handleSignalingData(data: JSONObject) {
Log.d(TAG, "Handling signaling data: $data")
when (data.getString("type")) {
"answer" -> {
Log.d(TAG, "Received answer")
val sdp = SessionDescription(
SessionDescription.Type.ANSWER,
data.getJSONObject("sdp").getString("sdp")
)
localPeer?.setRemoteDescription(object : SdpObserver {
override fun onSetSuccess() {
pendingIceCandidates.forEach { candidate ->
localPeer?.addIceCandidate(candidate)
}
pendingIceCandidates.clear()
Log.d(TAG, "Set remote description (answer) success")
}
override fun onSetFailure(error: String?) {
Log.e(TAG, "Set remote description error: $error")
}
override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}
"ice" -> {
Log.d(TAG, "Received ICE candidate")
val candidateData = data.getJSONObject("candidate")
val candidate = IceCandidate(
candidateData.getString("sdpMid"),
candidateData.getInt("sdpMLineIndex"),
candidateData.getString("candidate")
)
if (localPeer?.remoteDescription != null) {
localPeer?.addIceCandidate(candidate)
} else {
pendingIceCandidates.add(candidate)
}
}
"time_sync_response" -> {
val t1 = data.getLong("t1")
val t2 = data.getLong("t2")
val t3 = data.getLong("t3")
val t4 = System.currentTimeMillis()
val RTT = t4 - t1
val oneWayDelay = RTT / 2
timeOffset = ((t2 - t1) + (t3 - t4)) / 2
Log.d(TAG, "时间同步: RTT=$RTT ms, 单向时延=$oneWayDelay ms, 时间偏移量=$timeOffset ms")
// 更新 latencyState
viewLifecycleOwner.lifecycleScope.launch {
// 添加到历史记录
latencyHistory.add(oneWayDelay)
if (latencyHistory.size > 60) { // 保持最近60个时延值
latencyHistory.removeAt(0)
}
// 计算最大、最小和平均单向时延
val maxLatency = latencyHistory.maxOrNull() ?: 0L
val minLatency = latencyHistory.minOrNull() ?: 0L
val averageLatency = if (latencyHistory.isNotEmpty()) {
latencyHistory.average().toLong()
} else {
0L
}
maxLatencyState.value = maxLatency
minLatencyState.value = minLatency
averageLatencyState.value = averageLatency
latestLatencyState.value = oneWayDelay
}
}
else -> {
Log.e(TAG, "Unknown signaling type: ${data.getString("type")}")
}
}
}
private fun requestTimeSync() {
t1 = System.currentTimeMillis()
val syncRequest = JSONObject().apply {
put("type", "time_sync_request")
put("room", currentRoom)
put("t1", t1)
}
socket.emit("signal", syncRequest)
Log.d(TAG, "发送时间同步请求 at t1: $t1")
}
private fun startStatsCollection() {
Log.d(TAG, "Starting stats collection...")
statsJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(1000) // Collect stats every second
Log.d(TAG, "Collecting stats...")
localPeer?.getStats { report ->
Log.d(TAG, "Stats report obtained.")
parseStatsReport(report)
} ?: Log.e(TAG, "Failed to get stats: localPeer is null.")
}
}
}
private fun parseStatsReport(report: RTCStatsReport) {
Log.d(TAG, "Received RTCStatsReport: $report")
for (stats in report.statsMap.values) {
if (stats.type == "inbound-rtp") {
val kind = stats.members["kind"] as? String
if (kind == "video") {
val framesDecoded = (stats.members["framesDecoded"] as? Number)?.toDouble() ?: 0.0
val framesReceived = (stats.members["framesReceived"] as? Number)?.toDouble() ?: 0.0
val framesDropped = (stats.members["framesDropped"] as? Number)?.toDouble() ?: 0.0
val bytesReceived = (stats.members["bytesReceived"] as? Number)?.toDouble() ?: 0.0
val packetsLost = (stats.members["packetsLost"] as? Number)?.toDouble() ?: 0.0
val packetsReceived = (stats.members["packetsReceived"] as? Number)?.toDouble() ?: 1.0 // Avoid division by zero
val packetLossFraction = packetsLost / (packetsLost + packetsReceived)
val timestamp = stats.timestampUs / 1_000_000.0 // Convert to seconds
Log.d(
TAG,
"Stats - Frames Decoded: $framesDecoded, Frames Received: $framesReceived, Frames Dropped: $framesDropped, Bytes Received: $bytesReceived, Packet Loss Fraction: $packetLossFraction, Timestamp: $timestamp"
)
if (prevTimestamp != 0.0) {
val timeElapsed = timestamp - prevTimestamp
val framesDelta = framesDecoded - prevFramesDecoded
val bytesDelta = bytesReceived - prevBytesReceived
val framesReceivedDelta = framesReceived - prevFramesReceived
val framesDroppedDelta = framesDropped - prevFramesDropped
val frameRate = if (timeElapsed > 0) framesDelta / timeElapsed else 0.0
val bitrate = if (timeElapsed > 0) (bytesDelta * 8) / timeElapsed else 0.0 // bits per second
val packetLoss = packetLossFraction * 100 // Convert to percentage
Log.d(TAG, "Calculated Frame Rate: $frameRate fps, Bitrate: $bitrate bps, Packet Loss: $packetLoss%")
// Determine stuttering based on thresholds
val isStuttering = frameRate < 24 || packetLoss > 5.0 // Thresholds can be adjusted
// Update states
viewLifecycleOwner.lifecycleScope.launch {
frameRateState.value = frameRate
bitrateState.value = bitrate.toLong()
// Update stuttering state
stutteringState.value = isStuttering
// Update specific stuttering causes
frameRateLowState.value = frameRate < 24
packetLossHighState.value = packetLoss > 5.0
packetLossState.value = packetLoss
// Update history
frameRateHistory.add(frameRate.toFloat())
if (frameRateHistory.size > 60) {
frameRateHistory.removeAt(0)
}
bitrateHistory.add(bitrate.toLong())
if (bitrateHistory.size > 60) {
bitrateHistory.removeAt(0)
}
Log.d(
TAG,
"Updated Frame Rate: ${frameRateState.value} fps, Bitrate: ${bitrateState.value / 1000} kbps, Stuttering: $isStuttering"
)
}
}
// Update previous values
prevFramesDecoded = framesDecoded
prevBytesReceived = bytesReceived
prevFramesReceived = framesReceived
prevFramesDropped = framesDropped
prevTimestamp = timestamp
}
}
}
}
override fun onDestroyView() {
super.onDestroyView()
statsJob?.cancel()
socket.disconnect()
localPeer?.dispose()
remoteView?.release()
remoteEglBase?.release()
timeSyncJob?.cancel()
}
}
分析代码,目前发起端统计往返时延的代码有问题,往返时延的值始终为0,是否可以修改为在应用层实现统计往返时延并在页面上展示,返回修改后的代码