Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Neo4j csv publisher using apoc library for performance improvements - CLEAN #1877

Closed
wants to merge 5 commits into from

Conversation

zacr
Copy link

@zacr zacr commented May 27, 2022

Summary of Changes

New Neo4JCsvPublisher implementation using the APOC library for 5x performance improvement. This is a fresh PR because original one wasn't created on a clean branch.

Tests

I didn't see any automated tests for Neo4JCsvPublisher. This PR was tested manually by several community members.

Documentation

CheckList

Make sure you have checked all steps below to ensure a timely review.

  • PR title addresses the issue accurately and concisely. Example: "Updates the version of Flask to v1.0.2"
  • PR includes a summary of changes.
  • PR adds unit tests, updates existing unit tests, OR documents why no test additions or modifications are needed.
  • In case of new functionality, my PR adds documentation that describes how to use it.
    • All the public functions and the classes in the PR contain docstrings that explain what it does

@zacr zacr requested a review from a team as a code owner May 27, 2022 21:24
@boring-cyborg boring-cyborg bot added the area:databuilder From databuilder folder label May 27, 2022
@zacr zacr changed the title Fresh commit of apoc based neo4j csv publisher perf: Neo4j csv publisher using apoc library for performance improvements - CLEAN May 27, 2022
Copy link

@VoLKyyyOG VoLKyyyOG left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Once this is merged, I'll deploy into our dev environment and start testing other features out.

AFAIK this doesn't support multiple programmatic descriptions being added, so I'll work on that PR post-merge.

Excellent stuff!

@chonyy
Copy link
Contributor

chonyy commented May 30, 2022

Looking forward to having this PR merged! Would like to use this in the official build 🚀

@VoLKyyyOG
Copy link

Ahh looks like DCO is blocked. Can you randomly add a small commit @zacr with format:

Signed-off-by: your_name_here <your_email_here>

(https://github.com/amundsen-io/amundsen/pull/1877/checks?check_run_id=6632089423)

Zachary Ruiz and others added 5 commits May 31, 2022 09:53
Signed-off-by: Zac Ruiz <zac@salt.io>
Signed-off-by: Zac Ruiz <zac@salt.io>
Signed-off-by: Zac Ruiz <zac@salt.io>
Signed-off-by: Zac Ruiz <zac@salt.io>
Signed-off-by: Zac Ruiz <zac@salt.io>
@zacr zacr force-pushed the neo4j_csv_publisher_apoc_fresh branch from c51cb10 to e62874d Compare May 31, 2022 13:54
@chonyy
Copy link
Contributor

chonyy commented May 31, 2022

I tested this in my environment and found out that the data is actually not ingested into neo4j. Good news is after a discussion and pair debugging with @zacr , we are able to locate the problem! The root cause is related to the neo4j version.

  • Works with neo4j 3.5.26
  • Fails with neo4j 3.3.0

Basically, there are three main problems

Publisher not showing neo4j commit error

We will get this error message while running the query on the neo4j browser
org.neo4j.graphdb.TransactionFailureException: Transaction was marked as successful, but unable to commit transaction so rolled back.
However, this error is not showing on the publisher side. Therefore, it's hard to debug.

apoc.merge.node spec changed 3.3.0 and 3.5.26

  • 3.5.26: take 4 input parameters
  • 3.3.0: take 3 input parameters

Property key case sensitive

This is the query written in the source code

CALL apoc.periodic.iterate('UNWIND $rows AS row RETURN row', '
                        CALL apoc.merge.node([row.label], {key:row.key}, row, {published_tag:$tag,publisher_last_updated_epoch_ms:timestamp()}) YIELD node RETURN COUNT(*);
                    ', {batchSize: $batch_size, iterateList: True, parallel: False, params: { rows: $batch, tag: $publish_tag }})

This is the header of the csv file

"LABEL","KEY","name","is_view"

In 3.5.26 row.key could be mapped to KEY. In 3.3.0, we think it's case sensitive and it's not able to find the uppercase "KEY" when written in lowercase in the query.

@VoLKyyyOG
Copy link

Ahh yep, that would make sense with the .lower() I saw in the previous iteration. Thanks for investigating this as I'm not too familiar with Neo4J!

@zacr
Copy link
Author

zacr commented Jun 9, 2022

Thanks so much @chonyy. I have an update in progress that addresses all three, hope to have it committed in the next day or so.

@chonyy
Copy link
Contributor

chonyy commented Jun 11, 2022

Thanks so much @chonyy. I have an update in progress that addresses all three, hope to have it committed in the next day or so.

@zacr that's great news! I could also help with testing it in my environment once you are done 🚀

@chonyy
Copy link
Contributor

chonyy commented Jun 12, 2022

Hey @zacr ,

FYI, I tested your code in neo4j 3.5.26 and apoc 3.5.0.17, it also doesn't work. The error message is like below.
image

After I changed all the key to uppercase, it works! Just want to let you know that case sensitivity seems to also be an issue in your provided environment.

@zacr
Copy link
Author

zacr commented Jun 13, 2022

Awesome @chonyy . Agreed on the upper case. Can we assume the headers of the CSV files will always be upper case?

@zacr
Copy link
Author

zacr commented Jun 13, 2022

Also, I am out of town this week, but here is the error handling code that goes right after session.run() in _execute_statement()

            ret = [dict(i) for i in result]

            if ret[0]['failedOperations'] > 0:
                raise RuntimeError(f"Failed to executed statement: {stmt} with {ret[0]['errorMessages']}")

Was thinking of adding a flag to turn on\off this checking.

@ozandogrultan
Copy link
Contributor

Any updates on this PR? Would be great to see it merged!

@chonyy
Copy link
Contributor

chonyy commented Aug 12, 2022

I'm thinking that maybe we don't even have to spend time supporting Neo4j 3.3.0 since it's already EOL. @zacr's version works great with some minor fixes related to case sensitivity.

What I can help here is to add the fix and also the extra part that zacr mentioned, in order to surface the error to the caller side. But we still need a commiter's help to look into this and approve the CI pipeline run 🥲

Copy link

@conker84 conker84 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi I just checked your code and generally seems good to me, I have just a couple of nitpicks and one related to the index creation that it's a bug (so the performance improvement could be greater than your initial estimation).

One important thing that seems missing is the support for a database that is not the default one. Neo4j since version 4.X supports multi-tenancy so it would be good to have it in there.

CALL apoc.periodic.iterate(
'UNWIND $rows AS row RETURN row',
'CALL apoc.merge.node([row.label], {key:row.key}, row, row) YIELD node RETURN COUNT(*);',
{batchSize: $batch_size, iterateList: True, parallel: False, params: { rows: $batch, tag: $publish_tag }}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
{batchSize: $batch_size, iterateList: True, parallel: False, params: { rows: $batch, tag: $publish_tag }}
{batchSize: $batch_size, parallel: False, params: { rows: $batch, tag: $publish_tag }}

I would like to remove iterateList as it's deprecated and use the default batchMode which is BATCH, that has the same behaviour.

return """
CALL apoc.periodic.iterate('UNWIND $rows AS row RETURN row', '
CALL apoc.merge.node([row.label], {key:row.key}, row, {published_tag:$tag,publisher_last_updated_epoch_ms:timestamp()}) YIELD node RETURN COUNT(*);
', {batchSize: $batch_size, iterateList: True, parallel: False, params: { rows: $batch, tag: $publish_tag }})

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
', {batchSize: $batch_size, iterateList: True, parallel: False, params: { rows: $batch, tag: $publish_tag }})
', {batchSize: $batch_size, parallel: False, params: { rows: $batch, tag: $publish_tag }})

like above

return """
CALL apoc.periodic.iterate('UNWIND $rows AS row RETURN row', '
CALL apoc.merge.node([row.label], {key:row.key}, row, row) YIELD node RETURN COUNT(*);
', {batchSize: $batch_size, iterateList: True, parallel: False, params: { rows: $batch }})

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
', {batchSize: $batch_size, iterateList: True, parallel: False, params: { rows: $batch }})
', {batchSize: $batch_size, parallel: False, params: { rows: $batch }})

like above

# CALL apoc.schema.assert(null, data, dropExisting: false) YIELD label, key, keys, unique, action
# """
stmt = """
CREATE CONSTRAINT ON (node:label) ASSERT node.key IS UNIQUE

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The label is fixed you cannot pass it as a parameter as you do here you have to format the string upfront otherwise it won't work

@Golodhros
Copy link
Member

/rebase

@Golodhros Golodhros closed this Jan 31, 2023
@Golodhros Golodhros reopened this Jan 31, 2023
@kristenarmes
Copy link
Contributor

Hello, I know it has been a while since there has been activity on this PR, but I wanted to follow up for those who were interested in a faster publisher. A few months ago I worked on one myself in this PR using the unwind clause, which improves performance by allowing Neo4j to compile and cache the statements and reduces the overall amount of transactions (see it here). This publisher doesn't depend on the apoc library. We have been using it and seeing huge improvements in speed compared to the old one. I haven't tested it on all versions of Neo4j, but it may be sufficient for others to try and see if this in progress one using apoc is no longer required?

@Golodhros
Copy link
Member

Closing as done.

@Golodhros Golodhros closed this Feb 8, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:databuilder From databuilder folder keep fresh Disables stalebot from closing an issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants