Skip to content

Commit

Permalink
Get start number from muxer
Browse files Browse the repository at this point in the history
  • Loading branch information
sr1990 committed May 12, 2020
1 parent dd75f36 commit 0f703bf
Show file tree
Hide file tree
Showing 69 changed files with 1,411 additions and 608 deletions.
8 changes: 8 additions & 0 deletions packager/file/callback_file.cc
Expand Up @@ -65,4 +65,12 @@ bool CallbackFile::Open() {
return ParseCallbackFileName(file_name(), &callback_params_, &name_);
}

bool CallbackFile::Rename(const std::string& new_file_name) {
if (rename(file_name().c_str(), new_file_name.c_str()) != 0) {
return false;
}
set_file_name(new_file_name);
return true;
}

} // namespace shaka
1 change: 1 addition & 0 deletions packager/file/callback_file.h
Expand Up @@ -28,6 +28,7 @@ class CallbackFile : public File {
bool Flush() override;
bool Seek(uint64_t position) override;
bool Tell(uint64_t* position) override;
bool Rename(const std::string& new_file_name) override;
/// @}

protected:
Expand Down
10 changes: 10 additions & 0 deletions packager/file/file.h
Expand Up @@ -93,6 +93,16 @@ class File {
/// off.
const std::string& file_name() const { return file_name_; }

/// Set the file name.
void set_file_name(const std::string& newFileName) {
file_name_ = newFileName;
}

/// Rename the file
/// @param new file name.
/// @return true on success, false otherwise.
virtual bool Rename(const std::string& new_file_name) = 0;

// ************************************************************
// * Static Methods: File-on-the-filesystem status
// ************************************************************
Expand Down
8 changes: 8 additions & 0 deletions packager/file/local_file.cc
Expand Up @@ -200,4 +200,12 @@ bool LocalFile::Delete(const char* file_name) {
return base::DeleteFile(base::FilePath::FromUTF8Unsafe(file_name), false);
}

bool LocalFile::Rename(const std::string& new_file_name) {
if (rename(file_name().c_str(), new_file_name.c_str()) != 0) {
return false;
}
set_file_name(new_file_name);
return true;
}

} // namespace shaka
1 change: 1 addition & 0 deletions packager/file/local_file.h
Expand Up @@ -33,6 +33,7 @@ class LocalFile : public File {
bool Flush() override;
bool Seek(uint64_t position) override;
bool Tell(uint64_t* position) override;
bool Rename(const std::string& new_file_name) override;
/// @}

/// Delete a local file.
Expand Down
7 changes: 7 additions & 0 deletions packager/file/memory_file.cc
Expand Up @@ -191,4 +191,11 @@ void MemoryFile::Delete(const std::string& file_name) {
FileSystem::Instance()->Delete(file_name);
}

bool MemoryFile::Rename(const std::string& new_file_name) {
if (rename(file_name().c_str(), new_file_name.c_str()) != 0) {
return false;
}
set_file_name(new_file_name);
return true;
}
} // namespace shaka
2 changes: 2 additions & 0 deletions packager/file/memory_file.h
Expand Up @@ -31,6 +31,8 @@ class MemoryFile : public File {
bool Flush() override;
bool Seek(uint64_t position) override;
bool Tell(uint64_t* position) override;
bool Rename(const std::string& new_file_name) override;
// { LOG(ERROR) << "NIKKI: In memory file"; return false;}
/// @}

/// Deletes all memory file data created. This assumes that there are no
Expand Down
8 changes: 8 additions & 0 deletions packager/file/threaded_io_file.cc
Expand Up @@ -183,6 +183,14 @@ void ThreadedIoFile::RunInInputMode() {
}
}

bool ThreadedIoFile::Rename(const std::string& new_file_name) {
if (rename(file_name().c_str(), new_file_name.c_str()) != 0) {
return false;
}
set_file_name(new_file_name);
return true;
}

void ThreadedIoFile::RunInOutputMode() {
DCHECK(internal_file_);
DCHECK_EQ(kOutputMode, mode_);
Expand Down
2 changes: 2 additions & 0 deletions packager/file/threaded_io_file.h
Expand Up @@ -9,6 +9,7 @@

#include <atomic>
#include <memory>

#include "packager/base/synchronization/waitable_event.h"
#include "packager/file/file.h"
#include "packager/file/file_closer.h"
Expand All @@ -35,6 +36,7 @@ class ThreadedIoFile : public File {
bool Flush() override;
bool Seek(uint64_t position) override;
bool Tell(uint64_t* position) override;
bool Rename(const std::string& new_file_name) override;
/// @}

protected:
Expand Down
21 changes: 12 additions & 9 deletions packager/file/udp_file.cc
Expand Up @@ -27,7 +27,7 @@
// IP_MULTICAST_ALL has been supported since kernel version 2.6.31 but we may be
// building on a machine that is older than that.
#ifndef IP_MULTICAST_ALL
#define IP_MULTICAST_ALL 49
#define IP_MULTICAST_ALL 49
#endif

#endif // defined(OS_WIN)
Expand Down Expand Up @@ -205,24 +205,20 @@ bool UdpFile::Open() {
struct ip_mreq_source source_multicast_group;

source_multicast_group.imr_multiaddr = local_in_addr;
if (inet_pton(AF_INET,
options->interface_address().c_str(),
if (inet_pton(AF_INET, options->interface_address().c_str(),
&source_multicast_group.imr_interface) != 1) {
LOG(ERROR) << "Malformed IPv4 interface address "
<< options->interface_address();
return false;
}
if (inet_pton(AF_INET,
options->source_address().c_str(),
if (inet_pton(AF_INET, options->source_address().c_str(),
&source_multicast_group.imr_sourceaddr) != 1) {
LOG(ERROR) << "Malformed IPv4 source specific multicast address "
<< options->source_address();
return false;
}

if (setsockopt(new_socket.get(),
IPPROTO_IP,
IP_ADD_SOURCE_MEMBERSHIP,
if (setsockopt(new_socket.get(), IPPROTO_IP, IP_ADD_SOURCE_MEMBERSHIP,
reinterpret_cast<const char*>(&source_multicast_group),
sizeof(source_multicast_group)) < 0) {
LOG(ERROR) << "Failed to join multicast group, error = "
Expand All @@ -249,7 +245,7 @@ bool UdpFile::Open() {
<< GetSocketErrorCode();
return false;
}
}
}

#if defined(__linux__)
// Disable IP_MULTICAST_ALL to avoid interference caused when two sockets
Expand Down Expand Up @@ -294,4 +290,11 @@ bool UdpFile::Open() {
return true;
}

bool UdpFile::Rename(const std::string& new_file_name) {
if (rename(file_name().c_str(), new_file_name.c_str()) != 0) {
return false;
}
set_file_name(new_file_name);
return true;
}
} // namespace shaka
1 change: 1 addition & 0 deletions packager/file/udp_file.h
Expand Up @@ -38,6 +38,7 @@ class UdpFile : public File {
bool Flush() override;
bool Seek(uint64_t position) override;
bool Tell(uint64_t* position) override;
bool Rename(const std::string& new_file_name) override;
/// @}

protected:
Expand Down
1 change: 1 addition & 0 deletions packager/media/base/media_handler.h
Expand Up @@ -57,6 +57,7 @@ struct SegmentInfo {
bool is_encrypted = false;
int64_t start_timestamp = -1;
int64_t duration = 0;
int64_t segment_index = 0;
// This is only available if key rotation is enabled. Note that we may have
// a |key_rotation_encryption_config| even if the segment is not encrypted,
// which is the case for clear lead.
Expand Down
1 change: 1 addition & 0 deletions packager/media/base/media_handler_test_base.cc
Expand Up @@ -249,6 +249,7 @@ std::unique_ptr<SegmentInfo> MediaHandlerTestBase::GetSegmentInfo(
info->start_timestamp = start_timestamp;
info->duration = duration;
info->is_subsegment = is_subsegment;
info->segment_index = start_timestamp / duration + 1;

return info;
}
Expand Down
13 changes: 12 additions & 1 deletion packager/media/chunking/chunking_handler.cc
Expand Up @@ -26,6 +26,10 @@ bool IsNewSegmentIndex(int64_t new_index, int64_t current_index) {
new_index != current_index - 1;
}

bool isGreaterSegmentIndex(int64_t new_index, int64_t current_index) {
return new_index > current_index;
}

} // namespace

ChunkingHandler::ChunkingHandler(const ChunkingParams& chunking_params)
Expand Down Expand Up @@ -60,6 +64,7 @@ Status ChunkingHandler::Process(std::unique_ptr<StreamData> stream_data) {
}

Status ChunkingHandler::OnFlushRequest(size_t input_stream_index) {
set_segment_index_++;
RETURN_IF_ERROR(EndSegmentIfStarted());
return FlushDownstream(kStreamIndex);
}
Expand All @@ -74,6 +79,7 @@ Status ChunkingHandler::OnStreamInfo(std::shared_ptr<const StreamInfo> info) {
}

Status ChunkingHandler::OnCueEvent(std::shared_ptr<const CueEvent> event) {
set_segment_index_++;
RETURN_IF_ERROR(EndSegmentIfStarted());
const double event_time_in_seconds = event->time_in_seconds;
RETURN_IF_ERROR(DispatchCueEvent(kStreamIndex, std::move(event)));
Expand All @@ -89,7 +95,6 @@ Status ChunkingHandler::OnCueEvent(std::shared_ptr<const CueEvent> event) {
Status ChunkingHandler::OnMediaSample(
std::shared_ptr<const MediaSample> sample) {
DCHECK_NE(time_scale_, 0u) << "kStreamInfo should arrive before kMediaSample";

const int64_t timestamp = sample->pts();

bool started_new_segment = false;
Expand All @@ -101,6 +106,11 @@ Status ChunkingHandler::OnMediaSample(
: (timestamp - cue_offset_) / segment_duration_;
if (!segment_start_time_ ||
IsNewSegmentIndex(segment_index, current_segment_index_)) {
if (!isGreaterSegmentIndex(segment_index, current_segment_index_)) {
set_segment_index_ = current_segment_index_ + 1;
} else {
set_segment_index_ = segment_index;
}
current_segment_index_ = segment_index;
// Reset subsegment index.
current_subsegment_index_ = 0;
Expand Down Expand Up @@ -151,6 +161,7 @@ Status ChunkingHandler::EndSegmentIfStarted() const {
auto segment_info = std::make_shared<SegmentInfo>();
segment_info->start_timestamp = segment_start_time_.value();
segment_info->duration = max_segment_time_ - segment_start_time_.value();
segment_info->segment_index = set_segment_index_;
return DispatchSegmentInfo(kStreamIndex, std::move(segment_info));
}

Expand Down
2 changes: 1 addition & 1 deletion packager/media/chunking/chunking_handler.h
Expand Up @@ -72,7 +72,7 @@ class ChunkingHandler : public MediaHandler {
// Segment and subsegment duration in stream's time scale.
int64_t segment_duration_ = 0;
int64_t subsegment_duration_ = 0;

int64_t set_segment_index_ = 0;
// Current segment index, useful to determine where to do chunking.
int64_t current_segment_index_ = -1;
// Current subsegment index, useful to determine where to do chunking.
Expand Down
4 changes: 4 additions & 0 deletions packager/media/chunking/text_chunker.cc
Expand Up @@ -35,6 +35,7 @@ Status TextChunker::OnFlushRequest(size_t input_stream_index) {
// Keep outputting segments until all the samples leave the system. Calling
// |DispatchSegment| will remove samples over time.
while (samples_in_current_segment_.size()) {
segment_index_++;
RETURN_IF_ERROR(DispatchSegment(segment_duration_));
}

Expand Down Expand Up @@ -107,6 +108,9 @@ Status TextChunker::DispatchSegment(int64_t duration) {
std::shared_ptr<SegmentInfo> info = std::make_shared<SegmentInfo>();
info->start_timestamp = segment_start_;
info->duration = duration;
segment_index_ =
std::max(((segment_start_ / segment_duration_) + 1), segment_index_);
info->segment_index = segment_index_;
RETURN_IF_ERROR(DispatchSegmentInfo(kStreamIndex, std::move(info)));

// Move onto the next segment.
Expand Down
1 change: 1 addition & 0 deletions packager/media/chunking/text_chunker.h
Expand Up @@ -52,6 +52,7 @@ class TextChunker : public MediaHandler {
int64_t segment_start_ = -1; // Set when the first sample comes in.
int64_t segment_duration_ = -1; // Set in OnStreamInfo.

int64_t segment_index_ = 0;
// All samples that make up the current segment. We must store the samples
// until the segment ends because a cue event may end the segment sooner
// than we expected.
Expand Down
6 changes: 4 additions & 2 deletions packager/media/event/combined_muxer_listener.cc
Expand Up @@ -59,9 +59,11 @@ void CombinedMuxerListener::OnMediaEnd(const MediaRanges& media_ranges,
void CombinedMuxerListener::OnNewSegment(const std::string& file_name,
int64_t start_time,
int64_t duration,
uint64_t segment_file_size) {
uint64_t segment_file_size,
uint64_t segment_index) {
for (auto& listener : muxer_listeners_) {
listener->OnNewSegment(file_name, start_time, duration, segment_file_size);
listener->OnNewSegment(file_name, start_time, duration, segment_file_size,
segment_index);
}
}

Expand Down
3 changes: 2 additions & 1 deletion packager/media/event/combined_muxer_listener.h
Expand Up @@ -42,7 +42,8 @@ class CombinedMuxerListener : public MuxerListener {
void OnNewSegment(const std::string& file_name,
int64_t start_time,
int64_t duration,
uint64_t segment_file_size) override;
uint64_t segment_file_size,
uint64_t index_segment) override;
void OnKeyFrame(int64_t timestamp, uint64_t start_byte_offset, uint64_t size);
void OnCueEvent(int64_t timestamp, const std::string& cue_data) override;
/// @}
Expand Down
4 changes: 3 additions & 1 deletion packager/media/event/hls_notify_muxer_listener.cc
Expand Up @@ -7,6 +7,7 @@
#include "packager/media/event/hls_notify_muxer_listener.h"

#include <memory>

#include "packager/base/logging.h"
#include "packager/hls/base/hls_notifier.h"
#include "packager/media/base/muxer_options.h"
Expand Down Expand Up @@ -228,7 +229,8 @@ void HlsNotifyMuxerListener::OnMediaEnd(const MediaRanges& media_ranges,
void HlsNotifyMuxerListener::OnNewSegment(const std::string& file_name,
int64_t start_time,
int64_t duration,
uint64_t segment_file_size) {
uint64_t segment_file_size,
uint64_t segment_index) {
if (!media_info_->has_segment_template()) {
EventInfo event_info;
event_info.type = EventInfoType::kSegment;
Expand Down
3 changes: 2 additions & 1 deletion packager/media/event/hls_notify_muxer_listener.h
Expand Up @@ -67,7 +67,8 @@ class HlsNotifyMuxerListener : public MuxerListener {
void OnNewSegment(const std::string& file_name,
int64_t start_time,
int64_t duration,
uint64_t segment_file_size) override;
uint64_t segment_file_size,
uint64_t segment_index) override;
void OnKeyFrame(int64_t timestamp, uint64_t start_byte_offset, uint64_t size);
void OnCueEvent(int64_t timestamp, const std::string& cue_data) override;
/// @}
Expand Down

0 comments on commit 0f703bf

Please sign in to comment.