Skip to content

Commit

Permalink
fix bitratemanager tests and add TimeUtils to rtmp
Browse files Browse the repository at this point in the history
  • Loading branch information
pedroSG94 committed Sep 18, 2023
1 parent abc975c commit 8824da8
Show file tree
Hide file tree
Showing 21 changed files with 325 additions and 108 deletions.
3 changes: 2 additions & 1 deletion rtmp/src/main/java/com/pedro/rtmp/amf/v0/AmfDate.kt
Expand Up @@ -16,6 +16,7 @@

package com.pedro.rtmp.amf.v0

import com.pedro.rtmp.utils.TimeUtils
import com.pedro.rtmp.utils.readUntil
import java.io.IOException
import java.io.InputStream
Expand All @@ -28,7 +29,7 @@ import java.nio.ByteBuffer
* milliseconds from 1st Jan 1970 in UTC time zone.
* timeZone value is a reserved value that should be 0x0000
*/
class AmfDate(var date: Double = System.currentTimeMillis().toDouble()): AmfData() {
class AmfDate(var date: Double = TimeUtils.getCurrentTimeMillis().toDouble()): AmfData() {

@Throws(IOException::class)
override fun readBody(input: InputStream) {
Expand Down
7 changes: 4 additions & 3 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManager.kt
Expand Up @@ -25,6 +25,7 @@ import com.pedro.rtmp.rtmp.message.control.Type
import com.pedro.rtmp.rtmp.message.control.UserControl
import com.pedro.rtmp.utils.CommandSessionHistory
import com.pedro.rtmp.utils.RtmpConfig
import com.pedro.rtmp.utils.TimeUtils
import com.pedro.rtmp.utils.socket.RtmpSocket
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
Expand Down Expand Up @@ -80,7 +81,7 @@ abstract class CommandsManager {
}

protected fun getCurrentTimestamp(): Int {
return (System.currentTimeMillis() / 1000 - timestamp).toInt()
return (TimeUtils.getCurrentTimeMillis() / 1000 - timestamp).toInt()
}

@Throws(IOException::class)
Expand Down Expand Up @@ -182,7 +183,7 @@ abstract class CommandsManager {
writeSync.withLock {
val output = socket.getOutStream()
if (akamaiTs) {
flvPacket.timeStamp = ((System.nanoTime() / 1000 - startTs) / 1000)
flvPacket.timeStamp = ((TimeUtils.getCurrentTimeNano() / 1000 - startTs) / 1000)
}
val video = Video(flvPacket, streamId)
video.writeHeader(output)
Expand All @@ -197,7 +198,7 @@ abstract class CommandsManager {
writeSync.withLock {
val output = socket.getOutStream()
if (akamaiTs) {
flvPacket.timeStamp = ((System.nanoTime() / 1000 - startTs) / 1000)
flvPacket.timeStamp = ((TimeUtils.getCurrentTimeNano() / 1000 - startTs) / 1000)
}
val audio = Audio(flvPacket, streamId)
audio.writeHeader(output)
Expand Down
3 changes: 2 additions & 1 deletion rtmp/src/main/java/com/pedro/rtmp/rtmp/Handshake.kt
Expand Up @@ -17,6 +17,7 @@
package com.pedro.rtmp.rtmp

import android.util.Log
import com.pedro.rtmp.utils.TimeUtils
import com.pedro.rtmp.utils.readUntil
import com.pedro.rtmp.utils.socket.RtmpSocket
import java.io.IOException
Expand Down Expand Up @@ -102,7 +103,7 @@ class Handshake {
Log.i(TAG, "writing C1")
val c1 = ByteArray(handshakeSize)

timestampC1 = (System.currentTimeMillis() / 1000).toInt()
timestampC1 = (TimeUtils.getCurrentTimeMillis() / 1000).toInt()
Log.i(TAG, "writing time $timestampC1 to c1")
val timestampData = ByteArray(4)
timestampData[0] = (timestampC1 ushr 24).toByte()
Expand Down
5 changes: 3 additions & 2 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt
Expand Up @@ -27,6 +27,7 @@ import com.pedro.rtmp.rtmp.message.control.UserControl
import com.pedro.rtmp.utils.AuthUtil
import com.pedro.rtmp.utils.ConnectCheckerRtmp
import com.pedro.rtmp.utils.RtmpConfig
import com.pedro.rtmp.utils.TimeUtils
import com.pedro.rtmp.utils.onMainThread
import com.pedro.rtmp.utils.socket.RtmpSocket
import com.pedro.rtmp.utils.socket.TcpSocket
Expand Down Expand Up @@ -304,11 +305,11 @@ class RtmpClient(private val connectCheckerRtmp: ConnectCheckerRtmp) {
this.socket = socket
socket.connect()
if (!socket.isConnected()) return false
val timestamp = System.currentTimeMillis() / 1000
val timestamp = TimeUtils.getCurrentTimeMillis() / 1000
val handshake = Handshake()
if (!handshake.sendHandshake(socket)) return false
commandsManager.timestamp = timestamp.toInt()
commandsManager.startTs = System.nanoTime() / 1000
commandsManager.startTs = TimeUtils.getCurrentTimeNano() / 1000
return true
}

Expand Down
6 changes: 3 additions & 3 deletions rtmp/src/main/java/com/pedro/rtmp/utils/BitrateManager.kt
Expand Up @@ -24,16 +24,16 @@ package com.pedro.rtmp.utils
open class BitrateManager(private val connectCheckerRtmp: ConnectCheckerRtmp) {

private var bitrate: Long = 0
private var timeStamp = System.currentTimeMillis()
private var timeStamp = TimeUtils.getCurrentTimeMillis()

suspend fun calculateBitrate(size: Long) {
bitrate += size
val timeDiff = System.currentTimeMillis() - timeStamp
val timeDiff = TimeUtils.getCurrentTimeMillis() - timeStamp
if (timeDiff >= 1000) {
onMainThread {
connectCheckerRtmp.onNewBitrateRtmp((bitrate / (timeDiff / 1000f)).toLong())
}
timeStamp = System.currentTimeMillis()
timeStamp = TimeUtils.getCurrentTimeMillis()
bitrate = 0
}
}
Expand Down
29 changes: 29 additions & 0 deletions rtmp/src/main/java/com/pedro/rtmp/utils/TimeUtils.kt
@@ -0,0 +1,29 @@
/*
* Copyright (C) 2023 pedroSG94.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.pedro.rtmp.utils

/**
* Created by pedro on 30/8/23.
*/
object TimeUtils {

@JvmStatic
fun getCurrentTimeMillis(): Long = System.currentTimeMillis()

@JvmStatic
fun getCurrentTimeNano(): Long = System.nanoTime()
}
Expand Up @@ -17,6 +17,7 @@
package com.pedro.rtmp.utils.socket

import android.util.Log
import com.pedro.rtmp.utils.TimeUtils
import java.io.*
import java.net.HttpURLConnection
import java.net.SocketTimeoutException
Expand Down Expand Up @@ -49,12 +50,12 @@ class TcpTunneledSocket(private val host: String, private val port: Int, private

override fun getInputStream(): InputStream {
synchronized(sync) {
val start = System.currentTimeMillis()
val start = TimeUtils.getCurrentTimeMillis()
while (input.available() <= 1 && connected) {
val i = index.addAndGet(1)
val bytes = requestRead("idle/$connectionId/$i", secured)
input = ByteArrayInputStream(bytes, 1, bytes.size)
if (System.currentTimeMillis() - start >= timeout) {
if (TimeUtils.getCurrentTimeMillis() - start >= timeout) {
throw SocketTimeoutException("couldn't receive a valid packet")
}
}
Expand Down
23 changes: 23 additions & 0 deletions rtmp/src/test/java/com/pedro/rtmp/MainDispatcherRule.kt
@@ -0,0 +1,23 @@
package com.pedro.rtmp

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.TestDispatcher
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.resetMain
import kotlinx.coroutines.test.setMain
import org.junit.rules.TestWatcher
import org.junit.runner.Description

@OptIn(ExperimentalCoroutinesApi::class)
class MainDispatcherRule(
private val testDispatcher: TestDispatcher = UnconfinedTestDispatcher()
) : TestWatcher() {
override fun starting(description: Description) {
Dispatchers.setMain(testDispatcher)
}

override fun finished(description: Description) {
Dispatchers.resetMain()
}
}
38 changes: 38 additions & 0 deletions rtmp/src/test/java/com/pedro/rtmp/Utils.kt
@@ -0,0 +1,38 @@
/*
* Copyright (C) 2023 pedroSG94.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.pedro.rtmp

import org.mockito.MockedStatic

/**
* Created by pedro on 1/9/23.
*/
object Utils {

suspend fun useStatics(statics: List<MockedStatic<out Any>>, callback: suspend () -> Unit) {
val list = statics.toMutableList()
if (list.isEmpty()) callback()
else if (list.size == 1) {
list[0].use {
callback()
}
} else {
val value = list.removeAt(0)
value.use { useStatics(list, callback) }
}
}
}
53 changes: 38 additions & 15 deletions rtmp/src/test/java/com/pedro/rtmp/utils/BitrateManagerTest.kt
Expand Up @@ -16,10 +16,17 @@

package com.pedro.rtmp.utils

import com.pedro.rtmp.MainDispatcherRule
import com.pedro.rtmp.Utils
import kotlinx.coroutines.test.runTest
import org.junit.After
import org.junit.Assert.assertTrue
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.runner.RunWith
import org.mockito.Mock
import org.mockito.Mockito
import org.mockito.junit.MockitoJUnitRunner
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.times
Expand All @@ -32,25 +39,41 @@ import org.mockito.kotlin.verify
@RunWith(MockitoJUnitRunner::class)
class BitrateManagerTest {

@get:Rule
val mainDispatcherRule = MainDispatcherRule()
@Mock
private lateinit var connectCheckerRtmp: ConnectCheckerRtmp
private val timeUtilsMocked = Mockito.mockStatic(TimeUtils::class.java)
private var fakeTime = 7502849023L

@Before
fun setup() {
timeUtilsMocked.`when`<Long>(TimeUtils::getCurrentTimeMillis).then { fakeTime }
}

@After
fun teardown() {
fakeTime = 7502849023L
}

@Test
fun `WHEN set multiple values THEN return total of values each second`() {
val bitrateManager = BitrateManager(connectCheckerRtmp)
val fakeValues = arrayOf(100L, 200L, 300L, 400L, 500L)
var expectedResult = 0L
fakeValues.forEach {
bitrateManager.calculateBitrate(it)
expectedResult += it
fun `WHEN set multiple values THEN return total of values each second`() = runTest {
Utils.useStatics(listOf(timeUtilsMocked)) {
val bitrateManager = BitrateManager(connectCheckerRtmp)
val fakeValues = arrayOf(100L, 200L, 300L, 400L, 500L)
var expectedResult = 0L
fakeValues.forEach {
bitrateManager.calculateBitrate(it)
expectedResult += it
}
fakeTime += 1000
val value = 100L
bitrateManager.calculateBitrate(value)
expectedResult += value
val resultValue = argumentCaptor<Long>()
verify(connectCheckerRtmp, times(1)).onNewBitrateRtmp(resultValue.capture())
val marginError = 20
assertTrue(expectedResult - marginError <= resultValue.firstValue && resultValue.firstValue <= expectedResult + marginError)
}
Thread.sleep(1000)
val value = 100L
bitrateManager.calculateBitrate(value)
expectedResult += value
val resultValue = argumentCaptor<Long>()
verify(connectCheckerRtmp, times(1)).onNewBitrateRtmp(resultValue.capture())
val marginError = 20
assertTrue(expectedResult - marginError <= resultValue.firstValue && resultValue.firstValue <= expectedResult + marginError)
}
}
23 changes: 23 additions & 0 deletions rtsp/src/test/java/com/pedro/rtsp/MainDispatcherRule.kt
@@ -0,0 +1,23 @@
package com.pedro.rtsp

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.TestDispatcher
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.resetMain
import kotlinx.coroutines.test.setMain
import org.junit.rules.TestWatcher
import org.junit.runner.Description

@OptIn(ExperimentalCoroutinesApi::class)
class MainDispatcherRule(
private val testDispatcher: TestDispatcher = UnconfinedTestDispatcher()
) : TestWatcher() {
override fun starting(description: Description) {
Dispatchers.setMain(testDispatcher)
}

override fun finished(description: Description) {
Dispatchers.resetMain()
}
}

0 comments on commit 8824da8

Please sign in to comment.