Skip to content

Commit

Permalink
Merge pull request #61 from prio-data/time_subset
Browse files Browse the repository at this point in the history
Time subset
  • Loading branch information
jimdale committed Apr 23, 2024
2 parents a4da2ac + a8c2e4f commit d290329
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 28 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "viewser"
version = "6.3.2"
version = "6.4.0"
description = "The Views 3 CLI tool"
authors = ["peder2911 <pglandsverk@gmail.com>"]
readme = "README.md"
Expand Down
32 changes: 24 additions & 8 deletions viewser/commands/documentation/cli.py
Expand Up @@ -30,13 +30,23 @@ def list_features(obj: Dict[str, Any], loa: str):
===========
Show all available features at specified loa.
Response currently comes as a string - sending a df fails (empty parquet file) - reason unknown
"""

obj["operations"] = operations.DocumentationCrudOperations(settings.config_get("REMOTE_URL"),
f"features/{loa}/")
response_dict = json.loads(obj["operations"].list())['entries']
response_df = (pd.DataFrame.from_dict(response_dict).drop(columns=['entries', 'data']).reset_index(drop=True).
to_string())
response = requests.get(url=f'{settings.REMOTE_URL}/features/{loa}')

lines = str(response.content.decode()).strip('"').split("\\n")

features = []
loas = []

for line in lines[:-1]:
feature, loa = line.split(',')[0], line.split(',')[1]
features.append(feature)
loas.append(loa)

response_df = pd.DataFrame({'feature': features, 'loa hint': loas})

click.echo(response_df)

Expand All @@ -62,7 +72,10 @@ def list_transforms(obj: Dict[str, Any]):

response = requests.get(url=f'{settings.REMOTE_URL}/transforms')

response_df = pd.read_parquet(io.BytesIO(response.content))
try:
response_df = pd.read_parquet(io.BytesIO(response.content))
except:
response_df = pd.DataFrame()

click.echo(response_df)

Expand All @@ -78,9 +91,12 @@ def show_transform(
which level of analysis it is applicable to.
"""

response = requests.get(f'{settings.REMOTE_URL}/transforms/{loa}')
response = requests.get(url=f'{settings.REMOTE_URL}/transforms/{loa}')

response_df = pd.read_parquet(io.BytesIO(response.content))
try:
response_df = pd.read_parquet(io.BytesIO(response.content))
except:
response_df = pd.DataFrame()

click.echo(response_df)

Expand Down
15 changes: 12 additions & 3 deletions viewser/commands/queryset/cli.py
@@ -1,13 +1,15 @@

import datetime
import io
import pandas as pd
from typing import Optional, Dict, Any

import click
from viewser import settings
from viewser.settings import defaults
from . import operations, formatting


@click.group(name="queryset", short_help="queryset_operations related to querysets")
@click.pass_obj
def cli(ctx_obj: Dict[str, Any]):
Expand All @@ -19,6 +21,7 @@ def cli(ctx_obj: Dict[str, Any]):
ctx_obj["table_formatter"] = formatting.QuerysetTableFormatter()
ctx_obj["detail_formatter"] = formatting.QuerysetDetailFormatter()


@cli.command(name="fetch", short_help="fetch data for a queryset")
@click.argument("name")
@click.argument("out-file", type=click.File("wb"))
Expand All @@ -36,22 +39,28 @@ def queryset_fetch(
"""
ctx_obj["operations"].fetch(name, out_file)


@cli.command(name="list", short_help="show a list of available querysets")
@click.pass_obj
def queryset_list(ctx_obj: Dict[str, Any]):
"""
Show a list of available querysets.
"""
ctx_obj["operations"].list().then(ctx_obj["table_formatter"].formatted).then(click.echo)
result_df = pd.DataFrame(ctx_obj["operations"].list(), columns=['querysets', ])

click.echo(result_df)

@cli.command(name="show", short_help="show details about a queryset")

@cli.command(name="show", short_help="show code for a queryset")
@click.argument("name", type=str)
@click.pass_obj
def queryset_show(ctx_obj: Dict[str, Any], name: str):
"""
Show detailed information about a queryset
"""
ctx_obj["operations"].show(name).then(ctx_obj["detail_formatter"].formatted).then(click.echo).then(click.echo)

click.echo(ctx_obj["operations"].show(name))


@cli.command(name="delete", short_help="delete a queryset")
@click.confirmation_option(prompt="Delete queryset?")
Expand Down
2 changes: 1 addition & 1 deletion viewser/commands/queryset/models/queryset.py
Expand Up @@ -185,5 +185,5 @@ def fetch(self, *args, **kwargs):
Requires a self.push first.
"""
logger.info(f"Fetching queryset {self.name}")
dataset = queryset_operations.fetch(self.name)
dataset = queryset_operations.fetch(self.name, *args, **kwargs)
return dataset
122 changes: 107 additions & 15 deletions viewser/commands/queryset/operations.py
Expand Up @@ -34,7 +34,7 @@ def __init__(self,
self._max_retries = max_retries
self._error_handler = error_handler if error_handler else error_handling.ErrorDumper([])

def fetch(self, queryset_name: str) -> pd.DataFrame:
def fetch(self, queryset_name: str, start_date: str = None, end_date: str = None) -> pd.DataFrame:
"""
fetch
=====
Expand All @@ -48,31 +48,121 @@ def fetch(self, queryset_name: str) -> pd.DataFrame:
"""

if bool(start_date) != bool(end_date):
raise RuntimeError(f'You must specify either both or neither of start_date and end_date')

if start_date is not None:
try:
start_date = int(start_date)
end_date = int(end_date)
except:
raise RuntimeError(f'Unable to cast start and/or end date values {start_date, end_date} to integer')

if start_date < 1 or end_date < 1:
raise RuntimeError(f'Start and/or end date values {start_date, end_date} less than 1')

if start_date > end_date:
raise RuntimeError(f'Start date {start_date} bigger than end date {end_date}')

f = self._fetch(
self._max_retries,
self._remote_url,
queryset_name,
start_date,
end_date
)

return f

def list(self) -> queryset_list.QuerysetList:
def list(self):
"""
list
====
returns:
Optional[List[str]]: Returns a list of queryset name if operation succeeds.
Returns a list of queryset names if operation succeeds.
"""

response = requests.request(method="GET", url=f'{self._remote_url}/querysets')

return response.json()['querysets']

def qs_json_to_code(self, json_):

allowed_fields = ['name', 'loa', 'description', 'themes', 'operations']
allowed_namespaces = ['base', 'trf']

for key in json_.keys():
if key not in allowed_fields:
raise RuntimeError(f'Queryset json contains unrecognised field: {key}')

lines = []

tab = ' '

line = f"(Queryset('{json_['name']}','{json_['loa']}')"

lines.append(line)

ops = json_['operations']

for column in ops:
for op in column[::-1]:
if op['namespace'] not in allowed_namespaces:
raise RuntimeError(f"Queryset operation contains unrecognised namespace: {op['namespace']}")
if op['namespace'] == 'base':
for op2 in column:
if op2['namespace'] == 'trf' and op2['name'] == 'util.rename': rename = op2['arguments'][0]
loa, name = op['name'].split('.')

line = f"{tab}.with_column(Column('{rename}', from_loa='{loa}', from_column='{name}')"
lines.append(line)
if op['arguments'][0] != 'values':
arg = op['arguments'][0]
line = f"{tab}{tab}.aggregate('{arg}')"
lines.append(line)

if op['namespace'] == 'trf' and op['name'] != 'util.rename':
args = ','.join(op['arguments'])
line = f"{tab}{tab}.transform.{op['name']}({args})"
lines.append(line)

line = f"{tab}{tab})"
lines.append(line)
line = f""
lines.append(line)

if len(json_['themes']) > 0:
line = f"{tab}.with_theme('{json_['themes'][0]}')"
lines.append(line)

if json_['description'] is not None:
line = f'{tab}.describe("""{json_["description"]}""")'
lines.append(line)

line = f"{tab})"
lines.append(line)

qs_code = '\n'.join(lines)

return qs_code

def show(self, queryset: str):
"""
show
====
response = requests.request(method="GET", url=f'{self._remote_url}')
returns:
Returns code representing a queryset.
"""

qs_list = queryset_list.QuerysetList()
response = requests.request(method="GET", url=f'{self._remote_url}/querysets/{queryset}')

qs_list.querysets = response.content
json_ = response.json()

return qs_list
return self.qs_json_to_code(json_)

def publish(self, queryset: queryset_schema.Queryset, overwrite: bool = True) -> requests.Response:

Expand All @@ -94,13 +184,13 @@ def delete(self, name: str) -> requests.Response:

method = "DELETE"

url = self._remote_url + f"/{name}"
url = self._remote_url + f"/querysets/{name}"

response = requests.request(method=method, url=url)

return response

def _fetch(self, max_retries: int, base_url: str, name: str) -> pd.DataFrame:
def _fetch(self, max_retries: int, base_url: str, name: str, start_date: int, end_date: int) -> pd.DataFrame:
"""
_fetch
======
Expand All @@ -120,9 +210,13 @@ def overprint(message_string, last_line_length, end):

return new_line_length

path = f"data/{name}"
if start_date is not None:
path = f"data/{name}?" + parse.urlencode({"start_date": start_date, "end_date": end_date})
url = self._remote_url + '/' + path
else:
path = f"data/{name}"
url = self._remote_url + '/' + path

empty_df = pd.DataFrame()
retries = 0
delay = 5

Expand All @@ -132,8 +226,6 @@ def overprint(message_string, last_line_length, end):

last_line_length = 0

url = base_url + '/' + path + '/'

while not (succeeded or failed):

data = io.BytesIO()
Expand Down Expand Up @@ -166,15 +258,15 @@ def overprint(message_string, last_line_length, end):

if 'failed' in message:
failed = True
data = empty_df
data = pd.DataFrame()

if retries > max_retries:

clear_output(wait=True)
print(f'Max attempts ({max_retries}) to retrieve {name} exceeded: aborting retrieval', end="\r")

failed = True
data = empty_df
data = pd.DataFrame()

retries += 1
time.sleep(delay)
Expand Down

0 comments on commit d290329

Please sign in to comment.