Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for custom exporter class #6273

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

guillermo-bondonno
Copy link
Contributor

Opening this PR to discuss the implementation of adding support for custom exporter classes. This comes from the idea of adding stream exporters that don't rely on local storage. The usage I had in mind would be something like:

custom_settings = {
        "FEEDS": {
            "custom:": {
                "custom_exporter_class": "project.exporters.StreamExporter",
                "format" : None,
                "item_export_kwargs" : {"myarg1": "myvalue1"}, #needs dont_fail=True
            }
        }
    }
class CustomExporter(BaseItemExporter):
    def __init__(self, *, dont_fail=True, **kwargs):
        self._configure(kwargs, dont_fail=dont_fail)
        self.this_slot_items = []

    def export_item(self, item):
        #example
        requests.post(
            "https://some-api.com", json=item
        )
        #or
        self.this_slot_items.append(item)

    def finish_exporting(self):
        #do something here
        pass

The batching would work the same, since a new instance would be created for every new slot and the custom exporter can handle the queuing, etc.

After this, reusable exporters for widely used services can be implemented.
Missing tests and docs.

Copy link

codecov bot commented Mar 6, 2024

Codecov Report

Merging #6273 (bb8ff74) into master (e208f82) will increase coverage by 0.20%.
Report is 50 commits behind head on master.
The diff coverage is n/a.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #6273      +/-   ##
==========================================
+ Coverage   88.67%   88.88%   +0.20%     
==========================================
  Files         161      161              
  Lines       11788    11971     +183     
  Branches     1912     1929      +17     
==========================================
+ Hits        10453    10640     +187     
+ Misses       1007      980      -27     
- Partials      328      351      +23     

see 81 files with indirect coverage changes

@VMRuiz
Copy link
Contributor

VMRuiz commented Mar 7, 2024

Hello, I like the idea of using the Feed Exporters rather than Feed Storages to do the actual exporting. You don't actually need to write any additional code to support this right now, this should be similiar to your solution:

FEED = {
    "stdout://": {
       "format": "StreamingAPI"
   }
}
FEED_EXPORTERS = { 
   "StreamingAPI": "StreamingAPIExporter"
}

The methods start_exporting and finish_exporting could be used to initialization, authentication, batching, etc...

However, there are several downsides that we would need to figure out:

  1. export_item is an sync method, meaning that any blocking call like requests.post would block the entire Scrapy process
  2. We would need to figure out how to properly manage networking / API errors and allowing for retries.

@ftadao
Copy link

ftadao commented Mar 8, 2024

  1. export_item is an sync method, meaning that any blocking call like requests.post would block the entire Scrapy process

Wouldn't this be something that the developer needs to take care of on their customized exporter code? If we're allowing the customization of def export_item, any developer can also adapt it to be an async function and remove that barrier on its terms. Of course, that might open up possible adaptations that we might do to the underlying exporter code (e.g., allow export_item to be an async function), but it might make the code a little more robust after we go through it.

  1. We would need to figure out how to properly manage networking / API errors and allowing for retries.

Couldn't all this be managed by the custom exporter class itself? If we assume that there's only a limited amount of exceptions that are covered by the engine, wouldn't this be something managed by the code itself?

I think those questions can be answered better if we have an example of a customized exporter that matches those requirements. @VMRuiz what about the Airtable exporter we've previously discussed?

@guillermo-bondonno
Copy link
Contributor Author

Reopening this since it closed when I reset the branch, because the changes were not necessary. I tried @VMRuiz proposal for custom exporters and runs just fine.
Using defer.maybeDeferred to export the item you can run async requests inside export_item like below.

custom_settings = {
        "FEEDS" : {
            "stdout://": {
               "format": "StreamingAPI"
           }
        },
        "FEED_EXPORTERS" : { 
           "StreamingAPI": "stream_export_project.exporters.StreamExporter"
        }
    }
class StreamExporter(BaseItemExporter):
    """
    A new instance will be created for each slot/batch
    """
    def __init__(self, file, *args, dont_fail=False, **kwargs):
        self._kwargs = kwargs
        self.file = file #Or other name in case other format is used
        self._configure(kwargs, dont_fail=dont_fail)

    def start_exporting(self):
        pass

    def send_request(self, item):
        requests.post(
            "https://example.com/endpoint", json=item | self._kwargs
        )
        
    def export_item(self, item):
        dfd = threads.deferToThread(self.send_request, item)
        # maybe collect these to ensure them in finish exporting
        return dfd

    def finish_exporting(self):
        pass

The failing tests confuse me, because from the error they seem to be related to ExceptionJsonItemExporter , but the actual tests don't use it

@guillermo-bondonno
Copy link
Contributor Author

Turns out without changing any Scrapy code I get the same behavior by calling deferToThread from export_item, I'll look deeper into it to see if it's safe and reliable to do that

@VMRuiz
Copy link
Contributor

VMRuiz commented Mar 25, 2024

@Gallaecio @wRAR , Do you think using deferToThread is appropriate here, or could there be performance issues if there are too many calls?

@wRAR
Copy link
Member

wRAR commented Mar 27, 2024

I suggest using deferToThread from time to time but I've never used it myself :) It won't create a new thread for every item as it uses a thread pool so I think the performance shouldn't be worse.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants