Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fastest bulk merge via Threads #943

Open
urusd opened this issue May 5, 2022 · 1 comment
Open

Fastest bulk merge via Threads #943

urusd opened this issue May 5, 2022 · 1 comment

Comments

@urusd
Copy link

urusd commented May 5, 2022

Hello Team! I'm exploring the features neo4j several months. I use dockerized Neo4j comunity version: Browser version: [4.2.6], Server version: [4.1.9] and py2neo==2021.2.3.

There is a problem in importing a large number of data, about 100-500k rows.

I wrote a wrapper for the merge_relationships (py2neo -> bulk operations), the code below. Actually, he breaks the dataset into chunks and via threads sends them to merge.

Problem: the relations between the nodes are lost, the nodes of 100% exist! (did not clutter up the code about the nodes).

Also, if you merge nodes through threads (same code via py2neo.bulk.merge_nodes instead merge_relationships), there make duplicates of Activity nodes, BUT this doesn't generate duplicates in Organization nodes - very strange behavior.

Code example:

from py2neo import Graph
from py2neo.bulk import merge_relationships

from concurrent.futures import ThreadPoolExecutor


MAX_WORKERS = 10

graph = Graph("http://neo4j:password@localhost:7474")


def batcher(iterable, n=1):
    l = len(iterable)
    for ndx in range(0, l, n):
        yield iterable[ndx:min(ndx + n, l)]


def upload_relations(graph, dataset):
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        for batch in batcher(dataset.relationships, dataset.batch_size):
            executor.submit(
                merge_relationships,
                graph.auto(),
                batch,
                dataset.rel_type,
                (tuple(dataset.start_node_labels), *dataset.fixed_order_start_node_properties), # start_node_key
                (tuple(dataset.end_node_labels), *dataset.fixed_order_end_node_properties) # end_node_key
            )


class DataSet:
    batch_size = 1000
    rel_type = "HAS_ACTIVITY"

    start_node_labels = ['Organization']
    fixed_order_start_node_properties = ('index',)

    end_node_labels = ['Activity']
    fixed_order_end_node_properties = ('name',)

    relationships = [
        ('1810003938', {}, 'Type1'),
        ('1710000665', {}, 'Type2'),
        ('1810002242', {}, 'Type3'),
        ('0310006089', {}, 'Type4'),
        ('0310005915', {}, 'Type5'),
        ('1810002325', {}, 'Type6'),
        ('5710001175', {}, 'Type7'),
        ('3610002514', {}, 'Type8'),
        ('3910000839', {}, 'Type9'),
        ...
    ]

dataset = DataSet()

upload_relations(graph, dataset)
@bf96163
Copy link

bf96163 commented Jul 13, 2022

There are two things I want to share about this issue:

  1. the graph.auto() method may not a thread safe method ,i tried start 5 threads to creat different type of nodes,the error log told me that i release the lock doesn't belone to the thread,i guess the graph object in auto() method is shared and not handled properly when multithread in use.
  2. if the nodes already created in the graph,you may want to check the type in var:“relationships ” and the node already created is the same type.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants