Pregunta

class VideoInitiatorFragment : Fragment() {

companion object {
fun newInstance(
room: String,
stunUrl: String,
turnUrl: String,
turnUsername: String,
turnPassword: String,
signalingServerUrl: String
): VideoInitiatorFragment {
val fragment = VideoInitiatorFragment()
val args = Bundle().apply {
putString("room", room)
putString("stunUrl", stunUrl)
putString("turnUrl", turnUrl)
putString("turnUsername", turnUsername)
putString("turnPassword", turnPassword)
putString("signalingServerUrl", signalingServerUrl)
}
fragment.arguments = args
return fragment
}
}

// Class member variables remain unchanged
private lateinit var socket: Socket
private var localPeer: PeerConnection? = null
private var localView: SurfaceViewRenderer? = null
private var localEglBase: EglBase? = null
private val pendingIceCandidates = mutableListOf<IceCandidate>()
private var currentRoom: String? = null
private lateinit var signalingServerUrl: String
private lateinit var stunUrl: String
private lateinit var turnUrl: String
private lateinit var turnUsername: String
private lateinit var turnPassword: String

private val TAG: String = "WebRTC-Initiator"

// State variables for RTT and OWD
private val rttState = mutableLongStateOf(0L)
private val rttHistory = mutableStateListOf<Long>()

private var statsJob: Job? = null

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

// Retrieve data from arguments
currentRoom = arguments?.getString("room") ?: "default-room"
signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg"
stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478"
turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349"
turnUsername = arguments?.getString("turnUsername") ?: "wstszx"
turnPassword = arguments?.getString("turnPassword") ?: "930379"

Log.d(
TAG,
"onCreate: Role = Initiator, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl"
)
}

private val requestPermissionsLauncher =
registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions ->
permissions.entries.forEach { (permission, isGranted) ->
if (isGranted) {
Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show()
} else {
Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show()
}
}
onPermissionsChecked()
}

override fun onCreateView(
inflater: android.view.LayoutInflater,
container: android.view.ViewGroup?,
savedInstanceState: Bundle?
): android.view.View {
return ComposeView(requireContext()).apply {
setContent {
WebRTCComposeLayout()
}
}
}

@Composable
fun WebRTCComposeLayout() {
val context = LocalContext.current
lateinit var peerConnectionFactory: PeerConnectionFactory
var localVideoTrack: VideoTrack? by remember { mutableStateOf(null) }

Surface(color = Color.Black) {
Column(modifier = Modifier.fillMaxSize()) {

// Local video view
AndroidView(
factory = {
localView = SurfaceViewRenderer(it).apply {
setZOrderMediaOverlay(false)
}
localView!!
},
modifier = Modifier
.weight(1f)
.fillMaxWidth(),
update = {
if (localEglBase == null) {
localEglBase = EglBase.create()
it.init(localEglBase!!.eglBaseContext, null)
it.setMirror(false)
}
}
)

Spacer(modifier = Modifier.height(8.dp))

Column(modifier = Modifier.padding(horizontal = 16.dp)) {
Text(
text = "往返时延 (RTT) 最新: ${rttState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "RTT 最大: ${rttHistory.maxOrNull() ?: 0} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "RTT 最小: ${rttHistory.minOrNull() ?: 0} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
val averageRtt = if (rttHistory.isNotEmpty()) rttHistory.average().toLong() else 0L
Text(
text = "RTT 平均: $averageRtt ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 8.dp)
)
}

}

LaunchedEffect(Unit) {
val options = PeerConnectionFactory.InitializationOptions.builder(context)
.createInitializationOptions()
PeerConnectionFactory.initialize(options)

val encoderFactory = DefaultVideoEncoderFactory(
localEglBase!!.eglBaseContext, true, true
)

val decoderFactory = DefaultVideoDecoderFactory(localEglBase!!.eglBaseContext)

peerConnectionFactory = PeerConnectionFactory.builder()
.setVideoEncoderFactory(encoderFactory)
.setVideoDecoderFactory(decoderFactory)
.createPeerConnectionFactory()

initLocalVideo(context, localView, peerConnectionFactory, localEglBase!!) {
localVideoTrack = it
}

createPeerConnection(
context,
peerConnectionFactory,
localVideoTrack
) {
localPeer = it
startStatsCollection() // Start collecting statistics
}

initializeSocketIO()

requestPermissionsIfNeeded()
startTimeSync()
}
}
}

private fun startTimeSync(intervalMs: Long = 5000L) {
viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(intervalMs) // Sync every 5 seconds
requestTimeSync()
}
}
}

private fun startStatsCollection() {
statsJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(1000) // Collect stats every second
localPeer?.getStats { report ->
parseStatsReport(report)
} ?: Log.e(TAG, "Failed to get stats: localPeer is null.")
}
}
}

private fun parseStatsReport(report: RTCStatsReport) {
Log.d(TAG, "RTCStatsReport: ${report.statsMap}")
for (stats in report.statsMap.values) {
Log.d(TAG, "Stats type: ${stats.type}")
if (stats.type == "transport") {
Log.d(TAG, "Transport Stats found: $stats")
val currentRtt = (stats.members["currentRoundTripTime"] as? Number)?.toDouble()?.times(1000)?.toLong()
if (currentRtt != null && currentRtt > 0) {
viewLifecycleOwner.lifecycleScope.launch {
// Update RTT state
rttState.longValue = currentRtt

// Update RTT history
rttHistory.add(currentRtt)
if (rttHistory.size > 60) {
rttHistory.removeAt(0)
}

// Calculate max, min, and average RTT
val maxRtt = rttHistory.maxOrNull() ?: 0L
val minRtt = rttHistory.minOrNull() ?: 0L
val averageRtt = if (rttHistory.isNotEmpty()) {
rttHistory.average().toLong()
} else {
0L
}

Log.d(TAG, "RTT - Latest: $currentRtt ms, Max: $maxRtt ms, Min: $minRtt ms, Average: $averageRtt ms")
}
}
}
}
}

private fun initializeSocketIO() {
val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http"
val socketUrl = signalingServerUrl

val options = IO.Options().apply {
transports = arrayOf("websocket")
secure = protocol == "https"
path = "/socket.io/"
}

try {
socket = IO.socket(socketUrl, options)

socket.on(Socket.EVENT_CONNECT) {
Log.d(TAG, "Socket connected")
socket.emit("join", currentRoom)
Log.d(TAG, "Joined room: $currentRoom")
}

socket.on(Socket.EVENT_CONNECT_ERROR) { args ->
if (args.isNotEmpty()) {
val error = args[0]
Log.e(TAG, "Socket connection error: $error")
}
}

socket.on(Socket.EVENT_DISCONNECT) { args ->
if (args.isNotEmpty()) {
val reason = args[0]
Log.d(TAG, "Socket disconnected: $reason")
}
}

socket.on("signal") { args ->
Log.d(TAG, "Received signaling: ${args[0]}")
if (args.isNotEmpty() && args[0] is JSONObject) {
val data = args[0] as JSONObject
handleSignalingData(data)
}
}

socket.connect()
Log.d(TAG, "Connecting to Socket: $socketUrl...")
} catch (e: Exception) {
Log.e(TAG, "Error connecting to Socket: ${e.message}")
}
}

private fun requestPermissionsIfNeeded() {
val permissions = arrayOf(
Manifest.permission.CAMERA,
Manifest.permission.RECORD_AUDIO,
Manifest.permission.INTERNET,
Manifest.permission.ACCESS_NETWORK_STATE
)

val permissionsToRequest = permissions.filter {
ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED
}

if (permissionsToRequest.isNotEmpty()) {
requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray())
} else {
onPermissionsChecked()
}
}

private fun onPermissionsChecked() {
Toast.makeText(requireContext(), "所有必要权限已被授予", Toast.LENGTH_SHORT).show()
}

private fun initLocalVideo(
context: Context,
localView: SurfaceViewRenderer?,
peerConnectionFactory: PeerConnectionFactory,
eglBase: EglBase,
onLocalVideoTrack: (VideoTrack) -> Unit
) {
val videoCapturer = createCameraCapturer(context)
val surfaceTextureHelper = SurfaceTextureHelper.create("CaptureThread", eglBase.eglBaseContext)
val videoSource = peerConnectionFactory.createVideoSource(videoCapturer.isScreencast)
videoCapturer.initialize(surfaceTextureHelper, context, videoSource.capturerObserver)

videoCapturer.startCapture(1920, 1080, 60)

val localVideoTrack = peerConnectionFactory.createVideoTrack("video_track", videoSource)
localVideoTrack.addSink(localView)

val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints())
val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource)

// Add audio and video tracks to local stream
val mediaStream = peerConnectionFactory.createLocalMediaStream("local_stream")
mediaStream.addTrack(localAudioTrack)
mediaStream.addTrack(localVideoTrack)

onLocalVideoTrack(localVideoTrack)
}

private fun createCameraCapturer(context: Context): CameraVideoCapturer {
val camera2Enumerator = Camera2Enumerator(context)
val deviceNames = camera2Enumerator.deviceNames

// Prefer back-facing camera
for (deviceName in deviceNames) {
if (camera2Enumerator.isBackFacing(deviceName)) {
val capturer = camera2Enumerator.createCapturer(deviceName, null)
if (capturer != null) {
return capturer
}
}
}

// Fallback to front-facing camera
for (deviceName in deviceNames) {
if (camera2Enumerator.isFrontFacing(deviceName)) {
val capturer = camera2Enumerator.createCapturer(deviceName, null)
if (capturer != null) {
return capturer
}
}
}

// Fallback to first available camera
return camera2Enumerator.createCapturer(deviceNames[0], null)
?: throw IllegalStateException("Unable to create camera capturer")
}

private fun createPeerConnection(
context: Context,
peerConnectionFactory: PeerConnectionFactory,
localVideoTrack: VideoTrack?,
onLocalPeerCreated: (PeerConnection) -> Unit
) {
val iceServers = listOf(
PeerConnection.IceServer.builder(stunUrl).createIceServer(),
PeerConnection.IceServer.builder(turnUrl)
.setUsername(turnUsername)
.setPassword(turnPassword)
.createIceServer()
)

val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply {
bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE
rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE
tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED
continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY
sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN
}

localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer {
override fun onIceCandidate(iceCandidate: IceCandidate?) {
iceCandidate?.let {
Log.d(TAG, "ICE candidate: $it")
val signalData = JSONObject().apply {
put("type", "ice")
put("candidate", JSONObject().apply {
put("sdpMid", it.sdpMid)
put("sdpMLineIndex", it.sdpMLineIndex)
put("candidate", it.sdp)
})
put("room", currentRoom)
}
socket.emit("signal", signalData)
}
}

override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) {
Log.d(TAG, "ICE candidates removed")
}

override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
Log.d(TAG, "Signaling state changed to: $newState")
}

override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
Log.d(TAG, "ICE connection state changed to: $newState")
}

override fun onIceConnectionReceivingChange(receiving: Boolean) {
Log.d(TAG, "ICE connection receiving change: $receiving")
}

override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
Log.d(TAG, "ICE gathering state changed to: $newState")
}

override fun onAddStream(stream: MediaStream?) {
Log.d(TAG, "Stream added")
}

override fun onRemoveStream(stream: MediaStream?) {
Log.d(TAG, "Stream removed")
}

override fun onDataChannel(dataChannel: DataChannel?) {
Log.d(TAG, "Data channel created")
}

override fun onRenegotiationNeeded() {
Log.d(TAG, "Renegotiation needed")
}

override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) {
Log.d(TAG, "Track added")
}

override fun onTrack(transceiver: RtpTransceiver?) {
Log.d(TAG, "onTrack called")
}

override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
Log.d(TAG, "Connection state changed to: $newState")
}
})

localVideoTrack?.let {
localPeer?.addTrack(it, listOf("local_stream"))
}
val audioSource = peerConnectionFactory.createAudioSource(MediaConstraints())
val localAudioTrack = peerConnectionFactory.createAudioTrack("audio_track", audioSource)
localPeer?.addTrack(localAudioTrack, listOf("local_stream"))

onLocalPeerCreated(localPeer!!)
}

private fun createAnswer(peerConnection: PeerConnection, onAnswerCreated: (String) -> Unit) {
Log.d(TAG, "Creating answer...")
val constraints = MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true"))
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"))
}

peerConnection.createAnswer(object : SdpObserver {
override fun onCreateSuccess(sessionDescription: SessionDescription?) {
sessionDescription?.let { sdp ->
peerConnection.setLocalDescription(object : SdpObserver {
override fun onSetSuccess() {
Log.d(TAG, "SetLocalDescription onSetSuccess")
onAnswerCreated(sdp.description)
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "SetLocalDescription onSetFailure: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}
}

override fun onSetSuccess() {
Log.d(TAG, "createAnswer onSetSuccess")
}

override fun onCreateFailure(error: String?) {
Log.e(TAG, "createAnswer onCreateFailure: $error")
}

override fun onSetFailure(error: String?) {}
}, constraints)
}

private fun handleSignalingData(data: JSONObject) {
Log.d(TAG, "Handling signaling data: $data")
when (data.getString("type")) {
"offer" -> {
Log.d(TAG, "Received offer")
val sdp = SessionDescription(
SessionDescription.Type.OFFER,
data.getJSONObject("sdp").getString("sdp")
)
localPeer?.setRemoteDescription(object : SdpObserver {
override fun onSetSuccess() {
Log.d(TAG, "Set remote description (offer) success")
createAnswer(localPeer!!) { answer ->
val signalData = JSONObject().apply {
put("type", "answer")
put("sdp", JSONObject().put("sdp", answer))
put("room", currentRoom)
}

socket.emit("signal", signalData)

pendingIceCandidates.forEach { candidate ->
localPeer?.addIceCandidate(candidate)
}
pendingIceCandidates.clear()
}
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set remote description (offer) error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}

"answer" -> {
Log.d(TAG, "Received answer")
val sdp = SessionDescription(
SessionDescription.Type.ANSWER,
data.getJSONObject("sdp").getString("sdp")
)
localPeer?.setRemoteDescription(object : SdpObserver {
override fun onSetSuccess() {
pendingIceCandidates.forEach { candidate ->
localPeer?.addIceCandidate(candidate)
}
pendingIceCandidates.clear()
Log.d(TAG, "Set remote description (answer) success")
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set remote description error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}

"ice" -> {
Log.d(TAG, "Received ICE candidate")
val candidateData = data.getJSONObject("candidate")
val candidate = IceCandidate(
candidateData.getString("sdpMid"),
candidateData.getInt("sdpMLineIndex"),
candidateData.getString("candidate")
)

if (localPeer?.remoteDescription != null) {
localPeer?.addIceCandidate(candidate)
} else {
pendingIceCandidates.add(candidate)
}
}

"time_sync_response" -> {
Log.d(TAG, "接收到 time_sync_response")
val t1 = data.getLong("t1")
val t2 = data.getLong("t2")
val t3 = data.getLong("t3")
val t4 = System.currentTimeMillis()

// 计算 RTT
val rtt = t4 - t1 - (t3 - t2)

Log.d(TAG, "时间同步: RTT=$rtt ms")

viewLifecycleOwner.lifecycleScope.launch {
// 更新 RTT 状态
rttState.longValue = rtt

// 更新 RTT 历史记录
rttHistory.add(rtt)
if (rttHistory.size > 60) {
rttHistory.removeAt(0)
}

// 计算 RTT 的最大、最小和平均值
val maxRtt = rttHistory.maxOrNull() ?: 0L
val minRtt = rttHistory.minOrNull() ?: 0L
val averageRtt = if (rttHistory.isNotEmpty()) {
rttHistory.average().toLong()
} else {
0L
}

Log.d(TAG, "RTT - 最新: $rtt ms, 最大: $maxRtt ms, 最小: $minRtt ms, 平均: $averageRtt ms")
}
}

else -> {
Log.e(TAG, "Unknown signaling type: ${data.getString("type")}")
}
}
}

private fun requestTimeSync() {
val t1 = System.currentTimeMillis()
val syncRequest = JSONObject().apply {
put("type", "time_sync_request")
put("room", currentRoom)
put("t1", t1)
}
socket.emit("signal", syncRequest)
Log.d(TAG, "Sent time sync request at t1: $t1")
}

override fun onDestroyView() {
super.onDestroyView()
statsJob?.cancel()
socket.disconnect()
localPeer?.dispose()
localView?.release()
localEglBase?.release()
}
}


class VideoReceiverFragment : Fragment() {

companion object {
fun newInstance(
room: String,
stunUrl: String,
turnUrl: String,
turnUsername: String,
turnPassword: String,
signalingServerUrl: String
): VideoReceiverFragment {
val fragment = VideoReceiverFragment()
val args = Bundle().apply {
putString("room", room)
putString("stunUrl", stunUrl)
putString("turnUrl", turnUrl)
putString("turnUsername", turnUsername)
putString("turnPassword", turnPassword)
putString("signalingServerUrl", signalingServerUrl)
}
fragment.arguments = args
return fragment
}
}

// Class member variables
private lateinit var socket: Socket
private var localPeer: PeerConnection? = null
private var remoteView: SurfaceViewRenderer? = null
private var remoteEglBase: EglBase? = null
private val pendingIceCandidates = mutableListOf<IceCandidate>()
private var currentRoom: String? = null
private lateinit var signalingServerUrl: String
private lateinit var stunUrl: String
private lateinit var turnUrl: String
private lateinit var turnUsername: String
private lateinit var turnPassword: String
private val TAG: String = "WebRTC-Receiver"

// State variables for RTT and OWD (if needed)
private val owdState = mutableLongStateOf(0L)
private val owdHistory = mutableStateListOf<Long>()

private val frameRateState = mutableDoubleStateOf(0.0)
private val bitrateState = mutableLongStateOf(0L)
private val stutteringState = mutableStateOf(false)

private val frameRateLowState = mutableStateOf(false)
private val packetLossHighState = mutableStateOf(false)
private val packetLossState = mutableDoubleStateOf(0.0)

private val frameRateHistory = mutableStateListOf<Float>()
private val bitrateHistory = mutableStateListOf<Long>()
private var timeSyncJob: Job? = null

// History variables
private var prevFramesDecoded = 0.0
private var prevBytesReceived = 0.0
private var prevFramesReceived = 0.0
private var prevFramesDropped = 0.0
private var prevTimestamp = 0.0
private var timeOffset: Long = 0
private var t1: Long = 0

private var statsJob: Job? = null

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

// Retrieve data from arguments
currentRoom = arguments?.getString("room") ?: "default-room"
signalingServerUrl = arguments?.getString("signalingServerUrl") ?: "https://wstszx.us.kg"
stunUrl = arguments?.getString("stunUrl") ?: "stun:stun.wstszx.us.kg:3478"
turnUrl = arguments?.getString("turnUrl") ?: "turn:turn.wstszx.us.kg:5349"
turnUsername = arguments?.getString("turnUsername") ?: "wstszx"
turnPassword = arguments?.getString("turnPassword") ?: "930379"

Log.d(
TAG,
"onCreate: Role = Client, Room = $currentRoom, Signaling Server = $signalingServerUrl, STUN URL = $stunUrl, TURN URL = $turnUrl"
)
}

private val requestPermissionsLauncher =
registerForActivityResult(ActivityResultContracts.RequestMultiplePermissions()) { permissions ->
permissions.entries.forEach { (permission, isGranted) ->
if (isGranted) {
Toast.makeText(requireContext(), "$permission 权限已授予", Toast.LENGTH_SHORT).show()
} else {
Toast.makeText(requireContext(), "$permission 权限被拒绝", Toast.LENGTH_SHORT).show()
}
}
onPermissionsChecked()
}

override fun onCreateView(
inflater: android.view.LayoutInflater,
container: android.view.ViewGroup?,
savedInstanceState: Bundle?
): android.view.View {
return ComposeView(requireContext()).apply {
setContent {
WebRTCComposeLayout()
}
}
}

@Composable
fun WebRTCComposeLayout() {
val context = LocalContext.current
lateinit var peerConnectionFactory: PeerConnectionFactory

Surface(color = Color.Black) {
Column(modifier = Modifier.fillMaxSize()) {

// Remote video view
AndroidView(
factory = {
remoteView = SurfaceViewRenderer(it).apply {
setZOrderMediaOverlay(false)
}
remoteView!!
},
modifier = Modifier
.weight(1f)
.fillMaxWidth(),
update = {
if (remoteEglBase?.eglBaseContext == null) {
remoteEglBase = EglBase.create()
it.init(remoteEglBase!!.eglBaseContext, null)
it.setMirror(false)
}
}
)

Spacer(modifier = Modifier.height(8.dp))

Column(modifier = Modifier.padding(horizontal = 16.dp)) {
Text(
text = "单向时延 (OWD) 最新: ${owdState.longValue} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "OWD 最大: ${owdHistory.maxOrNull() ?: 0} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Text(
text = "OWD 最小: ${owdHistory.minOrNull() ?: 0} ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
val averageOwd = if (owdHistory.isNotEmpty()) owdHistory.average().toLong() else 0L
Text(
text = "OWD 平均: $averageOwd ms",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 8.dp)
)
}

// Spacer between video and charts
Spacer(modifier = Modifier.height(8.dp))

// Frame Rate Section
Column(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 8.dp)
) {
// Frame Rate Text in Chinese
Text(
text = "帧率: ${frameRateState.doubleValue.roundToInt()} fps",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)
Log.d(
TAG,
"UI - Frame Rate: ${frameRateState.doubleValue} fps, Bitrate: ${bitrateState.longValue / 1000} kbps"
)

// Line Chart for Frame Rate
LineChart(
data = frameRateHistory,
modifier = Modifier
.height(200.dp)
.fillMaxWidth()
.padding(vertical = 8.dp),
lineColor = Color.Green,
backgroundColor = Color.Black,
yAxisLabel = "帧率 (fps)",
xAxisLabel = "时间 (秒)"
)
}

// Spacer between charts
Spacer(modifier = Modifier.height(8.dp))

// Bitrate Section
Column(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 16.dp)
) {
// Bitrate Text in Chinese
Text(
text = "码率: ${bitrateState.longValue / 1000} kbps",
color = Color.White,
style = MaterialTheme.typography.bodyMedium,
modifier = Modifier.padding(bottom = 4.dp)
)

// Line Chart for Bitrate
LineChart(
data = bitrateHistory.map { it / 1000f }, // Convert to kbps
modifier = Modifier
.height(200.dp)
.fillMaxWidth()
.padding(vertical = 8.dp),
lineColor = Color.Blue,
backgroundColor = Color.Black,
yAxisLabel = "码率 (kbps)",
xAxisLabel = "时间 (秒)"
)
}

// Spacer between metrics and stuttering indicator
Spacer(modifier = Modifier.height(16.dp))

// Stuttering Indicator
Row(
modifier = Modifier
.fillMaxWidth()
.padding(horizontal = 16.dp),
verticalAlignment = Alignment.CenterVertically
) {
if (stutteringState.value) {
Icon(
imageVector = Icons.Default.Warning,
contentDescription = "卡顿警告",
tint = Color.Red,
modifier = Modifier.size(24.dp)
)
Spacer(modifier = Modifier.width(8.dp))
Column {
Text(
text = "视频播放出现卡顿",
color = Color.Red,
style = MaterialTheme.typography.bodyMedium
)
// Additional information about which metrics are abnormal
if (frameRateLowState.value) {
Text(
text = "帧率过低: ${frameRateState.doubleValue.roundToInt()} fps",
color = Color.Red,
style = MaterialTheme.typography.bodySmall
)
}
if (packetLossHighState.value) {
Text(
text = "包丢失率过高: ${packetLossState.doubleValue.roundToInt()}%",
color = Color.Red,
style = MaterialTheme.typography.bodySmall
)
}
}
} else {
Icon(
imageVector = Icons.Default.CheckCircle,
contentDescription = "正常",
tint = Color.Green,
modifier = Modifier.size(24.dp)
)
Spacer(modifier = Modifier.width(8.dp))
Text(
text = "视频播放正常",
color = Color.Green,
style = MaterialTheme.typography.bodyMedium
)
}
}
}

LaunchedEffect(Unit) {
val options = PeerConnectionFactory.InitializationOptions.builder(context)
.createInitializationOptions()
PeerConnectionFactory.initialize(options)

val encoderFactory = DefaultVideoEncoderFactory(
EglBase.create().eglBaseContext, true, true
)

val decoderFactory = DefaultVideoDecoderFactory(remoteEglBase!!.eglBaseContext)

peerConnectionFactory = PeerConnectionFactory.builder()
.setVideoEncoderFactory(encoderFactory)
.setVideoDecoderFactory(decoderFactory)
.createPeerConnectionFactory()

createPeerConnection(
context,
peerConnectionFactory,
remoteView!!
) {
localPeer = it
}

initializeSocketIO()

requestPermissionsIfNeeded()
}
}
}

/**
* Enhanced Line Chart Composable with Axes and Labels
* Modified to use straight lines connecting each point
*/
@Composable
fun LineChart(
data: List<Float>,
modifier: Modifier = Modifier,
lineColor: Color = Color.Green,
backgroundColor: Color = Color.Black,
yAxisLabel: String = "",
xAxisLabel: String = "",
minYValue: Float? = null,
maxYValue: Float? = null
) {
Canvas(modifier = modifier.background(backgroundColor)) {
val padding = 40.dp.toPx() // Padding for axes and labels

if (data.isEmpty()) return@Canvas

val maxY = maxYValue ?: data.maxOrNull() ?: 1f
val minY = minYValue ?: data.minOrNull() ?: 0f
val yRange = maxY - minY
val pointCount = data.size
val spacing = (size.width - padding * 2) / (pointCount - 1).coerceAtLeast(1)

val points = data.mapIndexed { index, value ->
val x = padding + index * spacing
val y = if (yRange == 0f) size.height / 2 else (size.height - padding) - ((value - minY) / yRange) * (size.height - padding * 2)
Offset(x, y)
}

// Draw axes
drawLine(
color = Color.White,
start = Offset(padding, padding),
end = Offset(padding, size.height - padding),
strokeWidth = 2f
)
drawLine(
color = Color.White,
start = Offset(padding, size.height - padding),
end = Offset(size.width - padding, size.height - padding),
strokeWidth = 2f
)

// Draw y-axis labels
val yLabelCount = 5
val yStep = yRange / (yLabelCount - 1)
for (i in 0 until yLabelCount) {
val yValue = minY + i * yStep
val yPos = (size.height - padding) - ((yValue - minY) / yRange) * (size.height - padding * 2)

drawContext.canvas.nativeCanvas.apply {
val label = yValue.roundToInt().toString()
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.RIGHT
}
drawText(
label,
padding - 8f,
yPos + textPaint.textSize / 2,
textPaint
)
}
}

// Draw x-axis labels
val xLabelCount = 5
val xStep = (pointCount - 1).coerceAtLeast(1) / (xLabelCount - 1).coerceAtLeast(1)
for (i in 0 until xLabelCount) {
val index = i * xStep
val xPos = padding + index * spacing

drawContext.canvas.nativeCanvas.apply {
val label = index.toString()
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
}
drawText(
label,
xPos,
size.height - padding + textPaint.textSize + 4f,
textPaint
)
}
}

// Optionally, draw axis labels
// Y-Axis Label
if (yAxisLabel.isNotEmpty()) {
drawContext.canvas.nativeCanvas.apply {
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
isAntiAlias = true
}
// Rotate for vertical text
save()
rotate(-90f, padding / 2, size.height / 2)
drawText(
yAxisLabel,
padding / 2,
size.height / 2,
textPaint
)
restore()
}
}

// X-Axis Label
if (xAxisLabel.isNotEmpty()) {
drawContext.canvas.nativeCanvas.apply {
val textPaint = android.graphics.Paint().apply {
color = android.graphics.Color.WHITE
textSize = 24f
textAlign = android.graphics.Paint.Align.CENTER
isAntiAlias = true
}
drawText(
xAxisLabel,
size.width / 2,
size.height - padding / 2,
textPaint
)
}
}

// Draw the straight lines connecting points
if (points.size >= 2) {
for (i in 0 until points.size - 1) {
drawLine(
color = lineColor,
start = points[i],
end = points[i + 1],
strokeWidth = 4f,
cap = StrokeCap.Round
)
}
}

// Optionally, draw points
points.forEach { point ->
drawCircle(
color = lineColor,
radius = 4f,
center = point
)
}
}
}

private fun initializeSocketIO() {
val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http"
val socketUrl = signalingServerUrl

val options = IO.Options().apply {
transports = arrayOf("websocket")
secure = protocol == "https"
path = "/socket.io/"
}

try {
socket = IO.socket(socketUrl, options)

socket.on(Socket.EVENT_CONNECT) {
Log.d(TAG, "Socket connected")
socket.emit("join", currentRoom)
Log.d(TAG, "Joined room: $currentRoom")
initiateCall()
}

socket.on(Socket.EVENT_CONNECT_ERROR) { args ->
if (args.isNotEmpty()) {
val error = args[0]
Log.e(TAG, "Socket connection error: $error")
}
}

socket.on(Socket.EVENT_DISCONNECT) { args ->
if (args.isNotEmpty()) {
val reason = args[0]
Log.d(TAG, "Socket disconnected: $reason")
}
}

socket.on("signal") { args ->
Log.d(TAG, "Received signaling: ${args[0]}")
if (args.isNotEmpty() && args[0] is JSONObject) {
val data = args[0] as JSONObject
handleSignalingData(data)
}
}

socket.connect()
Log.d(TAG, "Connecting to Socket: $socketUrl...")
} catch (e: Exception) {
Log.e(TAG, "Error connecting to Socket: ${e.message}")
}
}

private fun requestPermissionsIfNeeded() {
val permissions = arrayOf(
Manifest.permission.CAMERA,
Manifest.permission.RECORD_AUDIO,
Manifest.permission.INTERNET,
Manifest.permission.ACCESS_NETWORK_STATE
)

val permissionsToRequest = permissions.filter {
ContextCompat.checkSelfPermission(requireContext(), it) != PackageManager.PERMISSION_GRANTED
}

if (permissionsToRequest.isNotEmpty()) {
requestPermissionsLauncher.launch(permissionsToRequest.toTypedArray())
} else {
onPermissionsChecked()
}
}

private fun onPermissionsChecked() {
Toast.makeText(requireContext(), "所有必要的权限已授予", Toast.LENGTH_SHORT).show()
}

private fun createPeerConnection(
context: Context,
peerConnectionFactory: PeerConnectionFactory,
remoteView: SurfaceViewRenderer,
onLocalPeerCreated: (PeerConnection) -> Unit
) {
val iceServers = listOf(
PeerConnection.IceServer.builder(stunUrl).createIceServer(),
PeerConnection.IceServer.builder(turnUrl)
.setUsername(turnUsername)
.setPassword(turnPassword)
.createIceServer()
)

val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply {
bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE
rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE
tcpCandidatePolicy = PeerConnection.TcpCandidatePolicy.ENABLED
continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY
sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN
}

localPeer = peerConnectionFactory.createPeerConnection(rtcConfig, object : PeerConnection.Observer {
override fun onIceCandidate(iceCandidate: IceCandidate?) {
iceCandidate?.let {
Log.d(TAG, "ICE candidate: $it")
val signalData = JSONObject().apply {
put("type", "ice")
put("candidate", JSONObject().apply {
put("sdpMid", it.sdpMid)
put("sdpMLineIndex", it.sdpMLineIndex)
put("candidate", it.sdp)
})
put("room", currentRoom)
}
socket.emit("signal", signalData)
}
}

override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) {
Log.d(TAG, "ICE candidates removed")
}

override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
Log.d(TAG, "Signaling state changed to: $newState")
}

override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
Log.d(TAG, "ICE connection state changed to: $newState")
}

override fun onIceConnectionReceivingChange(receiving: Boolean) {
Log.d(TAG, "ICE connection receiving change: $receiving")
}

override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
Log.d(TAG, "ICE gathering state changed to: $newState")
}

override fun onAddStream(stream: MediaStream?) {
Log.d(TAG, "Stream added")
}

override fun onRemoveStream(stream: MediaStream?) {
Log.d(TAG, "Stream removed")
}

override fun onDataChannel(dataChannel: DataChannel?) {
Log.d(TAG, "Data channel created")
}

override fun onRenegotiationNeeded() {
Log.d(TAG, "Renegotiation needed")
}

override fun onAddTrack(receiver: RtpReceiver?, streams: Array<out MediaStream>?) {
Log.d(TAG, "Track added")
receiver?.track()?.let { track ->
if (track is VideoTrack) {
track.addSink(remoteView)
}
}
}

override fun onTrack(transceiver: RtpTransceiver?) {
Log.d(TAG, "onTrack called")
transceiver?.receiver?.track()?.let { track ->
if (track is VideoTrack) {
track.addSink(remoteView)
}
}
}

override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
Log.d(TAG, "Connection state changed to: $newState")
}
})

onLocalPeerCreated(localPeer!!)

startStatsCollection()
}

private fun initiateCall() {
Log.d(TAG, "Initiating call...")
val constraints = MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true"))
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"))
}

localPeer?.createOffer(object : SdpObserver {
override fun onCreateSuccess(sessionDescription: SessionDescription?) {
sessionDescription?.let { sdp ->
localPeer?.setLocalDescription(object : SdpObserver {
override fun onSetSuccess() {
val signalData = JSONObject().apply {
put("type", "offer")
put("sdp", JSONObject().put("sdp", sdp.description))
put("room", currentRoom)
}
socket.emit("signal", signalData)
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set local description error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}
}

override fun onSetSuccess() {}
override fun onCreateFailure(error: String?) {
Log.e(TAG, "Create offer error: $error")
}

override fun onSetFailure(error: String?) {}
}, constraints)
}

private fun handleSignalingData(data: JSONObject) {
Log.d(TAG, "Handling signaling data: $data")
when (data.getString("type")) {

"answer" -> {
Log.d(TAG, "Received answer")
val sdp = SessionDescription(
SessionDescription.Type.ANSWER,
data.getJSONObject("sdp").getString("sdp")
)

localPeer?.setRemoteDescription(object : SdpObserver {
override fun onSetSuccess() {
pendingIceCandidates.forEach { candidate ->
localPeer?.addIceCandidate(candidate)
}

pendingIceCandidates.clear()

Log.d(TAG, "Set remote description (answer) success")
}

override fun onSetFailure(error: String?) {
Log.e(TAG, "Set remote description error: $error")
}

override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}

"ice" -> {
Log.d(TAG, "Received ICE candidate")
val candidateData = data.getJSONObject("candidate")
val candidate = IceCandidate(
candidateData.getString("sdpMid"),
candidateData.getInt("sdpMLineIndex"),
candidateData.getString("candidate")
)

if (localPeer?.remoteDescription != null) {
localPeer?.addIceCandidate(candidate)
} else {
pendingIceCandidates.add(candidate)
}
}

"time_sync_request" -> {
val t1 = data.getLong("t1")
val t2 = System.currentTimeMillis()
val t3 = System.currentTimeMillis()
val syncResponse = JSONObject().apply {
put("type", "time_sync_response")
put("t1", t1)
put("t2", t2)
put("t3", t3)
put("room", currentRoom)
}
socket.emit("signal", syncResponse)
Log.d(TAG, "回复 time_sync_request: t1=$t1, t2=$t2, t3=$t3")

// 计算 OWD
val owd = t2 - t1
Log.d(TAG, "时间同步: OWD=$owd ms")

viewLifecycleOwner.lifecycleScope.launch {
// 更新 OWD 状态
owdState.longValue = owd

// 更新 OWD 历史记录
owdHistory.add(owd)
if (owdHistory.size > 60) {
owdHistory.removeAt(0)
}

// 计算 OWD 的最大、最小和平均值
val maxOwd = owdHistory.maxOrNull() ?: 0L
val minOwd = owdHistory.minOrNull() ?: 0L
val averageOwd = if (owdHistory.isNotEmpty()) {
owdHistory.average().toLong()
} else {
0L
}

Log.d(TAG, "OWD - 最新: $owd ms, 最大: $maxOwd ms, 最小: $minOwd ms, 平均: $averageOwd ms")
}
}

else -> {
Log.e(TAG, "Unknown signaling type: ${data.getString("type")}")
}
}
}

private fun startStatsCollection() {
Log.d(TAG, "Starting stats collection...")
statsJob = viewLifecycleOwner.lifecycleScope.launch {
while (isActive) {
delay(1000) // Collect stats every second
Log.d(TAG, "Collecting stats...")
localPeer?.getStats { report ->
Log.d(TAG, "Stats report obtained.")
parseStatsReport(report)
} ?: Log.e(TAG, "Failed to get stats: localPeer is null.")
}
}
}

private fun parseStatsReport(report: RTCStatsReport) {
Log.d(TAG, "Received RTCStatsReport: $report")
for (stats in report.statsMap.values) {
if (stats.type == "inbound-rtp") {
val kind = stats.members["kind"] as? String
if (kind == "video") {
val framesDecoded = (stats.members["framesDecoded"] as? Number)?.toDouble() ?: 0.0
val framesReceived = (stats.members["framesReceived"] as? Number)?.toDouble() ?: 0.0
val framesDropped = (stats.members["framesDropped"] as? Number)?.toDouble() ?: 0.0
val bytesReceived = (stats.members["bytesReceived"] as? Number)?.toDouble() ?: 0.0
val packetsLost = (stats.members["packetsLost"] as? Number)?.toDouble() ?: 0.0
val packetsReceived = (stats.members["packetsReceived"] as? Number)?.toDouble() ?: 1.0 // Avoid division by zero
val packetLossFraction = packetsLost / (packetsLost + packetsReceived)
val timestamp = stats.timestampUs / 1_000_000.0 // Convert to seconds

Log.d(
TAG,
"Stats - Frames Decoded: $framesDecoded, Frames Received: $framesReceived, Frames Dropped: $framesDropped, Bytes Received: $bytesReceived, Packet Loss Fraction: $packetLossFraction, Timestamp: $timestamp"
)

if (prevTimestamp != 0.0) {
val timeElapsed = timestamp - prevTimestamp
val framesDelta = framesDecoded - prevFramesDecoded
val bytesDelta = bytesReceived - prevBytesReceived
val framesReceivedDelta = framesReceived - prevFramesReceived
val framesDroppedDelta = framesDropped - prevFramesDropped

val frameRate = if (timeElapsed > 0) framesDelta / timeElapsed else 0.0
val bitrate = if (timeElapsed > 0) (bytesDelta * 8) / timeElapsed else 0.0 // bits per second
val packetLoss = packetLossFraction * 100 // Convert to percentage

Log.d(TAG, "Calculated Frame Rate: $frameRate fps, Bitrate: $bitrate bps, Packet Loss: $packetLoss%")

// Determine stuttering based on thresholds
val isStuttering = frameRate < 24 || packetLoss > 5.0 // Thresholds can be adjusted

// Update states
viewLifecycleOwner.lifecycleScope.launch {
frameRateState.doubleValue = frameRate
bitrateState.longValue = bitrate.toLong()
// Update stuttering state
stutteringState.value = isStuttering
// Update specific stuttering causes
frameRateLowState.value = frameRate < 24
packetLossHighState.value = packetLoss > 5.0
packetLossState.doubleValue = packetLoss

// Update history
frameRateHistory.add(frameRate.toFloat())
if (frameRateHistory.size > 60) {
frameRateHistory.removeAt(0)
}

bitrateHistory.add(bitrate.toLong())
if (bitrateHistory.size > 60) {
bitrateHistory.removeAt(0)
}

Log.d(
TAG,
"Updated Frame Rate: ${frameRateState.doubleValue} fps, Bitrate: ${bitrateState.longValue / 1000} kbps, Stuttering: $isStuttering"
)
}
}

// Update previous values
prevFramesDecoded = framesDecoded
prevBytesReceived = bytesReceived
prevFramesReceived = framesReceived
prevFramesDropped = framesDropped
prevTimestamp = timestamp
}
}
}
}

override fun onDestroyView() {
super.onDestroyView()
statsJob?.cancel()
socket.disconnect()
localPeer?.dispose()
remoteView?.release()
remoteEglBase?.release()
timeSyncJob?.cancel()
}
}


目前单向时延的值是负数,这是什么原因,给我修复这个问题

Pensamiento

探查延迟源头

正在分析VideoReceiverFragment类中的OWD(单向延迟)计算逻辑,怀疑负值可能是由于接收方计算时机不足或信号处理中断造成的。需要进一步追踪问题的根源。

计算网络延迟

发起者通过t1和t3来解释RTT,而接收者通过t2和t1计算OWD。如果时钟不同步,OWD可以大幅波动。

改进处理方法

解释了如何计算OWD,并指出负值可能由于时钟不同步导致。提出通过时钟同步来准确计算OWD。

梳理时延计算

我正在看的代码中,视频接收端错误地将t2和t3设置为当前时间,而不是合适的NTP时间戳。这个问题影响了时延计算的准确性。

调整时间同步

为了匹配接收方的时间戳,时间同步请求和响应是必需的,这样可以精确估算出时钟偏差。通过多次请求和测量往返时间,配合相应的数据,可以有效调整时钟同步。

在计算RTT时,Initiator需要知道t1、t2、t3和t4的值,然后使用RTT和clock offset来估计OWD。然而,当前代码仅计算RTT,未计算clock offset。

更新代码做计算

为了精确计算OWD,需要基于时钟偏移进行修正。可以在发起方估算时钟偏移后,在接收方或发起方进行修正,而接收方还需知道与发起方的时钟偏移。

更正时间偏差

在处理OWD负值问题时,最好的方法是实现时钟同步或调整计算方式,以确保Receiver能够正确计算OWD。

探索解决方法

我正在研究如何计算路由器的OWD,因为这对问题的解决至关重要。如果没有时钟同步,负的OWD可能会出现,影响结果的准确性。

调整时钟同步

我在想,计算负出发延迟(OWD)的原因是接收者的时钟落后于发起者,可能是由于时钟同步不足。因此,估算时钟偏差以相应调整OWD是个好办法。

优化时间同步

我正在思考拓扑同步来解决负时间偏移问题,确保时间一致性,从而改善性能。

Respuesta

The issue of obtaining negative One-Way Delay (OWD) values in your application stems from unsynchronized clocks between the VideoInitiatorFragment and the VideoReceiverFragment. Without proper clock synchronization, calculations that rely on timestamps from different devices can result in inaccurate and potentially negative values.

Understanding the Problem

Let's break down why you're encountering negative OWD values:

  1. Clock Discrepancy:

    • Initiator's Timestamp (t1): The time when the initiator sends the time_sync_request.
    • Receiver's Timestamp (t2): The time when the receiver processes the time_sync_request and sends back the time_sync_response.
    • Without synchronized clocks, t2 on the receiver's side might be earlier or later than t1 on the initiator's side, leading to owd = t2 - t1 being negative.
  2. Current Calculation in Receiver:

    • In your VideoReceiverFragment, you're calculating OWD as owd = t2 - t1.
    • Since t1 and t2 are from different devices with potentially different clock settings, this calculation is unreliable.

Solution: Implement Clock Synchronization

To accurately calculate OWD, you need to synchronize the clocks between the initiator and receiver. A common approach is to use a simplified version of the Network Time Protocol (NTP). Here's how you can implement it:

1. Estimate Clock Offset and RTT in Initiator

First, modify the VideoInitiatorFragment to estimate the Round-Trip Time (RTT) and the clock offset between the initiator and receiver.

// In VideoInitiatorFragment private var clockOffset: Long = 0L // Clock offset between initiator and receiver private fun startTimeSync(intervalMs: Long = 5000L) { viewLifecycleOwner.lifecycleScope.launch { while (isActive) { delay(intervalMs) // Sync every 5 seconds requestTimeSync() } } } private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { // Existing handling for "offer", "answer", "ice" "time_sync_response" -> { Log.d(TAG, "Received time_sync_response") val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.currentTimeMillis() // Calculate RTT and clock offset val rtt = (t4 - t1) - (t3 - t2) clockOffset = ((t2 - t1) + (t3 - t4)) / 2 Log.d(TAG, "Time Sync: RTT=$rtt ms, Clock Offset=$clockOffset ms") viewLifecycleOwner.lifecycleScope.launch { // Update RTT state as needed rttState.longValue = rtt // You can also store clockOffset for future use } } else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } } private fun requestTimeSync() { val t1 = System.currentTimeMillis() val syncRequest = JSONObject().apply { put("type", "time_sync_request") put("room", currentRoom) put("t1", t1) } socket.emit("signal", syncRequest) Log.d(TAG, "Sent time sync request at t1: $t1") }

2. Adjust OWD Calculation in Receiver Using Clock Offset

Now, in the VideoReceiverFragment, use the clock offset estimated by the initiator to adjust the OWD calculation.

However, since the receiver doesn't have direct access to the initiator's clockOffset, you need to communicate this value back to the receiver. Here's how you can achieve it:

a. Modify Initiator to Send Clock Offset to Receiver

After calculating the clockOffset, emit it to the receiver.

// In VideoInitiatorFragment.handleSignalingData for "time_sync_response" socket.emit("clock_offset", JSONObject().apply { put("room", currentRoom) put("offset", clockOffset) })
b. Receive Clock Offset in Receiver and Calculate OWD

In the VideoReceiverFragment, listen for the clock_offset event and use it to adjust the OWD.

// In VideoReceiverFragment.initializeSocketIO() socket.on("clock_offset") { args -> if (args.isNotEmpty() && args[0] is JSONObject) { val data = args[0] as JSONObject val offset = data.getLong("offset") Log.d(TAG, "Received clock offset: $offset ms") // Store the clock offset for OWD calculation this@VideoReceiverFragment.clockOffset = offset } } // Add a member variable to store clock offset private var clockOffset: Long = 0L

Now, modify the OWD calculation in the receiver to use the clockOffset:

// In VideoReceiverFragment.handleSignalingData for "time_sync_request" "time_sync_request" -> { val t1 = data.getLong("t1") val t2 = System.currentTimeMillis() val t3 = System.currentTimeMillis() val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("t1", t1) put("t2", t2) put("t3", t3) put("room", currentRoom) } socket.emit("signal", syncResponse) Log.d(TAG, "Replied to time_sync_request with t1=$t1, t2=$t2, t3=$t3") // Calculate OWD using clockOffset val owd = t2 - t1 + clockOffset Log.d(TAG, "Time Sync: OWD=$owd ms") viewLifecycleOwner.lifecycleScope.launch { // Update OWD state owdState.longValue = owd // Update OWD history owdHistory.add(owd) if (owdHistory.size > 60) { owdHistory.removeAt(0) } // Calculate OWD statistics val maxOwd = owdHistory.maxOrNull() ?: 0L val minOwd = owdHistory.minOrNull() ?: 0L val averageOwd = if (owdHistory.isNotEmpty()) owdHistory.average().toLong() else 0L Log.d(TAG, "OWD - Latest: $owd ms, Max: $maxOwd ms, Min: $minOwd ms, Average: $averageOwd ms") } }

3. Ensure Proper Synchronization Flow

Ensure that the clock offset is established before you start calculating OWD. This might involve:

  • Performing an initial time sync upon connection.
  • Ensuring that subsequent OWD calculations use the latest clockOffset.

Final Adjusted Code Snippets

Here are the key parts of the adjusted code for both fragments:

VideoInitiatorFragment

// Add a member variable to store clock offset private var clockOffset: Long = 0L private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { // Existing handling for "offer", "answer", "ice" "time_sync_response" -> { Log.d(TAG, "Received time_sync_response") val t1 = data.getLong("t1") val t2 = data.getLong("t2") val t3 = data.getLong("t3") val t4 = System.currentTimeMillis() // Calculate RTT and clock offset val rtt = (t4 - t1) - (t3 - t2) clockOffset = ((t2 - t1) + (t3 - t4)) / 2 Log.d(TAG, "Time Sync: RTT=$rtt ms, Clock Offset=$clockOffset ms") viewLifecycleOwner.lifecycleScope.launch { // Update RTT state as needed rttState.longValue = rtt } // Send clock offset to Receiver socket.emit("clock_offset", JSONObject().apply { put("room", currentRoom) put("offset", clockOffset) }) } else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } }

VideoReceiverFragment

// Add member variables private var clockOffset: Long = 0L private fun initializeSocketIO() { val protocol = if (signalingServerUrl.startsWith("https")) "https" else "http" val socketUrl = signalingServerUrl val options = IO.Options().apply { transports = arrayOf("websocket") secure = protocol == "https" path = "/socket.io/" } try { socket = IO.socket(socketUrl, options) socket.on(Socket.EVENT_CONNECT) { Log.d(TAG, "Socket connected") socket.emit("join", currentRoom) Log.d(TAG, "Joined room: $currentRoom") initiateCall() } socket.on("clock_offset") { args -> if (args.isNotEmpty() && args[0] is JSONObject) { val data = args[0] as JSONObject val offset = data.getLong("offset") Log.d(TAG, "Received clock offset: $offset ms") this@VideoReceiverFragment.clockOffset = offset } } socket.on(Socket.EVENT_CONNECT_ERROR) { args -> if (args.isNotEmpty()) { val error = args[0] Log.e(TAG, "Socket connection error: $error") } } socket.on(Socket.EVENT_DISCONNECT) { args -> if (args.isNotEmpty()) { val reason = args[0] Log.d(TAG, "Socket disconnected: $reason") } } socket.on("signal") { args -> Log.d(TAG, "Received signaling: ${args[0]}") if (args.isNotEmpty() && args[0] is JSONObject) { val data = args[0] as JSONObject handleSignalingData(data) } } socket.connect() Log.d(TAG, "Connecting to Socket: $socketUrl...") } catch (e: Exception) { Log.e(TAG, "Error connecting to Socket: ${e.message}") } } private fun handleSignalingData(data: JSONObject) { Log.d(TAG, "Handling signaling data: $data") when (data.getString("type")) { "answer" -> { // Existing handling for "answer" } "ice" -> { // Existing handling for "ice" } "time_sync_request" -> { val t1 = data.getLong("t1") val t2 = System.currentTimeMillis() val t3 = System.currentTimeMillis() val syncResponse = JSONObject().apply { put("type", "time_sync_response") put("t1", t1) put("t2", t2) put("t3", t3) put("room", currentRoom) } socket.emit("signal", syncResponse) Log.d(TAG, "Replied to time_sync_request with t1=$t1, t2=$t2, t3=$t3") // Calculate OWD using clockOffset val owd = t2 - t1 + clockOffset Log.d(TAG, "Time Sync: OWD=$owd ms") viewLifecycleOwner.lifecycleScope.launch { // Update OWD state owdState.longValue = owd // Update OWD history owdHistory.add(owd) if (owdHistory.size > 60) { owdHistory.removeAt(0) } // Calculate OWD statistics val maxOwd = owdHistory.maxOrNull() ?: 0L val minOwd = owdHistory.minOrNull() ?: 0L val averageOwd = if (owdHistory.isNotEmpty()) owdHistory.average().toLong() else 0L Log.d(TAG, "OWD - Latest: $owd ms, Max: $maxOwd ms, Min: $minOwd ms, Average: $averageOwd ms") } } "clock_offset" -> { // Optionally handle if Receiver also needs to know the offset } else -> { Log.e(TAG, "Unknown signaling type: ${data.getString("type")}") } } }

Additional Recommendations

  1. Multiple Time Sync Exchanges:

    • Perform multiple time synchronization exchanges to get a more accurate estimate of RTT and clock offset. This helps mitigate transient network delays and improves accuracy.
  2. Averaging Clock Offset:

    • Average the clock offset over multiple samples to smooth out variations and obtain a stable offset value.
  3. Handling Clock Drift:

    • Clocks on devices can drift over time. Periodically re-estimate the clock offset to maintain accuracy.
  4. Error Handling:

    • Implement error handling for scenarios where time synchronization fails or produces inconsistent results.
  5. Security Considerations:

    • Ensure that the time synchronization messages are authenticated and not susceptible to tampering or replay attacks.

Conclusion

The negative OWD values are primarily due to unsynchronized clocks between the initiator and receiver. By implementing a clock synchronization mechanism and adjusting the OWD calculation based on the estimated clock offset, you can achieve accurate and positive OWD measurements. This ensures that your application reliably measures network delays, enhancing the overall quality and performance of your video communication system.

If you need further assistance with implementing the clock synchronization or have additional questions, feel free to ask!

Compartir este Q&A