Skip to content
This repository has been archived by the owner on Jan 24, 2023. It is now read-only.

Commit

Permalink
working client server build
Browse files Browse the repository at this point in the history
  • Loading branch information
maxmcd committed Jan 28, 2020
1 parent c4e3d51 commit 3ec2f94
Show file tree
Hide file tree
Showing 8 changed files with 531 additions and 126 deletions.
1 change: 1 addition & 0 deletions pkg/embly/homedir.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func CreateHomeDir() (err error) {
}
for _, folder := range []string{
"./", "./nix",
"./blob_cache",
"./build_context",
"./build_context/rust_target",
"./build_context/cargo_home",
Expand Down
66 changes: 37 additions & 29 deletions pkg/filesystem/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,28 @@ import (
"github.com/radovskyb/watcher"
)

type trackedFile struct {
fi os.FileInfo
hash []byte
type FileInfo struct {
os.FileInfo
Hash []byte
}

func (fi *FileInfo) PopulateHash(path string) (err error) {
if fi.IsDir() {
return
}
var f *os.File
f, err = os.Open(path)
if err != nil {
err = errors.WithStack(err)
return
}
h := sha256.New()
if _, err = io.Copy(h, f); err != nil {
err = errors.WithStack(err)
return
}
fi.Hash = h.Sum(nil)
return
}

type Project struct {
Expand All @@ -30,7 +49,7 @@ type Project struct {
fnMap map[string]config.Function
fnTimerMap map[string]*time.Timer

files map[string]*trackedFile
files map[string]*FileInfo
}

// NewProject should create a new project
Expand All @@ -42,15 +61,15 @@ func NewProject(cfg *config.Config) (p *Project) {
fnMap: map[string]config.Function{},
fnTimerMap: map[string]*time.Timer{},
functionLocations: map[string][]config.Function{},
files: map[string]*trackedFile{},
files: map[string]*FileInfo{},
}
for _, fn := range cfg.Functions {
p.fnMap[fn.Name] = fn
}
return
}

func (p *Project) FunctionSources(name string) (path string, files []string, err error) {
func (p *Project) FunctionWatchedFiles(name string) (path string, files map[string]*FileInfo, err error) {
// quick and easy hack, just use watcher to crawl the files
tmpP := NewProject(p.cfg)
selected, ok := tmpP.fnMap[name]
Expand All @@ -60,9 +79,20 @@ func (p *Project) FunctionSources(name string) (path string, files []string, err
}
path = filepath.Join(p.cfg.ProjectRoot, selected.Path) + "/"
if err = tmpP.AddFunctionFiles(selected); err != nil {
err = errors.WithStack(err)
return
}
for name := range tmpP.watcher.WatchedFiles() {
filesInt := tmpP.watcher.WatchedFiles()
files = make(map[string]*FileInfo)
for name, fi := range filesInt {
files[name] = &FileInfo{FileInfo: fi}
}
return
}

func (p *Project) FunctionSources(name string) (path string, files []string, err error) {
path, fileMap, err := p.FunctionWatchedFiles(name)
for name := range fileMap {
files = append(files, name)
}
return
Expand Down Expand Up @@ -130,28 +160,6 @@ func CopyFile(src, dest string) (err error) {
return
}

func (p *Project) HashFiles() (err error) {
for name, fi := range p.watcher.WatchedFiles() {
tracked := &trackedFile{
fi: fi,
}
if !fi.IsDir() {
var f *os.File
f, err = os.Open(name)
if err != nil {
return
}
h := sha256.New()
if _, err = io.Copy(h, f); err != nil {
return
}
tracked.hash = h.Sum(nil)
}
p.files[name] = tracked
}
return
}

func (p *Project) AddRecursive(path string, function config.Function) (err error) {
path = p.cfg.AbsolutePath(path)
if err = p.watcher.Add(path); err != nil {
Expand Down
131 changes: 123 additions & 8 deletions pkg/nixbuild/client.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
package nixbuild

import "google.golang.org/grpc"
import (
"bytes"
"compress/zlib"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"

import nixbuildpb "embly/pkg/nixbuild/pb"
"github.com/pkg/errors"
"google.golang.org/grpc"

import "context"
"context"
nixbuildpb "embly/pkg/nixbuild/pb"
)

func (b *Builder) connectToBuildServer() (err error) {
conn, err := grpc.Dial("localhost:9276")
conn, err := grpc.Dial("localhost:9276", grpc.WithInsecure())
if err != nil {
return
}
Expand All @@ -17,16 +27,121 @@ func (b *Builder) connectToBuildServer() (err error) {
return nil
}

func (b *Builder) startRemoteBuild(name string) (err error) {
func WriteCompressedFile(file *nixbuildpb.CompressedFile, dir string) (err error) {
// Consider streaming in chunks?
r, err := zlib.NewReader(bytes.NewBuffer(file.Body))
if err != nil {
return
}
to, err := os.OpenFile(filepath.Join(dir, file.Name), os.O_CREATE|os.O_RDWR, os.ModePerm)
if err != nil {
err = errors.WithStack(err)
return
}
if _, err = io.Copy(to, r); err != nil {
err = errors.WithStack(err)
return
}
return errors.WithStack(r.Close())
}

func ReadCompressedFile(loc string) (compFile *nixbuildpb.CompressedFile, err error) {
var b bytes.Buffer
writer := zlib.NewWriter(&b)
var f *os.File
f, err = os.Open(loc)
if err != nil {
return
}
if _, err = io.Copy(writer, f); err != nil {
return
}
if err = writer.Close(); err != nil {
return
}
compFile = &nixbuildpb.CompressedFile{
Body: b.Bytes(),
Path: loc,
}
return
}

func (b *Builder) startRemoteBuild(name string) (result string, err error) {
defer func() {
fmt.Println("Client returned with error: ", err)
err = errors.WithStack(err)
}()
buildClient, err := b.client.Build(context.Background())
if err != nil {
return
}

buildClient.Send(&nixbuildpb.ClientPayload{
path, files, err := b.project.FunctionWatchedFiles(name)
if err != nil {
return
}
var protoFiles []*nixbuildpb.File
for loc, file := range files {
if err = file.PopulateHash(loc); err != nil {
return
}
protoFiles = append(protoFiles, &nixbuildpb.File{
Path: loc,
Name: file.Name(),
IsDir: file.IsDir(),
Size: file.Size(),
Hash: file.Hash,
})
}

if err = buildClient.Send(&nixbuildpb.ClientPayload{
Build: &nixbuildpb.Build{
Name: name,
BuildLocation: loc,
BuildLocation: path,
Files: protoFiles,
},
})
}); err != nil {
return
}
var pay *nixbuildpb.ServerPayload
for {
pay, err = buildClient.Recv()
if err != nil {
return
}
for _, hashRequest := range pay.HashNeeded {
if _, ok := files[hashRequest.Path]; !ok {
panic(fmt.Sprint(hashRequest.Path, "is unknown to this client, panicking!"))
}
var compFile *nixbuildpb.CompressedFile
compFile, err = ReadCompressedFile(hashRequest.Path)
if err != nil {
return
}
compFile.RequestedHash = hashRequest.Hash
fmt.Println("CLIENT: ", compFile)
if err = buildClient.Send(&nixbuildpb.ClientPayload{
File: compFile,
}); err != nil {
return
}
}
for _, log := range pay.Log {
fmt.Print(log)
}

if len(pay.Files) > 0 {
result, err = ioutil.TempDir(b.emblyLoc("./result/"), "")
if err != nil {
return
}
for _, file := range pay.Files {
if err = WriteCompressedFile(file, result); err != nil {
return
}
}
break // we got our files, build is complete
}
}
return
}
38 changes: 38 additions & 0 deletions pkg/nixbuild/clientserver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package nixbuild

import (
"embly/pkg/config"
"embly/pkg/tester"
"fmt"
"os"
"testing"
"time"
)

func TestClientServer(te *testing.T) {
t := tester.New(te)
if os.Getenv("NIXBUILD_INTEGRATION_TEST") == "" {
t.Skip()
return
}

cfg, err := config.New("../../examples/kv")
if err != nil {
return
}

builder, err := NewBuilder(BuildConfig{})
t.PanicOnErr(err)
builder.SetProject(cfg)

go func() {
if err := builder.startServer(); err != nil {
t.Fatal(err)
}
}()
time.Sleep(time.Second)
t.PanicOnErr(builder.connectToBuildServer())
result, err := builder.startRemoteBuild("main")
fmt.Println(result)
t.PanicOnErr(err)
}
3 changes: 0 additions & 3 deletions pkg/nixbuild/main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
package nixbuild

type Client struct {
}
42 changes: 24 additions & 18 deletions pkg/nixbuild/nixbuild.proto
Original file line number Diff line number Diff line change
@@ -1,37 +1,43 @@
syntax = "proto3";
package nixbuildpb;


message ClientPayload {
Build build = 1;
Build build = 1;
CompressedFile file = 2;
}

bytes zip = 2;
message CompressedFile {
bytes body = 1;
string name = 2;
bytes requested_hash = 3;
string path = 4;
}

message ServerPayload {
repeated OutFile files = 1;
repeated bytes hash_needed = 2;
repeated CompressedFile files = 1;
repeated HashRequest hash_needed = 2;
repeated string log = 3;
}

message OutFile {
string name = 1;
bytes body = 2;
message HashRequest {
bytes hash = 1;
string path = 2;
}

message Build {
string name = 1;
repeated File files = 2;
string build_location = 3;
string name = 1;
repeated File files = 2;
string build_location = 3;
}

message File {
string path = 1;
string name = 2;
bool is_dir = 3;
int64 size = 4;
bytes hash = 5;
string path = 1;
string name = 2;
bool is_dir = 3;
int64 size = 4;
bytes hash = 5;
}

service BuildService {
rpc Build(stream ClientPayload) returns (stream ServerPayload);
}
rpc Build(stream ClientPayload) returns (stream ServerPayload);
}

0 comments on commit 3ec2f94

Please sign in to comment.