Skip to content

Commit

Permalink
Merge pull request #145 from rsocket/cloudevents
Browse files Browse the repository at this point in the history
Cloudevents
  • Loading branch information
jell-o-fishi committed May 30, 2023
2 parents 9dd4eb5 + 0f07584 commit 42d30d8
Show file tree
Hide file tree
Showing 16 changed files with 107 additions and 34 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.rst
@@ -1,8 +1,13 @@
Changelog
---------

v0.4.11
=======
- Breaking change: RequestRouter argument 'payload_mapper' was replaced with 'payload_deserializer' and 'payload_serializer'
- Added CloutEvent serialize/deserialize helpers for use in RequestRouter

v0.4.10
======
=======
- Code cleanup
- Breaking change: Removed deprecated rsocket.routing.helpers module
- Added CloudEvents client/server usage example (compatible with java rsocket example from cloudevents/sdk-java)
Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Expand Up @@ -24,7 +24,7 @@
author = 'jellofishi@pm.me'

# The full version, including alpha/beta/rc tags
release = '0.4.10'
release = '0.4.11'

# -- General configuration ---------------------------------------------------

Expand Down
7 changes: 3 additions & 4 deletions examples/cloudevents/client_cloudevents.py
Expand Up @@ -25,11 +25,10 @@ async def main(server_port: int):
'source': 'https://spring.io/foos'
}, data=json.dumps({'value': 'Dave'}))

data = to_json(event)
response = await client.request_response(Payload(data=data, metadata=composite(route('event'))))
response = await client.request_response(Payload(data=to_json(event), metadata=composite(route('event'))))

event = from_json(CloudEvent, response.data)
response_data = json.loads(event.data)
response_event = from_json(CloudEvent, response.data)
response_data = json.loads(response_event.data)

assert response_data['value'] == 'Dave'

Expand Down
22 changes: 7 additions & 15 deletions examples/cloudevents/server_cloudevents.py
Expand Up @@ -3,30 +3,24 @@
import logging
import sys

from cloudevents.conversion import to_json, from_json
from cloudevents.pydantic import CloudEvent

from rsocket.helpers import create_future
from rsocket.payload import Payload
from rsocket.cloudevents.serialize import cloud_event_deserialize, cloud_event_serialize
from rsocket.routing.request_router import RequestRouter
from rsocket.routing.routing_request_handler import RoutingRequestHandler
from rsocket.rsocket_server import RSocketServer
from rsocket.transports.tcp import TransportTCP

router = RequestRouter()
router = RequestRouter(cloud_event_deserialize,
cloud_event_serialize)


@router.response('event')
async def single_request_response(payload):
received_event = from_json(CloudEvent, payload.data)
received_data = json.loads(received_event.data)

event = CloudEvent.create(attributes={
async def event_response(event: CloudEvent) -> CloudEvent:
return CloudEvent.create(attributes={
'type': 'io.spring.event.Foo',
'source': 'https://spring.io/foos'
}, data=json.dumps(received_data))

return create_future(Payload(to_json(event)))
}, data=json.dumps(json.loads(event.data)))


def handler_factory():
Expand All @@ -39,9 +33,7 @@ async def run_server(server_port):
def session(*connection):
RSocketServer(TransportTCP(*connection), handler_factory=handler_factory)

server = await asyncio.start_server(session, 'localhost', server_port)

async with server:
async with await asyncio.start_server(session, 'localhost', server_port) as server:
await server.serve_forever()


Expand Down
2 changes: 1 addition & 1 deletion examples/tutorial/reactivex/chat_server.py
Expand Up @@ -115,7 +115,7 @@ def remove(self):
del chat_data.user_session_by_id[self._session.session_id]

def router_factory(self):
router = RequestRouter(payload_mapper=decode_payload)
router = RequestRouter(payload_deserializer=decode_payload)

@router.response('login')
async def login(username: str) -> Observable:
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorial/step4/chat_server.py
Expand Up @@ -89,7 +89,7 @@ def __init__(self):
self._session: Optional[UserSessionData] = None

def router_factory(self):
router = RequestRouter(payload_mapper=decode_payload)
router = RequestRouter(payload_deserializer=decode_payload)

@router.response('login')
async def login(payload: Payload) -> Awaitable[Payload]:
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorial/step5/chat_server.py
Expand Up @@ -92,7 +92,7 @@ def __init__(self):
self._session: Optional[UserSessionData] = None

def router_factory(self):
router = RequestRouter(payload_mapper=decode_payload)
router = RequestRouter(payload_deserializer=decode_payload)

@router.response('login')
async def login(username: str) -> Awaitable[Payload]:
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorial/step6/chat_server.py
Expand Up @@ -108,7 +108,7 @@ def __init__(self):
self._requested_statistics = ServerStatisticsRequest()

def router_factory(self):
router = RequestRouter(payload_mapper=decode_payload)
router = RequestRouter(payload_deserializer=decode_payload)

@router.response('login')
async def login(username: str) -> Awaitable[Payload]:
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorial/step7/chat_server.py
Expand Up @@ -107,7 +107,7 @@ def __init__(self):
self._requested_statistics = ServerStatisticsRequest()

def router_factory(self):
router = RequestRouter(payload_mapper=decode_payload)
router = RequestRouter(payload_deserializer=decode_payload)

@router.response('login')
async def login(username: str) -> Awaitable[Payload]:
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorial/step8/chat_server.py
Expand Up @@ -110,7 +110,7 @@ def __init__(self):
self._requested_statistics = ServerStatisticsRequest()

def router_factory(self):
router = RequestRouter(payload_mapper=decode_payload)
router = RequestRouter(payload_deserializer=decode_payload)

@router.response('login')
async def login(username: str) -> Awaitable[Payload]:
Expand Down
2 changes: 1 addition & 1 deletion rsocket/__init__.py
@@ -1 +1 @@
__version__ = '0.4.10'
__version__ = '0.4.11'
Empty file added rsocket/cloudevents/__init__.py
Empty file.
20 changes: 20 additions & 0 deletions rsocket/cloudevents/serialize.py
@@ -0,0 +1,20 @@
from typing import Any

from cloudevents.conversion import to_json, from_json
from cloudevents.pydantic import CloudEvent

from rsocket.payload import Payload


def cloud_event_deserialize(cls, payload: Payload) -> Any:
if cls == CloudEvent:
return from_json(CloudEvent, payload.data)

return payload


def cloud_event_serialize(cls, value: Any) -> Payload:
if cls == CloudEvent:
return Payload(to_json(value))

return value
28 changes: 22 additions & 6 deletions rsocket/routing/request_router.py
@@ -1,3 +1,4 @@
from asyncio import Future
from dataclasses import dataclass
from inspect import signature
from typing import Callable, Any, Dict
Expand All @@ -6,6 +7,7 @@
from rsocket.extensions.composite_metadata import CompositeMetadata
from rsocket.frame import FrameType
from rsocket.frame_helpers import safe_len
from rsocket.helpers import create_future
from rsocket.payload import Payload
from rsocket.rsocket import RSocket

Expand Down Expand Up @@ -58,12 +60,16 @@ class RequestRouter:
'_fnf_routes',
'_metadata_push',
'_route_map_by_frame_type',
'_payload_mapper',
'_payload_deserializer',
'_payload_serializer',
'_unknown'
)

def __init__(self, payload_mapper=lambda cls, _: _):
self._payload_mapper = payload_mapper
def __init__(self,
payload_deserializer=lambda cls, _: _,
payload_serializer=lambda cls, _: _):
self._payload_serializer = payload_serializer
self._payload_deserializer = payload_deserializer
self._channel_routes: Dict[str, RouteInfo] = {}
self._stream_routes: Dict[str, RouteInfo] = {}
self._response_routes: Dict[str, RouteInfo] = {}
Expand Down Expand Up @@ -148,7 +154,15 @@ async def route(self,
payload,
composite_metadata)

return await route_info.method(**route_kwargs)
result = await route_info.method(**route_kwargs)

if frame_type == FrameType.REQUEST_RESPONSE and not isinstance(result, Future):
if not isinstance(result, Payload):
result = self._payload_serializer(route_info.signature.return_annotation, result)

return create_future(result)

return result

def _collect_route_arguments(self,
route_info: RouteInfo,
Expand All @@ -163,10 +177,12 @@ def _collect_route_arguments(self,
if 'composite_metadata' == parameter or parameter_type is CompositeMetadata:
route_kwargs['composite_metadata'] = composite_metadata
else:
payload_data = payload

if parameter_type.annotation not in (Payload, parameter_type.empty):
payload = self._payload_mapper(parameter_type.annotation, payload)
payload_data = self._payload_deserializer(parameter_type.annotation, payload)

route_kwargs[parameter] = payload
route_kwargs[parameter] = payload_data

return route_kwargs

Expand Down
Empty file.
41 changes: 41 additions & 0 deletions tests/rsocket/cloudevents/test_route_cloud_events.py
@@ -0,0 +1,41 @@
import json

from cloudevents.conversion import to_json, from_json
from cloudevents.pydantic import CloudEvent

from rsocket.cloudevents.serialize import cloud_event_deserialize, cloud_event_serialize
from rsocket.extensions.helpers import composite, route
from rsocket.extensions.mimetypes import WellKnownMimeTypes
from rsocket.payload import Payload
from rsocket.routing.request_router import RequestRouter
from rsocket.routing.routing_request_handler import RoutingRequestHandler


async def test_routed_cloudevents(lazy_pipe):
router = RequestRouter(cloud_event_deserialize,
cloud_event_serialize)

def handler_factory():
return RoutingRequestHandler(router)

@router.response('cloud_event')
async def response_request(value: CloudEvent) -> CloudEvent:
return CloudEvent.create(attributes={
'type': 'io.spring.event.Foo',
'source': 'https://spring.io/foos'
}, data=json.dumps(json.loads(value.data)))

async with lazy_pipe(
client_arguments={'metadata_encoding': WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA},
server_arguments={'handler_factory': handler_factory}) as (server, client):
event = CloudEvent.create(attributes={
'type': 'io.spring.event.Foo',
'source': 'https://spring.io/foos'
}, data=json.dumps({'value': 'Dave'}))

response = await client.request_response(Payload(data=to_json(event), metadata=composite(route('cloud_event'))))

response_event = from_json(CloudEvent, response.data)
response_data = json.loads(response_event.data)

assert response_data['value'] == 'Dave'

0 comments on commit 42d30d8

Please sign in to comment.