Skip to content

Commit

Permalink
Dataset accepts flow table and dimension table
Browse files Browse the repository at this point in the history
Before, `processes` was the only dimension table
  • Loading branch information
ricklupton committed Oct 18, 2016
1 parent af66a07 commit 012e6a2
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 26 deletions.
60 changes: 49 additions & 11 deletions sankeyview/dataset.py
Expand Up @@ -43,13 +43,35 @@ def eval_selection(df, column, sel):


class Dataset:
def __init__(self, processes, flows):
self._processes = processes
self._flows = flows
def __init__(self,
flows,
dim_process=None,
dim_material=None,
dim_time=None):

if dim_process is not None and not dim_process.index.is_unique:
raise ValueError('dim_process index not unique')
if dim_material is not None and not dim_material.index.is_unique:
raise ValueError('dim_material index not unique')
if dim_time is not None and not dim_time.index.is_unique:
raise ValueError('dim_time index not unique')

self._table = flows \
.join(processes.add_prefix('source.'), on='source') \
.join(processes.add_prefix('target.'), on='target')
self._flows = flows
self._dim_process = dim_process
self._dim_material = dim_material
self._dim_time = dim_time

self._table = flows
if dim_process is not None:
self._table = self._table \
.join(dim_process.add_prefix('source.'), on='source') \
.join(dim_process.add_prefix('target.'), on='target')
if dim_material is not None:
self._table = self._table \
.join(dim_material.add_prefix('material.'), on='material')
if dim_time is not None:
self._table = self._table \
.join(dim_time.add_prefix('time.'), on='time')

def partition(self, dimension, processes=None):
"""Partition of all values of `dimension` within `processes`"""
Expand All @@ -66,19 +88,35 @@ def apply_view(self, process_groups, bundles, flow_selection=None):

def save(self, filename):
with pd.HDFStore(filename) as store:
store['processes'] = self._processes
store['flows'] = self._flows
store['dim_process'] = self._dim_process
store['dim_material'] = self._dim_material
store['dim_time'] = self._dim_time

@classmethod
def from_hdf(cls, filename):
with pd.HDFStore(filename) as store:
return cls(store['processes'], store['flows'])
return cls(store['flows'], store['dim_process'],
store['dim_material'], store['dim_time'])

@classmethod
def from_csv(cls, flows_filename, processes_filename):
def from_csv(cls,
flows_filename,
dim_process_filename=None,
dim_material_filename=None,
dim_time_filename=None):

def read(filename):
if filename is not None:
return pd.read_csv(filename).set_index('id')
else:
return None

flows = pd.read_csv(flows_filename)
processes = pd.read_csv(processes_filename).set_index('id')
return cls(processes, flows)
dim_process = read(dim_process_filename)
dim_material = read(dim_material_filename)
dim_time = read(dim_time_filename)
return cls(flows, dim_process, dim_material, dim_time)


def find_flows(flows,
Expand Down
58 changes: 47 additions & 11 deletions test/test_dataset.py
@@ -1,11 +1,13 @@
import pytest

import pandas as pd

from sankeyview.dataset import Dataset, eval_selection
from sankeyview.sankey_definition import ProcessGroup, Bundle, Elsewhere


def _dataset():
processes = pd.DataFrame.from_records(
dim_process = pd.DataFrame.from_records(
[
('a1', 'a'),
('a2', 'a'),
Expand All @@ -14,16 +16,49 @@ def _dataset():
],
columns=['id', 'function']).set_index('id')

dim_material = pd.DataFrame.from_records([
('m1', 'type1'),
('m2', 'type2'),
], columns=['id', 'type']).set_index('id')

dim_time = pd.DataFrame.from_records([
('t1', 'August'),
('t2', 'March'),
], columns=['id', 'month']).set_index('id')

flows = pd.DataFrame.from_records(
[
('a1', 'b', 'm1', 't1', 3),
('a2', 'b', 'm2', 't1', 4),
('b', 'c', 'm1', 't1', 3),
('b', 'c', 'm2', 't1', 4),
],
columns=['source', 'target', 'material', 'time', 'value'])

return Dataset(flows, dim_process, dim_material, dim_time)


def test_dataset_joins_tables():
d = _dataset()
assert len(d._table.index) == 4
assert set(d._table.columns) == {'source', 'target', 'material', 'time', 'value',
'source.function', 'target.function',
'material.type', 'time.month'}

def test_dataset_checks_dim_tables_have_unique_index():
dim_time = pd.DataFrame.from_records([
('same_id', 'August'),
('same_id', 'March'),
], columns=['id', 'month']).set_index('id')

flows = pd.DataFrame.from_records(
[
('a1', 'b', 'm1', 3),
('a2', 'b', 'm2', 4),
('b', 'c', 'm1', 3),
('b', 'c', 'm2', 4),
('a1', 'b', 'same_id', 3),
],
columns=['source', 'target', 'material', 'value'])
columns=['source', 'target', 'time', 'value'])

return Dataset(processes, flows)
with pytest.raises(ValueError):
Dataset(flows, dim_time=dim_time)


def test_selection_list():
Expand Down Expand Up @@ -86,8 +121,9 @@ def test_unused_flows():
('b', 'c', 'm', 1),
],
columns=('source', 'target', 'material', 'value'))
processes = pd.DataFrame({'id': ['a', 'b', 'c', 'other']}).set_index('id')
dataset = Dataset(processes, flows)
dim_process = pd.DataFrame(
{'id': ['a', 'b', 'c', 'other']}).set_index('id')
dataset = Dataset(flows, dim_process)

bundle_flows, unused = dataset.apply_view(nodes, bundles)

Expand Down Expand Up @@ -132,8 +168,8 @@ def test_internal_flows():
('b', 'other', 'm', 1),
],
columns=('source', 'target', 'material', 'value'))
processes = pd.DataFrame({'id': ['a', 'b', 'other']}).set_index('id')
dataset = Dataset(processes, flows)
dim_process = pd.DataFrame({'id': ['a', 'b', 'other']}).set_index('id')
dataset = Dataset(flows, dim_process)

bundle_flows, unused = dataset.apply_view(nodes, bundles)

Expand Down
8 changes: 4 additions & 4 deletions test/test_sankey_view.py
Expand Up @@ -31,10 +31,10 @@ def test_sankey_view_results():
('b1', 'c2', 'n', 1),
],
columns=('source', 'target', 'material', 'value'))
processes = pd.DataFrame({
dim_process = pd.DataFrame({
'id': list(flows.source.unique()) + list(flows.target.unique())
}).set_index('id')
dataset = Dataset(processes, flows)
dataset = Dataset(flows, dim_process)

GR, groups = sankey_view(vd, dataset)

Expand Down Expand Up @@ -128,8 +128,8 @@ def test_sankey_view_results_time_partition():
('a1', 'b1', 'm', 2, 2),
],
columns=('source', 'target', 'material', 'time', 'value'))
processes = pd.DataFrame({'id': ['a1', 'b1']}).set_index('id')
dataset = Dataset(processes, flows)
dim_process = pd.DataFrame({'id': ['a1', 'b1']}).set_index('id')
dataset = Dataset(flows, dim_process)

GR, groups = sankey_view(vd, dataset)
assert set(GR.nodes()) == {'a^*', 'b^*'}
Expand Down

0 comments on commit 012e6a2

Please sign in to comment.