/
MiniRTMP.java
104 lines (84 loc) · 3.66 KB
/
MiniRTMP.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/*
* Copyright 2018 Bence Varga
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS
* OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.czentral.minirtmp;
import java.io.*;
import java.net.*;
import org.czentral.util.stream.*;
public class MiniRTMP implements Runnable {
protected int port;
protected InetAddress address;
protected ApplicationLibrary factory;
public MiniRTMP(int port, InetAddress address, ApplicationLibrary factory) {
this.port = port;
this.address = address;
this.factory = factory;
}
public MiniRTMP(int port, ApplicationLibrary factory) {
this(port, null, factory);
}
public void start() {
new Thread(this).start();
}
public void run() {
ServerSocket sock = null;
try {
sock = new ServerSocket(port, 50, address);
} catch (IOException e) {
throw new RuntimeException("MiniRTMP can not listen on given port: " + port);
}
try {
while (true) {
Socket clientSock = sock.accept();
new Worker(clientSock).start();
}
} catch (IOException e) {
throw new RuntimeException("Error accepting connection on port: " + port);
}
}
class Worker extends Thread {
private Socket sock;
public Worker(Socket sock) {
this.sock = sock;
}
public void run() {
InputStream is;
OutputStream os;
try {
is = sock.getInputStream();
os = sock.getOutputStream();
} catch (IOException e) {
throw new RuntimeException("Error opening streams");
}
ResourceLimit limit = new ResourceLimit();
limit.chunkStreamCount = 8;
limit.assemblyBufferCount = 2;
limit.assemblyBufferSize = 4096;
Feeder feeder = new Feeder(new Buffer(262144), is);
HandshakeProcessor handshake = new HandshakeProcessor(os);
feeder.feedTo(handshake);
String clientId = sock.getRemoteSocketAddress().toString();
ApplicationContext context = new ApplicationContext(os, limit, factory, clientId);
feeder.feedTo(new RTMPStreamProcessor(limit, context));
try {
is.close();
os.close();
} catch (IOException e) {
throw new RuntimeException("Error closing streams.");
}
}
}
}