Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Collaborative IPFS Cluster Tooling For Data Replication & Downloader Bug Fix #286

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions tools/downloader/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# downloader

`downloader` is a CLI tool that allows parsing over the all-locations csv file, and downloading all the videos referenced in the CSV file, and can concurrently download multiples videos. Because some of the videos have file names that are longer than the maximum permitted characters in a file path, the videos are not saved under their name, but instead using the videoID as determined by youtube-dl, along with a unique number. This information is then stored in a final CSV file which contains the video name, the link used to download video, as well as the unique number so you can easily determine what video belongs to what incident. Additionally it allows uploading the video data to an IPFS HTTP API endpoint
`downloader` is a CLI tool that allows parsing over the all-locations csv file, and downloading all the videos referenced in the CSV file, and can concurrently download multiples videos. Because some of the videos have file names that are longer than the maximum permitted characters in a file path, the videos are not saved under their name, but instead using a combination of their corresponding pb-id, the link number, and their extension. For posterity sake, there is a file called `name_mapping.csv` stored in the directory containing the video, which maps the name, link, pbid, and link number. It will not redownload any previously backed up data

The template for names of videos saved on disk is `[YOUTUBE-DL-VIDEO-ID].[UNIQUE-VIDEO-NUMBER].[EXTENSION]`, and the CSV file has the rows `name,link,unique_video_number`. So for example we have the following entry in the CSV file `Law enforcement gas a crowd chanting “we want peace” right after exiting the building.,https://twitter.com/courtenay_roche/status/1267653137969623040,1`, and two files we have downloaded:
The template for names of videos saved on disk is `[PB-ID].[LINK-NUMBER].[EXTENSION]`, and the CSV file has the rows `name,link,pbid,unique_video_number`. So for example we have the following entry in the CSV file `Law enforcement gas a crowd chanting “we want peace” right after exiting the building.,https://twitter.com/courtenay_roche/status/1267653137969623040,1`, and two files we have downloaded:

* `1267647898365427714.2.mp4`
* `1267653137969623040.1.mp4`
* `ar-bentonville-1.2.mp4`
* `ar-bentonville-1.1.mp4`

Given the row in the CSV file, the corresponding video would be `1267653137969623040.1.mp4`.

Expand Down
3 changes: 2 additions & 1 deletion tools/downloader/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ go 1.14

require (
github.com/RTradeLtd/go-ipfs-api/v3 v3.0.0
github.com/chromedp/cdproto v0.0.0-20200116234248-4da64dd111ac
github.com/chromedp/chromedp v0.5.3
github.com/panjf2000/ants/v2 v2.4.1
github.com/pkg/errors v0.8.1
github.com/urfave/cli/v2 v2.2.0
go.bobheadxi.dev/zapx/zapx v0.6.8
go.uber.org/atomic v1.6.0
go.uber.org/zap v1.15.0
)
16 changes: 16 additions & 0 deletions tools/downloader/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 h1:SKI1/fuSdodxmNNyVBR8d7X/HuLnRpvvFO0AgyQk764=
github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927/go.mod h1:h/aW8ynjgkuj+NQRlZcDbAbM1ORAbXjXX77sX7T289U=
github.com/chromedp/cdproto v0.0.0-20200116234248-4da64dd111ac h1:T7V5BXqnYd55Hj/g5uhDYumg9Fp3rMTS6bykYtTIFX4=
github.com/chromedp/cdproto v0.0.0-20200116234248-4da64dd111ac/go.mod h1:PfAWWKJqjlGFYJEidUM6aVIWPr0EpobeyVWEEmplX7g=
github.com/chromedp/chromedp v0.5.3 h1:F9LafxmYpsQhWQBdCs+6Sret1zzeeFyHS5LkRF//Ffg=
github.com/chromedp/chromedp v0.5.3/go.mod h1:YLdPtndaHQ4rCpSpBG+IPpy9JvX0VD+7aaLxYgYj28w=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
Expand All @@ -24,6 +28,12 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo=
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand All @@ -40,6 +50,8 @@ github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlT
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/knq/sysutil v0.0.0-20191005231841-15668db23d08 h1:V0an7KRw92wmJysvFvtqtKMAPmvS5O0jtB0nYo6t+gs=
github.com/knq/sysutil v0.0.0-20191005231841-15668db23d08/go.mod h1:dFWs1zEqDjFtnBXsd1vPOZaLsESovai349994nHx3e0=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand All @@ -49,6 +61,8 @@ github.com/libp2p/go-flow-metrics v0.0.1 h1:0gxuFd2GuK7IIP5pKljLwps6TvcuYgvG7Atq
github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8=
github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8/DT/1I=
github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco=
github.com/mailru/easyjson v0.7.0 h1:aizVhC/NAAcKWb+5QsU1iNOZb4Yws5UO2I+aIprQITM=
github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U=
Expand Down Expand Up @@ -142,6 +156,8 @@ golang.org/x/sys v0.0.0-20190219092855-153ac476189d/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190302025703-b6889370fb10/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42 h1:vEOn+mP2zCOVzKckCZy6YsCtDblrpj/w7B9nxGNELpg=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
Expand Down
8 changes: 7 additions & 1 deletion tools/downloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func main() {
Usage: "starts the downloader",
Action: func(c *cli.Context) error {
dl := New(c.String("log.file"), c.String("directory"), c.Int("concurrency"))
if err := dl.Run(c.Duration("timeout"), c.Int("max.downloads")); err != nil {
if err := dl.Run(c.Bool("capture.screenshot"), c.Duration("timeout"), c.Int("max.downloads")); err != nil {
return err
}
if c.Bool("upload.to_ipfs") {
Expand Down Expand Up @@ -84,6 +84,12 @@ func main() {
Usage: "enables uploading the video data to any ipfs endpoint",
Value: false,
},
&cli.BoolFlag{
Name: "capture.screenshot",
Aliases: []string{"cs"},
Usage: "enables optional capturing of the webpage we download media from for additional archiving",
Value: false,
},
},
},
}
Expand Down
113 changes: 93 additions & 20 deletions tools/downloader/pkg/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,24 @@ import (
"encoding/csv"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"os/exec"
"strings"
"sync"
"time"

"github.com/panjf2000/ants/v2"
"github.com/pkg/errors"
"go.bobheadxi.dev/zapx/zapx"
"go.uber.org/atomic"
"go.uber.org/zap"
)

const (
/* rows of csv file for easy reference
0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13
state,edit_at,city,name,date,date_text,Link 1,Link 2,Link 3,Link 4,Link 5,Link 6,Link 7,Link 8
0 , 1 , 2 , 3 , 4 , 5 , 6, 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14
state,edit_at,city,name,date,date_text,id,Link 1,Link 2,Link 3,Link 4,Link 5,Link 6,Link 7,Link 8
*/
url = "https://raw.githubusercontent.com/2020PB/police-brutality/data_build/all-locations.csv"
)
Expand All @@ -31,8 +32,7 @@ type Downloader struct {
path string
logger *zap.Logger
// enables running concurrent downloads
wp *ants.Pool
count *atomic.Int64
wp *ants.Pool
}

// New returns a new downloader
Expand All @@ -50,12 +50,12 @@ func New(logFile, path string, concurrency int) *Downloader {
if err != nil {
panic(err)
}
return &Downloader{path, logger, wp, atomic.NewInt64(0)}
return &Downloader{path, logger, wp}
}

// Run starts the download process, note that maxDownloads doesn't necessarily equate to number of videos
// it really means the maximum number of entries in the csv to download, and some entries in the csv may have more than 1 associated video
func (d *Downloader) Run(timeout time.Duration, maxDownloads int) error {
func (d *Downloader) Run(takeScreenshots bool, timeout time.Duration, maxDownloads int) error {
resp, err := http.Get(url)
if err != nil {
return err
Expand All @@ -65,13 +65,20 @@ func (d *Downloader) Run(timeout time.Duration, maxDownloads int) error {
results []struct {
name string
link string
pbid string
count int64
}
mux = &sync.Mutex{}
wg = &sync.WaitGroup{}
reader = csv.NewReader(resp.Body)
)
for i := 0; maxDownloads != 0 && i < maxDownloads; i++ {
for i := 0; ; i++ {
// the first read from the CSV file will be the header
// so we need to make sure that we factor that in when
// counting max downloads
if maxDownloads != 0 && i >= maxDownloads+1 {
break
}
// read the next record
record, err := reader.Read()
if err != nil && err != io.EOF {
Expand All @@ -81,24 +88,30 @@ func (d *Downloader) Run(timeout time.Duration, maxDownloads int) error {
break
}
// skip the first row as it contains column names OR
// skip if the row has less than 7 elements as the 7th element is the start of the video links
if i == 0 || len(record) < 7 {
// skip if the row has less than 8 elements as the 8th element is the start of the video links
if i == 0 || len(record) < 8 {
continue
}
wg.Add(1)
d.wp.Submit(func() {
defer wg.Done()
pbid := record[6]
// gets the last column so we dont get an out of range panic
max := len(record) - 1
for ii := 6; ii < max; ii++ {
var count int64 = 0
for ii := 7; ii < max; ii++ {
count++
// this column is empty, and has no data
if record[ii] == "" {
continue
}
count := d.count.Inc()
// if the file already exists, dont redownload
if _, err := os.Stat(d.getName(pbid, count)); err == nil {
continue
}
d.logger.Info("downloading video", zap.String("name", record[3]), zap.String("url", record[ii]))
download := func() error {
cmd := exec.Command("youtube-dl", "-o", d.getName(count), record[ii])
cmd := exec.Command("youtube-dl", "-o", d.getName(pbid, count), record[ii])
return d.runCommand(cmd, timeout)
}
if err := download(); err != nil {
Expand All @@ -109,31 +122,87 @@ func (d *Downloader) Run(timeout time.Duration, maxDownloads int) error {
results = append(results, struct {
name string
link string
pbid string
count int64
}{
name: record[3],
link: record[ii],
pbid: pbid,
count: count,
})
mux.Unlock()
}
// download the screenshot if specified
// TODO(bonedaddy): enable adding this to the csv, for now it exists alongside everything else
if takeScreenshots {
if err := capture(d.getName(pbid, count), record[ii]); err != nil {
d.logger.Error("failed to capture screenshot", zap.Error(err), zap.String("url", record[ii]))
}
}
}
})
}
// wait for pending download operations to finish
wg.Wait()
// open csv file to store mappings
fh, err := os.Create("name_mapping.csv")
// read download dir to check for any file artifacts
infos, err := ioutil.ReadDir(d.path)
if err != nil {
return err
}
for _, info := range infos {
// this was an incorrectly downloaded piece of data, remove
if strings.HasSuffix(info.Name(), ".part") {
if err := os.Remove(d.path + "/" + info.Name()); err != nil {
d.logger.Error("failed to remove file part", zap.String("file", info.Name()), zap.Error(err))
}
}
}
// backup the previous csv if it exists for posterity
if data, err := ioutil.ReadFile(d.path + "/name_mapping.csv"); err != nil {
d.logger.Error("failed to read previous name mapping file, likely doesn't exist", zap.Error(err))
} else {
if len(data) > 0 {
ioutil.WriteFile(fmt.Sprintf("%s/name_mapping-%v.csv", d.path, time.Now().UnixNano()), data, os.FileMode(0640))
}
}
var (
fh *os.File
records [][]string
)
// addd the headers to write to the csv
records = append(records, []string{"name", "link", "pbid", "link_number"})
if _, err := os.Stat(d.path + "/name_mapping.csv"); err == nil {
fh, err = os.Open(d.path + "/name_mapping.csv")
if err != nil {
// fallback to default
d.logger.Error("failed to open existing csv", zap.Error(err))
}
// file exists, remove the headers as they will be read
records = [][]string{}
for {
record, err := csv.NewReader(fh).Read()
if err != nil && err == io.EOF {
break
}
records = append(records, record)
}
} else {
// open csv file to store mappings
fh, err = os.Create(d.path + "/name_mapping.csv")
if err != nil {
return err
}
}
writer := csv.NewWriter(fh)
// write the csv file headers
writer.Write([]string{"name", "link", "unique_video_number"})
// write the previous csv file to disk
// if no previous mapping exists, this will just write the headers
for _, record := range records {
writer.Write(record)
}
mux.Lock()
// iterate over all results and add to csv
for _, v := range results {
writer.Write([]string{v.name, v.link, fmt.Sprint(v.count)})
writer.Write([]string{v.name, v.link, v.pbid, fmt.Sprint(v.count)})
}
mux.Unlock()
// flush csv, writing to disk
Expand Down Expand Up @@ -167,6 +236,10 @@ func (d *Downloader) runCommand(cmd *exec.Cmd, timeout time.Duration) error {
}

// uses an atomically increasing counter to prevent any possible chance of filename conflics when running many concurrent downloaders
func (d *Downloader) getName(count int64) string {
return d.path + "/%(id)s." + fmt.Sprint(count) + ".%(ext)s"
func (d *Downloader) getName(id string, count int64) string {
// fallback to youtube id
if id == "" {
id = "%(id)s"
}
return d.path + "/" + id + "." + fmt.Sprint(count) + ".%(ext)s"
}
36 changes: 36 additions & 0 deletions tools/downloader/pkg/downloader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package pkg

import (
"io/ioutil"
"os"
"strings"
"testing"
"time"
)

func TestDownloader(t *testing.T) {
var (
logFile = "test.log"
path = "testdir"
)
t.Cleanup(func() {
os.RemoveAll("testdir")
os.Remove("test.log")
})
dl := New(logFile, path, 1)
if _, err := os.Create(path + "/thisisatestfilethatweareusingtotestremovaloffileswith.part"); err != nil {
t.Fatal(err)
}
if err := dl.Run(false, time.Minute, 2); err != nil {
t.Fatal(err)
}
infos, err := ioutil.ReadDir(path)
if err != nil {
t.Fatal(err)
}
for _, info := range infos {
if strings.HasSuffix(info.Name(), ".part") {
t.Fatal("shouldn't have found .part file")
}
}
}