diff --git a/.idea/modules/stream-m_test.iml b/.idea/modules/stream-m_test.iml
index 006aaa8..baf7b78 100644
--- a/.idea/modules/stream-m_test.iml
+++ b/.idea/modules/stream-m_test.iml
@@ -5,6 +5,7 @@
+
diff --git a/build.gradle b/build.gradle
index 3b43089..adcaf87 100644
--- a/build.gradle
+++ b/build.gradle
@@ -21,7 +21,7 @@ plugins {
}
group 'org.czentral'
-version '1.0-SNAPSHOT'
+version '1.0.1-SNAPSHOT'
sourceCompatibility = 1.8
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index a910f7d..890a5e7 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,23 +1,6 @@
-#
-# Copyright 2018 Bence Varga
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
-# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
-# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
-# permit persons to whom the Software is furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
-# Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
-# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS
-# OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
-# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
-#
-
-#Sun Nov 18 17:10:41 CET 2018
+#Wed Dec 30 23:37:28 CET 2020
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-6.0-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-6.0-all.zip
diff --git a/src/main/java/org/czentral/incubator/streamm/HexDump.java b/src/main/java/org/czentral/incubator/streamm/HexDump.java
index 7351db3..99115b8 100644
--- a/src/main/java/org/czentral/incubator/streamm/HexDump.java
+++ b/src/main/java/org/czentral/incubator/streamm/HexDump.java
@@ -39,11 +39,14 @@ public static String prettyPrintHex(byte[] buffer, int startOffset, int maxLengt
StringBuilder sb = new StringBuilder(4096);
int offset = startOffset;
while (offset < startOffset + maxLength) {
+ sb.append(String.format("%08X: ", offset));
int segmentLength = Math.min(LINE_LENGTH, startOffset + maxLength - offset);
for (int i = 0; i < segmentLength; i++) {
+ if (i > 0) {
+ sb.append(i % 4 == 0 ? '-' : ' ');
+ }
sb.append(digits[(buffer[offset + i] >> 4) & 0x0f]);
sb.append(digits[(buffer[offset + i]) & 0x0f]);
- sb.append(' ');
//String digit = new String("0" + Integer.toHexString(buffer[offset + i] & 0xff));
//s += " " + digit.substring(digit.length() - 2);
}
diff --git a/src/main/java/org/czentral/minirtmp/AMFDecoder.java b/src/main/java/org/czentral/minirtmp/AMFDecoder.java
index c377a49..b4c3aea 100644
--- a/src/main/java/org/czentral/minirtmp/AMFDecoder.java
+++ b/src/main/java/org/czentral/minirtmp/AMFDecoder.java
@@ -91,7 +91,7 @@ public Map readObject() {
throw new RuntimeException(e);
}
offset += 2 + length;
- //System.err.println("k: " + key + ", o: " + offset);
+ //System.err.println("k: " + key + ", o: " + messageOffset);
if (length == 0) {
if (buffer[offset] != 0x09) {
throw new IllegalArgumentException("Object-end type ID (0x09) expected.");
diff --git a/src/main/java/org/czentral/minirtmp/ApplicationContext.java b/src/main/java/org/czentral/minirtmp/ApplicationContext.java
index 136d1b5..cbec7f6 100644
--- a/src/main/java/org/czentral/minirtmp/ApplicationContext.java
+++ b/src/main/java/org/czentral/minirtmp/ApplicationContext.java
@@ -131,7 +131,7 @@ public void processChunk(MessageInfo mi, ByteBuffer buffer) {
}
buffer.get(assemblyBuffer.array, mi.offset, buffer.remaining());
- //System.arraycopy(readBuffer, payloadOffset, assemblyBuffer.array, mi.offset, payloadLength);
+ //System.arraycopy(readBuffer, payloadOffset, assemblyBuffer.array, mi.messageOffset, payloadLength);
boolean assembled = (mi.offset + payloadLength >= mi.length);
if (assembled) {
diff --git a/src/main/java/org/czentral/minirtmp/HandshakeProcessor.java b/src/main/java/org/czentral/minirtmp/HandshakeProcessor.java
index ea24fd3..0ae35aa 100644
--- a/src/main/java/org/czentral/minirtmp/HandshakeProcessor.java
+++ b/src/main/java/org/czentral/minirtmp/HandshakeProcessor.java
@@ -54,7 +54,7 @@ public int process(byte[] buffer, int offset, int length) {
return 0;
}
- //System.err.println(HexDump.prettyPrintHex(buffer, offset, length));
+ //System.err.println(HexDump.prettyPrintHex(buffer, messageOffset, length));
if (stage == Stage.ClientBlock) {
int processed = bytesNeeded;
diff --git a/src/main/java/org/czentral/minirtmp/RTMPStreamProcessor.java b/src/main/java/org/czentral/minirtmp/RTMPStreamProcessor.java
index b69bff4..72840b6 100644
--- a/src/main/java/org/czentral/minirtmp/RTMPStreamProcessor.java
+++ b/src/main/java/org/czentral/minirtmp/RTMPStreamProcessor.java
@@ -28,8 +28,6 @@ class RTMPStreamProcessor implements Processor {
protected boolean finished = false;
- protected Map lastMessages = new HashMap();
-
private final int DEFAULT_CHUNK_SIZE = 128;
protected ResourceLimit limit;
@@ -39,6 +37,7 @@ class RTMPStreamProcessor implements Processor {
protected RtmpReader reader = new RtmpReader(DEFAULT_CHUNK_SIZE);
protected RtmpPacket lastPacket = null;
+ protected int chunkOffset = 0;
public RTMPStreamProcessor(ResourceLimit limit, ChunkProcessor chunkProcessor) {
this.limit = limit;
@@ -90,62 +89,51 @@ public int processPacket(byte[] buffer, int offset, int length) {
int processed = 0;
ByteBuffer bb = ByteBuffer.wrap(buffer, offset, length);
- if (lastPacket == null || lastPacket.offset >= lastPacket.payloadLegth) {
+ if (lastPacket == null || chunkOffset >= reader.getChunkSize() || lastPacket.messageOffset + chunkOffset >= lastPacket.messageSize) {
try {
Optional op = reader.read(bb);
if (!op.isPresent()) {
return 0;
}
lastPacket = op.get();
+ chunkOffset = 0;
processed += lastPacket.headerLength;
- //System.out.println(op.get());
} catch (RtmpException e) {
e.printStackTrace();
- return 0;
+ throw new RuntimeException(e);
}
}
- //if (bb.remaining() < lastPacket.payloadLegth) {
- // return 0;
- //}
-
- //System.err.println("code: " + code + ", sid: " + sid + ", type: " + type + ", length: " + payloadLength);
- //System.err.println(HexDump.prettyPrintHex(buffer, bufferOffset, sidLength + headLength + payloadLength));
- //System.err.println(HexDump.prettyPrintHex(buffer, bufferOffset, Math.min(16, sidLength + headLength + payloadLength)));
-
+
// change chunk size command processed
if (lastPacket.messageType == 0x01) {
if (bb.remaining() < 4) {
- return processed;
+ return 0;
}
- int proposedChunkSize = (bb.get() & 0xff) << 24 | (bb.get() & 0xff) << 16
- | (bb.get() & 0xff) << 8 | (bb.get() & 0xff);
+ ByteBuffer bbc = bb.duplicate();
+ int proposedChunkSize = (bbc.get() & 0xff) << 24 | (bbc.get() & 0xff) << 16
+ | (bbc.get() & 0xff) << 8 | (bbc.get() & 0xff);
int newChunkSize = Math.min(proposedChunkSize, 0xFFFFFF);
reader.setChunkSize(newChunkSize);
}
- //chunkProcessor.processChunk(lastMessage, buffer, readOffset, payloadLength);
- //MessageInfo messageInfo = buildInfo(p);
- int byteCount = Math.min((int)lastPacket.payloadLegth, bb.remaining());
+ int chunkRemaining = (int)Math.min(lastPacket.messageSize-(lastPacket.messageOffset+chunkOffset), reader.getChunkSize() - chunkOffset);
+ int byteCount = Math.min(chunkRemaining, bb.remaining());
if (byteCount > 0) {
- chunkProcessor.processChunk(buildInfo(lastPacket), bb.array(), bb.position(), byteCount);
+ chunkProcessor.processChunk(buildInfo(lastPacket, chunkOffset), bb.array(), bb.position(), byteCount);
finished = !chunkProcessor.alive();
- lastPacket = lastPacket.transposed(byteCount);
+ chunkOffset += byteCount;
processed += byteCount;
}
- //System.out.printf("len: %d, newLen: %d%n", sidLength + headLength + payloadLength, p.messageSize + p.headerLength);
- //return sidLength + headLength + payloadLength;
- //System.out.printf("len: %d, newLen: %d (%d + %d)%n", sidLength + headLength + payloadLength, p.headerLength + p.payloadLegth, p.headerLength, p.payloadLegth);
return processed;
}
- private MessageInfo buildInfo(RtmpPacket p) {
+ private static MessageInfo buildInfo(RtmpPacket p, int chunkOffset) {
MessageInfo mi = new MessageInfo(p.sid, p.messageType, (int)p.messageSize);
- mi.offset = (int)p.offset;
- mi.length = (int)p.messageSize;
+ mi.offset = (int)p.messageOffset + chunkOffset;
mi.calculatedTimestamp = p.absoluteTimestamp;
return mi;
}
diff --git a/src/main/java/org/czentral/minirtmp/RtmpPacket.java b/src/main/java/org/czentral/minirtmp/RtmpPacket.java
index fb146ea..53c150e 100644
--- a/src/main/java/org/czentral/minirtmp/RtmpPacket.java
+++ b/src/main/java/org/czentral/minirtmp/RtmpPacket.java
@@ -31,8 +31,7 @@ public class RtmpPacket {
long messageStreamId;
long absoluteTimestamp;
- long offset;
- long payloadLegth;
+ long messageOffset;
long headerLength;
protected RtmpPacket() {
@@ -76,31 +75,6 @@ public boolean hasExtendedTimestamp() {
return packetTimestamp == 0xffffff;
}
- private RtmpPacket(RtmpPacket other) {
- this.chunkType = other.chunkType;
- this.sid = other.sid;
- this.packetTimestamp = other.packetTimestamp;
- this.messageSize = other.messageSize;
- this.messageType = other.messageType;
- this.messageStreamId = other.messageStreamId;
- this.absoluteTimestamp = other.absoluteTimestamp;
- this.offset = other.offset;
- this.payloadLegth = other.payloadLegth;
- this.headerLength = other.headerLength;
- }
-
- /**
- * Accounts part of the packet being processed by creating a new packet with modified offset and payloadLength.
- * @param byteCount
- * @return
- */
- public RtmpPacket transposed(int byteCount) {
- RtmpPacket result = new RtmpPacket(this);
- result.offset += byteCount;
- result.payloadLegth -= byteCount;
- return result;
- }
-
@Override
public String toString() {
return "RtmpPacket{" +
@@ -111,8 +85,8 @@ public String toString() {
", messageType=" + messageType +
", messageStreamId=" + messageStreamId +
", absoluteTimestamp=" + absoluteTimestamp +
- ", offset=" + offset +
- ", payloadLegth=" + payloadLegth +
+ ", messageOffset=" + messageOffset +
+ ", headerLength=" + headerLength +
'}';
}
}
diff --git a/src/main/java/org/czentral/minirtmp/RtmpReader.java b/src/main/java/org/czentral/minirtmp/RtmpReader.java
index 3cafb11..c2db3ac 100644
--- a/src/main/java/org/czentral/minirtmp/RtmpReader.java
+++ b/src/main/java/org/czentral/minirtmp/RtmpReader.java
@@ -42,7 +42,11 @@ public Optional read(ByteBuffer buffer) throws RtmpException {
RtmpPacket lastPacket = lastPackets.getOrDefault(p.sid, null);
boolean newMessage = lastPacket == null ||
- lastPacket.offset + lastPacket.payloadLegth == lastPacket.messageSize;
+ lastPacket.messageOffset + chunkSize >= lastPacket.messageSize;
+
+ if (!newMessage) {
+ p.messageOffset = lastPacket.messageOffset + chunkSize;
+ }
if (lastPacket != null) {
p.absoluteTimestamp = lastPacket.absoluteTimestamp;
@@ -83,12 +87,6 @@ public Optional read(ByteBuffer buffer) throws RtmpException {
}
p.headerLength = buffer.position() - originalOffset;
- if (!newMessage) {
- p.offset = lastPacket.offset + lastPacket.payloadLegth;
- p.payloadLegth = Math.min(chunkSize, lastPacket.messageSize - p.offset);
- } else {
- p.payloadLegth = Math.min(chunkSize, p.messageSize);
- }
lastPackets.put(p.sid, p);
@@ -98,4 +96,8 @@ public Optional read(ByteBuffer buffer) throws RtmpException {
public void setChunkSize(long chunkSize) {
this.chunkSize = chunkSize;
}
+
+ public long getChunkSize() {
+ return chunkSize;
+ }
}
diff --git a/src/test/java/org/czentral/minirtmp/RTMPStreamProcessorTest.java b/src/test/java/org/czentral/minirtmp/RTMPStreamProcessorTest.java
new file mode 100644
index 0000000..9203501
--- /dev/null
+++ b/src/test/java/org/czentral/minirtmp/RTMPStreamProcessorTest.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2020 Bence Varga
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
+ * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
+ * Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
+ * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS
+ * OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+ * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.czentral.minirtmp;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+public class RTMPStreamProcessorTest {
+
+ @Test
+ public void process() throws Exception {
+ ResourceLimit limit = new ResourceLimit();
+ limit.assemblyBufferCount = 100;
+ limit.assemblyBufferSize = 200 * 1024;
+ limit.chunkStreamCount = 100;
+
+ String pathname = "rtmp-in.pcap";
+ byte[] buffer = loadFile(pathname);
+ int start = 0xc5f;
+ int end = 0x16c7 + 0x28;
+
+ for (int i=start; i