Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is it possible to register mappers dynamically? #185

Open
jorgecarleitao opened this issue Jan 29, 2020 · 4 comments
Open

Is it possible to register mappers dynamically? #185

jorgecarleitao opened this issue Jan 29, 2020 · 4 comments

Comments

@jorgecarleitao
Copy link

jorgecarleitao commented Jan 29, 2020

I am trying to register a mapper whose logic depends on the application.

The minimal reproducible example I came up with:

package bla_test

import (
	"io"
	"testing"

	"github.com/chrislusf/gleam/flow"
	"github.com/chrislusf/gleam/gio"
	"github.com/chrislusf/gleam/util"
)

func Test(t *testing.T) {
	gio.Init()

	ds := flow.New("").Source("a", util.Range(0, 100))

	// function to be used in Output. Stores the result in `result`
	var result []interface{}
	fn := func(r io.Reader) error {
		err := util.ProcessMessage(r, func(encodedBytes []byte) error {
			result = append(result, len(encodedBytes))
			return nil
		})
		return err
	}

	ds.Output(fn).Run()
	t.Log(result)

	// register mapper, note that it uses "result" from above
	mapper := func(row []interface{}) error {
		row = append(row, result)
		gio.Emit(row...)
		return nil
	}
	addColumn := gio.RegisterMapper(mapper)

	ds1 := ds.Map("id", addColumn)

	ds1.Output(fn).Run()
	t.Log(result)  // should contain the result from the first and second execution
}

this code hangs, not with outputting an error: Missing mapper function m1.

Note that this code is not interesting per-se, I am exemplifying a use-case on which the mapper may depend on app-specific logic.

Is this error expected behavior? I imagine that it is in theory, because spark allows this kind of functionality.

@chrislusf
Copy link
Owner

Current implementation is building a static binary first and sharing the binary on distributed executors. So it is not possible.

Java can dynamically send the byte code over the network. Go can not.

@jorgecarleitao
Copy link
Author

jorgecarleitao commented Jan 29, 2020

This makes this a bit limited in scope as it does not allow to build any type of complex logic, as the mappers have to be registered in global scope.

What if we add support for mappers with a different signature, e.g. func(row []interface{}, ...args), and allow the declaration of the flow to include both the mapperId and args? This way, we can send the args over the network and the executors can use them.

@chrislusf
Copy link
Owner

wow, this is a great idea!!!

Maybe limit the args to a JSON object, for easier serialization and deserialization?

@chrislusf chrislusf reopened this Jan 29, 2020
@jorgecarleitao
Copy link
Author

jorgecarleitao commented Jan 29, 2020

I am not familiar with the best practices in serialization in go so I defer that decision to you. :)

I was imagining that we declare an interface for an argument (e.g. MapperArgument) that supports a serialization and deserialization method, and redefine the Map signature to be

func (d *Dataset) Map(name string, mapperId gio.MapperId, args ...MapperArgument) *Dataset

Map's implementation would change to include the (serialized) arguments as part of the Command statement here, and the doProcessMapper implementation would change around here to get the arguments passed to the Command, deserialize them, and pass them to f.

I think that we may be able to keep backward compatibility with this change.

I am unsure how we are getting and processing the arguments for doProcessMapper. os.Args[1:]? I think that we may need to change the executor code to parse an extra argument containing the serialized arguments, so that we can pass them in Command.

Is this a reasonable approach?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants