Skip to content

Commit

Permalink
Add dsq binary for running SQL queries against files (#127)
Browse files Browse the repository at this point in the history
* Start on dsq

* Continue sketching out dsq

* More sketching

* Refactoring to support dsq

* Working with hardcoded shape

* Clean up

* Fix for shape guesser

* Fix for format

* Go doesnt support graph or table yet

* Fix build scripts for new binary

* Fixes for build

* More fixes
  • Loading branch information
eatonphil committed Dec 21, 2021
1 parent c421502 commit 00c3da5
Show file tree
Hide file tree
Showing 28 changed files with 493 additions and 260 deletions.
2 changes: 1 addition & 1 deletion desktop/panel/eval.ts
Expand Up @@ -95,7 +95,7 @@ function canUseGoRunner(panel: PanelInfo, connectors: ConnectorInfo[]) {
return false;
}

return true;
return !['table', 'graph'].includes(panel.type);
}

export async function evalInSubprocess(
Expand Down
2 changes: 1 addition & 1 deletion desktop/scripts/desktop.build
@@ -1,7 +1,7 @@
setenv UI_CONFIG_OVERRIDES "window.DS_CONFIG_MODE = 'desktop';"
yarn build-ui

cd runner && go build -o ../build/go_desktop_runner{required_ext}
cd runner && go build -o ../build/go_desktop_runner{required_ext} cmd/main.go

yarn esbuild desktop/preload.ts --external:electron --sourcemap --bundle --outfile=build/preload.js
yarn esbuild desktop/runner.ts --bundle --platform=node --sourcemap --external:react-native-fs --external:react-native-fetch-blob "--external:@elastic/elasticsearch" "--external:wasm-brotli" --external:prometheus-query --external:snowflake-sdk --external:ssh2 --external:ssh2-promise --external:ssh2-sftp-client --external:cpu-features --external:electron --target=node10.4 --outfile=build/desktop_runner.js
Expand Down
1 change: 1 addition & 0 deletions runner/.gitignore
@@ -1 +1,2 @@
runner
main
144 changes: 144 additions & 0 deletions runner/cmd/dsq/main.go
@@ -0,0 +1,144 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"os"
"strings"

"github.com/multiprocessio/datastation/runner"
)

func isinpipe() bool {
fi, _ := os.Stdin.Stat()
return (fi.Mode() & os.ModeCharDevice) == 0
}

func resolveContentType(fileExtensionOrContentType string) string {
if strings.Contains(fileExtensionOrContentType, "/") {
return fileExtensionOrContentType
}

return runner.GetMimeType("x."+fileExtensionOrContentType, runner.ContentTypeInfo{})
}

var firstNonFlagArg = ""

func getResult(res interface{}) error {
out := bytes.NewBuffer(nil)
arg := firstNonFlagArg

var internalErr error
if isinpipe() {
mimetype := resolveContentType(arg)
if mimetype == "" {
return fmt.Errorf(`First argument when used in a pipe should be file extension or content type. e.g. 'cat test.csv | dsq csv "SELECT * FROM {}"'`)
}

cti := runner.ContentTypeInfo{Type: mimetype}
internalErr = runner.TransformReader(os.Stdin, "", cti, out)
} else {
internalErr = runner.TransformFile(arg, runner.ContentTypeInfo{}, out)
}
if internalErr != nil {
return internalErr
}

decoder := json.NewDecoder(out)
return decoder.Decode(res)
}

func main() {
if len(os.Args) < 3 {
log.Fatal(`Expected data source and query. e.g. 'dsq names.csv "SELECT name FROM {}"'`)
}

runner.Verbose = false
inputTable := "{}"
lastNonFlagArg := ""
for i, arg := range os.Args[1:] {
if arg == "-i" || arg == "--input-table-alias" {
if i > len(os.Args)-2 {
log.Fatal(`Expected input table alias after flag. e.g. 'dsq -i XX names.csv "SELECT * FROM XX"'`)
}

inputTable = os.Args[i+1]
continue
}

if arg == "-v" || arg == "--verbose" {
runner.Verbose = true
continue
}

if firstNonFlagArg == "" {
firstNonFlagArg = arg
}

lastNonFlagArg = arg
}

var res []map[string]interface{}
err := getResult(&res)
if err != nil {
log.Fatal(err)
}

sampleSize := 50
shape, err := runner.GetArrayShape(firstNonFlagArg, res, sampleSize)
if err != nil {
log.Fatal(err)
}

p0 := runner.PanelInfo{
ResultMeta: runner.PanelResult{
Shape: *shape,
},
}
project := &runner.ProjectState{
Pages: []runner.ProjectPage{
{
Panels: []runner.PanelInfo{p0},
},
},
}
connector, tmp, err := runner.MakeTmpSQLiteConnector()
if err != nil {
log.Fatal(err)
}
defer os.Remove(tmp.Name())
project.Connectors = append(project.Connectors, *connector)

query := lastNonFlagArg
query = strings.ReplaceAll(query, inputTable, "DM_getPanel(0)")
panel := &runner.PanelInfo{
Type: runner.DatabasePanel,
Content: query,
DatabasePanelInfo: &runner.DatabasePanelInfo{
Database: runner.DatabasePanelInfoDatabase{
ConnectorId: connector.Id,
},
},
}

panelResultLoader := func(_, _ string, out interface{}) error {
r := out.(*[]map[string]interface{})
*r = res
return nil
}
err = runner.EvalDatabasePanel(project, 0, panel, panelResultLoader)
if err != nil {
log.Fatal(err)
}

// Dump the result to stdout
fd, err := os.Open(runner.GetPanelResultsFile(project.Id, panel.Id))
if err != nil {
log.Fatalf("Could not open results file: %s", err)
}

io.Copy(os.Stdout, fd)
}
78 changes: 78 additions & 0 deletions runner/cmd/main.go
@@ -0,0 +1,78 @@
package main

import (
"os"

"github.com/multiprocessio/datastation/runner"
)

const VERSION = "development"
const APP_NAME = "DataStation Runner (Go)"

func main() {
runner.Verbose = true
runner.Logln(APP_NAME + " " + VERSION)
projectId := ""
panelId := ""
panelMetaOut := ""

args := os.Args
for i := 0; i < len(args)-1; i++ {
if args[i] == "--dsproj" {
projectId = args[i+1]
i++
continue
}

if args[i] == "--evalPanel" {
panelId = args[i+1]
i++
continue
}

if args[i] == "--metaFile" {
panelMetaOut = args[i+1]
i++
continue
}
}

if projectId == "" {
runner.Fatalln("No project id given.")
}

if panelId == "" {
runner.Fatalln("No panel id given.")
}

if panelMetaOut == "" {
runner.Fatalln("No panel meta out given.")
}

settings, err := runner.LoadSettings()
if err != nil {
runner.Logln("Could not load settings, assuming defaults.")
settings = runner.DefaultSettings
}

ec := runner.NewEvalContext(*settings)

err = ec.Eval(projectId, panelId)
if err != nil {
runner.Logln("Failed to eval: %s", err)

if _, ok := err.(*runner.DSError); !ok {
err = runner.Edse(err)
err.(*runner.DSError).Stack = "Unknown"
}

err := runner.WriteJSONFile(panelMetaOut, map[string]interface{}{
"exception": err,
})
if err != nil {
runner.Fatalln("Could not write panel meta out: %s", err)
}

// Explicitly don't fail here so that the parent can read the exception from disk
}
}
File renamed without changes.
32 changes: 26 additions & 6 deletions runner/database.go
@@ -1,4 +1,4 @@
package main
package runner

import (
"encoding/base64"
Expand Down Expand Up @@ -200,6 +200,13 @@ func writeRowFromDatabase(dbInfo DatabaseConnectorInfoDatabase, w *JSONArrayWrit
return err
}

// Needing this whole translation layer may be a good reason
// not to use sqlx since it translates **into** this layer
// from being raw. At this point we're just reimplementing
// sqlx in reverse on top of sqlx. Might be better to do
// reflection directly on the sql package instead. Would be
// worth benchmarking.

// The MySQL driver is not friendly about unknown data types.
// https://github.com/go-sql-driver/mysql/issues/441
for _, s := range colTypes {
Expand Down Expand Up @@ -234,7 +241,7 @@ func writeRowFromDatabase(dbInfo DatabaseConnectorInfoDatabase, w *JSONArrayWrit
// Default to treating everything as a string
row[col] = string(bs)
if !wroteFirstRow && !textTypes[t] {
logln("Skipping unknown type: " + s.DatabaseTypeName())
Logln("Skipping unknown type: " + s.DatabaseTypeName())
}
}
}
Expand All @@ -249,7 +256,7 @@ func writeRowFromDatabase(dbInfo DatabaseConnectorInfoDatabase, w *JSONArrayWrit

}

func evalDatabasePanel(project *ProjectState, pageIndex int, panel *PanelInfo) error {
func EvalDatabasePanel(project *ProjectState, pageIndex int, panel *PanelInfo, panelResultLoader func(string, string, interface{}) error) error {
var connector *ConnectorInfo
for _, c := range project.Connectors {
cc := c
Expand Down Expand Up @@ -339,11 +346,23 @@ func evalDatabasePanel(project *ProjectState, pageIndex int, panel *PanelInfo) e
return err
}

return withRemoteConnection(server, host, port, func(host, port string) error {
out := getPanelResultsFile(project.Id, panel.Id)
if panelResultLoader == nil {
panelResultLoader = func(projectId, panelId string, res interface{}) error {
f := GetPanelResultsFile(projectId, panelId)
return readJSONFileInto(f, res)
}
}

out := GetPanelResultsFile(project.Id, panel.Id)
w, err := openTruncate(out)
if err != nil {
return err
}
defer w.Close()

return withRemoteConnection(server, host, port, func(host, port string) error {
wroteFirstRow := false
return withJSONArrayOutWriterFile(out, func(w *JSONArrayWriter) error {
return withJSONArrayOutWriterFile(w, func(w *JSONArrayWriter) error {
_, err := importAndRun(
func(createTableStmt string) error {
_, err := db.Exec(createTableStmt)
Expand Down Expand Up @@ -377,6 +396,7 @@ func evalDatabasePanel(project *ProjectState, pageIndex int, panel *PanelInfo) e
panelsToImport,
qt,
mangleInsert,
panelResultLoader,
)

return err
Expand Down
2 changes: 1 addition & 1 deletion runner/database_test.go
@@ -1,4 +1,4 @@
package main
package runner

import (
"testing"
Expand Down
3 changes: 2 additions & 1 deletion runner/errors.go
@@ -1,4 +1,4 @@
package main
package runner

import (
"encoding/json"
Expand Down Expand Up @@ -60,6 +60,7 @@ func makeErrException(e error) *DSError {
}

var edse = makeErrException
var Edse = edse

func edsef(msg string, args ...interface{}) *DSError {
return edse(fmt.Errorf(msg, args...))
Expand Down

0 comments on commit 00c3da5

Please sign in to comment.