Skip to content

Commit

Permalink
Image API
Browse files Browse the repository at this point in the history
  • Loading branch information
mjh1 committed Feb 15, 2024
1 parent 452977f commit 4e5f3e9
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 1 deletion.
6 changes: 6 additions & 0 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal b
for _, path := range [...]string{"/asset/hls/:playbackID/*file", "/webrtc/:playbackID"} {
router.OPTIONS(path, playback)
}
image := middleware.LogAndMetrics(metrics.Metrics.ImageRequestDurationSec)(
withCORS(
handlers.NewImageHandler(cli.PublicBucketURLs).Handle,
),
)
router.GET("/asset/image/:playbackID/thumbnail.jpg", image)

// Handling incoming playback redirection requests
redirectHandler := withLogging(withCORS(geoHandlers.RedirectHandler()))
Expand Down
1 change: 1 addition & 0 deletions config/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Cli struct {
APIServer string
SourceOutput string
PrivateBucketURLs []*url.URL
PublicBucketURLs []*url.URL
ExternalTranscoder string
VodPipelineStrategy string
MetricsDBConnectionString string
Expand Down
187 changes: 187 additions & 0 deletions handlers/image.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package handlers

import (
"bytes"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strconv"

"github.com/grafov/m3u8"
"github.com/julienschmidt/httprouter"
caterrs "github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/log"
"github.com/livepeer/catalyst-api/playback"
ffmpeg "github.com/u2takey/ffmpeg-go"
)

type ImageHandler struct {
PublicBucketURLs []*url.URL
}

func NewImageHandler(urls []*url.URL) *ImageHandler {
return &ImageHandler{
PublicBucketURLs: urls,
}

Check warning on line 30 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L27-L30

Added lines #L27 - L30 were not covered by tests
}

func (p *ImageHandler) Handle(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
timeString := req.URL.Query().Get("time")
time, err := strconv.ParseFloat(timeString, 64)
if err != nil {
log.LogNoRequestID("image API error", "err", err)
caterrs.WriteHTTPBadRequest(w, "failed to parse time", nil)
return
}
width, err := parseResolution(req, "width", 320)
if err != nil {
log.LogNoRequestID("image API error", "err", err)
caterrs.WriteHTTPBadRequest(w, "failed to parse width", nil)
return
}

Check warning on line 46 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L43-L46

Added lines #L43 - L46 were not covered by tests
height, err := parseResolution(req, "height", 240)
if err != nil {
log.LogNoRequestID("image API error", "err", err)
caterrs.WriteHTTPBadRequest(w, "failed to parse height", nil)
return
}

Check warning on line 52 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L49-L52

Added lines #L49 - L52 were not covered by tests

playbackID := params.ByName("playbackID")
if playbackID == "" {
caterrs.WriteHTTPBadRequest(w, "playbackID was empty", nil)
return
}

Check warning on line 58 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L56-L58

Added lines #L56 - L58 were not covered by tests

err = p.handle(w, playbackID, time, width, height)
if err != nil {
log.LogNoRequestID("image API error", "err", err)
switch {
case errors.Is(err, caterrs.ObjectNotFoundError):
caterrs.WriteHTTPNotFound(w, "not found", nil)
default:
caterrs.WriteHTTPInternalServerError(w, "internal server error", nil)

Check warning on line 67 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L66-L67

Added lines #L66 - L67 were not covered by tests
}
}
}

func parseResolution(req *http.Request, key string, defaultVal int64) (int64, error) {
val := req.URL.Query().Get(key)
if val == "" {
return defaultVal, nil
}
return strconv.ParseInt(val, 10, 32)

Check warning on line 77 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L77

Added line #L77 was not covered by tests
}

func (p *ImageHandler) handle(w http.ResponseWriter, playbackID string, time float64, width int64, height int64) error {
var (
err error
segmentFile string
dir = playbackID
)

fileInfoReader, err := playback.OsFetch(p.PublicBucketURLs, dir, "index.m3u8", "")
if err != nil {
return fmt.Errorf("failed to read manifest: %w", err)
}

// download master playlist
manifest, _, err := m3u8.DecodeFrom(fileInfoReader.Body, true)
if err != nil {
return fmt.Errorf("failed to decode manifest: %w", err)
}

Check warning on line 96 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L95-L96

Added lines #L95 - L96 were not covered by tests
masterPlaylist, ok := manifest.(*m3u8.MasterPlaylist)
if !ok || masterPlaylist == nil {
return fmt.Errorf("failed to parse playlist as MasterPlaylist")
}

Check warning on line 100 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L99-L100

Added lines #L99 - L100 were not covered by tests
if len(masterPlaylist.Variants) < 1 {
return fmt.Errorf("no renditions found")
}

Check warning on line 103 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L102-L103

Added lines #L102 - L103 were not covered by tests

renditionUri := masterPlaylist.Variants[0].URI
fileInfoReader, err = playback.OsFetch(p.PublicBucketURLs, dir, renditionUri, "")
if err != nil {
return fmt.Errorf("failed to read manifest: %w", err)
}

Check warning on line 109 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L108-L109

Added lines #L108 - L109 were not covered by tests
dir = filepath.Join(dir, filepath.Dir(renditionUri))
manifest, _, err = m3u8.DecodeFrom(fileInfoReader.Body, true)
if err != nil {
return fmt.Errorf("failed to decode manifest: %w", err)
}

Check warning on line 114 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L113-L114

Added lines #L113 - L114 were not covered by tests
mediaPlaylist, ok := manifest.(*m3u8.MediaPlaylist)
if !ok || mediaPlaylist == nil {
return fmt.Errorf("failed to parse playlist as MediaPlaylist")
}

Check warning on line 118 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L117-L118

Added lines #L117 - L118 were not covered by tests

// find the segment required
currentTime := 0.0
extractTime := 0.0
for _, segment := range mediaPlaylist.GetAllSegments() {
currentTime += segment.Duration
if currentTime > time {
segmentFile = segment.URI
extractTime = time - currentTime + segment.Duration
break
}
}
if segmentFile == "" {
return fmt.Errorf("playbackID media not found: %w", caterrs.ObjectNotFoundError)
}

tmpDir, err := os.MkdirTemp(os.TempDir(), "image-api-*")
if err != nil {
return fmt.Errorf("temp file creation failed: %w", err)
}

Check warning on line 138 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L137-L138

Added lines #L137 - L138 were not covered by tests
defer os.RemoveAll(tmpDir)

// download the segment
fileInfoReader, err = playback.OsFetch(p.PublicBucketURLs, dir, segmentFile, "")
if err != nil {
return fmt.Errorf("failed to get media: %w", err)
}

Check warning on line 145 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L144-L145

Added lines #L144 - L145 were not covered by tests
segBytes, err := io.ReadAll(fileInfoReader.Body)
if err != nil {
return fmt.Errorf("failed to get bytes: %w", err)
}

Check warning on line 149 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L148-L149

Added lines #L148 - L149 were not covered by tests

inputFile := path.Join(tmpDir, "in.ts")
if err = os.WriteFile(inputFile, segBytes, 0644); err != nil {
return fmt.Errorf("failed to write input file: %w", err)
}

Check warning on line 154 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L153-L154

Added lines #L153 - L154 were not covered by tests
outputFile := path.Join(tmpDir, "out.jpg")

var ffmpegErr bytes.Buffer
err = ffmpeg.
Input(inputFile).
Output(
outputFile,
ffmpeg.KwArgs{
"ss": fmt.Sprintf("00:00:%d", int64(extractTime)),
"vframes": "1",
"vf": fmt.Sprintf("scale=%d:%d:force_original_aspect_ratio=decrease", width, height),
},
).OverWriteOutput().WithErrorOutput(&ffmpegErr).Run()
if err != nil {
return fmt.Errorf("ffmpeg failed [%s]: %w", ffmpegErr.String(), err)
}

Check warning on line 170 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L169-L170

Added lines #L169 - L170 were not covered by tests

bs, err := os.ReadFile(outputFile)
if err != nil {
return err
}

Check warning on line 175 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L174-L175

Added lines #L174 - L175 were not covered by tests

w.Header().Set("content-type", "image/jpg")
w.Header().Set("content-length", strconv.Itoa(len(bs)))
w.WriteHeader(http.StatusOK)
count, err := w.Write(bs)
if err != nil {
log.LogNoRequestID("image handler failed to write response", "err", err)

Check warning on line 182 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L182

Added line #L182 was not covered by tests
} else {
log.LogNoRequestID("image handler wrote response", "count", count)
}
return nil
}
98 changes: 98 additions & 0 deletions handlers/image_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package handlers

import (
"context"
"io"
"log"
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"

"github.com/julienschmidt/httprouter"
"github.com/stretchr/testify/require"
"gopkg.in/vansante/go-ffprobe.v2"
)

func TestImageHandler_Handle(t *testing.T) {
wd, err := os.Getwd()
require.NoError(t, err)
handler := &ImageHandler{
PublicBucketURLs: []*url.URL{{Scheme: "file", Path: wd + "/../test"}},
}
tests := []struct {
name string
time string
playbackID string
expectedStatus int
}{
{
name: "first segment",
time: "5",
},
{
name: "second segment",
time: "21",
},
{
name: "final segment",
time: "29",
},
{
name: "out of bounds",
time: "30",
expectedStatus: http.StatusNotFound,
},
{
name: "invalid time",
time: "",
expectedStatus: http.StatusBadRequest,
},
{
name: "playbackID not found",
time: "29",
playbackID: "foo",
expectedStatus: http.StatusNotFound,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := httptest.NewRecorder()
req, err := http.NewRequest(http.MethodGet, "?time="+tt.time, nil)
require.NoError(t, err)

if tt.playbackID == "" {
tt.playbackID = "fixtures" // just use the fixtures directory for testing
}
handler.Handle(w, req, []httprouter.Param{{
Key: "playbackID",
Value: tt.playbackID,
}})
resp := w.Result()
if tt.expectedStatus == 0 {
tt.expectedStatus = 200
}
require.Equal(t, tt.expectedStatus, resp.StatusCode)

if tt.expectedStatus != 200 {
return
}
respBytes, err := io.ReadAll(resp.Body)
require.NoError(t, err)

outfile, err := os.CreateTemp(os.TempDir(), "out*.jpg")
require.NoError(t, err)
defer os.Remove(outfile.Name())
_, err = outfile.Write(respBytes)
require.NoError(t, err)
log.Println(outfile.Name())
probeData, err := ffprobe.ProbeURL(context.Background(), outfile.Name())
require.NoError(t, err)
require.Equal(t, "image2", probeData.Format.FormatName)
require.Len(t, probeData.Streams, 1)
require.Greater(t, probeData.Streams[0].Width, 0)
require.Greater(t, probeData.Streams[0].Height, 0)
})
}
}
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func main() {
fs.StringVar(&cli.APIToken, "api-token", "IAmAuthorized", "Auth header value for API access")
fs.StringVar(&cli.SourceOutput, "source-output", "", "URL for the video source segments used if source_segments is not defined in the upload request")
config.URLSliceVarFlag(fs, &cli.PrivateBucketURLs, "private-bucket", "", "URL for the private media bucket")
config.URLSliceVarFlag(fs, &cli.PublicBucketURLs, "public-bucket", "", "URL for the public media bucket")
fs.StringVar(&cli.ExternalTranscoder, "external-transcoder", "", "URL for the external transcoder to be used by the pipeline coordinator. Only 1 implementation today for AWS MediaConvert which should be in the format: mediaconvert://key-id:key-secret@endpoint-host?region=aws-region&role=iam-role&s3_aux_bucket=s3://bucket")
fs.StringVar(&cli.VodPipelineStrategy, "vod-pipeline-strategy", string(pipeline.StrategyCatalystFfmpegDominance), "Which strategy to use for the VOD pipeline")
fs.StringVar(&cli.MetricsDBConnectionString, "metrics-db-connection-string", "", "Connection string to use for the metrics Postgres DB. Takes the form: host=X port=X user=X password=X dbname=X")
Expand Down
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type CatalystAPIMetrics struct {
SerfEventBufferSize prometheus.Gauge
AccessControlRequestCount *prometheus.CounterVec
AccessControlRequestDurationSec *prometheus.SummaryVec
ImageRequestDurationSec *prometheus.SummaryVec // TODO

JobsInFlight prometheus.Gauge
HTTPRequestsInFlight prometheus.Gauge
Expand Down
7 changes: 6 additions & 1 deletion playback/playback.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net/url"
"path/filepath"
"strings"

"github.com/aws/aws-sdk-go/aws/awserr"
Expand Down Expand Up @@ -103,6 +104,10 @@ func appendAccessKey(uri, gatingParam, gatingParamName string) (string, error) {
}

func osFetch(buckets []*url.URL, playbackID, file, byteRange string) (*drivers.FileInfoReader, error) {
return OsFetch(buckets, filepath.Join("hls", playbackID), file, byteRange)
}

func OsFetch(buckets []*url.URL, playbackID, file, byteRange string) (*drivers.FileInfoReader, error) {
if len(buckets) < 1 {
return nil, errors.New("playback failed, no private buckets configured")
}
Expand All @@ -113,7 +118,7 @@ func osFetch(buckets []*url.URL, playbackID, file, byteRange string) (*drivers.F
)
// try all private buckets until object is found or return error
for _, bucket := range buckets {
osURL := bucket.JoinPath("hls").JoinPath(playbackID).JoinPath(file)
osURL := bucket.JoinPath(playbackID).JoinPath(file)
f, err = clients.GetOSURL(osURL.String(), byteRange)
if err == nil {
// object found successfully so return early
Expand Down
4 changes: 4 additions & 0 deletions test/fixtures/index.m3u8
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-STREAM-INF:PROGRAM-ID=0,BANDWIDTH=1036798,RESOLUTION=1280x720,NAME="0-720p0"
tiny.m3u8

0 comments on commit 4e5f3e9

Please sign in to comment.