Skip to content

Commit

Permalink
Merge pull request #1432 from pedroSG94/refactor/send-psi
Browse files Browse the repository at this point in the history
Refactor/send psi
  • Loading branch information
pedroSG94 committed Mar 11, 2024
2 parents 38fc27f + bb671f6 commit 971d211
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 137 deletions.
51 changes: 22 additions & 29 deletions srt/src/main/java/com/pedro/srt/mpeg2ts/psi/PsiManager.kt
Expand Up @@ -16,7 +16,12 @@

package com.pedro.srt.mpeg2ts.psi

import com.pedro.common.TimeUtils
import com.pedro.srt.mpeg2ts.MpegTsPacket
import com.pedro.srt.mpeg2ts.MpegTsPacketizer
import com.pedro.srt.mpeg2ts.MpegType
import com.pedro.srt.mpeg2ts.service.Mpeg2TsService
import com.pedro.srt.srt.packets.data.PacketPosition
import kotlin.random.Random

/**
Expand All @@ -25,43 +30,36 @@ import kotlin.random.Random
class PsiManager(
private var service: Mpeg2TsService
) {

private val idExtension = Random.nextInt(Byte.MIN_VALUE.toInt(), Byte.MAX_VALUE.toInt()).toShort()
private var sdtCount = 0
private var patCount = 0
companion object {
const val sdtPeriod = 200
const val patPeriod = 40
const val INTERVAL = 100 //ms
}

private var sdt = Sdt(
private val idExtension = Random.nextInt(Byte.MIN_VALUE.toInt(), Byte.MAX_VALUE.toInt()).toShort()
private var lastTime = 0L

val sdt = Sdt(
idExtension = idExtension,
version = 0,
service = service
)

private var pat = Pat(
val pat = Pat(
idExtension = idExtension,
version = 0,
service = service
)

fun shouldSend(isKey: Boolean = false): TableToSend {
var value = TableToSend.NONE
if (sdtCount >= sdtPeriod && patCount >= patPeriod) {
value = TableToSend.ALL
sdtCount = 0
patCount = 0
} else if (patCount >= patPeriod || isKey) {
value = TableToSend.PAT_PMT
patCount = 0
} else if (sdtCount >= sdtPeriod) {
value = TableToSend.SDT
sdtCount = 0
fun checkSendInfo(isKey: Boolean = false, mpegTsPacketizer: MpegTsPacketizer): List<MpegTsPacket> {
val pmt = service.pmt ?: return arrayListOf()
val currentTime = TimeUtils.getCurrentTimeMillis()
if (isKey || TimeUtils.getCurrentTimeMillis() - lastTime >= INTERVAL) {
lastTime = currentTime
val psiPackets = mpegTsPacketizer.write(listOf(pat, pmt, sdt), increasePsiContinuity = true).map { b ->
MpegTsPacket(b, MpegType.PSI, PacketPosition.SINGLE, isKey = false)
}
return psiPackets
}
sdtCount++
patCount++
return value
return arrayListOf()
}

fun upgradeSdtVersion() {
Expand All @@ -80,13 +78,8 @@ class PsiManager(
return service.tracks.find { !it.codec.isAudio() }?.pid ?: 0
}

fun getSdt(): Sdt = sdt
fun getPat(): Pat = pat
fun getPmt(): Pmt? = service.pmt

fun reset() {
sdtCount = 0
patCount = 0
lastTime = 0
}

fun updateService(service: Mpeg2TsService) {
Expand Down
57 changes: 13 additions & 44 deletions srt/src/main/java/com/pedro/srt/srt/SrtSender.kt
Expand Up @@ -23,20 +23,16 @@ import com.pedro.common.BitrateManager
import com.pedro.common.ConnectChecker
import com.pedro.common.onMainThread
import com.pedro.common.trySend
import com.pedro.srt.mpeg2ts.Codec
import com.pedro.srt.mpeg2ts.MpegTsPacket
import com.pedro.srt.mpeg2ts.MpegTsPacketizer
import com.pedro.srt.mpeg2ts.MpegType
import com.pedro.srt.mpeg2ts.Pid
import com.pedro.srt.mpeg2ts.packets.AacPacket
import com.pedro.srt.mpeg2ts.packets.BasePacket
import com.pedro.srt.mpeg2ts.packets.H26XPacket
import com.pedro.srt.mpeg2ts.packets.OpusPacket
import com.pedro.srt.mpeg2ts.psi.PsiManager
import com.pedro.srt.mpeg2ts.psi.TableToSend
import com.pedro.srt.mpeg2ts.service.Mpeg2TsService
import com.pedro.srt.srt.packets.SrtPacket
import com.pedro.srt.srt.packets.data.PacketPosition
import com.pedro.srt.utils.SrtSocket
import com.pedro.srt.utils.toCodec
import kotlinx.coroutines.CoroutineScope
Expand Down Expand Up @@ -127,8 +123,6 @@ class SrtSender(
fun sendVideoFrame(h264Buffer: ByteBuffer, info: MediaCodec.BufferInfo) {
if (running) {
h26XPacket.createAndSendPacket(h264Buffer, info) { mpegTsPackets ->
val isKey = mpegTsPackets[0].isKey
checkSendInfo(isKey)
val result = queue.trySend(mpegTsPackets)
if (!result) {
Log.i(TAG, "Video frame discarded")
Expand All @@ -141,8 +135,6 @@ class SrtSender(
fun sendAudioFrame(aacBuffer: ByteBuffer, info: MediaCodec.BufferInfo) {
if (running) {
audioPacket.createAndSendPacket(aacBuffer, info) { mpegTsPackets ->
val isKey = mpegTsPackets[0].isKey
checkSendInfo(isKey)
val result = queue.trySend(mpegTsPackets)
if (!result) {
Log.i(TAG, "Audio frame discarded")
Expand All @@ -157,13 +149,6 @@ class SrtSender(
setTrackConfig(!commandsManager.videoDisabled, !commandsManager.audioDisabled)
running = true
job = scope.launch {
//send config
val psiList = mutableListOf(psiManager.getSdt(), psiManager.getPat())
psiManager.getPmt()?.let { psiList.add(0, it) }
val psiPackets = mpegTsPacketizer.write(psiList).map { b ->
MpegTsPacket(b, MpegType.PSI, PacketPosition.SINGLE, isKey = false)
}
queue.trySend(psiPackets)
var bytesSend = 0L
val bitrateTask = async {
while (scope.isActive && running) {
Expand All @@ -178,14 +163,10 @@ class SrtSender(
val mpegTsPackets = runInterruptible {
queue.poll(1, TimeUnit.SECONDS)
}
mpegTsPackets.forEach { mpegTsPacket ->
var size = 0
size += commandsManager.writeData(mpegTsPacket, socket)
if (isEnableLogs) {
Log.i(TAG, "wrote ${mpegTsPacket.type.name} packet, size $size")
}
bytesSend += size
}
val isKey = mpegTsPackets[0].isKey
val psiPackets = psiManager.checkSendInfo(isKey, mpegTsPacketizer)
bytesSend += sendPackets(psiPackets)
bytesSend += sendPackets(mpegTsPackets)
}.exceptionOrNull()
if (error != null) {
onMainThread {
Expand All @@ -198,29 +179,17 @@ class SrtSender(
}
}

private fun checkSendInfo(isKey: Boolean = false) {
val pmt = psiManager.getPmt() ?: return
when (psiManager.shouldSend(isKey)) {
TableToSend.PAT_PMT -> {
val psiPackets = mpegTsPacketizer.write(listOf(psiManager.getPat(), pmt), increasePsiContinuity = true).map { b ->
MpegTsPacket(b, MpegType.PSI, PacketPosition.SINGLE, isKey = false)
}
queue.trySend(psiPackets)
}
TableToSend.SDT -> {
val psiPackets = mpegTsPacketizer.write(listOf(psiManager.getSdt()), increasePsiContinuity = true).map { b ->
MpegTsPacket(b, MpegType.PSI, PacketPosition.SINGLE, isKey = false)
}
queue.trySend(psiPackets)
}
TableToSend.NONE -> {}
TableToSend.ALL -> {
val psiPackets = mpegTsPacketizer.write(listOf(pmt, psiManager.getSdt(), psiManager.getPat()), increasePsiContinuity = true).map { b ->
MpegTsPacket(b, MpegType.PSI, PacketPosition.SINGLE, isKey = false)
}
queue.trySend(psiPackets)
private suspend fun sendPackets(packets: List<MpegTsPacket>): Long {
var bytesSend = 0L
packets.forEach { mpegTsPacket ->
var size = 0
size += commandsManager.writeData(mpegTsPacket, socket)
if (isEnableLogs) {
Log.i(TAG, "wrote ${mpegTsPacket.type.name} packet, size $size")
}
bytesSend += size
}
return bytesSend
}

suspend fun stop(clear: Boolean) {
Expand Down
31 changes: 10 additions & 21 deletions srt/src/test/java/com/pedro/srt/mpeg2ts/PsiManagerTest.kt
Expand Up @@ -17,9 +17,9 @@
package com.pedro.srt.mpeg2ts

import com.pedro.srt.mpeg2ts.psi.PsiManager
import com.pedro.srt.mpeg2ts.psi.TableToSend
import com.pedro.srt.mpeg2ts.service.Mpeg2TsService
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Test

/**
Expand All @@ -30,32 +30,21 @@ class PsiManagerTest {
private val service = Mpeg2TsService()

@Test
fun `GIVEN a psiManager WHEN call should send is key false patPeriod times THEN return TableToSend PAT_PMT`() {
fun `GIVEN a psiManager WHEN call should send is key false interval times THEN return psi packets`() {
service.generatePmt()
val psiManager = PsiManager(service)
var sendValue = TableToSend.NONE
(0..PsiManager.patPeriod).forEach { _ ->
sendValue = psiManager.shouldSend(false)
}
assertEquals(TableToSend.PAT_PMT, sendValue)
}

@Test
fun `GIVEN a psiManager WHEN call should send is key false sdtPeriod times THEN return TableToSend ALL`() {
val psiManager = PsiManager(service)
var sendValue = TableToSend.NONE
(0..PsiManager.sdtPeriod).forEach { _ ->
sendValue = psiManager.shouldSend(false)
}
assertEquals(TableToSend.ALL, sendValue)
val mpegTsPacketizer = MpegTsPacketizer(psiManager)
val result = psiManager.checkSendInfo(false, mpegTsPacketizer)
assertTrue(result.isNotEmpty())
}

@Test
fun `GIVEN a psiManager WHEN update pat and sdt version THEN get version 1`() {
val psiManager = PsiManager(service)
psiManager.upgradePatVersion()
psiManager.upgradeSdtVersion()
assertEquals(0x01.toByte(), psiManager.getPat().version)
assertEquals(0x01.toByte(), psiManager.getSdt().version)
assertEquals(0x01.toByte(), psiManager.pat.version)
assertEquals(0x01.toByte(), psiManager.sdt.version)
}

@Test
Expand All @@ -64,7 +53,7 @@ class PsiManagerTest {
val name = "name updated"
val serviceUpdated = service.copy(name = name)
psiManager.updateService(serviceUpdated)
assertEquals(name, psiManager.getPat().service.name)
assertEquals(name, psiManager.getSdt().service.name)
assertEquals(name, psiManager.pat.service.name)
assertEquals(name, psiManager.sdt.service.name)
}
}
56 changes: 13 additions & 43 deletions udp/src/main/java/com/pedro/udp/UdpSender.kt
Expand Up @@ -25,16 +25,13 @@ import com.pedro.common.onMainThread
import com.pedro.common.trySend
import com.pedro.srt.mpeg2ts.MpegTsPacket
import com.pedro.srt.mpeg2ts.MpegTsPacketizer
import com.pedro.srt.mpeg2ts.MpegType
import com.pedro.srt.mpeg2ts.Pid
import com.pedro.srt.mpeg2ts.packets.AacPacket
import com.pedro.srt.mpeg2ts.packets.BasePacket
import com.pedro.srt.mpeg2ts.packets.H26XPacket
import com.pedro.srt.mpeg2ts.packets.OpusPacket
import com.pedro.srt.mpeg2ts.psi.PsiManager
import com.pedro.srt.mpeg2ts.psi.TableToSend
import com.pedro.srt.mpeg2ts.service.Mpeg2TsService
import com.pedro.srt.srt.packets.data.PacketPosition
import com.pedro.srt.utils.Constants
import com.pedro.srt.utils.toCodec
import com.pedro.udp.utils.UdpSocket
Expand Down Expand Up @@ -126,8 +123,6 @@ class UdpSender(
fun sendVideoFrame(h264Buffer: ByteBuffer, info: MediaCodec.BufferInfo) {
if (running) {
h26XPacket.createAndSendPacket(h264Buffer, info) { mpegTsPackets ->
val isKey = mpegTsPackets[0].isKey
checkSendInfo(isKey)
val result = queue.trySend(mpegTsPackets)
if (!result) {
Log.i(TAG, "Video frame discarded")
Expand All @@ -140,8 +135,6 @@ class UdpSender(
fun sendAudioFrame(aacBuffer: ByteBuffer, info: MediaCodec.BufferInfo) {
if (running) {
audioPacket.createAndSendPacket(aacBuffer, info) { mpegTsPackets ->
val isKey = mpegTsPackets[0].isKey
checkSendInfo(isKey)
val result = queue.trySend(mpegTsPackets)
if (!result) {
Log.i(TAG, "Audio frame discarded")
Expand All @@ -156,13 +149,6 @@ class UdpSender(
setTrackConfig(!commandManager.videoDisabled, !commandManager.audioDisabled)
running = true
job = scope.launch {
//send config
val psiList = mutableListOf(psiManager.getSdt(), psiManager.getPat())
psiManager.getPmt()?.let { psiList.add(0, it) }
val psiPackets = mpegTsPacketizer.write(psiList).map { b ->
MpegTsPacket(b, MpegType.PSI, PacketPosition.SINGLE, isKey = false)
}
queue.trySend(psiPackets)
var bytesSend = 0L
val bitrateTask = async {
while (scope.isActive && running) {
Expand All @@ -177,14 +163,10 @@ class UdpSender(
val mpegTsPackets = runInterruptible {
queue.poll(1, TimeUnit.SECONDS)
}
mpegTsPackets.forEach { mpegTsPacket ->
var size = 0
size += commandManager.writeData(mpegTsPacket, socket)
if (isEnableLogs) {
Log.i(TAG, "wrote ${mpegTsPacket.type.name} packet, size $size")
}
bytesSend += size
}
val isKey = mpegTsPackets[0].isKey
val psiPackets = psiManager.checkSendInfo(isKey, mpegTsPacketizer)
bytesSend += sendPackets(psiPackets)
bytesSend += sendPackets(mpegTsPackets)
}.exceptionOrNull()
if (error != null) {
onMainThread {
Expand All @@ -197,29 +179,17 @@ class UdpSender(
}
}

private fun checkSendInfo(isKey: Boolean = false) {
val pmt = psiManager.getPmt() ?: return
when (psiManager.shouldSend(isKey)) {
TableToSend.PAT_PMT -> {
val psiPackets = mpegTsPacketizer.write(listOf(psiManager.getPat(), pmt), increasePsiContinuity = true).map { b ->
MpegTsPacket(b, MpegType.PSI, PacketPosition.SINGLE, isKey = false)
}
queue.trySend(psiPackets)
}
TableToSend.SDT -> {
val psiPackets = mpegTsPacketizer.write(listOf(psiManager.getSdt()), increasePsiContinuity = true).map { b ->
MpegTsPacket(b, MpegType.PSI, PacketPosition.SINGLE, isKey = false)
}
queue.trySend(psiPackets)
}
TableToSend.NONE -> {}
TableToSend.ALL -> {
val psiPackets = mpegTsPacketizer.write(listOf(pmt, psiManager.getSdt(), psiManager.getPat()), increasePsiContinuity = true).map { b ->
MpegTsPacket(b, MpegType.PSI, PacketPosition.SINGLE, isKey = false)
}
queue.trySend(psiPackets)
private suspend fun sendPackets(packets: List<MpegTsPacket>): Long {
var bytesSend = 0L
packets.forEach { mpegTsPacket ->
var size = 0
size += commandManager.writeData(mpegTsPacket, socket)
if (isEnableLogs) {
Log.i(TAG, "wrote ${mpegTsPacket.type.name} packet, size $size")
}
bytesSend += size
}
return bytesSend
}

suspend fun stop(clear: Boolean) {
Expand Down

0 comments on commit 971d211

Please sign in to comment.