New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Logs/Outputs are not consistently showing or being added to Outputs #3695
Comments
Hi Jack, can you share the redacted version of the main flow and all subflows so we can try to reproduce? Thanks in advance |
id: googleAds
namespace: prod
description: This flow fetches and lands data from Google Ads to BigQuery.
inputs:
- id: start_date
type: STRING
required: true
description: The start date to fetch data from Google Ads.
validator: ^\d{4}-\d{2}-\d{2}$
- id: end_date
type: STRING
required: true
description: The end date to fetch data from Google Ads.
validator: ^\d{4}-\d{2}-\d{2}$
tasks:
- id: parallel
type: io.kestra.core.tasks.flows.Parallel
tasks:
- id: fetch_customers_info
type: io.kestra.core.tasks.flows.Subflow
namespace: prod
flowId: fetchAndLoadBqFromGcs
wait: true
inputs:
fetch_command: src/integrations/google_ads/jobs/fetch_customers_info.py
gcs_path_prefix: gs://REDACTED/google_ads/customers_info
destination_table: REDACTED.landing_google_ads.customers_info
- id: fetch_campaigns_info
type: io.kestra.core.tasks.flows.Subflow
namespace: prod
flowId: fetchAndLoadBqFromGcs
wait: true
inputs:
fetch_command: src/integrations/google_ads/jobs/fetch_campaigns_info.py
gcs_path_prefix: gs://REDACTED/google_ads/campaigns_info
destination_table: REDACTED.landing_google_ads.campaigns_info
- id: fetch_campaigns_metrics
type: io.kestra.core.tasks.flows.Subflow
namespace: prod
flowId: fetchAndLoadBqFromGcs
wait: true
inputs:
fetch_command: src/integrations/google_ads/jobs/fetch_campaigns_metrics.py --start_date {{ inputs.start_date }} --end_date {{ inputs.end_date }}
gcs_path_prefix: gs://REDACTED/google_ads/campaigns_metrics
destination_table: REDACTED.landing_google_ads.campaigns_metrics
- id: fetch_campaigns_pmax_video_metrics
type: io.kestra.core.tasks.flows.Subflow
namespace: prod
flowId: fetchAndLoadBqFromGcs
wait: true
inputs:
fetch_command: src/integrations/google_ads/jobs/fetch_campaigns_pmax_video_metrics.py --start_date {{ inputs.start_date }} --end_date {{ inputs.end_date }}
gcs_path_prefix: gs://REDACTEDt/google_ads/campaigns_pmax_video_metrics
destination_table: REDACTED.landing_google_ads.campaigns_pmax_video_metrics
main GoogleAds YAML, Flow that is Triggered id: fetchAndLoadBqFromGcs
namespace: prod
description: This is a sub flow that loads data to BigQuery from GCS after fetching data.
inputs:
- id: fetch_command
type: STRING
required: true
description: The command to fetch data
- id: gcs_path_prefix
type: STRING
required: true
description: The GCS path prefix to load data from
- id: destination_table
type: STRING
required: true
description: The destination table to load data to in BigQuery
tasks:
- id: fetch_data
type: io.kestra.plugin.scripts.python.Commands
beforeCommands:
- export PYTHONPATH=src
- export GCP_PROJECT_ID=REDACTED
commands:
- python {{ inputs.fetch_command }}
- id: load_bq_from_gcs
type: io.kestra.plugin.gcp.bigquery.LoadFromGcs
from:
- "{{ inputs.gcs_path_prefix }}/{{ outputs.fetch_data.vars.uuid_str }}_*.parquet"
destinationTable: "{{ inputs.destination_table }}"
taskDefaults:
- type: io.kestra.plugin.scripts.python.Commands
values:
containerImage: us-central1-docker.pkg.dev/REDACTED/kestra-test/kestra-test:latest
warningOnStdErr: false
taskRunner:
type: io.kestra.plugin.gcp.runner.GcpBatchTaskRunner
projectId: REDACTED
region: us-central1
bucket: REDACTED
- type: io.kestra.plugin.gcp.bigquery.LoadFromGcs
values:
writeDisposition: WRITE_APPEND
createDisposition: CREATE_IF_NEEDED
autodetect: true
format: PARQUET Sub Flow YAMLfrom kestra import Kestra
from integrations.google_ads.fetch_data_and_load_to_gcs import (
fetch_data_and_load_to_gcs,
)
def main():
gaql_query = """
SELECT
customer_client.id
FROM customer_client
WHERE customer_client.manager = False
AND customer_client.status = 'ENABLED'
"""
mcc_ids = ["REDACTED"]
uuid_str = fetch_data_and_load_to_gcs(
customer_ids=mcc_ids,
gaql_query=gaql_query,
job_name="customers_info",
)
Kestra.outputs({"uuid_str": uuid_str})
if __name__ == "__main__":
main() Just a glimpse of where output is used. I basically fire off 4 of these at once and it happens 20-40% of the time, no outut.
Let me know if you need anything else, thanks @anna-geller !! |
I can also try to send more examples if needed of executions in Kestra in a few hours, after I finish some work stuff if that's helpful -- Also for now, I am going to just generate the output before parallel step while we investigate this, which seems to work. So this is not blocking anything for me now :D |
Interesting, I wonder whether the same happens if you add that Python script as a Namespace File instead of baking it into the Artifact Registry container? Generally speaking in kestra, you don't need any complex CI/CD to package your code into containers, you can simply add your code as Namespace Files from the Editor, add (ofc the way you did it should work too so it might be a bug in how we capture the output from that Google Batch Python container, you did nothing wrong) |
@anna-geller , *sorry this turned into a book ha... I mainly did Docker custom packaging since we are developing in devcontainer.json, so it was sorta easy to just write this for syncing local to kestra container ci/cd-wise function kestra:deploy() {
: "Deploys Kestra Flows to Compute Engine VM and Custom Image to Artifact Registry"
# Deploy Kestra Flows
./kestra flow namespace update prod _flows --server http://$KESTRA_IP:8080 --user REDACTED:$KESTRA_PASSWORD
# Deploy Custom Image
docker build -t kestra-custom-image -f Dockerfile .
docker tag kestra-custom-image us-central1-docker.pkg.dev/REDACTED/kestra-test/kestra-test:latest
docker push us-central1-docker.pkg.dev/REDACTED/kestra-test/kestra-test:latest
} I am sorta seeing what you are saying as I adventure into dbt core, I think. Instead of using Docker for that, I am going to do recommended Git Clone of repo and then run dbt build task with inputs. Hmm, I think I might keep going containers since there is a automation scraper thing I will move over to this, and that requires Playwright/Firefox/not-normal dependencies. But out of curiousity are namespace files cloned from git, or just stored in server instance? And then the beforeCommand would just pip install the requirements.txt, or I could could generate with poetry even (using poetry :D) And, lastly, to end my novel on a great note: when I took out 'Logging/Print' logic from the sub tasks, everything is working 100% of the time, consistency (see image of all 14 or 15 jobs running parallel, woohoo kestra). I am going to hopefully get a full all accounts run over the weekend, still want to add concurrent features since it's using 2+ CPUs on these fetches.. why not ⚡ 🚀 . TLDR: So I found a solution, but I think still think Python logging/stdouting syncing from Google Cloud Batch Parallel VMs to Kestra Server UI is an issue that should stay raised, or I am doing something wrong with the VM settings, like not enough resources or something (which I did check I think in GCP monitoring). But, I know you all have a ton of priorities/this can wait as it's not blocking me now, but when you all do get to it, I am more than happy to share more/any extra needed details or anything 🙏 . Also, I could totally see a situation where all of logging not being in Kestra, but being in GCP Logging is fine/preferrable, since it will just take up rapid space on Postgres JDBC for me. sorry that was long... just having so much fun :) ! |
Great news! Let me answer in bullet points in case you have follow-up questions:
|
|
@japerry911 to summarize the issue, this is not with logging per say but only when The Python Kestra library send outputs via In this case, the log of the output is not shown in the log of the subflow execution right? |
@loicmathieu Hello! The Logging is an issue along with the outputs too. It does not consistently sync or show on Kestra UI. |
I can reproduce it so I'll have a look |
Logs can be available in the log stream after the job ends as everything is anynchronous so we don't have any other option than to wait a little for them to arrives. This wait time is configurable and set to a conservative value of 5s. Note that calling cancel() on the stream will make the stream iterator to return false to hasNext() effectively ending the stream, no need for a boolean for that. Fixes kestra-io/kestra#3695
Logs can be available in the log stream after the job ends as everything is anynchronous so we don't have any other option than to wait a little for them to arrives. This wait time is configurable and set to a conservative value of 5s. Note that calling cancel() on the stream will make the stream iterator to return false to hasNext() effectively ending the stream, no need for a boolean for that. Fixes kestra-io/kestra#3695
Describe the issue
I run a YAML flow with 4 tasks in a parallel GCP Cloud Batch Task Runner. These 4 tasks are Sub Flows. It prints logs/adds the single output about 70-80% of the time in Kestra, and 100% in GCP Logs, which makes me think it's something with Kestra core.
For output I run:
For Example of Logs showing partially, but skipping output example:
In this image, it shows my initial PRINT statements showing, but when it gets to
Kestra.outputs({"uuid_str": "..."})
, it does not show in this job, but showed in other jobs.I checked the Docker Compose logs, and saw nothing, and GCP logs don't show a fail or anything.
Let me know if you need more context/details, happy to provide.
Thank you!
Environment
The text was updated successfully, but these errors were encountered: