Skip to content

Commit

Permalink
Merge pull request #41 from scalar-dev/feature/new-model
Browse files Browse the repository at this point in the history
Implement new facet data model
  • Loading branch information
alexsparrow committed Mar 11, 2024
2 parents 0eb6d28 + acd2bde commit d9cf8f0
Show file tree
Hide file tree
Showing 70 changed files with 1,779 additions and 1,040 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ jobs:
cd metadata
go mod download
go generate ./
mkdir ../ui/graphql_schema
cp *.graphql ../ui/graphql_schema
cp -r graphql/ ../ui/graphql_schema
cd -
- name: Build
run: |
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@ jobs:
poetry install
poetry run pytest -s .
docker-compose logs
2 changes: 1 addition & 1 deletion datahub/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ def test_s3(test_bucket):
pipeline = run_pipeline("s3.dhub.yaml")
report = pipeline.sink.get_report()

assert report.total_records_written == 20
assert report.total_records_written == 27
assert len(report.failures) == 0
assert len(report.warnings) == 0
47 changes: 43 additions & 4 deletions metadata/api/datahub/api.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package datahub

import (
"bytes"
"encoding/json"
"io"
"log/slog"
"net/http"

Expand All @@ -15,22 +17,53 @@ type datahubApiServer struct {
log *slog.Logger
}

// func writeRequestToFile(data *bytes.Buffer, id string) {
// path := ".data/" + strings.ReplaceAll(id, "/", "__") + ".json"
// fmt.Println("Writing path", path)
// f, err := os.Create(path)
// if err != nil {
// fmt.Println(err)
// os.Exit(1)
// }
// f.Write(data.Bytes())
// f.Close()

// }

func (i *datahubApiServer) Entities(w http.ResponseWriter, r *http.Request) {
variables := mux.Vars(r)
namespace := variables["namespace"]

var buf bytes.Buffer
tee := io.TeeReader(r.Body, &buf)

var req Entities

if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
if err := json.NewDecoder(tee).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

aspectHandler := CreateAspectHandler(req.Entity.Value.DatasetSnapshot.Urn, req.SystemMetadata.RunId, variables["namespace"], i.client, r.Context(), i.log)
aspectHandler := CreateAspectHandler(req.Entity.Value.DatasetSnapshot.Urn, req.SystemMetadata.RunId, namespace, i.client, r.Context(), i.log)

for _, aspect := range req.Entity.Value.DatasetSnapshot.Aspects {
if aspect.SchemaMetadata != nil {
// urn, _ := parseDatahubUrn(namespace, req.Entity.Value.DatasetSnapshot.Urn)
// writeRequestToFile(&buf, "entity_"+urn.ToString())
err := aspectHandler.HandleSchemaMetadata(*aspect.SchemaMetadata)

if err != nil {
i.log.Error("Error while handling aspect", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}

if aspect.DatasetProperties != nil {
err := aspectHandler.HandleDatasetProperties(*aspect.DatasetProperties)

if err != nil {
i.log.Error("Error while handling aspect", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Expand All @@ -40,16 +73,22 @@ func (i *datahubApiServer) Entities(w http.ResponseWriter, r *http.Request) {

func (i *datahubApiServer) Aspects(w http.ResponseWriter, r *http.Request) {
variables := mux.Vars(r)
namespace := variables["namespace"]

var buf bytes.Buffer
tee := io.TeeReader(r.Body, &buf)

var req Aspects

if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
if err := json.NewDecoder(tee).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

aspectHandler := CreateAspectHandler(req.Proposal.EntityUrn, req.Proposal.SystemMetadata.RunId, variables["namespace"], i.client, r.Context(), i.log)
aspectHandler := CreateAspectHandler(req.Proposal.EntityUrn, req.Proposal.SystemMetadata.RunId, namespace, i.client, r.Context(), i.log)
// writeRequestToFile(&buf, fmt.Sprintf("aspect_%s_%s", req.Proposal.EntityUrn, req.Proposal.AspectName))
if err := aspectHandler.HandleJsonAspect(req.Proposal.AspectName, req.Proposal.Aspect.Value); err != nil {
i.log.Error("Error while handling aspect", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down
7 changes: 6 additions & 1 deletion metadata/api/datahub/aspects.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ func (h *AspectHandler) HandleJsonAspect(aspectName string, aspectValue string)
return err
}

h.HandleDataHubExecutionRequestInput(data)
err = h.HandleDataHubExecutionRequestInput(data)

if err != nil {
return err
}

case _SchemaMetadata:
var data SchemaMetadata
err := json.Unmarshal([]byte(aspectValue), &data)
Expand Down
33 changes: 13 additions & 20 deletions metadata/api/datahub/datahub_execution_request_input.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,28 @@
package datahub

import (
"metadata/core"
"strings"
"time"

"entgo.io/ent/dialect/sql"
)

func (h *AspectHandler) HandleDataHubExecutionRequestInput(aspect DataHubExecutionRequestInput) {
urn := core.ProvURN{
Type: "activity",
Universe: "default",
Namespace: h.namespace,
Source: "datahub",
Name: strings.ReplaceAll(strings.TrimPrefix(h.entityUrn, "urn:li:"), ":", "/"),
}
func (h *AspectHandler) HandleDataHubExecutionRequestInput(aspect DataHubExecutionRequestInput) error {
name := strings.TrimPrefix(h.entityUrn, "urn:li:dataHubExecutionRequest:")

ingestSourceId := h.client.DatahubIngestSource.Create().
SetDatahubUrn(h.entityUrn).
SetUrn(urn.ToString()).
SetNamespace(urn.Namespace).
err := h.client.DatahubIngest.Create().
SetNamespace(h.namespace).
SetType("DataHubExecutionRequestInput").
SetKey(name).
SetUpdatedAt(time.Now()).
OnConflict(sql.ConflictColumns("namespace", "datahub_urn")).
SetData(aspect.Args).
OnConflict(sql.ConflictColumns("namespace", "type", "key")).
UpdateUpdatedAt().
IDX(h.context)

ingestSource := h.client.DatahubIngestSource.GetX(h.context, ingestSourceId)
Exec(h.context)

if ingestSource.DatasetSourceID == nil {
datasetSource := h.client.DatasetSource.Create().SetUrn(urn.ToString()).SetData(aspect.Args).SaveX(h.context)
h.client.DatahubIngestSource.Update().SetDatasetSourceID(datasetSource.ID).ExecX(h.context)
if err != nil {
return err
}

return nil
}
40 changes: 34 additions & 6 deletions metadata/api/datahub/dataset_propeties.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,51 @@
package datahub

import (
"errors"
"metadata/core"
"metadata/core/facet"
"metadata/ent"
"metadata/ent/datahubingest"
)

func (h *AspectHandler) HandleDatasetProperties(aspect DatasetProperties) error {
// Convert the datahub URN to a trawler URN
urn, err := parseDatahubUrn(h.namespace, h.entityUrn)

if err != nil {
h.log.Error("failed to parse urn")
h.log.Error("failed to parse urn", "urn", urn)
return err
}

// Create the dataset
datasetId, err := ingestDataset(h.client, h.context, urn, h.entityUrn, h.runId, aspect.Name)
executionRequest, err := h.client.DatahubIngest.Query().Where(datahubingest.TypeEQ("DataHubExecutionRequestInput"), datahubingest.NamespaceEQ(h.namespace), datahubingest.KeyEQ(h.runId)).Only(h.context)

if err != nil {
h.log.Error("error ingesting dataset", "err", err)
if err != nil && !errors.Is(err, &ent.NotFoundError{}) {
return err
}

h.log.Debug("dataset with id", "datasetId", datasetId)
updater := core.CreateUpdater(h.context, h.client, h.log)
updater.ApplyUpdate(
core.EntityUpdate{
URN: urn.ToString(),
DataPlatform: urn.Platform,
Type: "https://www.trawler.dev/schema#Dataset",
Namespace: h.namespace,
Facets: []core.FacetUpdate{
{
Type: core.DCTitle,
Data: aspect.Name,
},
{
Type: "http://www.w3.org/ns/dcat#keyword",
Data: aspect.Tags,
},
{
Type: core.DatahubExecutionRequest,
Data: facet.DatahubExecutionRequest{Args: executionRequest.Data.(map[string]interface{})},
},
},
},
)

return nil
}
51 changes: 30 additions & 21 deletions metadata/api/datahub/schema_metadata.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package datahub

import (
"metadata/core"
"metadata/core/facet"
)

func (h *AspectHandler) HandleSchemaMetadata(aspect SchemaMetadata) error {
// Convert the datahub URN to a trawler URN
urn, err := parseDatahubUrn(h.namespace, h.entityUrn)
Expand All @@ -9,32 +14,36 @@ func (h *AspectHandler) HandleSchemaMetadata(aspect SchemaMetadata) error {
return err
}

// Create the dataset
datasetId, err := ingestDataset(h.client, h.context, urn, h.entityUrn, h.runId, urn.Name)

if err != nil {
h.log.Error("error ingesting dataset", "err", err)
return err
}

// Calculate snapshot
snapshot, err := processDatasetSnapshot(aspect)
fields := []*facet.DatasetSchemaField{}

if err != nil {
h.log.Error("error producing snapshot")
return err
for _, field := range aspect.Fields {
fields = append(fields, &facet.DatasetSchemaField{
Name: field.FieldPath,
RawType: field.NativeDataType,
})
}

// Get the dataset source if it exists
datasetSource, err := getDatasetSource(h.client, h.context, urn, h.entityUrn, h.runId)
updater := core.CreateUpdater(h.context, h.client, h.log)
_, err = updater.ApplyUpdate(
core.EntityUpdate{
URN: urn.ToString(),
DataPlatform: urn.Platform,
Type: "https://www.trawler.dev/schema#Dataset",
Namespace: h.namespace,

Facets: []core.FacetUpdate{
{
Type: core.DatasetSchema,
Data: facet.DatasetSchema{
Name: aspect.SchemaName,
Fields: fields,
},
},
},
},
)

if err != nil {
h.log.Warn("couldn't find ingest source", "err", err)
}

// Try to snapshot the dataset
if err = snapshotDataset(h.client, h.context, *datasetId, datasetSource, snapshot); err != nil {
h.log.Error("error writing snapshot")
return err
}

Expand Down

0 comments on commit d9cf8f0

Please sign in to comment.