Skip to content

Commit

Permalink
Image API
Browse files Browse the repository at this point in the history
  • Loading branch information
mjh1 committed Feb 14, 2024
1 parent 1ba6bd7 commit fa7e07d
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 0 deletions.
6 changes: 6 additions & 0 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal b
for _, path := range [...]string{"/asset/hls/:playbackID/*file", "/webrtc/:playbackID"} {
router.OPTIONS(path, playback)
}
image := middleware.LogAndMetrics(metrics.Metrics.ImageRequestDurationSec)(
withCORS(
handlers.NewImageHandler(cli.PublicBucketURLs).Handle,
),
)
router.GET("/image/hls/:playbackID/:secs", image)

// Handling incoming playback redirection requests
redirectHandler := withLogging(withCORS(geoHandlers.RedirectHandler()))
Expand Down
1 change: 1 addition & 0 deletions config/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Cli struct {
APIServer string
SourceOutput string
PrivateBucketURLs []*url.URL
PublicBucketURLs []*url.URL
ExternalTranscoder string
VodPipelineStrategy string
MetricsDBConnectionString string
Expand Down
179 changes: 179 additions & 0 deletions handlers/image.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package handlers

import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path"
"strconv"
"strings"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/grafov/m3u8"
"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/clients"
caterrs "github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/log"
"github.com/livepeer/go-tools/drivers"
ffmpeg "github.com/u2takey/ffmpeg-go"
)

type ImageHandler struct {
PublicBucketURLs []*url.URL
}

func NewImageHandler(urls []*url.URL) *ImageHandler {
return &ImageHandler{
PublicBucketURLs: urls,
}

Check warning on line 33 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L30-L33

Added lines #L30 - L33 were not covered by tests
}

func (p *ImageHandler) Handle(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
timeString := req.URL.Query().Get("time")
time, err := strconv.ParseFloat(timeString, 64)
if err != nil {
caterrs.WriteHTTPBadRequest(w, "failed to parse time", nil)
return
}

playbackID := params.ByName("playbackID")
if playbackID == "" {
caterrs.WriteHTTPBadRequest(w, "playbackID was empty", nil)
return
}

Check warning on line 48 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L46-L48

Added lines #L46 - L48 were not covered by tests

err = p.handle(w, playbackID, time)
if err != nil {
log.LogNoRequestID("image API error", "err", err)
switch {
case errors.Is(err, caterrs.ObjectNotFoundError):
caterrs.WriteHTTPNotFound(w, "not found", nil)
default:
caterrs.WriteHTTPInternalServerError(w, "internal server error", nil)

Check warning on line 57 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L56-L57

Added lines #L56 - L57 were not covered by tests
}
}
}

func (p *ImageHandler) handle(w http.ResponseWriter, playbackID string, time float64) error {
var (
page drivers.PageInfo
err error
manifestFile string
segmentFile string
bucketLocation *url.URL
)
// list the contents of the playbackID folder in storage and check for an HLS manifest
for _, bucket := range p.PublicBucketURLs {
bucketLocation = bucket.JoinPath(playbackID, "/")
page, err = clients.ListOSURL(context.TODO(), bucketLocation.String())
if err == nil {
break
}
// if this is the final bucket in the list then the error set here will be used in the final return
var awsErr awserr.Error
if errors.As(err, &awsErr) && awsErr.Code() == s3.ErrCodeNoSuchKey ||
strings.Contains(err.Error(), "no such file") {
err = fmt.Errorf("invalid request: %w %w", caterrs.ObjectNotFoundError, err)
} else {
err = fmt.Errorf("failed to get file for playback: %w", err)
}

Check warning on line 84 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L83-L84

Added lines #L83 - L84 were not covered by tests
}
if err != nil {
return err
}
for _, file := range page.Files() {
if strings.HasSuffix(file.Name, "m3u8") {
manifestFile = file.Name
break
}
}
if manifestFile == "" {
return fmt.Errorf("playbackID not found: %w", caterrs.ObjectNotFoundError)
}

Check warning on line 97 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L96-L97

Added lines #L96 - L97 were not covered by tests

// find the segment required
fileInfoReader, err := clients.GetOSURL(bucketLocation.JoinPath(manifestFile).String(), "")
if err != nil {
return fmt.Errorf("failed to read manifest: %w", err)
}

Check warning on line 103 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L102-L103

Added lines #L102 - L103 were not covered by tests
manifest, _, err := m3u8.DecodeFrom(fileInfoReader.Body, true)
if err != nil {
return err
}

Check warning on line 107 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L106-L107

Added lines #L106 - L107 were not covered by tests
// TODO check if master playlist

mediaPlaylist, ok := manifest.(*m3u8.MediaPlaylist)
if !ok || mediaPlaylist == nil {
return fmt.Errorf("failed to parse playlist as MediaPlaylist")
}

Check warning on line 113 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L112-L113

Added lines #L112 - L113 were not covered by tests
currentTime := 0.0
extractTime := 0.0
for _, segment := range mediaPlaylist.GetAllSegments() {
currentTime += segment.Duration
if currentTime > time {
segmentFile = segment.URI
extractTime = time - currentTime + segment.Duration
break
}
}
if segmentFile == "" {
return fmt.Errorf("playbackID media not found: %w", caterrs.ObjectNotFoundError)
}

tmpDir, err := os.MkdirTemp(os.TempDir(), "image-api-*")
if err != nil {
return fmt.Errorf("temp file creation failed: %w", err)
}

Check warning on line 131 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L130-L131

Added lines #L130 - L131 were not covered by tests
defer os.RemoveAll(tmpDir)

// download the segment
fileInfoReader, err = clients.GetOSURL(bucketLocation.JoinPath(segmentFile).String(), "")
if err != nil {
return fmt.Errorf("failed to get media: %w", err)
}

Check warning on line 138 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L137-L138

Added lines #L137 - L138 were not covered by tests
segBytes, err := io.ReadAll(fileInfoReader.Body)
if err != nil {
return fmt.Errorf("failed to get bytes: %w", err)
}

Check warning on line 142 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L141-L142

Added lines #L141 - L142 were not covered by tests

inputFile := path.Join(tmpDir, "in.ts")
if err = os.WriteFile(inputFile, segBytes, 0644); err != nil {
return fmt.Errorf("failed to write input file: %w", err)
}

Check warning on line 147 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L146-L147

Added lines #L146 - L147 were not covered by tests
outputFile := path.Join(tmpDir, "out.jpg")

// TODO disable ffmpeg verbose output
err = ffmpeg.
Input(inputFile).
Output(
outputFile,
ffmpeg.KwArgs{
"ss": fmt.Sprintf("00:00:%d", int64(extractTime)),
"vframes": "1",
// TODO change resolution to a queryparam
"vf": "scale=320:240:force_original_aspect_ratio=decrease",
},
).OverWriteOutput().ErrorToStdOut().Run()
if err != nil {
return fmt.Errorf("ffmpeg failed: %w", err)
}

Check warning on line 164 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L163-L164

Added lines #L163 - L164 were not covered by tests

bs, err := os.ReadFile(outputFile)
if err != nil {
return err
}

Check warning on line 169 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L168-L169

Added lines #L168 - L169 were not covered by tests

w.WriteHeader(http.StatusOK)
count, err := w.Write(bs)
if err != nil {
log.LogNoRequestID("image handler failed to write response", "err", err)

Check warning on line 174 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L174

Added line #L174 was not covered by tests
} else {
log.LogNoRequestID("image handler wrote response", "count", count)
}
return nil
}
98 changes: 98 additions & 0 deletions handlers/image_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package handlers

import (
"context"
"io"
"log"
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"

"github.com/julienschmidt/httprouter"
"github.com/stretchr/testify/require"
"gopkg.in/vansante/go-ffprobe.v2"
)

func TestImageHandler_Handle(t *testing.T) {
wd, err := os.Getwd()
require.NoError(t, err)
handler := &ImageHandler{
PublicBucketURLs: []*url.URL{{Scheme: "file", Path: wd + "/../test"}},
}
tests := []struct {
name string
time string
playbackID string
expectedStatus int
}{
{
name: "first segment",
time: "5",
},
{
name: "second segment",
time: "21",
},
{
name: "final segment",
time: "29",
},
{
name: "out of bounds",
time: "30",
expectedStatus: http.StatusNotFound,
},
{
name: "invalid time",
time: "",
expectedStatus: http.StatusBadRequest,
},
{
name: "playbackID not found",
time: "29",
playbackID: "foo",
expectedStatus: http.StatusNotFound,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := httptest.NewRecorder()
req, err := http.NewRequest(http.MethodGet, "?time="+tt.time, nil)
require.NoError(t, err)

if tt.playbackID == "" {
tt.playbackID = "fixtures" // just use the fixtures directory for testing
}
handler.Handle(w, req, []httprouter.Param{{
Key: "playbackID",
Value: tt.playbackID,
}})
resp := w.Result()
if tt.expectedStatus == 0 {
tt.expectedStatus = 200
}
require.Equal(t, tt.expectedStatus, resp.StatusCode)

if tt.expectedStatus != 200 {
return
}
respBytes, err := io.ReadAll(resp.Body)
require.NoError(t, err)

outfile, err := os.CreateTemp(os.TempDir(), "out*.jpg")
require.NoError(t, err)
defer os.Remove(outfile.Name())
_, err = outfile.Write(respBytes)
require.NoError(t, err)
log.Println(outfile.Name())
probeData, err := ffprobe.ProbeURL(context.Background(), outfile.Name())
require.NoError(t, err)
require.Equal(t, "image2", probeData.Format.FormatName)
require.Len(t, probeData.Streams, 1)
require.Greater(t, probeData.Streams[0].Width, 0)
require.Greater(t, probeData.Streams[0].Height, 0)
})
}
}
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func main() {
fs.StringVar(&cli.APIToken, "api-token", "IAmAuthorized", "Auth header value for API access")
fs.StringVar(&cli.SourceOutput, "source-output", "", "URL for the video source segments used if source_segments is not defined in the upload request")
config.URLSliceVarFlag(fs, &cli.PrivateBucketURLs, "private-bucket", "", "URL for the private media bucket")
config.URLSliceVarFlag(fs, &cli.PublicBucketURLs, "public-bucket", "", "URL for the public media bucket")
fs.StringVar(&cli.ExternalTranscoder, "external-transcoder", "", "URL for the external transcoder to be used by the pipeline coordinator. Only 1 implementation today for AWS MediaConvert which should be in the format: mediaconvert://key-id:key-secret@endpoint-host?region=aws-region&role=iam-role&s3_aux_bucket=s3://bucket")
fs.StringVar(&cli.VodPipelineStrategy, "vod-pipeline-strategy", string(pipeline.StrategyCatalystFfmpegDominance), "Which strategy to use for the VOD pipeline")
fs.StringVar(&cli.MetricsDBConnectionString, "metrics-db-connection-string", "", "Connection string to use for the metrics Postgres DB. Takes the form: host=X port=X user=X password=X dbname=X")
Expand Down
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type CatalystAPIMetrics struct {
SerfEventBufferSize prometheus.Gauge
AccessControlRequestCount *prometheus.CounterVec
AccessControlRequestDurationSec *prometheus.SummaryVec
ImageRequestDurationSec *prometheus.SummaryVec

JobsInFlight prometheus.Gauge
HTTPRequestsInFlight prometheus.Gauge
Expand Down

0 comments on commit fa7e07d

Please sign in to comment.