Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Agent on InLong Transform #10023

Open
2 tasks done
luchunliang opened this issue Apr 20, 2024 · 0 comments
Open
2 tasks done

[Feature] Agent on InLong Transform #10023

luchunliang opened this issue Apr 20, 2024 · 0 comments
Assignees
Milestone

Comments

@luchunliang
Copy link
Contributor

luchunliang commented Apr 20, 2024

Motivation

  • Agent (File collection) needs the ability to filter and collect valid data content.
  • Agent (Pulsar collection) requires PB protocol parsing and data extraction capabilities.

Solution

  • Transform is integrated as an SDK by Agent; Manager will also integrate Transform to provide pre-transformation validation when users configure transformation SQL.
  • Before performing transformation processing, the Agent needs to register the transformation configuration pulled from Manager to Transform. When the transformation configuration changes, it needs to re-register the configuration to Transform based on Key: StreamSourceId.
  • Agent-Sink passes StreamSourceId and RawData into Transform, and Transform returns zero or more FormalData. Agent-Sink sends the final FormalData to DataProxy.
  • For Transform's registered configurations, there is one set of configurations per StreamSourceId, and one StreamSourceId belongs to one GroupId and StreamId's InLong data stream.
  • Transform's transformation configuration includes three parts: transformation Source, transformation SQL, and transformation Sink.
  • Transformation SQL first provides basic field filtering and field cropping. Other date and time conversion functions and string conversion functions will be supplemented later based on Flink's built-in functions.
    image

Configuration Model

image

Interface API of Transform SDK

  • TransformConfig register(String streamSourceId, TransformConfig config) throws TransformException;
    • If the transformation SQL compilation fails, an exception is returned.
    • Check the legality and non-null of the configuration, and return an exception if it fails.
    • ProtoDefine automatically generates code and compiles it using a dynamic classloader; if it fails, an exception is returned.
    • If the configuration validation is successful, the previous TransformConfig is returned; if it is a new configuration, null is returned.
  • TransformConfig unregister(String streamSourceId);
    • Unregister the configuration, return the previous TransformConfig; if it does not exist, return null.
  • List<byte[]> transform(String streamSourceId, byte[] rawdata) throws TransformException;
    • Synchronous interface
    • Thread-safe
    • Processing logic:
      • Based on the SourceInfo configuration, parse rawdata, and generate Map<String, byte[]> or ProtoObject/JsonObject.
      • Interpret the syntax tree generated by the transformation SQL and generate the result set List.
      • Based on the SinkInfo configuration, convert the result set List into List<byte[]> and return it.

Task list

Use case

No response

Are you willing to submit PR?

  • Yes, I am willing to submit a PR!

Code of Conduct

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant