Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Length mismatch at #202

Open
TongmengXie opened this issue Dec 6, 2023 · 0 comments
Open

Length mismatch at #202

TongmengXie opened this issue Dec 6, 2023 · 0 comments

Comments

@TongmengXie
Copy link

TongmengXie commented Dec 6, 2023

When trying to slice candidates (pd.MultiIndex) and perform comparer.compute(), sometimes among my partitions there will be

"ValueError: Length mismatch: Expected axis has 31262 elements, new values have 31250 elements".

I have checked the shapes are all right, then the problem should be about the computing process. I assume that the additional 12 rows are misplaced columns (10 comparing columns + two index columns)

`import os
import pandas as pd
import recordlinkage
from recordlinkage import Compare

Perform the actual comparisons and store the result

print("Performing comparisons...")

def compute_and_save_partition(comparer, candidates, df1, df2, start, end, partition_path):

# Check if the partition is already computed and saved
if os.path.exists(partition_path):
    # Load the partition from the Parquet file
    print(f"reading in {start}:{end}")
    return pd.read_parquet(partition_path)
else:
    # Compute the partition
    candidate_slice = candidates[start:end]
    print(f"processing partition {partition_path.split('_')[1:-1]}: start={start}, end={end}, candidate_slice: {len(candidate_slice)}")
    if len(candidate_slice) != 1000000:
            raise ValueError(f"candidate_slice has incorrect number of rows: {len(candidate_slice)}")
    print(f"df1 shape: {df1.shape}, df2 shape: {df2.shape}")
    print(f"Candidate slice length: {len(candidate_slice)}")
    partition_result = comparer.compute(candidate_slice, df1, df2) 
    print(f"Partition result shape: {partition_result.shape}")
    # Save the computed partition to a Parquet file
    partition_result.to_parquet(partition_path)
    return partition_result

def parallel_compute_and_save(comparer, candidates, df1, df2, output_dir, partition=1000000):
result = []
total_candidates = len(candidates)
for i in range(0, total_candidates, partition):
end = min(i + partition, total_candidates)
partition_path = os.path.join(output_dir, f'partition_{i}_{end}.parquet')
result.append(compute_and_save_partition(comparer, candidates, df1, df2, i, end, partition_path))
return result

Setup the output directory

name = "your_dataset_name" # Replace with your dataset name
output_dir = f"../Output/temp/{name}_compare"
os.makedirs(output_dir, exist_ok=True)

Replace _1861[cols_to_compare] and _1851[cols_to_compare] with your dataframes and columns to compare

final_result = parallel_compute_and_save(comparer, candidates, _1861[cols_to_compare], _1851[cols_to_compare], output_dir)

Update the columns of the result dataframe

final_result = pd.concat(final_result)
final_result.columns = ['pname', 'oname', 'sname', 'pname_soundex', 'sname_soundex', 'pname_metaphone', 'sname_metaphone', 'address', 'sname_pop_metaphone', 'dateofbirth']
final_result = final_result.reset_index(drop=True)
`

Performing comparisons...
reading in 0:1000000
processing partition ['dataset', 'name', 'compare/partition', '1000000']: start=1000000, end=2000000, candidate_slice: 1000000
df1 shape: (19828561, 10), df2 shape: (17711058, 10)
Candidate slice length: 1000000


_RemoteTraceback Traceback (most recent call last)
_RemoteTraceback:
"""
Traceback (most recent call last):
File "/users/xiet13/.local/lib/python3.11/site-packages/joblib/externals/loky/process_executor.py", line 463, in _process_worker
r = call_item()
^^^^^^^^^^^
File "/users/xiet13/.local/lib/python3.11/site-packages/joblib/externals/loky/process_executor.py", line 291, in call
return self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/users/xiet13/.local/lib/python3.11/site-packages/joblib/parallel.py", line 589, in call
return [func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^
File "/users/xiet13/.local/lib/python3.11/site-packages/joblib/parallel.py", line 589, in
return [func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/users/xiet13/.local/lib/python3.11/site-packages/recordlinkage/base.py", line 33, in _parallel_compare_helper
return class_obj._compute(pairs, x, x_link)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/users/xiet13/.local/lib/python3.11/site-packages/recordlinkage/base.py", line 668, in _compute
df_b_indexed = frame_indexing(x_link[sublabels_right], pairs, 1)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/users/xiet13/.local/lib/python3.11/site-packages/recordlinkage/utils.py", line 198, in frame_indexing
data.index = multi_index
^^^^^^^^^^
File "/opt/apps/hand/miniconda/23.5.2/lib/python3.11/site-packages/pandas/core/generic.py", line 6218, in setattr
return object.setattr(self, name, value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "properties.pyx", line 69, in pandas._libs.properties.AxisProperty.set
File "/opt/apps/hand/miniconda/23.5.2/lib/python3.11/site-packages/pandas/core/generic.py", line 767, in _set_axis
self._mgr.set_axis(axis, labels)
File "/opt/apps/hand/miniconda/23.5.2/lib/python3.11/site-packages/pandas/core/internals/managers.py", line 227, in set_axis
self._validate_set_axis(axis, new_labels)
File "/opt/apps/hand/miniconda/23.5.2/lib/python3.11/site-packages/pandas/core/internals/base.py", line 85, in _validate_set_axis
raise ValueError(
ValueError: Length mismatch: Expected axis has 31262 elements, new values have 31250 elements
"""

The above exception was the direct cause of the following exception:

ValueError Traceback (most recent call last)
Cell In[31], line 45
42 os.makedirs(output_dir, exist_ok=True)
44 # Replace _1861[cols_to_compare] and _1851[cols_to_compare] with your dataframes and columns to compare
---> 45 final_result = parallel_compute_and_save(comparer, candidates, _1861[cols_to_compare], _1851[cols_to_compare], output_dir)
47 # Update the columns of the result dataframe
48 final_result = pd.concat(final_result)

Cell In[31], line 36, in parallel_compute_and_save(comparer, candidates, df1, df2, output_dir, partition)
34 end = min(i + partition, total_candidates)
35 partition_path = os.path.join(output_dir, f'partition_{i}_{end}.parquet')
---> 36 result.append(compute_and_save_partition(comparer, candidates, df1, df2, i, end, partition_path))
37 return result

Cell In[31], line 24, in compute_and_save_partition(comparer, candidates, df1, df2, start, end, partition_path)
22 print(f"df1 shape: {df1.shape}, df2 shape: {df2.shape}")
23 print(f"Candidate slice length: {len(candidate_slice)}")
---> 24 partition_result = comparer.compute(candidate_slice, df1, df2)
25 print(f"Partition result shape: {partition_result.shape}")
26 # Save the computed partition to a Parquet file

File ~/.local/lib/python3.11/site-packages/recordlinkage/base.py:836, in BaseCompare.compute(self, pairs, x, x_link)
834 results = self._compute(pairs, x, x_link)
835 elif self.n_jobs > 1:
--> 836 results = self._compute_parallel(pairs, x, x_link, n_jobs=self.n_jobs)
837 else:
838 raise ValueError("number of jobs should be positive integer")

File ~/.local/lib/python3.11/site-packages/recordlinkage/base.py:648, in BaseCompare._compute_parallel(self, pairs, x, x_link, n_jobs)
646 def _compute_parallel(self, pairs, x, x_link=None, n_jobs=1):
647 df_chunks = index_split(pairs, n_jobs)
--> 648 result_chunks = Parallel(n_jobs=n_jobs)(
649 delayed(_parallel_compare_helper)(self, chunk, x, x_link)
650 for chunk in df_chunks
651 )
653 result = pandas.concat(result_chunks)
654 return result

File ~/.local/lib/python3.11/site-packages/joblib/parallel.py:1952, in Parallel.call(self, iterable)
1946 # The first item from the output is blank, but it makes the interpreter
1947 # progress until it enters the Try/Except block of the generator and
1948 # reach the first yield statement. This starts the aynchronous
1949 # dispatch of the tasks to the workers.
1950 next(output)
-> 1952 return output if self.return_generator else list(output)

File ~/.local/lib/python3.11/site-packages/joblib/parallel.py:1595, in Parallel._get_outputs(self, iterator, pre_dispatch)
1592 yield
1594 with self._backend.retrieval_context():
-> 1595 yield from self._retrieve()
1597 except GeneratorExit:
1598 # The generator has been garbage collected before being fully
1599 # consumed. This aborts the remaining tasks if possible and warn
1600 # the user if necessary.
1601 self._exception = True

File ~/.local/lib/python3.11/site-packages/joblib/parallel.py:1699, in Parallel._retrieve(self)
1692 while self._wait_retrieval():
1693
1694 # If the callback thread of a worker has signaled that its task
1695 # triggered an exception, or if the retrieval loop has raised an
1696 # exception (e.g. GeneratorExit), exit the loop and surface the
1697 # worker traceback.
1698 if self._aborting:
-> 1699 self._raise_error_fast()
1700 break
1702 # If the next job is not ready for retrieval yet, we just wait for
1703 # async callbacks to progress.

File ~/.local/lib/python3.11/site-packages/joblib/parallel.py:1734, in Parallel._raise_error_fast(self)
1730 # If this error job exists, immediatly raise the error by
1731 # calling get_result. This job might not exists if abort has been
1732 # called directly or if the generator is gc'ed.
1733 if error_job is not None:
-> 1734 error_job.get_result(self.timeout)

File ~/.local/lib/python3.11/site-packages/joblib/parallel.py:736, in BatchCompletionCallBack.get_result(self, timeout)
730 backend = self.parallel._backend
732 if backend.supports_retrieve_callback:
733 # We assume that the result has already been retrieved by the
734 # callback thread, and is stored internally. It's just waiting to
735 # be returned.
--> 736 return self._return_or_raise()
738 # For other backends, the main thread needs to run the retrieval step.
739 try:

File ~/.local/lib/python3.11/site-packages/joblib/parallel.py:754, in BatchCompletionCallBack._return_or_raise(self)
752 try:
753 if self.status == TASK_ERROR:
--> 754 raise self._result
755 return self._result
756 finally:

ValueError: Length mismatch: Expected axis has 31262 elements, new values have 31250 elements

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant