Skip to content

Commit

Permalink
Merge pull request #1736 from CartoDB/release/1.2.2
Browse files Browse the repository at this point in the history
Release/1.2.2
  • Loading branch information
Mmoncadaisla committed Jun 16, 2021
2 parents 98e0dae + b686160 commit 448a7c7
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 42 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Expand Up @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.2.2] - 2021-06-16

### Changed

- Perform cartodbfication step after COPY from for performance improvement (#1735)
- Use UDF strategy on drop-add columns to avoid Batch API payload limit (#1734)

## [1.2.1] - 2021-04-28

### Changed
Expand Down
8 changes: 4 additions & 4 deletions README.rst
Expand Up @@ -4,8 +4,8 @@ CARTOframes

.. image:: https://travis-ci.org/CartoDB/cartoframes.svg?branch=develop
:target: https://travis-ci.org/CartoDB/cartoframes
.. image:: https://img.shields.io/badge/pypi-v1.2.1-orange
:target: https://pypi.org/project/cartoframes/1.2.1
.. image:: https://img.shields.io/badge/pypi-v1.2.2-orange
:target: https://pypi.org/project/cartoframes/1.2.2

A Python package for integrating `CARTO <https://carto.com/>`__ maps, analysis, and data services into data science workflows.

Expand All @@ -14,11 +14,11 @@ Python data analysis workflows often rely on the de facto standards `pandas <htt
Try it Out
==========

* Stable (1.2.1): |stable|
* Stable (1.2.2): |stable|
* Latest (develop branch): |develop|

.. |stable| image:: https://mybinder.org/badge_logo.svg
:target: https://mybinder.org/v2/gh/cartodb/cartoframes/v1.2.1?filepath=examples
:target: https://mybinder.org/v2/gh/cartodb/cartoframes/v1.2.2?filepath=examples

.. |develop| image:: https://mybinder.org/badge_logo.svg
:target: https://mybinder.org/v2/gh/cartodb/cartoframes/develop?filepath=examples
Expand Down
2 changes: 1 addition & 1 deletion cartoframes/_version.py
@@ -1 +1 @@
__version__ = '1.2.1'
__version__ = '1.2.2'
4 changes: 2 additions & 2 deletions cartoframes/data/services/service.py
@@ -1,7 +1,7 @@
import uuid
from collections import namedtuple

from ...io.managers.context_manager import ContextManager
from ...utils.utils import create_tmp_name

SERVICE_KEYS = ('hires_geocoder', 'isolines')
QUOTA_INFO_KEYS = ('monthly_quota', 'used_quota', 'soft_limit', 'provider')
Expand Down Expand Up @@ -48,7 +48,7 @@ def _schema(self):
return self._context_manager.get_schema()

def _new_temporary_table_name(self, base=None):
return (base or 'table') + '_' + uuid.uuid4().hex[:10]
return create_tmp_name(base=base or 'table')

def _execute_query(self, query):
return self._context_manager.execute_query(query)
Expand Down
144 changes: 112 additions & 32 deletions cartoframes/io/managers/context_manager.py
Expand Up @@ -15,11 +15,13 @@
from ...auth.defaults import get_default_credentials
from ...utils.logger import log
from ...utils.geom_utils import encode_geometry_ewkb
from ...utils.utils import is_sql_query, check_credentials, encode_row, map_geom_type, PG_NULL, double_quote
from ...utils.utils import (is_sql_query, check_credentials, encode_row, map_geom_type, PG_NULL, double_quote,
create_tmp_name)
from ...utils.columns import (get_dataframe_columns_info, get_query_columns_info, obtain_converters, date_columns_names,
normalize_name)

DEFAULT_RETRY_TIMES = 3
BATCH_API_PAYLOAD_THRESHOLD = 12000


def retry_copy(func):
Expand Down Expand Up @@ -96,33 +98,38 @@ def copy_from(self, gdf, table_name, if_exists='fail', cartodbfy=True,

if self._compare_columns(df_columns, table_columns):
# Equal columns: truncate table
self._truncate_table(table_name, schema, cartodbfy)
self._truncate_table(table_name, schema)
else:
# Diff columns: truncate table and drop + add columns
self._truncate_and_drop_add_columns(
table_name, schema, df_columns, table_columns, cartodbfy)
table_name, schema, df_columns, table_columns)

elif if_exists == 'fail':
raise Exception('Table "{schema}.{table_name}" already exists in your CARTO account. '
'Please choose a different `table_name` or use '
'if_exists="replace" to overwrite it.'.format(
table_name=table_name, schema=schema))
else: # 'append'
pass
cartodbfy = False
else:
self._create_table_from_columns(table_name, schema, df_columns, cartodbfy)
self._create_table_from_columns(table_name, schema, df_columns)

self._copy_from(gdf, table_name, df_columns, retry_times)

if cartodbfy is True:
cartodbfy_query = _cartodbfy_query(table_name, schema)
self.execute_long_running_query(cartodbfy_query)

return table_name

def create_table_from_query(self, query, table_name, if_exists, cartodbfy=True):
def create_table_from_query(self, query, table_name, if_exists):
schema = self.get_schema()
table_name = self.normalize_table_name(table_name)

if self.has_table(table_name, schema):
if if_exists == 'replace':
# TODO: review logic copy_from
self._drop_create_table_from_query(table_name, schema, query, cartodbfy)
self._drop_create_table_from_query(table_name, schema, query)
elif if_exists == 'fail':
raise Exception('Table "{schema}.{table_name}" already exists in your CARTO account. '
'Please choose a different `table_name` or use '
Expand All @@ -131,7 +138,7 @@ def create_table_from_query(self, query, table_name, if_exists, cartodbfy=True):
else: # 'append'
pass
else:
self._drop_create_table_from_query(table_name, schema, query, cartodbfy)
self._drop_create_table_from_query(table_name, schema, query)

return table_name

Expand Down Expand Up @@ -159,6 +166,25 @@ def delete_table(self, table_name):
output = self.execute_query(query)
return not('notices' in output and 'does not exist' in output['notices'][0])

def _delete_function(self, function_name):
query = _drop_function_query(function_name)
self.execute_query(query)
return function_name

def _create_function(self, schema, statement,
function_name=None, columns_types=None, return_value='VOID', language='plpgsql'):
function_name = function_name or create_tmp_name(base='tmp_func')
safe_schema = double_quote(schema)
query, qualified_func_name = _create_function_query(
schema=safe_schema,
function_name=function_name,
statement=statement,
columns_types=columns_types or '',
return_value=return_value,
language=language)
self.execute_query(query)
return qualified_func_name

def rename_table(self, table_name, new_table_name, if_exists='fail'):
new_table_name = self.normalize_table_name(new_table_name)

Expand Down Expand Up @@ -270,37 +296,59 @@ def _compare_columns(self, a, b):

return a_copy == b_copy

def _drop_create_table_from_query(self, table_name, schema, query, cartodbfy):
def _drop_create_table_from_query(self, table_name, schema, query):
log.debug('DROP + CREATE table "{}"'.format(table_name))
query = 'BEGIN; {drop}; {create}; {cartodbfy}; COMMIT;'.format(
query = 'BEGIN; {drop}; {create}; COMMIT;'.format(
drop=_drop_table_query(table_name),
create=_create_table_from_query_query(table_name, query),
cartodbfy=_cartodbfy_query(table_name, schema) if cartodbfy else '')
create=_create_table_from_query_query(table_name, query))
self.execute_long_running_query(query)

def _create_table_from_columns(self, table_name, schema, columns, cartodbfy):
def _create_table_from_columns(self, table_name, schema, columns):
log.debug('CREATE table "{}"'.format(table_name))
query = 'BEGIN; {create}; {cartodbfy}; COMMIT;'.format(
create=_create_table_from_columns_query(table_name, columns),
cartodbfy=_cartodbfy_query(table_name, schema) if cartodbfy else '')
query = 'BEGIN; {create}; COMMIT;'.format(
create=_create_table_from_columns_query(table_name, columns))
self.execute_query(query)

def _truncate_table(self, table_name, schema, cartodbfy):
def _truncate_table(self, table_name, schema):
log.debug('TRUNCATE table "{}"'.format(table_name))
query = 'BEGIN; {truncate}; {cartodbfy}; COMMIT;'.format(
truncate=_truncate_table_query(table_name),
cartodbfy=_cartodbfy_query(table_name, schema) if cartodbfy else '')
query = 'BEGIN; {truncate}; COMMIT;'.format(
truncate=_truncate_table_query(table_name))
self.execute_query(query)

def _truncate_and_drop_add_columns(self, table_name, schema, df_columns, table_columns, cartodbfy):
def _truncate_and_drop_add_columns(self, table_name, schema, df_columns, table_columns):
log.debug('TRUNCATE AND DROP + ADD columns table "{}"'.format(table_name))
query = '{regenerate}; BEGIN; {truncate}; {drop_columns}; {add_columns}; {cartodbfy}; COMMIT;'.format(
drop_columns = _drop_columns_query(table_name, table_columns)
add_columns = _add_columns_query(table_name, df_columns)

drop_add_columns = 'ALTER TABLE {table_name} {drop_columns},{add_columns};'.format(
table_name=table_name, drop_columns=drop_columns, add_columns=add_columns)

query = '{regenerate}; BEGIN; {truncate}; {drop_add_columns}; COMMIT;'.format(
regenerate=_regenerate_table_query(table_name, schema) if self._check_regenerate_table_exists() else '',
truncate=_truncate_table_query(table_name),
drop_columns=_drop_columns_query(table_name, table_columns),
add_columns=_add_columns_query(table_name, df_columns),
cartodbfy=_cartodbfy_query(table_name, schema) if cartodbfy else '')
self.execute_long_running_query(query)
drop_add_columns=drop_add_columns)

query_length_over_threshold = len(query) > BATCH_API_PAYLOAD_THRESHOLD

if query_length_over_threshold:
qualified_func_name = self._create_function(
schema=schema, statement=drop_add_columns)
drop_add_func_sql = 'SELECT {}'.format(qualified_func_name)
query = '''
{regenerate};
BEGIN;
{truncate};
{drop_add_func_sql};
COMMIT;'''.format(
regenerate=_regenerate_table_query(
table_name, schema) if self._check_regenerate_table_exists() else '',
truncate=_truncate_table_query(table_name),
drop_add_func_sql=drop_add_func_sql)
try:
self.execute_long_running_query(query)
finally:
if query_length_over_threshold:
self._delete_function(qualified_func_name)

def compute_query(self, source, schema=None):
if is_sql_query(source):
Expand Down Expand Up @@ -401,25 +449,57 @@ def _drop_table_query(table_name, if_exists=True):
if_exists='IF EXISTS' if if_exists else '')


def _drop_function_query(function_name, columns_types=None, if_exists=True):
if columns_types and not isinstance(columns_types, dict):
raise ValueError('The columns_types parameter should be a dictionary of column names and types.')
columns_types = columns_types or {}
columns = ['{0} {1}'.format(cname, ctype) for cname, ctype in columns_types.items()]
columns_str = ','.join(columns)
return 'DROP FUNCTION {if_exists} {function_name}{columns_str_call}'.format(
function_name=function_name,
if_exists='IF EXISTS' if if_exists else '',
columns_str_call='({columns_str})'.format(columns_str=columns_str) if columns else '')


def _truncate_table_query(table_name):
return 'TRUNCATE TABLE {table_name}'.format(
table_name=table_name)


def _create_function_query(schema, function_name, statement, columns_types, return_value, language):
if columns_types and not isinstance(columns_types, dict):
raise ValueError('The columns_types parameter should be a dictionary of column names and types.')
columns_types = columns_types or {}
columns = ['{0} {1}'.format(cname, ctype) for cname, ctype in columns_types.items()]
columns_str = ','.join(columns) if columns else ''
function_query = '''
CREATE FUNCTION {schema}.{function_name}({columns_str})
RETURNS {return_value} AS $$
BEGIN
{statement}
END;
$$ LANGUAGE {language}
'''.format(schema=schema,
function_name=function_name,
statement=statement,
columns_str=columns_str,
return_value=return_value,
language=language)
qualified_func_name = '{schema}.{function_name}({columns_str})'.format(
schema=schema, function_name=function_name, columns_str=columns_str)
return function_query, qualified_func_name


def _drop_columns_query(table_name, columns):
columns = ['DROP COLUMN {name}'.format(name=double_quote(c.dbname))
for c in columns if _not_reserved(c.dbname)]
return 'ALTER TABLE {table_name} {drop_columns}'.format(
table_name=table_name,
drop_columns=','.join(columns))
return ','.join(columns)


def _add_columns_query(table_name, columns):
columns = ['ADD COLUMN {name} {type}'.format(name=double_quote(c.dbname), type=c.dbtype)
for c in columns if _not_reserved(c.dbname)]
return 'ALTER TABLE {table_name} {add_columns}'.format(
table_name=table_name,
add_columns=','.join(columns))
return ','.join(columns)


def _not_reserved(column):
Expand Down
6 changes: 6 additions & 0 deletions cartoframes/utils/utils.py
Expand Up @@ -401,6 +401,12 @@ def create_hash(value):
return hashlib.md5(str(value).encode()).hexdigest()


def create_tmp_name(base=None):
"""Create temporary name using uuid"""
from uuid import uuid4
return (base + '_' if base else '') + uuid4().hex[:10]


def extract_viz_columns(viz):
"""Extract columns prop('name') in viz"""
columns = []
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/io/managers/test_context_manager.py
Expand Up @@ -73,7 +73,7 @@ def test_copy_from(self, mocker):

# Then
mock_create_table.assert_called_once_with('''
BEGIN; CREATE TABLE table_name ("a" bigint); SELECT CDB_CartodbfyTable(\'schema\', \'table_name\'); COMMIT;
BEGIN; CREATE TABLE table_name ("a" bigint); COMMIT;
'''.strip())
mock.assert_called_once_with(df, 'table_name', columns, DEFAULT_RETRY_TIMES)

Expand Down Expand Up @@ -108,7 +108,7 @@ def test_copy_from_exists_replace_truncate_and_drop_add_columns(self, mocker):
cm.copy_from(df, 'TABLE NAME', 'replace')

# Then
mock.assert_called_once_with('table_name', 'schema', columns, [], True)
mock.assert_called_once_with('table_name', 'schema', columns, [])

def test_copy_from_exists_replace_truncate(self, mocker):
# Given
Expand All @@ -124,7 +124,7 @@ def test_copy_from_exists_replace_truncate(self, mocker):
cm.copy_from(df, 'TABLE NAME', 'replace')

# Then
mock.assert_called_once_with('table_name', 'schema', True)
mock.assert_called_once_with('table_name', 'schema')

def test_internal_copy_from(self, mocker):
# Given
Expand Down

0 comments on commit 448a7c7

Please sign in to comment.