Skip to content

Commit

Permalink
change queues
Browse files Browse the repository at this point in the history
  • Loading branch information
pedro committed Apr 28, 2017
1 parent bedec9a commit 3c34d6e
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 52 deletions.
1 change: 1 addition & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions builder/src/main/java/com/pedro/builder/RtmpBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ public void startStream(String url) {
}

public void stopStream() {
srsFlvMuxer.stop();

This comment has been minimized.

Copy link
@avinash7074

avinash7074 May 9, 2017

I think this changes has not been integrated within gradle project

cameraManager.stop();
microphoneManager.stop();
srsFlvMuxer.stop();
videoEncoder.stop();
audioEncoder.stop();
streaming = false;
Expand Down Expand Up @@ -165,7 +165,6 @@ public void getAccData(ByteBuffer accBuffer, MediaCodec.BufferInfo info) {
@Override
public void onSPSandPPS(ByteBuffer sps, ByteBuffer pps) {
srsFlvMuxer.setSpsPPs(sps, pps);
//used on rtsp
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ public void start() {
}

public void stop() {
running = false;
if (audioEncoder != null) {
audioEncoder.stop();
audioEncoder.release();
audioEncoder = null;
}
running = false;
Log.i(TAG, "AudioEncoder stopped");
}

Expand Down
42 changes: 23 additions & 19 deletions encoder/src/main/java/com/pedro/encoder/video/VideoEncoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
import com.pedro.encoder.utils.YUVUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
* Created by pedro on 19/01/17.
Expand All @@ -40,9 +41,8 @@ public class VideoEncoder implements GetCameraData {
//surface to buffer encoder
private Surface inputSurface;
//buffer to buffer
private ConcurrentLinkedQueue<byte[]> queue = new ConcurrentLinkedQueue<>();
private BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>(20);
private int imageFormat = ImageFormat.NV21;
private final Object lockFrame = new Object();

//default parameters for encoder
private String codec = "video/avc";
Expand Down Expand Up @@ -195,10 +195,10 @@ public void start() {
public void run() {
android.os.Process.setThreadPriority(Process.THREAD_PRIORITY_MORE_FAVORABLE);
while (!Thread.interrupted()) {
while (!queue.isEmpty()) {
try {
byte[] b = queue.take();
byte[] i420;
byte[] b = queue.poll();
if(b == null) continue;
if (b == null) continue;
if (imageFormat == ImageFormat.NV21) {
i420 = (sendBlackImage) ? blackImage
: YUVUtil.NV21toYUV420byColor(b, width, height, formatVideoEncoder);
Expand All @@ -215,11 +215,8 @@ public void run() {
} else {
getDataFromEncoder(i420);
}
}
synchronized (lockFrame){
try {
lockFrame.wait(500);
} catch (InterruptedException ignored) {}
} catch (InterruptedException e) {
thread.interrupt();
}
}
}
Expand All @@ -234,6 +231,11 @@ public void stop() {
queue.clear();
if (thread != null) {
thread.interrupt();
try {
thread.join();
} catch (InterruptedException e) {
thread.interrupt();
}
thread = null;
}
if (videoEncoder != null) {
Expand All @@ -247,22 +249,24 @@ public void stop() {
@Override
public void inputYv12Data(byte[] buffer) {
if (running) {
queue.add(buffer);
synchronized (lockFrame){
lockFrame.notifyAll();
try {
queue.add(buffer);
Log.i(TAG, "send data yv12");
} catch (IllegalStateException e) {
Log.e(TAG, "frame discarded, cant add more frames: ", e);
}
Log.i(TAG, "send data yv12");
}
}

@Override
public void inputNv21Data(byte[] buffer) {
if (running) {
queue.add(buffer);
synchronized (lockFrame){
lockFrame.notifyAll();
try {
queue.add(buffer);
Log.i(TAG, "send data nv21");
} catch (IllegalStateException e) {
Log.e(TAG, "frame discarded, cant add more frames: ", e);
}
Log.i(TAG, "send data nv21");
}
}

Expand Down
47 changes: 17 additions & 30 deletions rtmp/src/main/java/net/ossrs/rtmp/SrsFlvMuxer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -50,15 +51,14 @@ public class SrsFlvMuxer {
private DefaultRtmpPublisher publisher;

private Thread worker;
private final Object txFrameLock = new Object();

private SrsFlv flv = new SrsFlv();
private boolean needToFindKeyFrame = true;
private SrsFlvFrame mVideoSequenceHeader;
private SrsFlvFrame mAudioSequenceHeader;
private SrsAllocator mVideoAllocator = new SrsAllocator(VIDEO_ALLOC_SIZE);
private SrsAllocator mAudioAllocator = new SrsAllocator(AUDIO_ALLOC_SIZE);
private ConcurrentLinkedQueue<SrsFlvFrame> mFlvTagCache = new ConcurrentLinkedQueue<>();
private BlockingQueue<SrsFlvFrame> mFlvTagCache = new LinkedBlockingQueue<>(40);
private ConnectCheckerRtmp connectCheckerRtmp;
private static final String TAG = "SrsFlvMuxer";
private int asample_rate = 44100;
Expand All @@ -71,7 +71,7 @@ public SrsFlvMuxer(ConnectCheckerRtmp connectCheckerRtmp) {
publisher = new DefaultRtmpPublisher(connectCheckerRtmp);
}

public void setSpsPPs(ByteBuffer sps, ByteBuffer pps){
public void setSpsPPs(ByteBuffer sps, ByteBuffer pps) {
flv.setSpsPPs(sps, pps);
}

Expand Down Expand Up @@ -168,8 +168,8 @@ public void run() {
}
connectCheckerRtmp.onConnectionSuccessRtmp();
while (!Thread.interrupted()) {
if (!mFlvTagCache.isEmpty()) {
SrsFlvFrame frame = mFlvTagCache.poll();
try {
SrsFlvFrame frame = mFlvTagCache.take();
if (frame.is_sequenceHeader()) {
if (frame.is_video()) {
mVideoSequenceHeader = frame;
Expand All @@ -185,22 +185,9 @@ public void run() {
sendFlvTag(frame);
}
}
} else {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (InterruptedException e) {
worker.interrupt();
}
// Waiting for next frame
//synchronized (txFrameLock) {
// try {
// isEmpty() may take some time, so we set timeout to detect next frame
//txFrameLock.wait(500);
//} catch (InterruptedException ie) {
// worker.interrupt();
//}
//}
}
}
});
Expand All @@ -211,17 +198,16 @@ public void run() {
* stop the muxer, disconnect RTMP connection.
*/
public void stop() {
mFlvTagCache.clear();
if (worker != null) {
worker.interrupt();
try {
worker.join();
} catch (InterruptedException e) {
e.printStackTrace();
worker.interrupt();
}
worker = null;
}
mFlvTagCache.clear();
flv.reset();
needToFindKeyFrame = true;
Log.i(TAG, "SrsFlvMuxer closed");
Expand Down Expand Up @@ -813,7 +799,7 @@ public void writeVideoSample(final ByteBuffer bb, MediaCodec.BufferInfo bi) {
ipbs.clear();
}

public void setSpsPPs(ByteBuffer sps, ByteBuffer pps){
public void setSpsPPs(ByteBuffer sps, ByteBuffer pps) {
h264_sps_changed = true;
h264_sps = sps;
h264_pps_changed = true;
Expand Down Expand Up @@ -892,12 +878,13 @@ private void writeRtmpPacket(int type, int dts, int frame_type, int avc_aac_type
}

private void flvFrameCacheAdd(SrsFlvFrame frame) {
mFlvTagCache.add(frame);
if (frame.is_video()) {
getVideoFrameCacheNumber().incrementAndGet();
}
synchronized (txFrameLock) {
txFrameLock.notifyAll();
try {
mFlvTagCache.add(frame);
if (frame.is_video()) {
getVideoFrameCacheNumber().incrementAndGet();
}
} catch (IllegalStateException e){
Log.e(TAG, "frame discarded, cant add more frame: ", e);
}
}
}
Expand Down

0 comments on commit 3c34d6e

Please sign in to comment.