问题

class VideoInitiatorFragment : Fragment() {

companion object {
fun newInstance(
room: String,
stunUrl: String,
turnUrl: String,
turnUsername: String,
turnPassword: String,
signalingServerUrl: String
): VideoInitiatorFragment {
val fragment = VideoInitiatorFragment()
val args = Bundle().apply {
putString("room", room)
putString("stunUrl", stunUrl)
putString("turnUrl", turnUrl)
putString("turnUsername", turnUsername)
putString("turnPassword", turnPassword)
putString("signalingServerUrl", signalingServerUrl)
}
fragment.arguments = args
return fragment
}
}

// Class member variables remain unchanged
private lateinit var socket: Socket
private var localPeer: PeerConnection? = null
private var localView: SurfaceViewRenderer? = null
private var localEglBase: EglBase? = null
private val pendingIceCandidates = mutableListOf<IceCandidate>()
private var currentRoom: String? = null
private lateinit var signalingServerUrl: String
private lateinit var stunUrl: String
private lateinit var turnUrl: String
private lateinit var turnUsername: String
private lateinit var turnPassword: String

private val TAG: String = "WebRTC-Initiator"

// State variables for RTT and OWD
private val rttState = mutableLongStateOf(0L)
private val rttHistory = mutableStateListOf<Long>()
private val owdState = mutableLongStateOf(0L)
private val owdHistory = mutableStateListOf<Long>()
private var clockOffset: Long = 0L

private var statsJob: Job? = null

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

// Retrieve data from arguments
currentRoom = arguments?.getString("room") ?: "default-room"
signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg"
stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478"
turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349"
turnUsername = arguments?.getString("turnUsername") ?: "wstszx"
turnPassword = arguments?.getString("turnPassword") ?: "930379"

Log.d(
TAG,
"onCreate: Role = Initiator, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl"
)
}

private val requestPermissionsLauncher =
registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions ->
permissions.entries.forEach { (permission, isGranted) ->
if (isGranted) {
Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show()
} else {
Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show()
}
}
onPermissionsChecked()
}

override fun onCreateView(
inflater: android.view.LayoutInflater,
container: android.view.ViewGroup?,
savedInstanceState: Bundle?
): android.view.View {
return ComposeView(requireContext()).apply {
setContent {
WebRTCComposeLayout()
}
}
}

@Composable
fun WebRTCComposeLayout() {
val context = LocalContext.current
lateinit var peerConnectionFactory: PeerConnectionFactory
var localVideoTrack: VideoTrack? by remember { mutableStateOf(null) }

Surface(color = Color.Black) {
Column(modifier = Modifier.fillMaxSize()) {

// Local video view
AndroidView(
factory = {
localView = SurfaceViewRenderer(it).apply {
setZOrderMediaOverlay(false)
}
localView!!
},
modifier = Modifier
.weight(1f)
.fillMaxWidth(),
update = {
if (localEglBase == null) {
localEglBase = EglBase.create()
it.init(localEglBase!!.eglBaseContext, null)
it.setMirror(false)
}
}
)

Spacer(modifier = Modifier.height(8.dp))

// 并排显示 OWD 和 RTT
Row(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 16.dp),
horizontalArrangement = Arrangement.SpaceEvenly
) {
// 单向时延 (OWD) 列
Column(modifier = Modifier.weight(1f)) {
Text(
text = "单向时延 (OWD)",
color = Color.Cyan,
style = MaterialTheme.typography.titleMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最新: ${owdState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 2.dp)
)
Text(
text = "最大: ${owdHistory.maxOrNull() ?: 0} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 2.dp)
)
Text(
text = "最小: ${owdHistory.minOrNull() ?: 0} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 2.dp)
)
val averageOwd = if (owdHistory.isNotEmpty()) owdHistory.average().toLong() else 0L
Text(
text = "平均: $averageOwd ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium
)
}

Spacer(modifier = Modifier.width(16.dp))

// 往返时延 (RTT) 列
Column(modifier = Modifier.weight(1f)) {
Text(
text = "往返时延 (RTT)",
color = Color.Cyan,
style = MaterialTheme.typography.titleMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "最新: ${rttState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 2.dp)
)
Text(
text = "最大: ${rttHistory.maxOrNull() ?: 0} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 2.dp)
)
Text(
text = "最小: ${rttHistory.minOrNull() ?: 0} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 2.dp)
)
val averageRtt = if (rttHistory.isNotEmpty()) rttHistory.average().toLong() else 0L
Text(
text = "平均: $averageRtt ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium
)
}
}

}

LaunchedEffect(Unit) {
val options = PeerConnectionFactory.InitializationOptions.builder(context)
.createInitializationOptions()
PeerConnectionFactory.initialize(options)

val encoderFactory = DefaultVideoEncoderFactory(
localEglBase!!.eglBaseContext, true, true
)

val decoderFactory = DefaultVideoDecoderFactory(localEglBase!!.eglBaseContext)

peerConnectionFactory = PeerConnectionFactory.builder()
.setVideoEncoderFactory(encoderFactory)
.setVideoDecoderFactory(decoderFactory)
.createPeerConnectionFactory()

initLocalVideo(context, localView, peerConnectionFactory, localEglBase!!) {
localVideoTrack = it
}

createPeerConnection(
context,
peerConnectionFactory,
localVideoTrack
) {
localPeer = it
startStatsCollection() // Start collecting statistics
}

initializeSocketIO()

requestPermissionsIfNeeded()
startTimeSync()
}
}
}

private fun startTimeSync(intervalMs: Long = 5000L) {
viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(intervalMs) // Sync every 5 seconds
requestTimeSync()
}
}
}

private fun startStatsCollection() {
statsJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(1000) // Collect stats every second
localPeer?.getStats { report ->
parseStatsReport(report)
} ?: Log.e(TAG, "Failed to get stats: localPeer is null.")
}
}
}

private fun parseStatsReport(report: RTCStatsReport) {
Log.d(TAG, "RTCStatsReport: ${report.statsMap}")
for (stats in report.statsMap.values) {
Log.d(TAG, "Stats type: ${stats.type}")
if (stats.type == "transport") {
Log.d(TAG, "Transport Stats found: $stats")
val currentRtt = (stats.members["currentRoundTripTime"] as? Number)?.toDouble()?.times(1000)?.toLong()
if (currentRtt != null && currentRtt > 0) {
viewLifecycleOwner.lifecycleScope.launch {
// Update RTT state
rttState.longValue = currentRtt

// Update RTT history
rttHistory.add(currentRtt)
if (rttHistory.size > 60) {
rttHistory.removeAt(0)
}

// Calculate max, min, and average RTT
val maxRtt = rttHistory.maxOrNull() ?: 0L
val minRtt = rttHistory.minOrNull() ?: 0L
val averageRtt = if (rttHistory.isNotEmpty()) {
rttHistory.average().toLong()
} else {
0L
}

Log.d(TAG, "RTT - Latest: $currentRtt ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms")
}
}
}
}
}

private fun initializeSocketIO() {
val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http"
val socketUrl = signalingServerUrl

val options = IO.Options().apply {
transports = arrayOf("websocket")
secure = protocol == "https"
path = "/socket.io/"
}

try {
socket = IO.socket(socketUrl, options)

socket.on(Socket.EVENT_CONNECT) {
Log.d(TAG, "Socket connected")
socket.emit("join", currentRoom)
Log.d(TAG, "Joined room: $currentRoom")
}

socket.on(Socket.EVENT_CONNECT_ERROR) { args ->
if (args.isNotEmpty()) {
val error = args[0]
Log.e(TAG, "Socket connection error: $error")
}
}

socket.on(Socket.EVENT_DISCONNECT) { args ->
if (args.isNotEmpty()) {
val reason = args[0]
Log.d(TAG, "Socket disconnected: $reason")
}
}

socket.on("signal") { args ->
Log.d(TAG, "Received signaling: ${args[0]}")
if (args.isNotEmpty() && args[0] is JSONObject) {
val data = args[0] as JSONObject
handleSignalingData(data)
}
}

socket.connect()
Log.d(TAG, "Connecting to Socket: $socketUrl...")
} catch (e: Exception) {
Log.e(TAG, "Error connecting to Socket: ${e.message}")
}
}

private fun requestPermissionsIfNeeded() {
val permissions = arrayOf(
Manifest.permission.CAMERA,
Manifest.permission.RECORD_AUDIO,
Manifest.permission.INTERNET,
Manifest.permission.ACCESS_NETWORK_STATE
)

val permissionsToRequest = permissions.filter {
ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED
}

if (permissionsToRequest.isNotEmpty()) {
requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray())
} else {
onPermissionsChecked()
}
}

private fun onPermissionsChecked() {
Toast.makeText(requireContext(), "所有必要权限已被授予", Toast.LENGTH_SHORT).show()
}

private fun initLocalVideo(
context: Context,
localView: SurfaceViewRenderer?,
peerConnectionFactory: PeerConnectionFactory,
eglBase: EglBase,
onLocalVideoTrack: (VideoTrack) -> Unit
) {
val videoCapturer = createCameraCapturer(context)
val surfaceTextureHelper = SurfaceTextureHelper.create("CaptureThread", eglBase.eglBaseContext)
val videoSource = peerConnectionFactory.createVideoSource(videoCapturer.isScreencast)
videoCapturer.initialize(surfaceTextureHelper, context, videoSource.capturerObserver)

videoCapturer.startCapture(1920, 1080, 60)

val localVideoTrack = peerConnectionFactory.createVideoTrack("video_track", videoSource)
localVideoTrack.addSink(localView)

val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints())
val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource)

// Add audio and video tracks to local stream
val mediaStream = peerConnectionFactory.createLocalMediaStream("local_stream")
mediaStream.addTrack(localAudioTrack)
mediaStream.addTrack(localVideoTrack)

onLocalVideoTrack(localVideoTrack)
}

private fun createCameraCapturer(context: Context): CameraVideoCapturer {
val camera2Enumerator = Camera2Enumerator(context)
val deviceNames = camera2Enumerator.deviceNames

// Prefer back-facing camera
for (deviceName in deviceNames) {
if (camera2Enumerator.isBackFacing(deviceName)) {
val capturer = camera2Enumerator.createCapturer(deviceName, null)
if (capturer != null) {
return capturer
}
}
}

// Fallback to front-facing camera
for (deviceName in deviceNames) {
if (camera2Enumerator.isFrontFacing(deviceName)) {
val capturer = camera2Enumerator.createCapturer(deviceName, null)
if (capturer != null) {
return capturer
}
}
}

// Fallback to first available camera
return camera2Enumerator.createCapturer(deviceNames[0], null)
?: throw IllegalStateException("Unable to create camera capturer")
}

private fun createPeerConnection(
context: Context,
peerConnectionFactory: PeerConnectionFactory,
localVideoTrack: VideoTrack?,
onLocalPeerCreated: (PeerConnection) -> Unit
) {
val iceServers = listOf(
PeerConnection.IceServer.builder(stunUrl).createIceServer(),
PeerConnection.IceServer.builder(turnUrl)
.setUsername(turnUsername)
.setPassword(turnPassword)
.createIceServer()
)

val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply {
bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE
rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE
tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED
continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY
sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN
}

localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer {
override fun onIceCandidate(iceCandidate: IceCandidate?) {
iceCandidate?.let {
Log.d(TAG, "ICE candidate: $it")
val signalData = JSONObject().apply {
put("type", "ice")
put("candidate", JSONObject().apply {
put("sdpMid", it.sdpMid)
put("sdpMLineIndex", it.sdpMLineIndex)
put("candidate", it.sdp)
})
put("room", currentRoom)
}
socket.emit("signal", signalData)
}
}

override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) {
Log.d(TAG, "ICE candidates removed")
}

override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
Log.d(TAG, "Signaling state changed to: $newState")
}

override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
Log.d(TAG, "ICE connection state changed to: $newState")
}

override fun onIceConnectionReceivingChange(receiving: Boolean) {
Log.d(TAG, "ICE connection receiving change: $receiving")
}

override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
Log.d(TAG, "ICE gathering state changed to: $newState")
}

override fun onAddStream(stream: MediaStream?) {
Log.d(TAG, "Stream added")
}

override fun onRemoveStream(stream: MediaStream?) {
Log.d(TAG, "Stream removed")
}

override fun onDataChannel(dataChannel: DataChannel?) {
Log.d(TAG, "Data channel created")
}

override fun onRenegotiationNeeded() {
Log.d(TAG, "Renegotiation needed")
}

override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) {
Log.d(TAG, "Track added")
}

override fun onTrack(transceiver: RtpTransceiver?) {
Log.d(TAG, "onTrack called")
}

override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
Log.d(TAG, "Connection state changed to: $newState")
}
})

localVideoTrack?.let {
localPeer?.addTrack(it, listOf("local_stream"))
}
val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints())
val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource)
localPeer?.addTrack(localAudioTrack, listOf("local_stream"))

onLocalPeerCreated(localPeer!!)
}

private fun createAnswer(peerConnection: PeerConnection, onAnswerCreated: (String) -> Unit) {
Log.d(TAG, "Creating answer...")
val constraints = MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true"))
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"))
}

peerConnection.createAnswer(object : SdpObserver {
override fun onCreateSuccess(sessionDescription: SessionDescription?) {
sessionDescription?.let { sdp ->
peerConnection.setLocalDescription(object : SdpObserver {
override fun onSetSuccess() {
Log.d(TAG, "SetLocalDescription onSetSuccess")
onAnswerCreated(sdp.description)
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "SetLocalDescription onSetFailure: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}
}

override fun onSetSuccess() {
Log.d(TAG, "createAnswer onSetSuccess")
}

override fun onCreateFailure(error: String?) {
Log.e(TAG, "createAnswer onCreateFailure: $error")
}

override fun onSetFailure(error: String?) {}
}, constraints)
}

private fun handleSignalingData(data: JSONObject) {
Log.d(TAG, "Handling signaling data: $data")
when (data.getString("type")) {
"offer" -> {
Log.d(TAG, "Received offer")
val sdp = SessionDescription(
SessionDescription.Type.OFFER,
data.getJSONObject("sdp").getString("sdp")
)
localPeer?.setRemoteDescription(object : SdpObserver {
override fun onSetSuccess() {
Log.d(TAG, "Set remote description (offer) success")
createAnswer(localPeer!!) { answer ->
val signalData = JSONObject().apply {
put("type", "answer")
put("sdp", JSONObject().put("sdp", answer))
put("room", currentRoom)
}

socket.emit("signal", signalData)

pendingIceCandidates.forEach { candidate ->
localPeer?.addIceCandidate(candidate)
}
pendingIceCandidates.clear()
}
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set remote description (offer) error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}

"answer" -> {
Log.d(TAG, "Received answer")
val sdp = SessionDescription(
SessionDescription.Type.ANSWER,
data.getJSONObject("sdp").getString("sdp")
)
localPeer?.setRemoteDescription(object : SdpObserver {
override fun onSetSuccess() {
pendingIceCandidates.forEach { candidate ->
localPeer?.addIceCandidate(candidate)
}
pendingIceCandidates.clear()
Log.d(TAG, "Set remote description (answer) success")
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set remote description error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}

"ice" -> {
Log.d(TAG, "Received ICE candidate")
val candidateData = data.getJSONObject("candidate")
val candidate = IceCandidate(
candidateData.getString("sdpMid"),
candidateData.getInt("sdpMLineIndex"),
candidateData.getString("candidate")
)

if (localPeer?.remoteDescription != null) {
localPeer?.addIceCandidate(candidate)
} else {
pendingIceCandidates.add(candidate)
}
}

"time_sync_response" -> {
Log.d(TAG, "Received time_sync_response")
val t1 = data.getLong("t1")
val t2 = data.getLong("t2")
val t3 = data.getLong("t3")
val t4 = System.currentTimeMillis()

// 计算 RTT 和时钟偏移量
val rtt = t4 - t1 - (t3 - t2)
clockOffset = ((t2 - t1) + (t3 - t4)) / 2

Log.d(TAG, "时间同步: RTT=$rtt ms, 时钟偏移量= $clockOffset ms")

// 计算单向时延 OWD
val owd = (t2 - t1) - clockOffset
if (owd >= 0) {
viewLifecycleOwner.lifecycleScope.launch {
// 更新 RTT 状态
rttState.longValue = rtt

// 更新 RTT 历史记录
rttHistory.add(rtt)
if (rttHistory.size > 60) {
rttHistory.removeAt(0)
}

// 计算 RTT 的最大、最小和平均值
val maxRtt = rttHistory.maxOrNull() ?: 0L
val minRtt = rttHistory.minOrNull() ?: 0L
val averageRtt = if (rttHistory.isNotEmpty()) {
rttHistory.average().toLong()
} else {
0L
}

// 更新 OWD 状态
owdState.longValue = owd

// 更新 OWD 历史记录
owdHistory.add(owd)
if (owdHistory.size > 60) {
owdHistory.removeAt(0)
}

// 计算 OWD 的最大、最小和平均值
val maxOwd = owdHistory.maxOrNull() ?: 0L
val minOwd = owdHistory.minOrNull() ?: 0L
val averageOwd = if (owdHistory.isNotEmpty()) {
owdHistory.average().toLong()
} else {
0L
}

Log.d(TAG, "RTT - 最新: $rtt ms, 最大: $maxRtt ms, 最小: $minRtt ms, 平均: $averageRtt ms")
Log.d(TAG, "单向时延 (OWD) - 最新: $owd ms, 最大: $maxOwd ms, 最小: $minOwd ms, 平均: $averageOwd ms")
}
} else {
Log.e(TAG, "计算出的OWD为负数,可能时钟同步有误。")
}
}

else -> {
Log.e(TAG, "Unknown signaling type: ${data.getString("type")}")
}
}
}

private fun requestTimeSync() {
val t1 = System.currentTimeMillis()
val syncRequest = JSONObject().apply {
put("type", "time_sync_request")
put("room", currentRoom)
put("t1", t1)
}
socket.emit("signal", syncRequest)
Log.d(TAG, "Sent time sync request at t1: $t1")
}

override fun onDestroyView() {
super.onDestroyView()
statsJob?.cancel()
socket.disconnect()
localPeer?.dispose()
localView?.release()
localEglBase?.release()
}
content_copy
Use code with caution.

}

class VideoReceiverFragment : Fragment() {

companion object {
fun newInstance(
room: String,
stunUrl: String,
turnUrl: String,
turnUsername: String,
turnPassword: String,
signalingServerUrl: String
): VideoReceiverFragment {
val fragment = VideoReceiverFragment()
val args = Bundle().apply {
putString("room", room)
putString("stunUrl", stunUrl)
putString("turnUrl", turnUrl)
putString("turnUsername", turnUsername)
putString("turnPassword", turnPassword)
putString("signalingServerUrl", signalingServerUrl)
}
fragment.arguments = args
return fragment
}
}

private lateinit var socket: Socket
private var localPeer: PeerConnection? = null
private var remoteView: SurfaceViewRenderer? = null
private var remoteEglBase: EglBase? = null
private val pendingIceCandidates = mutableListOf<IceCandidate>()
private var currentRoom: String? = null
private lateinit var signalingServerUrl: String
private lateinit var stunUrl: String
private lateinit var turnUrl: String
private lateinit var turnUsername: String
private lateinit var turnPassword: String
private val TAG: String = "WebRTC-Receiver"

private val frameRateState = mutableDoubleStateOf(0.0)
private val bitrateState = mutableLongStateOf(0L)
private val stutteringState = mutableStateOf(false)

private val frameRateLowState = mutableStateOf(false)
private val packetLossHighState = mutableStateOf(false)
private val packetLossState = mutableDoubleStateOf(0.0)

private val frameRateHistory = mutableStateListOf<Float>()
private val bitrateHistory = mutableStateListOf<Long>()
private var timeSyncJob: Job? = null

private var prevFramesDecoded = 0.0
private var prevBytesReceived = 0.0
private var prevFramesReceived = 0.0
private var prevFramesDropped = 0.0
private var prevTimestamp = 0.0

private var statsJob: Job? = null

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

// Retrieve data from arguments
currentRoom = arguments?.getString("room") ?: "default-room"
signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg"
stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478"
turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349"
turnUsername = arguments?.getString("turnUsername") ?: "wstszx"
turnPassword = arguments?.getString("turnPassword") ?: "930379"

Log.d(
TAG,
"onCreate: Role = Client, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl"
)
}

private val requestPermissionsLauncher =
registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions ->
permissions.entries.forEach { (permission, isGranted) ->
if (isGranted) {
Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show()
} else {
Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show()
}
}
onPermissionsChecked()
}

override fun onCreateView(
inflater: android.view.LayoutInflater,
container: android.view.ViewGroup?,
savedInstanceState: Bundle?
): android.view.View {
return ComposeView(requireContext()).apply {
setContent {
WebRTCComposeLayout()
}
}
}

@Composable
fun WebRTCComposeLayout() {
val context = LocalContext.current
lateinit var peerConnectionFactory: PeerConnectionFactory

Surface(color = Color.Black) {
Column(modifier = Modifier.fillMaxSize()) {

// Remote video view
AndroidView(
factory = {
remoteView = SurfaceViewRenderer(it).apply {
setZOrderMediaOverlay(false)
}
remoteView!!
},
modifier = Modifier
.weight(1f)
.fillMaxWidth(),
update = {
if (remoteEglBase?.eglBaseContext == null) {
remoteEglBase = EglBase.create()
it.init(remoteEglBase!!.eglBaseContext, null)
it.setMirror(false)
}
}
)

Spacer(modifier = Modifier.height(8.dp))

Column(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 8.dp)
) {
// Frame Rate Text in Chinese
Text(
text = "帧率: ${frameRateState.doubleValue.roundToInt()} fps",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Log.d(
TAG,
"UI - Frame Rate: ${frameRateState.doubleValue} fps, Bitrate: ${bitrateState.longValue / 1000} kbps"
)

// Line Chart for Frame Rate
LineChart(
data = frameRateHistory,
modifier = Modifier
.height(200.dp)
.fillMaxWidth()
.padding(vertical = 8.dp),
lineColor = Color.Green,
backgroundColor = Color.Black,
yAxisLabel = "帧率 (fps)",
xAxisLabel = "时间 (秒)"
)
}

// Spacer between charts
Spacer(modifier = Modifier.height(8.dp))

// Bitrate Section
Column(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 16.dp)
) {
// Bitrate Text in Chinese
Text(
text = "码率: ${bitrateState.longValue / 1000} kbps",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)

// Line Chart for Bitrate
LineChart(
data = bitrateHistory.map { it / 1000f },
modifier = Modifier
.height(200.dp)
.fillMaxWidth()
.padding(vertical = 8.dp),
lineColor = Color.Blue,
backgroundColor = Color.Black,
yAxisLabel = "码率 (kbps)",
xAxisLabel = "时间 (秒)"
)
}

// Spacer between metrics and stuttering indicator
Spacer(modifier = Modifier.height(16.dp))

// Stuttering Indicator
Row(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 16.dp),
verticalAlignment = Alignment.CenterVertically
) {
if (stutteringState.value) {
Icon(
imageVector = Icons.Default.Warning,
contentDescription = "卡顿警告",
tint = Color.Red,
modifier = Modifier.size(24.dp)
)
Spacer(modifier = Modifier.width(8.dp))
Column {
Text(
text = "视频播放出现卡顿",
color = Color.Red,
style = MaterialTheme.typography.bodyMedium
)
// Additional information about which metrics are abnormal
if (frameRateLowState.value) {
Text(
text = "帧率过低: ${frameRateState.doubleValue.roundToInt()} fps",
color = Color.Red,
style = MaterialTheme.typography.bodySmall
)
}
if (packetLossHighState.value) {
Text(
text = "包丢失率过高: ${packetLossState.doubleValue.roundToInt()}%",
color = Color.Red,
style = MaterialTheme.typography.bodySmall
)
}
}
} else {
Icon(
imageVector = Icons.Default.CheckCircle,
contentDescription = "正常",
tint = Color.Green,
modifier = Modifier.size(24.dp)
)
Spacer(modifier = Modifier.width(8.dp))
Text(
text = "视频播放正常",
color = Color.Green,
style = MaterialTheme.typography.bodyMedium
)
}
}
}

LaunchedEffect(Unit) {
val options = PeerConnectionFactory.InitializationOptions.builder(context)
.createInitializationOptions()
PeerConnectionFactory.initialize(options)

val encoderFactory = DefaultVideoEncoderFactory(
EglBase.create().eglBaseContext, true, true
)

val decoderFactory = DefaultVideoDecoderFactory(remoteEglBase!!.eglBaseContext)

peerConnectionFactory = PeerConnectionFactory.builder()
.setVideoEncoderFactory(encoderFactory)
.setVideoDecoderFactory(decoderFactory)
.createPeerConnectionFactory()

createPeerConnection(
context,
peerConnectionFactory,
remoteView!!
) {
localPeer = it
}

initializeSocketIO()

requestPermissionsIfNeeded()
}
}
}

/**
* Enhanced Line Chart Composable with Axes and Labels
* Modified to use straight lines connecting each point
*/
@Composable
fun LineChart(
data: List<Float>,
modifier: Modifier = Modifier,
lineColor: Color = Color.Green,
backgroundColor: Color = Color.Black,
yAxisLabel: String = "",
xAxisLabel: String = "",
minYValue: Float? = null,
maxYValue: Float? = null
) {
Canvas(modifier = modifier.background(backgroundColor)) {
val padding = 40.dp.toPx() // Padding for axes and labels

if (data.isEmpty()) return@Canvas

val maxY = maxYValue ?: data.maxOrNull() ?: 1f
val minY = minYValue ?: data.minOrNull() ?: 0f
val yRange = maxY - minY
val pointCount = data.size
val spacing = (size.width - padding * 2) / (pointCount - 1).coerceAtLeast(1)

val points = data.mapIndexed { index, value ->
val x = padding + index * spacing
val y = if (yRange == 0f) size.height / 2 else (size.height - padding) - ((value - minY) / yRange) * (size.height - padding * 2)
Offset(x, y)
}

// Draw axes
drawLine(
color = Color.White,
start = Offset(padding, padding),
end = Offset(padding, size.height - padding),
strokeWidth = 2f
)
drawLine(
color = Color.White,
start = Offset(padding, size.height - padding),
end = Offset(size.width - padding, size.height - padding),
strokeWidth = 2f
)

// Draw y-axis labels
val yLabelCount = 5
val yStep = yRange / (yLabelCount - 1)
for (i in 0 until yLabelCount) {
val yValue = minY + i * yStep
val yPos = (size.height - padding) - ((yValue - minY) / yRange) * (size.height - padding * 2)

drawContext.canvas.nativeCanvas.apply {
val label = yValue.roundToInt().toString()
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.RIGHT
}
drawText(
label,
padding - 8f,
yPos + textPaint.textSize / 2,
textPaint
)
}
}

// Draw x-axis labels
val xLabelCount = 5
val xStep = (pointCount - 1).coerceAtLeast(1) / (xLabelCount - 1).coerceAtLeast(1)
for (i in 0 until xLabelCount) {
val index = i * xStep
val xPos = padding + index * spacing

drawContext.canvas.nativeCanvas.apply {
val label = index.toString()
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
}
drawText(
label,
xPos,
size.height - padding + textPaint.textSize + 4f,
textPaint
)
}
}

// Optionally, draw axis labels
// Y-Axis Label
if (yAxisLabel.isNotEmpty()) {
drawContext.canvas.nativeCanvas.apply {
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
isAntiAlias = true
}
// Rotate for vertical text
save()
rotate(-90f, padding / 2, size.height / 2)
drawText(
yAxisLabel,
padding / 2,
size.height / 2,
textPaint
)
restore()
}
}

// X-Axis Label
if (xAxisLabel.isNotEmpty()) {
drawContext.canvas.nativeCanvas.apply {
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
isAntiAlias = true
}
drawText(
xAxisLabel,
size.width / 2,
size.height - padding / 2,
textPaint
)
}
}

// Draw the straight lines connecting points
if (points.size >= 2) {
for (i in 0 until points.size - 1) {
drawLine(
color = lineColor,
start = points[i],
end = points[i + 1],
strokeWidth = 4f,
cap = StrokeCap.Round
)
}
}

// Optionally, draw points
points.forEach { point ->
drawCircle(
color = lineColor,
radius = 4f,
center = point
)
}
}
}

private fun initializeSocketIO() {
val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http"
val socketUrl = signalingServerUrl

val options = IO.Options().apply {
transports = arrayOf("websocket")
secure = protocol == "https"
path = "/socket.io/"
}

try {
socket = IO.socket(socketUrl, options)

socket.on(Socket.EVENT_CONNECT) {
Log.d(TAG, "Socket connected")
socket.emit("join", currentRoom)
Log.d(TAG, "Joined room: $currentRoom")
initiateCall()
}

socket.on(Socket.EVENT_CONNECT_ERROR) { args ->
if (args.isNotEmpty()) {
val error = args[0]
Log.e(TAG, "Socket connection error: $error")
}
}

socket.on(Socket.EVENT_DISCONNECT) { args ->
if (args.isNotEmpty()) {
val reason = args[0]
Log.d(TAG, "Socket disconnected: $reason")
}
}

socket.on("signal") { args ->
Log.d(TAG, "Received signaling: ${args[0]}")
if (args.isNotEmpty() && args[0] is JSONObject) {
val data = args[0] as JSONObject
handleSignalingData(data)
}
}

socket.connect()
Log.d(TAG, "Connecting to Socket: $socketUrl...")
} catch (e: Exception) {
Log.e(TAG, "Error connecting to Socket: ${e.message}")
}
}

private fun requestPermissionsIfNeeded() {
val permissions = arrayOf(
Manifest.permission.CAMERA,
Manifest.permission.RECORD_AUDIO,
Manifest.permission.INTERNET,
Manifest.permission.ACCESS_NETWORK_STATE
)

val permissionsToRequest = permissions.filter {
ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED
}

if (permissionsToRequest.isNotEmpty()) {
requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray())
} else {
onPermissionsChecked()
}
}

private fun onPermissionsChecked() {
Toast.makeText(requireContext(), "所有必要的权限已授予", Toast.LENGTH_SHORT).show()
}

private fun createPeerConnection(
context: Context,
peerConnectionFactory: PeerConnectionFactory,
remoteView: SurfaceViewRenderer,
onLocalPeerCreated: (PeerConnection) -> Unit
) {
val iceServers = listOf(
PeerConnection.IceServer.builder(stunUrl).createIceServer(),
PeerConnection.IceServer.builder(turnUrl)
.setUsername(turnUsername)
.setPassword(turnPassword)
.createIceServer()
)

val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply {
bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE
rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE
tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED
continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY
sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN
}

localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer {
override fun onIceCandidate(iceCandidate: IceCandidate?) {
iceCandidate?.let {
Log.d(TAG, "ICE candidate: $it")
val signalData = JSONObject().apply {
put("type", "ice")
put("candidate", JSONObject().apply {
put("sdpMid", it.sdpMid)
put("sdpMLineIndex", it.sdpMLineIndex)
put("candidate", it.sdp)
})
put("room", currentRoom)
}
socket.emit("signal", signalData)
}
}

override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) {
Log.d(TAG, "ICE candidates removed")
}

override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
Log.d(TAG, "Signaling state changed to: $newState")
}

override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
Log.d(TAG, "ICE connection state changed to: $newState")
}

override fun onIceConnectionReceivingChange(receiving: Boolean) {
Log.d(TAG, "ICE connection receiving change: $receiving")
}

override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
Log.d(TAG, "ICE gathering state changed to: $newState")
}

override fun onAddStream(stream: MediaStream?) {
Log.d(TAG, "Stream added")
}

override fun onRemoveStream(stream: MediaStream?) {
Log.d(TAG, "Stream removed")
}

override fun onDataChannel(dataChannel: DataChannel?) {
Log.d(TAG, "Data channel created")
}

override fun onRenegotiationNeeded() {
Log.d(TAG, "Renegotiation needed")
}

override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) {
Log.d(TAG, "Track added")
receiver?.track()?.let { track ->
if (track is VideoTrack) {
track.addSink(remoteView)
}
}
}

override fun onTrack(transceiver: RtpTransceiver?) {
Log.d(TAG, "onTrack called")
transceiver?.receiver?.track()?.let { track ->
if (track is VideoTrack) {
track.addSink(remoteView)
}
}
}

override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
Log.d(TAG, "Connection state changed to: $newState")
}
})

onLocalPeerCreated(localPeer!!)

startStatsCollection()
}

private fun initiateCall() {
Log.d(TAG, "Initiating call...")
val constraints = MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true"))
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"))
}

localPeer?.createOffer(object : SdpObserver {
override fun onCreateSuccess(sessionDescription: SessionDescription?) {
sessionDescription?.let { sdp ->
localPeer?.setLocalDescription(object : SdpObserver {
override fun onSetSuccess() {
val signalData = JSONObject().apply {
put("type", "offer")
put("sdp", JSONObject().put("sdp", sdp.description))
put("room", currentRoom)
}
socket.emit("signal", signalData)
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set local description error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}
}

override fun onSetSuccess() {}
override fun onCreateFailure(error: String?) {
Log.e(TAG, "Create offer error: $error")
}

override fun onSetFailure(error: String?) {}
}, constraints)
}

private fun handleSignalingData(data: JSONObject) {
Log.d(TAG, "Handling signaling data: $data")
when (data.getString("type")) {

"answer" -> {
Log.d(TAG, "Received answer")
val sdp = SessionDescription(
SessionDescription.Type.ANSWER,
data.getJSONObject("sdp").getString("sdp")
)

localPeer?.setRemoteDescription(object : SdpObserver {
override fun onSetSuccess() {
pendingIceCandidates.forEach { candidate ->
localPeer?.addIceCandidate(candidate)
}

pendingIceCandidates.clear()

Log.d(TAG, "Set remote description (answer) success")
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set remote description error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}

"ice" -> {
Log.d(TAG, "Received ICE candidate")
val candidateData = data.getJSONObject("candidate")
val candidate = IceCandidate(
candidateData.getString("sdpMid"),
candidateData.getInt("sdpMLineIndex"),
candidateData.getString("candidate")
)

if (localPeer?.remoteDescription != null) {
localPeer?.addIceCandidate(candidate)
} else {
pendingIceCandidates.add(candidate)
}
}

"time_sync_request" -> {
val t1 = data.getLong("t1")
val t2 = System.currentTimeMillis()
val t3 = System.currentTimeMillis()
val syncResponse = JSONObject().apply {
put("type", "time_sync_response")
put("t1", t1)
put("t2", t2)
put("t3", t3)
put("room", currentRoom)
}
socket.emit("signal", syncResponse)
Log.d(TAG, "回复 time_sync_request: t1=$t1, t2=$t2, t3=$t3")
}
else -> {
Log.e(TAG, "Unknown signaling type: ${data.getString("type")}")
}
}
}

private fun startStatsCollection() {
Log.d(TAG, "Starting stats collection...")
statsJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(1000) // Collect stats every second
Log.d(TAG, "Collecting stats...")
localPeer?.getStats { report ->
Log.d(TAG, "Stats report obtained.")
parseStatsReport(report)
} ?: Log.e(TAG, "Failed to get stats: localPeer is null.")
}
}
}

private fun parseStatsReport(report: RTCStatsReport) {
Log.d(TAG, "Received RTCStatsReport: $report")
for (stats in report.statsMap.values) {
if (stats.type == "inbound-rtp") {
val kind = stats.members["kind"] as? String
if (kind == "video") {
val framesDecoded = (stats.members["framesDecoded"] as? Number)?.toDouble() ?: 0.0
val framesReceived = (stats.members["framesReceived"] as? Number)?.toDouble() ?: 0.0
val framesDropped = (stats.members["framesDropped"] as? Number)?.toDouble() ?: 0.0
val bytesReceived = (stats.members["bytesReceived"] as? Number)?.toDouble() ?: 0.0
val packetsLost = (stats.members["packetsLost"] as? Number)?.toDouble() ?: 0.0
val packetsReceived = (stats.members["packetsReceived"] as? Number)?.toDouble() ?: 1.0 // Avoid division by zero
val packetLossFraction = packetsLost / (packetsLost + packetsReceived)
val timestamp = stats.timestampUs / 1_000_000.0 // Convert to seconds

Log.d(
TAG,
"Stats - Frames Decoded: $framesDecoded, Frames Received: $framesReceived, Frames Dropped: $framesDropped, Bytes Received: $bytesReceived, Packet Loss Fraction: $packetLossFraction, Timestamp: $timestamp"
)

if (prevTimestamp != 0.0) {
val timeElapsed = timestamp - prevTimestamp
val framesDelta = framesDecoded - prevFramesDecoded
val bytesDelta = bytesReceived - prevBytesReceived
val framesReceivedDelta = framesReceived - prevFramesReceived
val framesDroppedDelta = framesDropped - prevFramesDropped

val frameRate = if (timeElapsed > 0) framesDelta / timeElapsed else 0.0
val bitrate = if (timeElapsed > 0) (bytesDelta * 8) / timeElapsed else 0.0 // bits per second
val packetLoss = packetLossFraction * 100 // Convert to percentage

Log.d(TAG, "Calculated Frame Rate: $frameRate fps, Bitrate: $bitrate bps, Packet Loss: $packetLoss%")

// Determine stuttering based on thresholds
val isStuttering = frameRate < 24 || packetLoss > 5.0 // Thresholds can be adjusted

// Update states
viewLifecycleOwner.lifecycleScope.launch {
frameRateState.doubleValue = frameRate
bitrateState.longValue = bitrate.toLong()
// Update stuttering state
stutteringState.value = isStuttering
// Update specific stuttering causes
frameRateLowState.value = frameRate < 24
packetLossHighState.value = packetLoss > 5.0
packetLossState.doubleValue = packetLoss

// Update history
frameRateHistory.add(frameRate.toFloat())
if (frameRateHistory.size > 60) {
frameRateHistory.removeAt(0)
}

bitrateHistory.add(bitrate.toLong())
if (bitrateHistory.size > 60) {
bitrateHistory.removeAt(0)
}

Log.d(
TAG,
"Updated Frame Rate: ${frameRateState.doubleValue} fps, Bitrate: ${bitrateState.longValue / 1000} kbps, Stuttering: $isStuttering"
)
}
}

// Update previous values
prevFramesDecoded = framesDecoded
prevBytesReceived = bytesReceived
prevFramesReceived = framesReceived
prevFramesDropped = framesDropped
prevTimestamp = timestamp
}
}
}
}

override fun onDestroyView() {
super.onDestroyView()
statsJob?.cancel()
socket.disconnect()
localPeer?.dispose()
remoteView?.release()
remoteEglBase?.release()
timeSyncJob?.cancel()
}
content_copy
Use code with caution.

}

目前这部分代码计算的单向时延和往返时延有问题吗?

思考

评估代码的准确性

查看代码计算单向时延(OWD)和往返时延(RTT)的方法,特别是启动和接收片段的处理逻辑。

分析时间同步

我正在计算RTT和OWD。在视频接收器片段中,发送时间同步请求,接收方设定时间戳并回复。RTT通过计算时差得到,时钟偏移量通过时间戳差的平均值计算。

考核OWD论文

计算传输时间

我正在计算从发送方到接收方的往返时间(RTT)。通过发送方和接收方的时钟差异以及传输时间和接收时间,最终得到 RTT = t4 - t1 - (t3 - t2)。

考量时间同步

时间同步可能不准确,代码计算OWD为(t2 - t1) - clockOffset,但实际无法用单向消息精确测量OWD,需依赖RTT。时间同步可以估计OWD,但需要知道发送方和接收方的时钟是否已经同步。ერთ rekey_request, rekey_response和others вызовы на управляющей части.

调整时间设置

现在我们在看VideoReceiverFragment,t2和t3指的是请求收到和处理后的发送时间。这说明响应发送是即时的,处理时间不包含在内。

纠正SDK错误

调整时间同步

在解析和处理时间同步请求时,设置接收时间t2,然后处理,最后设置发送时间t3。这确保t3 - t2包括处理延迟,从而获得准确的时钟偏移和往返时间。

检测问题并调整时间同步

回答

您的代码在计算单向时延(OWD)和往返时延(RTT)时存在一些潜在的问题,主要集中在时间同步(Time Synchronization)的实现上。以下是详细分析和建议:

1. 时间同步协议的问题

当前实现的流程:

  1. 发起方(VideoInitiatorFragment)

    • 定期发送 time_sync_request,包含发送时间 t1
    • 接收到 time_sync_response 后,记录接收时间 t4,并使用 t2t3 来计算 RTT 和时钟偏移量(clockOffset)。
  2. 接收方(VideoReceiverFragment)

    • 接收到 time_sync_request 后,立即记录接收时间 t2 和发送时间 t3,并发送回 time_sync_response

存在的问题:

  • t2t3 的设定方式

    • VideoReceiverFragment 中,t2t3 是在接收到请求后立即设定的,没有考虑任何处理时间。这意味着 t3 - t2 ≈ 0,导致 RTT 计算为 t4 - t1,忽略了接收方处理请求并发送响应所需的时间。这会使 RTT 的计算不准确,从而影响 clockOffset 和 OWD 的计算。
  • 单向时延(OWD)的计算

    • 当前代码中,OWD 被计算为 (t2 - t1) - clockOffset。然而,由于 clockOffset 的计算基于不准确的 RTT,这导致 OWD 的估计也不准确。
  • 时间同步的准确性

    • 使用简单的时间同步方法(类似于 NTP)需要多次往返时间测量以减少网络抖动和提高准确性。当前实现仅进行一次同步,可能无法提供足够的精度。

2. 改进建议

修正时间同步的 t2t3 设定:

确保 t3 记录的是发送响应的准确时间,应该在发送响应之前设定 t3,而不是与 t2 相同。例如:

private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "time_sync_request" -> { val t1 = data.getLong("t1") val t2 = System.currentTimeMillis() // 模拟一些处理延迟(如果有的话) // 例如,等待某个条件或执行一些操作 val t3 = System.currentTimeMillis() // 确保 t3 > t2 val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("t1", t1) put("t2", t2) put("t3", t3) put("room", currentRoom) } socket.emit("signal", syncResponse) Log.d(TAG, "回复 time_sync_request: t1=$t1, t2=$t2, t3=$t3") } // 其他信令类型处理 } }

增加多次同步测量:

为了提高时间同步的准确性,可以进行多次时间同步测量,并取平均值或其他统计方法来减少网络抖动的影响。

private fun startTimeSync(intervalMs: Long = 5000L, attempts: Int = 5) { viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(intervalMs) repeat(attempts) { requestTimeSync() delay(100) // 每次请求之间的间隔 } } } }

改进 RTT 和 OWD 的计算:

确保 RTT 和 OWD 的计算基于准确的 clockOffset,并考虑多次测量后的平均值。例如:

private val clockOffsets = mutableListOf<Long>() private val rttValues = mutableListOf<Long>() private fun handleSignalingData(data: JSONObject) { // 现有的代码... when (data.getString("type")) { "time_sync_response" -> { val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.currentTimeMillis() val rtt = t4 - t1 - (t3 - t2) val offset = ((t2 - t1) + (t3 - t4)) / 2 // 保存 RTT 和 clockOffset rttValues.add(rtt) clockOffsets.add(offset) // 计算平均 RTT 和 clockOffset val avgRtt = rttValues.average().toLong() val avgOffset = clockOffsets.average().toLong() // 计算 OWD val owd = (t2 - t1) - avgOffset if (owd >= 0) { viewLifecycleOwner.lifecycleScope.launch { // 更新 RTT 和 OWD 状态 rttState.longValue = avgRtt owdState.longValue = owd // 更新历史记录和统计信息 // (现有的代码) } } else { Log.e(TAG, "计算出的 OWD 为负数,可能时钟同步有误。") } } // 其他信令类型处理 } }

考虑使用现有的时间同步库或协议:

如果需要更高精度的时间同步,可以考虑使用专门的时间同步库或协议,如 NTP 或 PTP。这些协议经过优化,可以提供更准确的时钟同步。

3. 其他潜在问题

  • 权限检查

    • 确保所有必要的权限在应用启动时已正确申请,并处理用户可能拒绝权限的情况。
  • 异常处理

    • 增强对网络异常、Socket 连接错误等情况的处理,以提高应用的鲁棒性。
  • 性能优化

    • 确保统计数据的收集和 UI 更新不会对主线程造成过大压力,避免导致 UI 卡顿。

总结

您的代码在基本的 RTT 和 OWD 计算逻辑上是正确的,但由于时间同步过程中 t3 被设定为与 t2 相同,导致 RTT 和 clockOffset 的计算不准确,进而影响 OWD 的估计。通过确保 t3 记录的是准确的响应发送时间,并考虑多次测量来提高同步精度,可以显著改善时延计算的准确性。

如果您有进一步的问题或需要更详细的帮助,请随时提问!

问题

t3的时间能否按照正确的方式来赋值,目前没有真实的处理请求响应耗时

思考

时间同步的问题

我在分析代码时发现,接收者片段中t3设定得很紧,影响了RTT和clockOffset计算。问t3可以正确设置吗?可以模拟一些处理时延来使t3-t2不为零。

确保时钟同步

在实际应用中,处理时间会影响RTT和clockOffset计算的准确性。为了确保t3准确,在接收器片段中安排处理延迟或测量实际处理时间是重要的。

时间同步请求处理中合理利用系统时间以提高精度。

安排时间发送

为了处理数据时序问题,我在接收端模拟处理延迟,通过协程异步发送响应。我还计划在"处理请求"时明确设置发送时间点。

设定正确条件

在代码处理中,确保 time_sync_request 条件成立时,t2 被设定为当前时间,t3 在响应发送前设置。

回答

是的,您可以通过正确地设置 t3 来改进当前的单向时延(OWD)和往返时延(RTT)计算。目前,t3 被设置为与 t2 相同,这会导致 t3 - t2 ≈ 0,从而使 RTT 和 clockOffset 的计算不准确。以下是详细的解决方案和代码示例,帮助您正确地设置 t3 并改进时延计算。

1. 正确设置 t2t3

VideoReceiverFragment 中,t2 应该是在接收到 time_sync_request 时记录的时间,而 t3 应该是在发送 time_sync_response 之前记录的时间。这意味着 t3 代表了处理请求所花费的时间。

修改后的 handleSignalingData 方法

以下是如何在 VideoReceiverFragment 中正确设置 t2t3 的示例:

private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "time_sync_request" -> { val t1 = data.getLong("t1") val t2 = System.currentTimeMillis() // 模拟处理时间(如果有实际的处理任务,可以在这里执行) // 如果没有,可以选择不添加延迟,t3 将紧随 t2 // 例如,添加一个短暂的延迟来模拟处理时间 // viewLifecycleOwner.lifecycleScope.launch { // delay(10) // 模拟10毫秒的处理时间 // val t3 = System.currentTimeMillis() // sendTimeSyncResponse(t1, t2, t3) // } // 如果没有实际的处理任务,直接设置 t3 val t3 = System.currentTimeMillis() sendTimeSyncResponse(t1, t2, t3) } // 其他信令类型处理 else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } } private fun sendTimeSyncResponse(t1: Long, t2: Long, t3: Long) { val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("t1", t1) put("t2", t2) put("t3", t3) put("room", currentRoom) } socket.emit("signal", syncResponse) Log.d(TAG, "回复 time_sync_request: t1=$t1, t2=$t2, t3=$t3") }

解释

  1. 记录 t1t2

    • t1 是 initiator 发送请求时的时间戳。
    • t2 是 receiver 接收到请求时的时间戳。
  2. 处理请求并记录 t3

    • 在处理完请求后(无论是否有实际的处理任务),记录发送响应的时间戳 t3
    • 如果有实际的处理任务,可以在记录 t3 之前添加适当的延迟。
  3. 发送响应

    • t1t2t3 作为响应的一部分发送回 initiator。

可选:模拟处理延迟

如果您希望模拟实际的处理延迟,可以取消注释相关代码:

viewLifecycleOwner.lifecycleScope.launch { delay(10) // 模拟10毫秒的处理时间 val t3 = System.currentTimeMillis() sendTimeSyncResponse(t1, t2, t3) }

这样,t3 - t2 将包含模拟的处理时间,使 RTT 的计算更为准确。

2. 在 Initiator 端正确处理响应

确保在 VideoInitiatorFragment 中正确处理 time_sync_response,并使用 t1t2t3 来计算 RTT 和 clockOffset。以下是改进后的代码示例:

private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "offer" -> { /* 现有代码 */ } "answer" -> { /* 现有代码 */ } "ice" -> { /* 现有代码 */ } "time_sync_response" -> { Log.d(TAG, "Received time_sync_response") val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.currentTimeMillis() // 计算 RTT 和时钟偏移量 val rtt = t4 - t1 - (t3 - t2) val clockOffset = ((t2 - t1) + (t3 - t4)) / 2 Log.d(TAG, "时间同步: RTT=$rtt ms, 时钟偏移量= $clockOffset ms") // 计算单向时延 OWD val owd = (t2 - t1) - clockOffset if (owd >= 0) { viewLifecycleOwner.lifecycleScope.launch { // 更新 RTT 状态 rttState.longValue = rtt // 更新 RTT 历史记录 rttHistory.add(rtt) if (rttHistory.size > 60) { rttHistory.removeAt(0) } // 计算 RTT 的最大、最小和平均值 val maxRtt = rttHistory.maxOrNull() ?: 0L val minRtt = rttHistory.minOrNull() ?: 0L val averageRtt = if (rttHistory.isNotEmpty()) { rttHistory.average().toLong() } else { 0L } // 更新 OWD 状态 owdState.longValue = owd // 更新 OWD 历史记录 owdHistory.add(owd) if (owdHistory.size > 60) { owdHistory.removeAt(0) } // 计算 OWD 的最大、最小和平均值 val maxOwd = owdHistory.maxOrNull() ?: 0L val minOwd = owdHistory.minOrNull() ?: 0L val averageOwd = if (owdHistory.isNotEmpty()) { owdHistory.average().toLong() } else { 0L } Log.d(TAG, "RTT - 最新: $rtt ms, 最大: $maxRtt ms, 最小: $minRtt ms, 平均: $averageRtt ms") Log.d(TAG, "单向时延 (OWD) - 最新: $owd ms, 最大: $maxOwd ms, 最小: $minOwd ms, 平均: $averageOwd ms") } } else { Log.e(TAG, "计算出的 OWD 为负数,可能时钟同步有误。") } } else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } }

解释

  1. 接收响应并提取时间戳

    • time_sync_response 中提取 t1t2t3
  2. 计算 RTT 和 clockOffset

    • RTTRTT = t4 - t1 - (t3 - t2)
      • t4 是接收响应时的当前时间。
      • t3 - t2 是 receiver 处理请求并发送响应所花费的时间。
    • Clock OffsetclockOffset = ((t2 - t1) + (t3 - t4)) / 2
      • 这是 initiator 和 receiver 之间的时钟偏移量。
  3. 计算 OWD

    • OWD = (t2 - t1) - clockOffset
    • 确保 OWD 为非负值。
  4. 更新状态和历史记录

    • 更新 RTT 和 OWD 的当前值、历史记录以及统计信息(最大值、最小值、平均值)。

3. 增加多次同步测量以提高准确性

为了提高时间同步的准确性,建议进行多次同步测量并取平均值。这可以减少网络抖动和偶发延迟对结果的影响。

修改 startTimeSync 方法

private fun startTimeSync(intervalMs: Long = 5000L, attemptsPerSync: Int = 5) { viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(intervalMs) repeat(attemptsPerSync) { requestTimeSync() delay(100) // 每次请求之间的间隔 } } } }

修改 handleSignalingData 方法

handleSignalingData 中,收集多个 RTT 和 clockOffset 值,并计算平均值:

private val rttValues = mutableListOf<Long>() private val clockOffsets = mutableListOf<Long>() private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "time_sync_request" -> { val t1 = data.getLong("t1") val t2 = System.currentTimeMillis() val t3 = System.currentTimeMillis() sendTimeSyncResponse(t1, t2, t3) } "time_sync_response" -> { Log.d(TAG, "Received time_sync_response") val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.currentTimeMillis() val rtt = t4 - t1 - (t3 - t2) val clockOffset = ((t2 - t1) + (t3 - t4)) / 2 // 保存 RTT 和 clockOffset rttValues.add(rtt) clockOffsets.add(clockOffset) // 只保留最近的 N 个值 if (rttValues.size > 20) rttValues.removeAt(0) if (clockOffsets.size > 20) clockOffsets.removeAt(0) // 计算平均 RTT 和 clockOffset val avgRtt = rttValues.average().toLong() val avgClockOffset = clockOffsets.average().toLong() // 计算 OWD val owd = (t2 - t1) - avgClockOffset if (owd >= 0) { viewLifecycleOwner.lifecycleScope.launch { // 更新 RTT 状态 rttState.longValue = avgRtt // 更新 RTT 历史记录 rttHistory.add(avgRtt) if (rttHistory.size > 60) { rttHistory.removeAt(0) } // 计算 RTT 的最大、最小和平均值 val maxRtt = rttHistory.maxOrNull() ?: 0L val minRtt = rttHistory.minOrNull() ?: 0L val averageRtt = if (rttHistory.isNotEmpty()) { rttHistory.average().toLong() } else { 0L } // 更新 OWD 状态 owdState.longValue = owd // 更新 OWD 历史记录 owdHistory.add(owd) if (owdHistory.size > 60) { owdHistory.removeAt(0) } // 计算 OWD 的最大、最小和平均值 val maxOwd = owdHistory.maxOrNull() ?: 0L val minOwd = owdHistory.minOrNull() ?: 0L val averageOwd = if (owdHistory.isNotEmpty()) { owdHistory.average().toLong() } else { 0L } Log.d(TAG, "RTT - 最新: $avgRtt ms, 最大: $maxRtt ms, 最小: $minRtt ms, 平均: $averageRtt ms") Log.d(TAG, "单向时延 (OWD) - 最新: $owd ms, 最大: $maxOwd ms, 最小: $minOwd ms, 平均: $averageOwd ms") } } else { Log.e(TAG, "计算出的 OWD 为负数,可能时钟同步有误。") } } else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } }

解释

  1. 收集多次 RTT 和 clockOffset

    • 使用 rttValuesclockOffsets 列表来存储多次同步的结果。
    • 保留最近的 N 个值(例如 20 个),以避免列表过长。
  2. 计算平均 RTT 和 clockOffset

    • 通过计算列表的平均值,获得更稳定和准确的 RTT 和 clockOffset
  3. 计算 OWD

    • 使用平均的 clockOffset 来计算 OWD,确保更准确的单向时延估计。

4. 其他改进建议

使用更高精度的时间戳

System.currentTimeMillis() 具有较低的精度,可能导致 RTT 和 OWD 的计算不够精确。建议使用 System.nanoTime()SystemClock.elapsedRealtime() 来获取更高精度的时间戳。

示例修改

// 使用 System.nanoTime() 获取高精度时间戳 private fun handleSignalingData(data: JSONObject) { when (data.getString("type")) { "time_sync_request" -> { val t1 = data.getLong("t1") val t2 = System.nanoTime() / 1_000_000 // 转换为毫秒 val t3 = System.nanoTime() / 1_000_000 sendTimeSyncResponse(t1, t2, t3) } "time_sync_response" -> { val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.nanoTime() / 1_000_000 val rtt = t4 - t1 - (t3 - t2) val clockOffset = ((t2 - t1) + (t3 - t4)) / 2 // 其余逻辑不变 } // 其他信令类型处理 } }

处理时钟漂移和同步误差

尽管上述改进可以提高时延计算的准确性,但时钟漂移和网络抖动仍然可能影响结果。为此,建议:

  1. 增加同步频率:定期进行时间同步,以应对时钟漂移。
  2. 使用滑动窗口:使用滑动窗口技术,仅基于最近的 N 次同步结果进行计算。
  3. 滤波和平滑:应用滤波算法(如卡尔曼滤波)来平滑 RTT 和 OWD 的变化。

5. 完整的代码示例

以下是经过上述改进后的 VideoReceiverFragmentVideoInitiatorFragment 的关键部分代码示例。

VideoReceiverFragment

private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "time_sync_request" -> { val t1 = data.getLong("t1") val t2 = System.nanoTime() / 1_000_000 // 毫秒 // 如果有实际的处理任务,可以在这里执行 // 模拟处理时间(可选) // delay(10) // 使用协程延迟模拟处理时间 val t3 = System.nanoTime() / 1_000_000 sendTimeSyncResponse(t1, t2, t3) } "time_sync_response" -> { Log.d(TAG, "Received time_sync_response") val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.nanoTime() / 1_000_000 // 毫秒 val rtt = t4 - t1 - (t3 - t2) val clockOffset = ((t2 - t1) + (t3 - t4)) / 2 // 保存 RTT 和 clockOffset rttValues.add(rtt) clockOffsets.add(clockOffset) // 保持最近的 20 个值 if (rttValues.size > 20) rttValues.removeAt(0) if (clockOffsets.size > 20) clockOffsets.removeAt(0) // 计算平均 RTT 和 clockOffset val avgRtt = rttValues.average().toLong() val avgClockOffset = clockOffsets.average().toLong() // 计算 OWD val owd = (t2 - t1) - avgClockOffset if (owd >= 0) { viewLifecycleOwner.lifecycleScope.launch { // 更新 RTT 状态 rttState.longValue = avgRtt // 更新 RTT 历史记录 rttHistory.add(avgRtt) if (rttHistory.size > 60) { rttHistory.removeAt(0) } // 计算 RTT 的最大、最小和平均值 val maxRtt = rttHistory.maxOrNull() ?: 0L val minRtt = rttHistory.minOrNull() ?: 0L val averageRtt = if (rttHistory.isNotEmpty()) { rttHistory.average().toLong() } else { 0L } // 更新 OWD 状态 owdState.longValue = owd // 更新 OWD 历史记录 owdHistory.add(owd) if (owdHistory.size > 60) { owdHistory.removeAt(0) } // 计算 OWD 的最大、最小和平均值 val maxOwd = owdHistory.maxOrNull() ?: 0L val minOwd = owdHistory.minOrNull() ?: 0L val averageOwd = if (owdHistory.isNotEmpty()) { owdHistory.average().toLong() } else { 0L } Log.d(TAG, "RTT - 最新: $avgRtt ms, 最大: $maxRtt ms, 最小: $minRtt ms, 平均: $averageRtt ms") Log.d(TAG, "单向时延 (OWD) - 最新: $owd ms, 最大: $maxOwd ms, 最小: $minOwd ms, 平均: $averageOwd ms") } } else { Log.e(TAG, "计算出的 OWD 为负数,可能时钟同步有误。") } } else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } } private fun sendTimeSyncResponse(t1: Long, t2: Long, t3: Long) { val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("t1", t1) put("t2", t2) put("t3", t3) put("room", currentRoom) } socket.emit("signal", syncResponse) Log.d(TAG, "回复 time_sync_request: t1=$t1, t2=$t2, t3=$t3") }

VideoInitiatorFragment

private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "offer" -> { /* 现有代码 */ } "answer" -> { /* 现有代码 */ } "ice" -> { /* 现有代码 */ } "time_sync_response" -> { Log.d(TAG, "Received time_sync_response") val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.nanoTime() / 1_000_000 // 毫秒 val rtt = t4 - t1 - (t3 - t2) val clockOffset = ((t2 - t1) + (t3 - t4)) / 2 Log.d(TAG, "时间同步: RTT=$rtt ms, 时钟偏移量= $clockOffset ms") // 计算单向时延 OWD val owd = (t2 - t1) - clockOffset if (owd >= 0) { viewLifecycleOwner.lifecycleScope.launch { // 更新 RTT 状态 rttState.longValue = rtt // 更新 RTT 历史记录 rttHistory.add(rtt) if (rttHistory.size > 60) { rttHistory.removeAt(0) } // 计算 RTT 的最大、最小和平均值 val maxRtt = rttHistory.maxOrNull() ?: 0L val minRtt = rttHistory.minOrNull() ?: 0L val averageRtt = if (rttHistory.isNotEmpty()) { rttHistory.average().toLong() } else { 0L } // 更新 OWD 状态 owdState.longValue = owd // 更新 OWD 历史记录 owdHistory.add(owd) if (owdHistory.size > 60) { owdHistory.removeAt(0) } // 计算 OWD 的最大、最小和平均值 val maxOwd = owdHistory.maxOrNull() ?: 0L val minOwd = owdHistory.minOrNull() ?: 0L val averageOwd = if (owdHistory.isNotEmpty()) { owdHistory.average().toLong() } else { 0L } Log.d(TAG, "RTT - 最新: $rtt ms, 最大: $maxRtt ms, 最小: $minRtt ms, 平均: $averageRtt ms") Log.d(TAG, "单向时延 (OWD) - 最新: $owd ms, 最大: $maxOwd ms, 最小: $minOwd ms, 平均: $averageOwd ms") } } else { Log.e(TAG, "计算出的 OWD 为负数,可能时钟同步有误。") } } else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } }

关键点总结

  1. 正确记录时间戳

    • Receiver
      • t2 在接收到请求时记录。
      • t3 在发送响应前记录。
    • Initiator
      • t4 在接收到响应时记录。
  2. 计算 RTT 和 clockOffset

    • 使用公式:
      • RTT = t4 - t1 - (t3 - t2)
      • clockOffset = ((t2 - t1) + (t3 - t4)) / 2
  3. 计算 OWD

    • OWD = (t2 - t1) - clockOffset
  4. 多次同步测量

    • 进行多次时间同步请求,收集 RTT 和 clockOffset 值,取平均值以提高准确性。
  5. 使用高精度时间戳

    • 优先使用 System.nanoTime()SystemClock.elapsedRealtime() 代替 System.currentTimeMillis() 以获得更高的时间精度。
  6. 处理负值 OWD

    • 当计算出的 OWD 为负数时,记录错误日志,并考虑重新进行时间同步以纠正时钟偏移。

6. 完整示例代码

VideoReceiverFragment 的关键部分

private val rttValues = mutableListOf<Long>() private val clockOffsets = mutableListOf<Long>() private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "time_sync_request" -> { val t1 = data.getLong("t1") val t2 = System.nanoTime() / 1_000_000 // 毫秒 // 如果有实际的处理任务,可以在这里执行 // 模拟处理时间(可选) // viewLifecycleOwner.lifecycleScope.launch { // delay(10) // 模拟10毫秒的处理时间 // val t3 = System.nanoTime() / 1_000_000 // sendTimeSyncResponse(t1, t2, t3) // } // 如果没有实际的处理任务,直接设置 t3 val t3 = System.nanoTime() / 1_000_000 sendTimeSyncResponse(t1, t2, t3) } "time_sync_response" -> { Log.d(TAG, "Received time_sync_response") val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.nanoTime() / 1_000_000 // 毫秒 val rtt = t4 - t1 - (t3 - t2) val clockOffset = ((t2 - t1) + (t3 - t4)) / 2 // 保存 RTT 和 clockOffset rttValues.add(rtt) clockOffsets.add(clockOffset) // 保持最近的 20 个值 if (rttValues.size > 20) rttValues.removeAt(0) if (clockOffsets.size > 20) clockOffsets.removeAt(0) // 计算平均 RTT 和 clockOffset val avgRtt = rttValues.average().toLong() val avgClockOffset = clockOffsets.average().toLong() // 计算 OWD val owd = (t2 - t1) - avgClockOffset if (owd >= 0) { viewLifecycleOwner.lifecycleScope.launch { // 更新 RTT 状态 rttState.longValue = avgRtt // 更新 RTT 历史记录 rttHistory.add(avgRtt) if (rttHistory.size > 60) { rttHistory.removeAt(0) } // 计算 RTT 的最大、最小和平均值 val maxRtt = rttHistory.maxOrNull() ?: 0L val minRtt = rttHistory.minOrNull() ?: 0L val averageRtt = if (rttHistory.isNotEmpty()) { rttHistory.average().toLong() } else { 0L } // 更新 OWD 状态 owdState.longValue = owd // 更新 OWD 历史记录 owdHistory.add(owd) if (owdHistory.size > 60) { owdHistory.removeAt(0) } // 计算 OWD 的最大、最小和平均值 val maxOwd = owdHistory.maxOrNull() ?: 0L val minOwd = owdHistory.minOrNull() ?: 0L val averageOwd = if (owdHistory.isNotEmpty()) { owdHistory.average().toLong() } else { 0L } Log.d(TAG, "RTT - 最新: $avgRtt ms, 最大: $maxRtt ms, 最小: $minRtt ms, 平均: $averageRtt ms") Log.d(TAG, "单向时延 (OWD) - 最新: $owd ms, 最大: $maxOwd ms, 最小: $minOwd ms, 平均: $averageOwd ms") } } else { Log.e(TAG, "计算出的 OWD 为负数,可能时钟同步有误。") } } else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } } private fun sendTimeSyncResponse(t1: Long, t2: Long, t3: Long) { val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("t1", t1) put("t2", t2) put("t3", t3) put("room", currentRoom) } socket.emit("signal", syncResponse) Log.d(TAG, "回复 time_sync_request: t1=$t1, t2=$t2, t3=$t3") }

VideoInitiatorFragment 的关键部分

private val rttValues = mutableListOf<Long>() private val clockOffsets = mutableListOf<Long>() private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "offer" -> { /* 现有代码 */ } "answer" -> { /* 现有代码 */ } "ice" -> { /* 现有代码 */ } "time_sync_response" -> { Log.d(TAG, "Received time_sync_response") val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.nanoTime() / 1_000_000 // 毫秒 val rtt = t4 - t1 - (t3 - t2) val clockOffset = ((t2 - t1) + (t3 - t4)) / 2 Log.d(TAG, "时间同步: RTT=$rtt ms, 时钟偏移量= $clockOffset ms") // 保存 RTT 和 clockOffset rttValues.add(rtt) clockOffsets.add(clockOffset) // 保持最近的 20 个值 if (rttValues.size > 20) rttValues.removeAt(0) if (clockOffsets.size > 20) clockOffsets.removeAt(0) // 计算平均 RTT 和 clockOffset val avgRtt = rttValues.average().toLong() val avgClockOffset = clockOffsets.average().toLong() // 计算 OWD val owd = (t2 - t1) - avgClockOffset if (owd >= 0) { viewLifecycleOwner.lifecycleScope.launch { // 更新 RTT 状态 rttState.longValue = avgRtt // 更新 RTT 历史记录 rttHistory.add(avgRtt) if (rttHistory.size > 60) { rttHistory.removeAt(0) } // 计算 RTT 的最大、最小和平均值 val maxRtt = rttHistory.maxOrNull() ?: 0L val minRtt = rttHistory.minOrNull() ?: 0L val averageRtt = if (rttHistory.isNotEmpty()) { rttHistory.average().toLong() } else { 0L } // 更新 OWD 状态 owdState.longValue = owd // 更新 OWD 历史记录 owdHistory.add(owd) if (owdHistory.size > 60) { owdHistory.removeAt(0) } // 计算 OWD 的最大、最小和平均值 val maxOwd = owdHistory.maxOrNull() ?: 0L val minOwd = owdHistory.minOrNull() ?: 0L val averageOwd = if (owdHistory.isNotEmpty()) { owdHistory.average().toLong() } else { 0L } Log.d(TAG, "RTT - 最新: $avgRtt ms, 最大: $maxRtt ms, 最小: $minRtt ms, 平均: $averageRtt ms") Log.d(TAG, "单向时延 (OWD) - 最新: $owd ms, 最大: $maxOwd ms, 最小: $minOwd ms, 平均: $averageOwd ms") } } else { Log.e(TAG, "计算出的 OWD 为负数,可能时钟同步有误。") } } else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } }

总结

通过以上修改,您可以确保 t3 在发送响应时被正确设置,从而提高 RTT 和 OWD 计算的准确性。此外,通过进行多次同步测量并取平均值,可以进一步减少网络抖动对结果的影响。以下是关键步骤的总结:

  1. 正确记录时间戳

    • t2 在接收到请求时记录。
    • t3 在发送响应前记录。
  2. 计算 RTT 和 clockOffset

    • 使用准确的时间戳计算 RTT 和 clockOffset
  3. 计算 OWD

    • 使用 clockOffset 计算单向时延,确保其为非负值。
  4. 多次同步测量

    • 进行多次时间同步请求,收集 RTT 和 clockOffset,并计算平均值以提高准确性。
  5. 使用高精度时间戳

    • 优先使用 System.nanoTime()SystemClock.elapsedRealtime() 来获取更高精度的时间戳。

通过这些改进,您的应用程序将能够更准确地测量和显示单向时延和往返时延,从而提供更可靠的网络性能指标。

如果您有任何进一步的问题或需要更多帮助,请随时提问!

分享这个问答