Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger DAG will get (_mysql_exceptions.IntegrityError) Duplicate entry 'xxxxx' for key 'dag_id' #285

Open
YouZhengChuan opened this issue Nov 21, 2019 · 1 comment

Comments

@YouZhengChuan
Copy link

YouZhengChuan commented Nov 21, 2019

i have a dag: "pcdn_export_agg_peak_daily", it will be trigger dag "pcdn_export_agg_peak" 3 times by difference run_id and dag_run.conf

but when dag: "pcdn_export_agg_peak_daily" is running end, log show it get erros and exit with code 1:

[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx sqlalchemy.exc.IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'pcdn_export_agg_peak.split_to_agg_9.pcdn_agg-2019-11-21 09:47:00' for key 'dag_id'")
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx [SQL: INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)]
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx [parameters: ('pcdn_export_agg_peak.split_to_agg_9.pcdn_agg', <Pendulum [2019-11-21T09:47:00+00:00]>, datetime.datetime(2019, 11, 21, 9, 47, 56, 409081, tzinfo=<Timezone [UTC]>), None, 'running', 'tri_peak_agg-daily_device_app_tx-for:2019-11-20-on:20191120013000.000000', 1, b'\x80\x04\x95&\x01\x00\x00\x00\x00\x00\x00}\x94(\x8c\x03env\x94\x8c\x03dev\x94\x8c\x08start_ts\x94J\x80=\xd5]\x8c\x06end_ts\x94J\xa4K\xd5]\x8c\tstat_ ... (275 characters truncated) ... \x8c\x06device\x94as\x8c\tlog_level\x94\x8c\x04INFO\x94\x8c\rseries_chunks\x94Kd\x8c\tsp_chunks\x94J@B\x0f\x00\x8c\nsp_schunks\x94J\xa0\x86\x01\x00u.')]
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx (Background on this error at: http://sqlalche.me/e/gkpj)

The error message indicates that there is a duplicate key: "pcdn_export_agg_peak.split_to_agg_9.pcdn_agg-2019-11-21 09:47:00", because I am preparing to insert the data is including these fields: ("dag_id", "execution_date") VALUES is ("pcdn_export_agg_peak.split_to_agg_9.pcdn_agg", <Pendulum [2019-11-21T09:47:00+00:00]>).

my airflow version is v1.10.5, backend meatadata database is mysql.

in the mysql database airflow, show create table dag_run; result is this:

----------------------------------------------------------------+
| dag_run | CREATE TABLE `dag_run` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `dag_id` varchar(250) DEFAULT NULL,
  `execution_date` timestamp(6) NULL DEFAULT NULL,
  `state` varchar(50) DEFAULT NULL,
  `run_id` varchar(250) DEFAULT NULL,
  `external_trigger` tinyint(1) DEFAULT NULL,
  `conf` blob,
  `end_date` timestamp(6) NULL DEFAULT NULL,
  `start_date` timestamp(6) NULL DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `dag_id` (`dag_id`,`execution_date`),
  UNIQUE KEY `dag_id_2` (`dag_id`,`run_id`),
  KEY `dag_id_state` (`dag_id`,`state`)
) ENGINE=InnoDB AUTO_INCREMENT=27 DEFAULT CHARSET=latin1 |

this table has UNIQUE KEY dag_id (dag_id,execution_date), this value is not repeatable.

However, I queried the value of "2019-11-21 09:47:00" to the data of "execution_date" and did not query it.

However, the value of "execution_date" of the data I inserted is "UTC 2019-11-21T09:47:00+00:00". If I convert according to my local time, the value should be "2019-11-21 17:47:00". I queried the value of "2019-11-21 17:47:00" to the data and get 4 result:

mysql> select `id`,`dag_id`,`execution_date`,`state`,`run_id`,`external_trigger`,`end_date`,`start_date` from `dag_run` where `execution_date`='2019-11-21 17:47:00';
+----+--------------------------------------------------+----------------------------+---------+--------------------------------------------------------------------------+------------------+----------+----------------------------+
| id | dag_id                                           | execution_date             | state   | run_id                                                                   | external_trigger | end_date | start_date                 |
+----+--------------------------------------------------+----------------------------+---------+--------------------------------------------------------------------------+------------------+----------+----------------------------+
|  8 | pcdn_export_agg_peak                             | 2019-11-21 17:47:00.000000 | running | tri_peak_agg-daily_device_app_tx-for:2019-11-20-on:20191120013000.000000 |                1 | NULL     | 2019-11-21 17:47:53.924978 |
| 13 | pcdn_export_agg_peak.split_to_agg_9.pcdn_agg     | 2019-11-21 17:47:00.000000 | running | tri_peak_agg-daily_device_app_tx-for:2019-11-20-on:20191120013000.000000 |                1 | NULL     | 2019-11-21 17:47:54.530647 |
| 18 | pcdn_export_agg_peak.split_to_agg_9.partial_sort | 2019-11-21 17:47:00.000000 | running | tri_peak_agg-daily_device_app_tx-for:2019-11-20-on:20191120013000.000000 |                1 | NULL     | 2019-11-21 17:47:55.154685 |
| 23 | pcdn_export_agg_peak.split_to_agg_9              | 2019-11-21 17:47:00.000000 | running | tri_peak_agg-daily_device_app_tx-for:2019-11-20-on:20191120013000.000000 |                1 | NULL     | 2019-11-21 17:47:55.788638 |
+----+--------------------------------------------------+----------------------------+---------+--------------------------------------------------------------------------+------------------+----------+----------------------------+
4 rows in set (0.05 sec)

so, by the log error information, i guess the bug how cause of the failure.

  • airflow query dag_run by UNIQUE KEY pcdn_export_agg_peak.split_to_agg_9.pcdn_agg-2019-11-21 09:47:00 and did not query data.

  • and airflow insert data in to dag_run again, and trigger this bug..

For reference....

Complete error log:

*** Reading local file: /home/work/airflow/logs/pcdn_export_agg_peak_daily/peak_agg.daily_device_app_tx/2019-11-20T01:30:00+00:00/1.log
[2019-11-21 17:47:42,165] {taskinstance.py:620} INFO - Dependencies all met for <TaskInstance: pcdn_export_agg_peak_daily.peak_agg.daily_device_app_tx 2019-11-20T01:30:00+00:00 [queued]>
[2019-11-21 17:47:42,251] {taskinstance.py:620} INFO - Dependencies all met for <TaskInstance: pcdn_export_agg_peak_daily.peak_agg.daily_device_app_tx 2019-11-20T01:30:00+00:00 [queued]>
[2019-11-21 17:47:42,251] {taskinstance.py:838} INFO - 
--------------------------------------------------------------------------------
[2019-11-21 17:47:42,251] {taskinstance.py:839} INFO - Starting attempt 1 of 1
[2019-11-21 17:47:42,251] {taskinstance.py:840} INFO - 
--------------------------------------------------------------------------------
[2019-11-21 17:47:42,475] {taskinstance.py:859} INFO - Executing <Task(TriggerDagRunOperator): peak_agg.daily_device_app_tx> on 2019-11-20T01:30:00+00:00
[2019-11-21 17:47:42,475] {base_task_runner.py:133} INFO - Running: ['airflow', 'run', 'pcdn_export_agg_peak_daily', 'peak_agg.daily_device_app_tx', '2019-11-20T01:30:00+00:00', '--job_id', '2', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/pcdn_export_agg_peak_daily.py', '--cfg_path', '/tmp/tmpvw0s95my']
[2019-11-21 17:47:43,148] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx [2019-11-21 17:47:43,148] {settings.py:213} INFO - settings.configure_orm(): Using pool settings. pool_size=10, max_overflow=20, pool_recycle=1800, pid=7815
[2019-11-21 17:47:43,460] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx [2019-11-21 17:47:43,458] {__init__.py:51} INFO - Using executor CeleryExecutor
[2019-11-21 17:47:44,505] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx [2019-11-21 17:47:44,504] {dagbag.py:90} INFO - Filling up the DagBag from /home/work/pcdn/offline_agg/dags/pcdn_export_agg_peak_daily.py
[2019-11-21 17:47:45,367] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx /home/work/pcdn/src/lib/conf_manager.py:21: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details.
[2019-11-21 17:47:45,367] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   self._main_cf = yaml.load(fstream)
[2019-11-21 17:47:45,389] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx /home/work/pcdn/src/lib/conf_manager.py:24: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details.
[2019-11-21 17:47:45,389] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   self._base_cf = yaml.load(bstream) or {}
[2019-11-21 17:47:46,885] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx [2019-11-21 17:47:46,884] {cli.py:516} INFO - Running <TaskInstance: pcdn_export_agg_peak_daily.peak_agg.daily_device_app_tx 2019-11-20T01:30:00+00:00 [running]> on host jh02-db2.jh02
[2019-11-21 17:47:47,293] {logging_mixin.py:95} INFO - context: {'dag': <DAG: pcdn_export_agg_peak_daily>, 'ds': '2019-11-20', 'next_ds': '2019-11-21', 'next_ds_nodash': '20191121', 'prev_ds': '2019-11-19', 'prev_ds_nodash': '20191119', 'ds_nodash': '20191120', 'ts': '2019-11-20T01:30:00+00:00', 'ts_nodash': '20191120T013000', 'ts_nodash_with_tz': '20191120T013000+0000', 'yesterday_ds': '2019-11-19', 'yesterday_ds_nodash': '20191119', 'tomorrow_ds': '2019-11-21', 'tomorrow_ds_nodash': '20191121', 'END_DATE': '2019-11-20', 'end_date': '2019-11-20', 'dag_run': <DagRun pcdn_export_agg_peak_daily @ 2019-11-20 01:30:00+00:00: scheduled__2019-11-20T01:30:00+00:00, externally triggered: False>, 'run_id': 'scheduled__2019-11-20T01:30:00+00:00', 'execution_date': <Pendulum [2019-11-20T01:30:00+00:00]>, 'prev_execution_date': <Pendulum [2019-11-19T01:30:00+00:00]>, 'prev_execution_date_success': <Proxy at 0x7fda2a4d7548 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7fda3f7d02f0>>, 'prev_start_date_success': <Proxy at 0x7fda2a4d7508 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7fda2a434d90>>, 'next_execution_date': <Pendulum [2019-11-21T01:30:00+00:00]>, 'latest_date': '2019-11-20', 'macros': <module 'airflow.macros' from '/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/macros/__init__.py'>, 'params': {'stat_cfg': {'env': 'dev', 'start_ts': 1574256000, 'end_ts': 1574259620, 'stat_name': 'daily_device_app_tx', 'exporter': {'database': 'aegis', 'retention': 'one_week', 'measurements': 'docker_container', 'fields': 'tx,rx'}, 'order_by': {'tags': ['device']}, 'log_level': 'INFO', 'series_chunks': 100, 'sp_chunks': 1000000, 'sp_schunks': 100000}}, 'tables': None, 'task': <Task(TriggerDagRunOperator): peak_agg.daily_device_app_tx>, 'task_instance': <TaskInstance: pcdn_export_agg_peak_daily.peak_agg.daily_device_app_tx 2019-11-20T01:30:00+00:00 [running]>, 'ti': <TaskInstance: pcdn_export_agg_peak_daily.peak_agg.daily_device_app_tx 2019-11-20T01:30:00+00:00 [running]>, 'task_instance_key_str': 'pcdn_export_agg_peak_daily__peak_agg.daily_device_app_tx__20191120', 'conf': <module 'airflow.configuration' from '/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/configuration.py'>, 'test_mode': False, 'var': {'value': None, 'json': None}, 'inlets': [], 'outlets': []}
[2019-11-21 17:47:47,293] {logging_mixin.py:95} INFO - dag config: {'env': 'dev', 'start_ts': 1574256000, 'end_ts': 1574259620, 'stat_name': 'daily_device_app_tx', 'exporter': {'database': 'aegis', 'retention': 'one_week', 'measurements': 'docker_container', 'fields': 'tx,rx'}, 'order_by': {'tags': ['device']}, 'log_level': 'INFO', 'series_chunks': 100, 'sp_chunks': 1000000, 'sp_schunks': 100000}
[2019-11-21 17:47:47,293] {logging_mixin.py:95} INFO - run_dag_id: tri_peak_agg-daily_device_app_tx-for:2019-11-20-on:20191120013000.000000
[2019-11-21 17:47:47,498] {logging_mixin.py:95} INFO - [�[34m2019-11-21 17:47:47,498�[0m] {�[34mdagbag.py:�[0m90} INFO�[0m - Filling up the DagBag from �[1m/home/work/pcdn/offline_agg/dags/pcdn_export_agg_peak_once.py�[0m�[0m
[2019-11-21 17:47:56,478] {taskinstance.py:1051} ERROR - (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'pcdn_export_agg_peak.split_to_agg_9.pcdn_agg-2019-11-21 09:47:00' for key 'dag_id'")
[SQL: INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)]
[parameters: ('pcdn_export_agg_peak.split_to_agg_9.pcdn_agg', <Pendulum [2019-11-21T09:47:00+00:00]>, datetime.datetime(2019, 11, 21, 9, 47, 56, 409081, tzinfo=<Timezone [UTC]>), None, 'running', 'tri_peak_agg-daily_device_app_tx-for:2019-11-20-on:20191120013000.000000', 1, b'\x80\x04\x95&\x01\x00\x00\x00\x00\x00\x00}\x94(\x8c\x03env\x94\x8c\x03dev\x94\x8c\x08start_ts\x94J\x80=\xd5]\x8c\x06end_ts\x94J\xa4K\xd5]\x8c\tstat_ ... (275 characters truncated) ... \x8c\x06device\x94as\x8c\tlog_level\x94\x8c\x04INFO\x94\x8c\rseries_chunks\x94Kd\x8c\tsp_chunks\x94J@B\x0f\x00\x8c\nsp_schunks\x94J\xa0\x86\x01\x00u.')]
(Background on this error at: http://sqlalche.me/e/gkpj)
Traceback (most recent call last):
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1244, in _execute_context
    cursor, statement, parameters, context
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 552, in do_execute
    cursor.execute(statement, parameters)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 255, in execute
    self.errorhandler(self, exc, value)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
    raise errorvalue
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 252, in execute
    res = self._query(query)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 378, in _query
    db.query(q)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/connections.py", line 280, in query
    _mysql.connection.query(self, query)
_mysql_exceptions.IntegrityError: (1062, "Duplicate entry 'pcdn_export_agg_peak.split_to_agg_9.pcdn_agg-2019-11-21 09:47:00' for key 'dag_id'")

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 926, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/operators/dagrun_operator.py", line 95, in execute
    replace_microseconds=False)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", line 126, in trigger_dag
    replace_microseconds=replace_microseconds,
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", line 90, in _trigger_dag
    external_trigger=True,
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/models/dag.py", line 1286, in create_dagrun
    session.commit()
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1027, in commit
    self.transaction.commit()
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 494, in commit
    self._prepare_impl()
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 473, in _prepare_impl
    self.session.flush()
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2459, in flush
    self._flush(objects)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2597, in _flush
    transaction.rollback(_capture_exception=True)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
    raise value
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2557, in _flush
    flush_context.execute()
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
    rec.execute(self)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
    uow,
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
    insert,
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 1138, in _emit_insert_statements
    statement, params
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 988, in execute
    return meth(self, multiparams, params)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1107, in _execute_clauseelement
    distilled_params,
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1248, in _execute_context
    e, statement, parameters, cursor, context
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1466, in _handle_dbapi_exception
    util.raise_from_cause(sqlalchemy_exception, exc_info)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 152, in reraise
    raise value.with_traceback(tb)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1244, in _execute_context
    cursor, statement, parameters, context
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 552, in do_execute
    cursor.execute(statement, parameters)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 255, in execute
    self.errorhandler(self, exc, value)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
    raise errorvalue
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 252, in execute
    res = self._query(query)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 378, in _query
    db.query(q)
  File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/connections.py", line 280, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'pcdn_export_agg_peak.split_to_agg_9.pcdn_agg-2019-11-21 09:47:00' for key 'dag_id'")
[SQL: INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)]
[parameters: ('pcdn_export_agg_peak.split_to_agg_9.pcdn_agg', <Pendulum [2019-11-21T09:47:00+00:00]>, datetime.datetime(2019, 11, 21, 9, 47, 56, 409081, tzinfo=<Timezone [UTC]>), None, 'running', 'tri_peak_agg-daily_device_app_tx-for:2019-11-20-on:20191120013000.000000', 1, b'\x80\x04\x95&\x01\x00\x00\x00\x00\x00\x00}\x94(\x8c\x03env\x94\x8c\x03dev\x94\x8c\x08start_ts\x94J\x80=\xd5]\x8c\x06end_ts\x94J\xa4K\xd5]\x8c\tstat_ ... (275 characters truncated) ... \x8c\x06device\x94as\x8c\tlog_level\x94\x8c\x04INFO\x94\x8c\rseries_chunks\x94Kd\x8c\tsp_chunks\x94J@B\x0f\x00\x8c\nsp_schunks\x94J\xa0\x86\x01\x00u.')]
(Background on this error at: http://sqlalche.me/e/gkpj)
[2019-11-21 17:47:56,485] {taskinstance.py:1082} INFO - Marking task as FAILED.
[2019-11-21 17:47:56,816] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx Traceback (most recent call last):
[2019-11-21 17:47:56,816] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1244, in _execute_context
[2019-11-21 17:47:56,816] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     cursor, statement, parameters, context
[2019-11-21 17:47:56,816] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 552, in do_execute
[2019-11-21 17:47:56,817] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     cursor.execute(statement, parameters)
[2019-11-21 17:47:56,817] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 255, in execute
[2019-11-21 17:47:56,817] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     self.errorhandler(self, exc, value)
[2019-11-21 17:47:56,817] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
[2019-11-21 17:47:56,817] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     raise errorvalue
[2019-11-21 17:47:56,817] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 252, in execute
[2019-11-21 17:47:56,817] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     res = self._query(query)
[2019-11-21 17:47:56,817] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 378, in _query
[2019-11-21 17:47:56,818] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     db.query(q)
[2019-11-21 17:47:56,818] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/connections.py", line 280, in query
[2019-11-21 17:47:56,818] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     _mysql.connection.query(self, query)
[2019-11-21 17:47:56,818] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx _mysql_exceptions.IntegrityError: (1062, "Duplicate entry 'pcdn_export_agg_peak.split_to_agg_9.pcdn_agg-2019-11-21 09:47:00' for key 'dag_id'")
[2019-11-21 17:47:56,818] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx 
[2019-11-21 17:47:56,818] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx The above exception was the direct cause of the following exception:
[2019-11-21 17:47:56,818] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx 
[2019-11-21 17:47:56,818] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx Traceback (most recent call last):
[2019-11-21 17:47:56,819] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/bin/airflow", line 32, in <module>
[2019-11-21 17:47:56,819] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     args.func(args)
[2019-11-21 17:47:56,819] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-11-21 17:47:56,819] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     return f(*args, **kwargs)
[2019-11-21 17:47:56,819] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/bin/cli.py", line 522, in run
[2019-11-21 17:47:56,819] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     _run(args, dag, ti)
[2019-11-21 17:47:56,819] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/bin/cli.py", line 440, in _run
[2019-11-21 17:47:56,819] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     pool=args.pool,
[2019-11-21 17:47:56,819] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
[2019-11-21 17:47:56,820] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     return func(*args, **kwargs)
[2019-11-21 17:47:56,820] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 926, in _run_raw_task
[2019-11-21 17:47:56,820] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     result = task_copy.execute(context=context)
[2019-11-21 17:47:56,820] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/operators/dagrun_operator.py", line 95, in execute
[2019-11-21 17:47:56,820] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     replace_microseconds=False)
[2019-11-21 17:47:56,820] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", line 126, in trigger_dag
[2019-11-21 17:47:56,820] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     replace_microseconds=replace_microseconds,
[2019-11-21 17:47:56,820] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", line 90, in _trigger_dag
[2019-11-21 17:47:56,821] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     external_trigger=True,
[2019-11-21 17:47:56,821] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
[2019-11-21 17:47:56,821] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     return func(*args, **kwargs)
[2019-11-21 17:47:56,821] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/airflow/models/dag.py", line 1286, in create_dagrun
[2019-11-21 17:47:56,821] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     session.commit()
[2019-11-21 17:47:56,821] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1027, in commit
[2019-11-21 17:47:56,821] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     self.transaction.commit()
[2019-11-21 17:47:56,821] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 494, in commit
[2019-11-21 17:47:56,821] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     self._prepare_impl()
[2019-11-21 17:47:56,822] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 473, in _prepare_impl
[2019-11-21 17:47:56,822] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     self.session.flush()
[2019-11-21 17:47:56,822] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2459, in flush
[2019-11-21 17:47:56,822] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     self._flush(objects)
[2019-11-21 17:47:56,822] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2597, in _flush
[2019-11-21 17:47:56,822] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     transaction.rollback(_capture_exception=True)
[2019-11-21 17:47:56,822] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
[2019-11-21 17:47:56,822] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     compat.reraise(exc_type, exc_value, exc_tb)
[2019-11-21 17:47:56,822] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
[2019-11-21 17:47:56,822] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     raise value
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2557, in _flush
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     flush_context.execute()
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     rec.execute(self)
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     uow,
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     insert,
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 1138, in _emit_insert_statements
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     statement, params
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 988, in execute
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     return meth(self, multiparams, params)
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     return connection._execute_clauseelement(self, multiparams, params)
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1107, in _execute_clauseelement
[2019-11-21 17:47:56,823] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     distilled_params,
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1248, in _execute_context
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     e, statement, parameters, cursor, context
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1466, in _handle_dbapi_exception
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     util.raise_from_cause(sqlalchemy_exception, exc_info)
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     reraise(type(exception), exception, tb=exc_tb, cause=cause)
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 152, in reraise
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     raise value.with_traceback(tb)
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1244, in _execute_context
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     cursor, statement, parameters, context
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 552, in do_execute
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     cursor.execute(statement, parameters)
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 255, in execute
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     self.errorhandler(self, exc, value)
[2019-11-21 17:47:56,824] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     raise errorvalue
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 252, in execute
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     res = self._query(query)
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/cursors.py", line 378, in _query
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     db.query(q)
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx   File "/home/work/pcdn/venv/lib/python3.6/site-packages/MySQLdb/connections.py", line 280, in query
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx     _mysql.connection.query(self, query)
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx sqlalchemy.exc.IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'pcdn_export_agg_peak.split_to_agg_9.pcdn_agg-2019-11-21 09:47:00' for key 'dag_id'")
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx [SQL: INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)]
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx [parameters: ('pcdn_export_agg_peak.split_to_agg_9.pcdn_agg', <Pendulum [2019-11-21T09:47:00+00:00]>, datetime.datetime(2019, 11, 21, 9, 47, 56, 409081, tzinfo=<Timezone [UTC]>), None, 'running', 'tri_peak_agg-daily_device_app_tx-for:2019-11-20-on:20191120013000.000000', 1, b'\x80\x04\x95&\x01\x00\x00\x00\x00\x00\x00}\x94(\x8c\x03env\x94\x8c\x03dev\x94\x8c\x08start_ts\x94J\x80=\xd5]\x8c\x06end_ts\x94J\xa4K\xd5]\x8c\tstat_ ... (275 characters truncated) ... \x8c\x06device\x94as\x8c\tlog_level\x94\x8c\x04INFO\x94\x8c\rseries_chunks\x94Kd\x8c\tsp_chunks\x94J@B\x0f\x00\x8c\nsp_schunks\x94J\xa0\x86\x01\x00u.')]
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx (Background on this error at: http://sqlalche.me/e/gkpj)
[2019-11-21 17:47:57,393] {logging_mixin.py:95} INFO - [�[34m2019-11-21 17:47:57,392�[0m] {�[34mlocal_task_job.py:�[0m105} INFO�[0m - Task exited with return code 1�[0m
@MatteoMart1994
Copy link

Hi @YouZhengChuan, any news on this issue? Have the same problem in composer 1.10.15-airflow1.10.12

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants