Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logs/Outputs are not consistently showing or being added to Outputs #3695

Closed
japerry911 opened this issue May 9, 2024 · 10 comments · Fixed by kestra-io/plugin-gcp#385
Closed
Assignees
Labels
bug Something isn't working
Milestone

Comments

@japerry911
Copy link

Describe the issue

I run a YAML flow with 4 tasks in a parallel GCP Cloud Batch Task Runner. These 4 tasks are Sub Flows. It prints logs/adds the single output about 70-80% of the time in Kestra, and 100% in GCP Logs, which makes me think it's something with Kestra core.

For output I run:

Kestra.outputs({"uuid_str": uuid_str})

For Example of Logs showing partially, but skipping output example:
image

In this image, it shows my initial PRINT statements showing, but when it gets to Kestra.outputs({"uuid_str": "..."}), it does not show in this job, but showed in other jobs.

I checked the Docker Compose logs, and saw nothing, and GCP logs don't show a fail or anything.

Let me know if you need more context/details, happy to provide.

Thank you!

Environment

  • Kestra Version: v0.16.0
  • Operating System (OS/Docker/Kubernetes): Docker/GCP Compute Engine VM
  • Java Version (if you don't run kestra in Docker): n/a
@japerry911 japerry911 added the bug Something isn't working label May 9, 2024
@anna-geller
Copy link
Member

Hi Jack, can you share the redacted version of the main flow and all subflows so we can try to reproduce? Thanks in advance

@japerry911
Copy link
Author

japerry911 commented May 9, 2024

id: googleAds
namespace: prod
description: This flow fetches and lands data from Google Ads to BigQuery.

inputs:
  - id: start_date
    type: STRING
    required: true
    description: The start date to fetch data from Google Ads.
    validator: ^\d{4}-\d{2}-\d{2}$

  - id: end_date
    type: STRING
    required: true
    description: The end date to fetch data from Google Ads.
    validator: ^\d{4}-\d{2}-\d{2}$

tasks:
  - id: parallel
    type: io.kestra.core.tasks.flows.Parallel
    tasks:
      - id: fetch_customers_info
        type: io.kestra.core.tasks.flows.Subflow
        namespace: prod
        flowId: fetchAndLoadBqFromGcs
        wait: true
        inputs:
          fetch_command: src/integrations/google_ads/jobs/fetch_customers_info.py
          gcs_path_prefix: gs://REDACTED/google_ads/customers_info
          destination_table: REDACTED.landing_google_ads.customers_info
      - id: fetch_campaigns_info
        type: io.kestra.core.tasks.flows.Subflow
        namespace: prod
        flowId: fetchAndLoadBqFromGcs
        wait: true
        inputs:
          fetch_command: src/integrations/google_ads/jobs/fetch_campaigns_info.py
          gcs_path_prefix: gs://REDACTED/google_ads/campaigns_info
          destination_table: REDACTED.landing_google_ads.campaigns_info
      - id: fetch_campaigns_metrics
        type: io.kestra.core.tasks.flows.Subflow
        namespace: prod
        flowId: fetchAndLoadBqFromGcs
        wait: true
        inputs:
          fetch_command: src/integrations/google_ads/jobs/fetch_campaigns_metrics.py --start_date {{ inputs.start_date }} --end_date {{ inputs.end_date }}
          gcs_path_prefix: gs://REDACTED/google_ads/campaigns_metrics
          destination_table: REDACTED.landing_google_ads.campaigns_metrics
      - id: fetch_campaigns_pmax_video_metrics
        type: io.kestra.core.tasks.flows.Subflow
        namespace: prod
        flowId: fetchAndLoadBqFromGcs
        wait: true
        inputs:
          fetch_command: src/integrations/google_ads/jobs/fetch_campaigns_pmax_video_metrics.py --start_date {{ inputs.start_date }} --end_date {{ inputs.end_date }}
          gcs_path_prefix: gs://REDACTEDt/google_ads/campaigns_pmax_video_metrics
          destination_table: REDACTED.landing_google_ads.campaigns_pmax_video_metrics

main GoogleAds YAML, Flow that is Triggered


id: fetchAndLoadBqFromGcs
namespace: prod
description: This is a sub flow that loads data to BigQuery from GCS after fetching data.

inputs:
  - id: fetch_command
    type: STRING
    required: true
    description: The command to fetch data

  - id: gcs_path_prefix
    type: STRING
    required: true
    description: The GCS path prefix to load data from
  
  - id: destination_table
    type: STRING
    required: true
    description: The destination table to load data to in BigQuery

tasks:
  - id: fetch_data
    type: io.kestra.plugin.scripts.python.Commands
    beforeCommands:
      - export PYTHONPATH=src
      - export GCP_PROJECT_ID=REDACTED
    commands:
      - python {{ inputs.fetch_command }}
  - id: load_bq_from_gcs
    type: io.kestra.plugin.gcp.bigquery.LoadFromGcs
    from:
      - "{{ inputs.gcs_path_prefix }}/{{ outputs.fetch_data.vars.uuid_str }}_*.parquet"
    destinationTable: "{{ inputs.destination_table }}"

taskDefaults:
  - type: io.kestra.plugin.scripts.python.Commands
    values:
      containerImage: us-central1-docker.pkg.dev/REDACTED/kestra-test/kestra-test:latest
      warningOnStdErr: false
      taskRunner:
        type: io.kestra.plugin.gcp.runner.GcpBatchTaskRunner
        projectId: REDACTED
        region: us-central1
        bucket: REDACTED

  - type: io.kestra.plugin.gcp.bigquery.LoadFromGcs
    values:
      writeDisposition: WRITE_APPEND
      createDisposition: CREATE_IF_NEEDED
      autodetect: true
      format: PARQUET

Sub Flow YAML

from kestra import Kestra

from integrations.google_ads.fetch_data_and_load_to_gcs import (
    fetch_data_and_load_to_gcs,
)


def main():
    gaql_query = """
        SELECT
            customer_client.id
        FROM customer_client
        WHERE customer_client.manager = False
            AND customer_client.status = 'ENABLED'
    """

    mcc_ids = ["REDACTED"]

    uuid_str = fetch_data_and_load_to_gcs(
        customer_ids=mcc_ids,
        gaql_query=gaql_query,
        job_name="customers_info",
    )

    Kestra.outputs({"uuid_str": uuid_str})


if __name__ == "__main__":
    main()

Just a glimpse of where output is used. I basically fire off 4 of these at once and it happens 20-40% of the time, no outut.

  • I did not include serviceAccount: | .. in here , redacted that entirely, but it's in there.
  • my apologies it's sloppy code right now, going to clean it up once I get it all working :D

Let me know if you need anything else, thanks @anna-geller !!

@japerry911
Copy link
Author

japerry911 commented May 9, 2024

I can also try to send more examples if needed of executions in Kestra in a few hours, after I finish some work stuff if that's helpful

--

Also for now, I am going to just generate the output before parallel step while we investigate this, which seems to work. So this is not blocking anything for me now :D

@anna-geller
Copy link
Member

anna-geller commented May 9, 2024

Interesting, I wonder whether the same happens if you add that Python script as a Namespace File instead of baking it into the Artifact Registry container?

Generally speaking in kestra, you don't need any complex CI/CD to package your code into containers, you can simply add your code as Namespace Files from the Editor, add beforeCommands if you need custom packages and you're good to go.

(ofc the way you did it should work too so it might be a bug in how we capture the output from that Google Batch Python container, you did nothing wrong)

@japerry911
Copy link
Author

@anna-geller , *sorry this turned into a book ha...

I mainly did Docker custom packaging since we are developing in devcontainer.json, so it was sorta easy to just write this for syncing local to kestra container ci/cd-wise

function kestra:deploy() {
    : "Deploys Kestra Flows to Compute Engine VM and Custom Image to Artifact Registry"

    # Deploy Kestra Flows
    ./kestra flow namespace update prod _flows --server http://$KESTRA_IP:8080 --user REDACTED:$KESTRA_PASSWORD

    # Deploy Custom Image
    docker build -t kestra-custom-image -f Dockerfile .
    docker tag kestra-custom-image us-central1-docker.pkg.dev/REDACTED/kestra-test/kestra-test:latest
    docker push us-central1-docker.pkg.dev/REDACTED/kestra-test/kestra-test:latest
}

I am sorta seeing what you are saying as I adventure into dbt core, I think.

Instead of using Docker for that, I am going to do recommended Git Clone of repo and then run dbt build task with inputs.


Hmm, I think I might keep going containers since there is a automation scraper thing I will move over to this, and that requires Playwright/Firefox/not-normal dependencies.

But out of curiousity are namespace files cloned from git, or just stored in server instance? And then the beforeCommand would just pip install the requirements.txt, or I could could generate with poetry even (using poetry :D)

And, lastly, to end my novel on a great note: when I took out 'Logging/Print' logic from the sub tasks, everything is working 100% of the time, consistency (see image of all 14 or 15 jobs running parallel, woohoo kestra). I am going to hopefully get a full all accounts run over the weekend, still want to add concurrent features since it's using 2+ CPUs on these fetches.. why not ⚡ 🚀 .

Screenshot 2024-05-09 at 5 36 03 PM


TLDR: So I found a solution, but I think still think Python logging/stdouting syncing from Google Cloud Batch Parallel VMs to Kestra Server UI is an issue that should stay raised, or I am doing something wrong with the VM settings, like not enough resources or something (which I did check I think in GCP monitoring). But, I know you all have a ton of priorities/this can wait as it's not blocking me now, but when you all do get to it, I am more than happy to share more/any extra needed details or anything 🙏 . Also, I could totally see a situation where all of logging not being in Kestra, but being in GCP Logging is fine/preferrable, since it will just take up rapid space on Postgres JDBC for me.

sorry that was long... just having so much fun :) !

@anna-geller
Copy link
Member

Great news! Let me answer in bullet points in case you have follow-up questions:

  1. Namespace files are stored in internal storage. E.g. if you configured GCS as local storage, namespace files created/imported from the UI or from CI/CD are stored in your GCS bucket and kestra stores only metadata about them in the DB
  2. You can use whatever commands you want in beforeCommands incl. poetry, pip install etc.
  3. Yup, 100%, we will look into the log/stdout capturing issue with the GcpBatch runner
  4. Regarding Postgres space, check out the Purge task + blueprint https://kestra.io/blueprints/202-purge-execution-data-including-logs-metrics-and-outputs-on-a-schedule - it's a matter of a single flow to clean the DB space.

@tchiotludo
Copy link
Member

  • we have a log service that must be used, as it used on the VertexAI integration, no need to reinvent the wheel (actually, this full logic is rewrite on each task runner).
  • maybe the service account will not have the right to read the cloud logging service, this need to be catch, see this screenshot

image

@loicmathieu
Copy link
Member

@japerry911 to summarize the issue, this is not with logging per say but only when The Python Kestra library send outputs via Kestra.outputs({"uuid_str": uuid_str}) inside subflows that are launched inside a Parallel task.

In this case, the log of the output is not shown in the log of the subflow execution right?

@japerry911
Copy link
Author

@loicmathieu Hello!

The Logging is an issue along with the outputs too. It does not consistently sync or show on Kestra UI.

@loicmathieu
Copy link
Member

I can reproduce it so I'll have a look

loicmathieu added a commit to kestra-io/plugin-gcp that referenced this issue May 17, 2024
Logs can be available in the log stream after the job ends as everything is anynchronous so we don't have any other option than to wait a little for them to arrives. This wait time is configurable and set to a conservative value of 5s.

Note that calling cancel() on the stream will make the stream iterator to return false to hasNext() effectively ending the stream, no need for a boolean for that.

Fixes kestra-io/kestra#3695
loicmathieu added a commit to kestra-io/plugin-gcp that referenced this issue May 17, 2024
Logs can be available in the log stream after the job ends as everything is anynchronous so we don't have any other option than to wait a little for them to arrives. This wait time is configurable and set to a conservative value of 5s.

Note that calling cancel() on the stream will make the stream iterator to return false to hasNext() effectively ending the stream, no need for a boolean for that.

Fixes kestra-io/kestra#3695
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants