Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Kv/Obj watch. #1383

Merged
merged 21 commits into from
May 22, 2024
Merged

Implement Kv/Obj watch. #1383

merged 21 commits into from
May 22, 2024

Conversation

sheldygg
Copy link
Contributor

@sheldygg sheldygg commented Apr 18, 2024

Description

Some updates for NatsBroker:

  1. Now you can use just pull_sub=True instead of pull_sub=PullSub() in basic case:

     from faststream import FastStream, Logger
     from faststream.nats import NatsBroker
     
     broker = NatsBroker()
     app = FastStream(broker)
     
     @broker.subscriber("test", stream="stream", pull_sub=True)
     async def handler(msg, logger: Logger):
         logger.info(msg)
  2. KeyValue creation and watching API added:

     from faststream import FastStream, Logger
     from faststream.nats import NatsBroker
     
     broker = NatsBroker()
     app = FastStream(broker)
     
     @broker.subscriber("some-key", kv_watch="bucket")
     async def handler(msg: int, logger: Logger):
         logger.info(msg)
     
     @app.after_startup
     async def test():
         kv = await broker.key_value("bucket")
         await kv.put("some-key", b"1")

    Or you can use extended API if it is required

     from faststream.nats import KvWatch
    
     @broker.subscriber("some-key", kv_watch=KvWatch("bucket"))
     async def handler(msg: int, logger: Logger):
         logger.info(msg)
  3. ObjectStore API added as well:

    from faststream import FastStream, Logger
    from faststream.nats import NatsBroker
    
    broker = NatsBroker()
    app = FastStream(broker)
    
    @broker.subscriber("file-bucket", obj_watch=True)
    async def handler(filename: str, logger: Logger):
        logger.info(filename)
    
    @app.after_startup
    async def test():
        object_store = await broker.object_storage("file-bucket")
        await object_store.put("some-file.txt", b"1")

    Also, we have built-in ObjectStorage annotation for fastest access to current bucket:

    from faststream.nats.annotations import ObjectStorage
    
    @broker.subscriber("file-bucket", obj_watch=True)
    async def handler(filename: str, logger: Logger, storage: ObjectStorage):
        file = await storage.get(filename)
        logger.info(file.data)

    For sure, we have an extended API as well:

    from faststream.nats import ObjWatch
    
    @broker.subscriber("file-bucket", obj_watch=ObjWatch(include_history=True))
    async def handler(filename: str, logger: Logger):
        logger.info(filename)

Type of change

Please delete options that are not relevant.

  • Documentation (typos, code examples, or any documentation updates)
  • Bug fix (a non-breaking change that resolves an issue)
  • New feature (a non-breaking change that adds functionality)
  • Breaking change (a fix or feature that would disrupt existing functionality)
  • This change requires a documentation update

Checklist

  • My code adheres to the style guidelines of this project (scripts/lint.sh shows no errors)
  • I have conducted a self-review of my own code
  • I have made the necessary changes to the documentation
  • My changes do not generate any new warnings
  • I have added tests to validate the effectiveness of my fix or the functionality of my new feature
  • Both new and existing unit tests pass successfully on my local environment by running scripts/test-cov.sh
  • I have ensured that static analysis tests are passing by running scripts/static-anaylysis.sh
  • I have included code examples to illustrate the modifications

@Lancetnik Lancetnik added the enhancement New feature or request label Apr 18, 2024
@Lancetnik Lancetnik linked an issue Apr 18, 2024 that may be closed by this pull request
Remove `kv_watch`/`obj_watch` from basic class.
@Lancetnik
Copy link
Collaborator

Lancetnik commented Apr 24, 2024

  • add AsyncAPI tests (KV and OS subscribers should not generate schema)
  • add tests for broker.key_value(...) (it must caches buckets)
  • add tests for broker.object_storage(...) (it must caches values as well)
  • add KeyValue subscriber tests
  • add ObjectStorage subscriber tests
  • add KeyValue subscriber Path feature tests
  • update KeyValue and ObjectStorage documentation
  • update FastAPI subscriber annotation
  • update NatsRouter annotation

1. AsyncAPI (KV, OS)
2. broker.key_value and broker.object_storage cache instances
3. KV and OS watcher
tests/asyncapi/nats/test_obj_schema.py Outdated Show resolved Hide resolved
tests/brokers/nats/test_consume.py Outdated Show resolved Hide resolved
tests/brokers/nats/test_consume.py Show resolved Hide resolved
tests/brokers/nats/test_kv_declarer_cache.py Outdated Show resolved Hide resolved
tests/brokers/nats/test_os_declarer_cache.py Outdated Show resolved Hide resolved
@Lancetnik Lancetnik mentioned this pull request May 22, 2024
13 tasks
@Lancetnik Lancetnik enabled auto-merge May 22, 2024 21:08
@Lancetnik Lancetnik added this pull request to the merge queue May 22, 2024
Merged via the queue into airtai:main with commit 6c667ff May 22, 2024
31 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

Feature: NATS watch by KV and Object storage via decorator
2 participants