Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resume support #87

Open
whyoleg opened this issue Aug 19, 2020 · 2 comments
Open

Resume support #87

whyoleg opened this issue Aug 19, 2020 · 2 comments

Comments

@whyoleg
Copy link
Member

whyoleg commented Aug 19, 2020

No description provided.

@whyoleg
Copy link
Member Author

whyoleg commented Oct 18, 2020

Postponed until rsocket protocol resume changes

@olme04
Copy link
Contributor

olme04 commented Aug 7, 2021

POC Public API to support resume 2.0:

public interface ResumeResolver : Closeable {
    public suspend fun shouldResumeRequestResponse(payload: Payload): Boolean = false
    public suspend fun shouldResumeRequestStream(payload: Payload): Boolean = false
    public suspend fun shouldResumeRequestChannel(payload: Payload): Boolean = false
    override fun close(): Unit = Unit
}

public interface ResumeFrameStorage : Closeable {
    public fun state(): ResumeState                                // on keep alive send
    public fun onFrameSend(streamId: Int, frame: ByteReadPacket)   // on send to connection
    public fun onFrameReceive(streamId: Int)                       // on receive from connection
    public fun resumeFrames(streamId: Int): List<ByteReadPacket>   // on reconnect
    public fun releaseFrames(streamId: Int, impliedPosition: Long) // on keep alive receive
    public fun remove(streamId: Int)                               // on full remove
}

public class ResumeStrategy(
    public val resumeResolver: ResumeResolver,
    public val resumeFrameStorage: ResumeFrameStorage
) : Closeable

Server side API:

RSocketServer {
  resume {
    strategy { config: ConnectionConfig ->
      //example of resume strategy configuration, that shows, how we can decide on resumability of requests based on
      // both request payload (which contains f.e. route), specific storage capacity or any other parameter
      val storage = InMemoryResumeFrameStorage("SERVER")
      //some check based on setup payload
      val resumeStream = config.setupPayload.data.readText().contains("RESUME:STREAM") 
      ResumeStrategy(
        resumeFrameStorage = storage,
        resumeResolver = object: ResumeResolver {
          override suspend fun shouldResumeRequestStream(payload: Payload): Boolean {
            val route: String? = payload.metadata?.read(RoutintMetadata)?.tags?.firstOrNull()
            return resumeStream && route == "resumable_route" && storage.hasEnoughSpaceForResumingStream()
          }
        }
      )
    }
  }
}

Connector side API change:

RSocketConnector {
  reconnectable(10) //reconnect config for resume
  resume {
    token { 
      buildPacket { 
        writeText(generateStringToken()) //some token
      } 
    }
    //resumeResolver omitted, so client side Responder will not resume any stream by default
    strategy { config: ConnectionConfig ->
      ResumeStrategy(
        resumeFrameStorage = InMemoryResumeFrameStorage("CLIENT")
      )
    }
  }
}

@whyoleg whyoleg added this to the 0.16.0 milestone Oct 5, 2022
@whyoleg whyoleg modified the milestone: 0.17.0 - Lease and Resume support, configuration API rework Nov 24, 2022
@whyoleg whyoleg removed this from the 0.17.0 - Lease and Resume support, configuration API rework milestone May 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants