Skip to content

Commit

Permalink
Merge pull request #49 from vbence/chunked-reader-fix
Browse files Browse the repository at this point in the history
Chunked reader fix
  • Loading branch information
vbence committed Dec 30, 2020
2 parents 1a6ef4a + 4334a67 commit f754379
Show file tree
Hide file tree
Showing 12 changed files with 122 additions and 87 deletions.
1 change: 1 addition & 0 deletions .idea/modules/stream-m_test.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion build.gradle
Expand Up @@ -21,7 +21,7 @@ plugins {
}

group 'org.czentral'
version '1.0-SNAPSHOT'
version '1.0.1-SNAPSHOT'

sourceCompatibility = 1.8

Expand Down
21 changes: 2 additions & 19 deletions gradle/wrapper/gradle-wrapper.properties
@@ -1,23 +1,6 @@
#
# Copyright 2018 Bence Varga
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
# Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS
# OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#

#Sun Nov 18 17:10:41 CET 2018
#Wed Dec 30 23:37:28 CET 2020
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.0-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.0-all.zip
5 changes: 4 additions & 1 deletion src/main/java/org/czentral/incubator/streamm/HexDump.java
Expand Up @@ -39,11 +39,14 @@ public static String prettyPrintHex(byte[] buffer, int startOffset, int maxLengt
StringBuilder sb = new StringBuilder(4096);
int offset = startOffset;
while (offset < startOffset + maxLength) {
sb.append(String.format("%08X: ", offset));
int segmentLength = Math.min(LINE_LENGTH, startOffset + maxLength - offset);
for (int i = 0; i < segmentLength; i++) {
if (i > 0) {
sb.append(i % 4 == 0 ? '-' : ' ');
}
sb.append(digits[(buffer[offset + i] >> 4) & 0x0f]);
sb.append(digits[(buffer[offset + i]) & 0x0f]);
sb.append(' ');
//String digit = new String("0" + Integer.toHexString(buffer[offset + i] & 0xff));
//s += " " + digit.substring(digit.length() - 2);
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/czentral/minirtmp/AMFDecoder.java
Expand Up @@ -91,7 +91,7 @@ public Map<String, Object> readObject() {
throw new RuntimeException(e);
}
offset += 2 + length;
//System.err.println("k: " + key + ", o: " + offset);
//System.err.println("k: " + key + ", o: " + messageOffset);
if (length == 0) {
if (buffer[offset] != 0x09) {
throw new IllegalArgumentException("Object-end type ID (0x09) expected.");
Expand Down
Expand Up @@ -131,7 +131,7 @@ public void processChunk(MessageInfo mi, ByteBuffer buffer) {
}

buffer.get(assemblyBuffer.array, mi.offset, buffer.remaining());
//System.arraycopy(readBuffer, payloadOffset, assemblyBuffer.array, mi.offset, payloadLength);
//System.arraycopy(readBuffer, payloadOffset, assemblyBuffer.array, mi.messageOffset, payloadLength);

boolean assembled = (mi.offset + payloadLength >= mi.length);
if (assembled) {
Expand Down
Expand Up @@ -54,7 +54,7 @@ public int process(byte[] buffer, int offset, int length) {
return 0;
}

//System.err.println(HexDump.prettyPrintHex(buffer, offset, length));
//System.err.println(HexDump.prettyPrintHex(buffer, messageOffset, length));

if (stage == Stage.ClientBlock) {
int processed = bytesNeeded;
Expand Down
42 changes: 15 additions & 27 deletions src/main/java/org/czentral/minirtmp/RTMPStreamProcessor.java
Expand Up @@ -28,8 +28,6 @@ class RTMPStreamProcessor implements Processor {

protected boolean finished = false;

protected Map<Integer, MessageInfo> lastMessages = new HashMap<Integer, MessageInfo>();

private final int DEFAULT_CHUNK_SIZE = 128;

protected ResourceLimit limit;
Expand All @@ -39,6 +37,7 @@ class RTMPStreamProcessor implements Processor {
protected RtmpReader reader = new RtmpReader(DEFAULT_CHUNK_SIZE);

protected RtmpPacket lastPacket = null;
protected int chunkOffset = 0;

public RTMPStreamProcessor(ResourceLimit limit, ChunkProcessor chunkProcessor) {
this.limit = limit;
Expand Down Expand Up @@ -90,62 +89,51 @@ public int processPacket(byte[] buffer, int offset, int length) {
int processed = 0;

ByteBuffer bb = ByteBuffer.wrap(buffer, offset, length);
if (lastPacket == null || lastPacket.offset >= lastPacket.payloadLegth) {
if (lastPacket == null || chunkOffset >= reader.getChunkSize() || lastPacket.messageOffset + chunkOffset >= lastPacket.messageSize) {
try {
Optional<RtmpPacket> op = reader.read(bb);
if (!op.isPresent()) {
return 0;
}
lastPacket = op.get();
chunkOffset = 0;
processed += lastPacket.headerLength;
//System.out.println(op.get());
} catch (RtmpException e) {
e.printStackTrace();
return 0;
throw new RuntimeException(e);
}
}
//if (bb.remaining() < lastPacket.payloadLegth) {
// return 0;
//}

//System.err.println("code: " + code + ", sid: " + sid + ", type: " + type + ", length: " + payloadLength);
//System.err.println(HexDump.prettyPrintHex(buffer, bufferOffset, sidLength + headLength + payloadLength));
//System.err.println(HexDump.prettyPrintHex(buffer, bufferOffset, Math.min(16, sidLength + headLength + payloadLength)));


// change chunk size command processed
if (lastPacket.messageType == 0x01) {
if (bb.remaining() < 4) {
return processed;
return 0;
}
int proposedChunkSize = (bb.get() & 0xff) << 24 | (bb.get() & 0xff) << 16
| (bb.get() & 0xff) << 8 | (bb.get() & 0xff);
ByteBuffer bbc = bb.duplicate();
int proposedChunkSize = (bbc.get() & 0xff) << 24 | (bbc.get() & 0xff) << 16
| (bbc.get() & 0xff) << 8 | (bbc.get() & 0xff);
int newChunkSize = Math.min(proposedChunkSize, 0xFFFFFF);
reader.setChunkSize(newChunkSize);
}

//chunkProcessor.processChunk(lastMessage, buffer, readOffset, payloadLength);
//MessageInfo messageInfo = buildInfo(p);
int byteCount = Math.min((int)lastPacket.payloadLegth, bb.remaining());
int chunkRemaining = (int)Math.min(lastPacket.messageSize-(lastPacket.messageOffset+chunkOffset), reader.getChunkSize() - chunkOffset);
int byteCount = Math.min(chunkRemaining, bb.remaining());
if (byteCount > 0) {
chunkProcessor.processChunk(buildInfo(lastPacket), bb.array(), bb.position(), byteCount);
chunkProcessor.processChunk(buildInfo(lastPacket, chunkOffset), bb.array(), bb.position(), byteCount);

finished = !chunkProcessor.alive();

lastPacket = lastPacket.transposed(byteCount);
chunkOffset += byteCount;

processed += byteCount;
}

//System.out.printf("len: %d, newLen: %d%n", sidLength + headLength + payloadLength, p.messageSize + p.headerLength);
//return sidLength + headLength + payloadLength;
//System.out.printf("len: %d, newLen: %d (%d + %d)%n", sidLength + headLength + payloadLength, p.headerLength + p.payloadLegth, p.headerLength, p.payloadLegth);
return processed;
}

private MessageInfo buildInfo(RtmpPacket p) {
private static MessageInfo buildInfo(RtmpPacket p, int chunkOffset) {
MessageInfo mi = new MessageInfo(p.sid, p.messageType, (int)p.messageSize);
mi.offset = (int)p.offset;
mi.length = (int)p.messageSize;
mi.offset = (int)p.messageOffset + chunkOffset;
mi.calculatedTimestamp = p.absoluteTimestamp;
return mi;
}
Expand Down
32 changes: 3 additions & 29 deletions src/main/java/org/czentral/minirtmp/RtmpPacket.java
Expand Up @@ -31,8 +31,7 @@ public class RtmpPacket {
long messageStreamId;

long absoluteTimestamp;
long offset;
long payloadLegth;
long messageOffset;
long headerLength;

protected RtmpPacket() {
Expand Down Expand Up @@ -76,31 +75,6 @@ public boolean hasExtendedTimestamp() {
return packetTimestamp == 0xffffff;
}

private RtmpPacket(RtmpPacket other) {
this.chunkType = other.chunkType;
this.sid = other.sid;
this.packetTimestamp = other.packetTimestamp;
this.messageSize = other.messageSize;
this.messageType = other.messageType;
this.messageStreamId = other.messageStreamId;
this.absoluteTimestamp = other.absoluteTimestamp;
this.offset = other.offset;
this.payloadLegth = other.payloadLegth;
this.headerLength = other.headerLength;
}

/**
* Accounts part of the packet being processed by creating a new packet with modified offset and payloadLength.
* @param byteCount
* @return
*/
public RtmpPacket transposed(int byteCount) {
RtmpPacket result = new RtmpPacket(this);
result.offset += byteCount;
result.payloadLegth -= byteCount;
return result;
}

@Override
public String toString() {
return "RtmpPacket{" +
Expand All @@ -111,8 +85,8 @@ public String toString() {
", messageType=" + messageType +
", messageStreamId=" + messageStreamId +
", absoluteTimestamp=" + absoluteTimestamp +
", offset=" + offset +
", payloadLegth=" + payloadLegth +
", messageOffset=" + messageOffset +
", headerLength=" + headerLength +
'}';
}
}
16 changes: 9 additions & 7 deletions src/main/java/org/czentral/minirtmp/RtmpReader.java
Expand Up @@ -42,7 +42,11 @@ public Optional<RtmpPacket> read(ByteBuffer buffer) throws RtmpException {
RtmpPacket lastPacket = lastPackets.getOrDefault(p.sid, null);

boolean newMessage = lastPacket == null ||
lastPacket.offset + lastPacket.payloadLegth == lastPacket.messageSize;
lastPacket.messageOffset + chunkSize >= lastPacket.messageSize;

if (!newMessage) {
p.messageOffset = lastPacket.messageOffset + chunkSize;
}

if (lastPacket != null) {
p.absoluteTimestamp = lastPacket.absoluteTimestamp;
Expand Down Expand Up @@ -83,12 +87,6 @@ public Optional<RtmpPacket> read(ByteBuffer buffer) throws RtmpException {
}

p.headerLength = buffer.position() - originalOffset;
if (!newMessage) {
p.offset = lastPacket.offset + lastPacket.payloadLegth;
p.payloadLegth = Math.min(chunkSize, lastPacket.messageSize - p.offset);
} else {
p.payloadLegth = Math.min(chunkSize, p.messageSize);
}

lastPackets.put(p.sid, p);

Expand All @@ -98,4 +96,8 @@ public Optional<RtmpPacket> read(ByteBuffer buffer) throws RtmpException {
public void setChunkSize(long chunkSize) {
this.chunkSize = chunkSize;
}

public long getChunkSize() {
return chunkSize;
}
}
84 changes: 84 additions & 0 deletions src/test/java/org/czentral/minirtmp/RTMPStreamProcessorTest.java
@@ -0,0 +1,84 @@
/*
* Copyright 2020 Bence Varga
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS
* OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package org.czentral.minirtmp;

import org.junit.Test;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;

public class RTMPStreamProcessorTest {

@Test
public void process() throws Exception {
ResourceLimit limit = new ResourceLimit();
limit.assemblyBufferCount = 100;
limit.assemblyBufferSize = 200 * 1024;
limit.chunkStreamCount = 100;

String pathname = "rtmp-in.pcap";
byte[] buffer = loadFile(pathname);
int start = 0xc5f;
int end = 0x16c7 + 0x28;

for (int i=start; i<end; i++) {
TestChunkProcessor cp = new TestChunkProcessor();
RTMPStreamProcessor o = new RTMPStreamProcessor(limit, cp);
try {
int round1 = o.process(buffer, start, i - start);
System.out.print("i: " + String.format("%x", i) + ", c: " + cp.counter);
o.process(buffer, start + round1, end - (start + round1));
System.out.println(", c2: " + cp.counter);
} catch (Exception e) {
System.err.println("i: " + String.format("%x", i));
throw e;
}
}
System.out.println(buffer.length);
}

private byte[] loadFile(String pathname) throws IOException {
//File file = new File(pathname);
ClassLoader classLoader = getClass().getClassLoader();
File file = new File(classLoader.getResource(pathname).getFile());
FileInputStream fis = new FileInputStream(file);
byte[] buffer = new byte[(int)file.length()];
int offset = 0;
int read = 0;
while (offset < buffer.length) {
read = fis.read(buffer, offset, buffer.length - offset);
offset += read;
}
return buffer;
}

class TestChunkProcessor implements ChunkProcessor {

int counter = 0;
@Override
public boolean alive() {
return true;
}

@Override
public void processChunk(MessageInfo mi, byte[] buffer, int payloadOffset, int payloadLength) {
counter++;
}
}
}
Binary file added src/test/resources/rtmp-in.pcap
Binary file not shown.

0 comments on commit f754379

Please sign in to comment.