Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous select #416

Open
SanderMertens opened this issue Nov 28, 2015 · 5 comments
Open

Asynchronous select #416

SanderMertens opened this issue Nov 28, 2015 · 5 comments

Comments

@SanderMertens
Copy link
Member

The corto_select call can potentially return data from one or more replicators, which most likely will require accessing remote resources through databases, REST APIs, filesystems, messaging protocols etc. Retrieving these resources takes time and could block an application.

Blocking behavior is generally undesired and thus an asynchronous alternative would be desired. Two complementary approaches have been suggested:

Through a corto_selectAsync call:

corto_selectAsync(
    corto_object scope, 
    corto_string expr, 
    int (*callback)(corto_selectItem *item, void *data), 
    void *data);

// Example callback
int onItem(corto_selectItem *item, void *cldata)
{
    if (item == 0)
    // No more items to deliver, wrap it up.
    else
    // Do stuff with the item.

    return 0;  // don't send any more items
    // or 
    return 1;  // continue to send items
}

The above approach could be extended with a corto_selectDispatch that uses the corto_dispatcher interface to dispatch events to. This would allow users to control in which thread events are handled, which for example could enable custom thread pool implementations.

The other approach would involve a poll method in an event loop:

int corto_poll(corto_iter *handles, int *handle_status, const struct timespec *timeout_ts);

corto_iter h[2] = {CORTO_ITER_INITIALIZER, CORTO_ITER_INITIALIZER};
int estatus[2];
corto_select(root_o, "//", &h[0]);
while(1) {
   int r = corto_poll(h, estatus, 0);
   if (r == -1 || estatus[0] & DATA_COMPLETE)
     break;
   ...
}
corto_iterRelease(&h[0]);

When only a single iterator is required, the above could be simplified to:

corto_iter h;
int estatus;
corto_select(root_o, "//", &h);
while(1) {
   int r = corto_iterPoll(&h, &estatus, 0);
   if (r == -1 || estatus & DATA_COMPLETE)
     break;
   ...
}
corto_iterRelease(&h);
@dkantowitz
Copy link

  1. corto_wait() is a better name than corto_poll().
  2. An advantage of corto_wait() is it offers a lot of different usage styles:
    • blocking by specifying infinite wait
    • polling by specifying zero wait
    • timeouts
  3. While blocking is undesirable "in general", it can be great in specific situations. So, support a blocking and a non-blocking api.
  4. corto_selectAsync() is easy to understand and I expect is fairly straight forward to implement.
  5. Instead of corto_iterPoll(), have corto_wait() take an argument for the length of the handle & status array. This removes an API call:
     corto_iter h;
     int estatus;
     corto_select(root_o, "//", &h);
     corto_wait(1, &h, &estatus, 0);

Overall I see issues:
a. What makes a good user api?
b. How does the interface between corto & a replicator support that user api?

What makes a good user API depends greatly on the usage context. It's not clear to me that one of the 3 choices (original blocking iteration, async callbacks, and event waiting) is always better than the other. So -- eventually -- I'd like to see all three styles supported. However, initially I would only support blocking iteration and async callbacks.

Waiting on event handles (the second code example) really shines when there are a number of different events that corto exposes for a user app. Until then, it is unnecessarily complicated. So wait to implement this call until there are more events.

Now the more complicated bit is designing a replicator API that can easily support all 3 styles.

@SanderMertens
Copy link
Member Author

The unifying element in the three approaches is the iterator. A replicator interface that can provide a corto_iter object could address all three usecases. Since an iterator is essentially just a collection of callbacks, a replicator could implement the next and hasNext callbacks in any which way it wants.

In addition to next and hasNext, a waitNext would be required too. The waitNext would be similar to the hasNext, but will wait until new data becomes available. Any iterator that exposes these three methods could be used by the corto_wait function.

I would like to consider unifying the corto_wait approach with two other mechanisms that are related to event processing, which is the corto_dispatcher and the ability to wait for notifications from one or more objects.

The corto_dispatcher interface enables a user to intercept Corto events, and handle them in a user's own context/thread/process. The interface is quite simple:

interface dispatcher::
    void post(corto/lang/event event)

Instances of a class that implement this interface can be registered with observers, which will then forward all notifications to the post method in the form of an observableEvent event.

Perhaps a dispatcher could be part of a handle that is provided to corto_wait, which if set, will automatically invoke the post method of the dispatcher when the handle has data available.

@dkantowitz
Copy link

This seems a lot like theIObservable<>/IEnumerable<> split in .net [C#]. IEnumerable<> gets an iterator which is used to sequentially pull from the collection. IObservable<> registers an observer which is pushed data from the collection.

So your thinking if a replicator implements both iteration and observation, then a dispatcher can be used to glue that to the user code with any blocking/async behaviour the user wants?

PS. I've copied out the interface definitions form MS docs & .net reference source.

public interface IObservable<out T> 
{ 
   IDisposable Subscribe(IObserver<T> observer);  // Call dispose() on returned object to unsubscribe.
} 
public interface IObserver<in T> 
{ 
   void OnCompleted();            // Notifies the observer that the source has finished sending messages.
   void OnError(Exception error); // Notifies the observer about any exception or error.
   void OnNext(T value);          // Pushes the next data value from the source to the observer.
} 

public interface IEnumerable<out T> 
{
    // Returns an IEnumerator for this enumerable Object.  The enumerator provides
    // a simple way to access all the contents of a collection.
    IEnumerator<T> GetEnumerator();
}

public interface IEnumerator<out T> : IDisposable, IEnumerator
{    
    // Returns the current element of the enumeration. The returned value is
    // undefined before the first call to MoveNext and following a
    // call to MoveNext that returned false. Multiple calls to
    // GetCurrent with no intervening calls to MoveNext 
    // will return the same object.
    T Current {
        get; 
    }

    // Advances the enumerator to the next element of the enumeration and
    // returns a boolean indicating whether an element is available. Upon
    // creation, an enumerator is conceptually positioned before the first
    // element of the enumeration, and the first call to MoveNext 
    // brings the first element of the enumeration into view.
    // 
    bool MoveNext();

    // Resets the enumerator to the beginning of the enumeration, starting over.
    // The preferred behavior for Reset is to return the exact same enumeration.
    // This means if you modify the underlying collection then call Reset, your
    // IEnumerator will be invalid, just as it would have been if you had called
    // MoveNext or Current.
    //
    void Reset();   

     void Dispose();    
}

@SanderMertens
Copy link
Member Author

@dkantowitz Sort of. Let me clarify.

This is what I'm thinking the base replicator class could look like:

void notifyAction(object observable) delegate

class replicator::
    // callbacks that allow pushing notifications to other stores
    onDeclare, onDefine, onUpdate, onDelete: notifyAction

    // selecting objects from a remote store
    iterator{selectItem} select(object scope, string expr)

An iterator would have the following callbacks, which can be implemented by a replicator:

// Obtain next value. Should not block.
void *next();

// Iterate over a static set.
// Blocking undefined. Up to the replicator when/how it requests data from a remote source.
corto_bool hasNext();

// Will setup a push-based connection with remote store. Can block for max. timeout_ts. 
// Will unblock when data is received.
// When 0 is provided as timeout, only cached data may be returned.
corto_bool waitNext(const struct timespec *timeout_ts);

The select next and hasNext would make use of the replicator hasNext. This will potentially cause blocking behavior. The asynchronous select would also use hasNext, and invoke the callback provided to select whenever hasNext unblocks.

The select / wait combi would use the replicator's waitNext.

Dispatchers can be used for either mechanism.

Instead of an asynchronous callback, a user could provide a dispatcher to a select, which will then invoke the dispatcher/poll whenever data becomes available to a select. The dispatcher can then do whatever it wants with the data:

void ThreadPool_post(corto_event e)
{
    // Do something nice with the event
}

ThreadPool myDispatcher = ThreadPoolCreate(4);

corto_select(root_o, "//", myDispatcher);

Similarly, the dispatcher could be used for the wait. It would be nice if each handle that can be waited for can define its own dispatcher:

corto_handle h[3];
int estatus[2];

memset(&h, 0, sizeof(h));

corto_select(root_o, "a//", &h[0].iter);
corto_select(root_o, "b//", &h[1].iter);

h[0].dispatcher = myDispatcher; // Optional

while(1) {
   int r = corto_wait(h, estatus, 0); // When h[0] unblocks, dispatch event
   ...
}

SanderMertens added a commit that referenced this issue Dec 1, 2015
Reworked iterators. Added onRequest callback and request method to
replicators. Added logic to selectScope that sends out requests to
replicators and stores the iterators.
SanderMertens added a commit that referenced this issue Dec 2, 2015
Also switched out delegates for virtual methods.
@SanderMertens
Copy link
Member Author

With the new introduction of corto_selectContentType the above suggestion for corto_selectAsync would not work, since this would give the user no opportunity to set the content type before select starts dispatching results to the callback.

Instead, the following approach could be used, and would also allow for future additions to set more options for select:

corto_select(root_o, "//*", &iter);
corto_selectContentType(&iter, "text/json");
corto_selectAsyncDispatch(&iter, callback, udata);

Similarly, a regular dispatcher could be handled similarly:

corto_select(root_o, "//*", &iter);
corto_selectContentType(&iter, "text/json");
corto_selectDispatch(&iter, myDispatcher);

@SanderMertens SanderMertens removed this from the alpha 0.3.0 milestone Feb 11, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants