New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-5946][Streaming] Add Python API for direct Kafka stream #4723
Conversation
Test build #27848 has finished for PR 4723 at commit
|
@tdas What's the difference between |
@davies It is a the new experimental API for Kafka, which does not use Receivers at all, and rather treats Kafka as a file system and topics as files. It finds new data in topic pretty much like your patch to support appends in file stream. |
@tdas Cool, thanks. Do it mean that it does NOT need WAL to have high durability? This patch looks good to me, just need more docs (somewhere) to tell the difference between these two APIs. |
ssc = StreamingContext(sc, 2) | ||
|
||
brokers, topic = sys.argv[1:] | ||
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If broker is the MUST-HAVE argument, I'd like to put it into the arguments list (before topics).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @davies , thanks for your comment, I will add this as a argument.
Test build #27849 has finished for PR 4723 at commit
|
Hi @tdas , do we need to add a Python version of |
Test build #27855 has finished for PR 4723 at commit
|
Yes, python version of createRDD would be great. BTW, is it possible to mark these experimental in Python @davies? The TD On Mon, Feb 23, 2015 at 11:16 AM, UCB AMPLab notifications@github.com
|
We can mark it as experimental by
|
@jerryshao please follow the examples in MLlib, thanks. |
Test build #27918 has finished for PR 4723 at commit
|
self._untilOffset = untilOffset | ||
|
||
def _joffsetRange(self, sc): | ||
java_import(sc._jvm, "org.apache.spark.streaming.kafka.OffsetRange") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's imported by createRDD(), no more needed here.
Test build #27919 has finished for PR 4723 at commit
|
Thanks @davies for your review, I will fix these comments :). |
Test build #27939 has finished for PR 4723 at commit
|
Test build #27941 has finished for PR 4723 at commit
|
Hi @davies , mind taking a look at this again, I've addressed your comments, though some duplications are hard to remove, any suggestions? |
LGTM, thanks! |
d68aad2
to
1b6e873
Compare
Test build #28111 has finished for PR 4723 at commit
|
Test build #28254 has finished for PR 4723 at commit
|
return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) | ||
|
||
@staticmethod | ||
def createDirectStream(ssc, brokerList, topics, kafkaParams={}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also allow creating direct stream with offsets and specify optional leaders. That is, all the stuff the advanced version of createDirectStream supports. The only thing that we cannot easily support in python, is the custom function MessagAndMetadata => R. Other than that we should be able to do others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @tdas , these days I'm trying to enable custom function MessageAndMetadata => R
in Python related API, I just tried several solutions but unfortunately no success, do you have any suggestion? Can we just ignore this parameter for Python related APIs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is quite tricky. And its not even clear to me what the demand will be for this from the Python API. What I wanted to have in the Python aPI is the ability to specify starting offsets. I would say lets not block on handling this, and just find a way to specify offset; no need to for messageAndMetadata handler Python function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently I have a solution, serialize the content of MessageAndMetadata
into byte array in Scala code, and unpack, reconstruct the python object of MessageAndMetadata
in python code. So the handler can be supported. What do you think of this solution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets leave that for another PR. That sounds complicated, and needs careful
thought.
On Mon, Apr 13, 2015 at 10:32 PM, Saisai Shao notifications@github.com
wrote:
In python/pyspark/streaming/kafka.py
#4723 (comment):@@ -70,7 +71,103 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
except Py4JJavaError, e:
# TODO: use --jar once it also work on driver
if 'ClassNotFoundException' in str(e.java_exception):
print """
KafkaUtils._printErrorMsg(ssc.sparkContext)
raise e
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
stream = DStream(jstream, ssc, ser)
return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))
- @staticmethod
- def createDirectStream(ssc, brokerList, topics, kafkaParams={},
Currently I have a solution, serialize the content of MessageAndMetadata
into byte array in Scala code, and unpack, reconstruct the python object of
MessageAndMetadata in python code. So the handler can be supported. What
do you think of this solution?—
Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/4723/files#r28301541.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will leave this to another PR.
Basically this solution is not so complicated, just write (topic, partition, key, message, offset) with a controlled format into a byte array. And the python code unpack this byte array with same format into the original data and construct a object used for message handler function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree as such it is not a complicated thing, just that this way of doing
things is non-standard practice (unless you show me some examples of this
being done in other python kafka libraries) and therefore should be added
only if there is a good use case / demand for it.
On Mon, Apr 13, 2015 at 10:48 PM, Saisai Shao notifications@github.com
wrote:
In python/pyspark/streaming/kafka.py
#4723 (comment):@@ -70,7 +71,103 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
except Py4JJavaError, e:
# TODO: use --jar once it also work on driver
if 'ClassNotFoundException' in str(e.java_exception):
print """
KafkaUtils._printErrorMsg(ssc.sparkContext)
raise e
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
stream = DStream(jstream, ssc, ser)
return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))
- @staticmethod
- def createDirectStream(ssc, brokerList, topics, kafkaParams={},
OK, I will leave this to another PR.
Basically this solution is not so complicated, just write (topic,
partition, key, message, offset) with a controlled format into a byte
array. And the python code unpack this byte array with same format into the
original data and construct a object used for message handler function.—
Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/4723/files#r28301930.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, that make sense, I will ignore this handler support in Python API unless someone has specific requirement for this.
This is looking good, but unfortunately its hard to say due to the lack of unit tests. We have to test the kafka python API through python unit tests. I can open a separate JIRA for that, and it will be good to have that framework set up first. |
Thanks @tdas for your review, maybe we should figure out a way to test the Kafka Python API at first. |
Hi @davies and @tdas , I met a problem of converting Python A simple solution is to modify all the Java-Python interface to change to type |
@JoshRosen Any thoughts on this. |
Offline conversation with @JoshRosen |
Thanks a lot @tdas for your comments, but what if users want to use this API in Python 3, they will still meet such problem, we only neglect the problem by skipping unit test, but not actually solve this. |
Aaah, I forgot to mention. Please open up a JIRA regarding this pySpark On Mon, Apr 27, 2015 at 7:29 PM, Saisai Shao notifications@github.com
|
OK, thanks a lot for your suggestion, I will open a JIRA about this issue. |
defbad7
to
a1fe97c
Compare
LGTM. Will merge when tests pass. |
Test build #31115 has finished for PR 4723 at commit
|
Alright! Merging it. Thanks! :) |
Hi I seem to be having a problem with this. The default py4j dict and set conversion does not seem to be working on my machine(Windows 8.1 Anaconda Python 2.7 64 Bit Java 1.7). I had to hack kafka.py a bit and include the old conversion code in there for this to work. Any idea what the problem might be? |
Hi @Arttii , maybe it is the problem of your local environment, my machine is Ubuntu 14.04 with Python 2.7.6, it looks OK on my side, also there's no problem in Jenkins environment, where Python version is 2.6. |
Would you please paste your stack trace? |
This is the normal WordCount example running with local[*]
If I pass in jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client) seems to work fine. |
I dont think this is a bug, but rather a problem on my side. I am just wondering what parts can impact this. There's nothing in KafkaUtils or the PythonHelper class that handles any of the conversion logic( I think), so this is a py4j thing. Well I think so at least |
So can you try receiver-based Kafka stream using |
I did try it. Also fails. Py4J docs say that dict and set are converted to HashMap and HasSet by default. This does not seem to be happening, which is really weird. So I just put in the direct conversion and everything works. I might try py4j forums or something. This is fairly weird. |
Might be the problem of py4j, I'm not the expert of py4j, what is your insight @davies ? Maybe you could report a bug in JIRA, so others can take a crack of this problem. |
I believe @davies will be slow in responding as he is on a vacation. On Wed, Apr 29, 2015 at 2:54 AM, Saisai Shao notifications@github.com
|
Currently only added `createDirectStream` API, I'm not sure if `createRDD` is also needed, since some Java object needs to be wrapped in Python. Please help to review, thanks a lot. Author: jerryshao <saisai.shao@intel.com> Author: Saisai Shao <saisai.shao@intel.com> Closes apache#4723 from jerryshao/direct-kafka-python-api and squashes the following commits: a1fe97c [jerryshao] Fix rebase issue eebf333 [jerryshao] Address the comments da40f4e [jerryshao] Fix Python 2.6 Syntax error issue 5c0ee85 [jerryshao] Style fix 4aeac18 [jerryshao] Fix bug in example code 7146d86 [jerryshao] Add unit test bf3bdd6 [jerryshao] Add more APIs and address the comments f5b3801 [jerryshao] Small style fix 8641835 [Saisai Shao] Rebase and update the code 589c05b [Saisai Shao] Fix the style d6fcb6a [Saisai Shao] Address the comments dfda902 [Saisai Shao] Style fix 0f7d168 [Saisai Shao] Add the doc and fix some style issues 67e6880 [Saisai Shao] Fix test bug 917b0db [Saisai Shao] Add Python createRDD API for Kakfa direct stream c3fc11d [jerryshao] Modify the docs 2c00936 [Saisai Shao] address the comments 3360f44 [jerryshao] Fix code style e0e0f0d [jerryshao] Code clean and bug fix 338c41f [Saisai Shao] Add python API and example for direct kafka stream
Currently only added `createDirectStream` API, I'm not sure if `createRDD` is also needed, since some Java object needs to be wrapped in Python. Please help to review, thanks a lot. Author: jerryshao <saisai.shao@intel.com> Author: Saisai Shao <saisai.shao@intel.com> Closes apache#4723 from jerryshao/direct-kafka-python-api and squashes the following commits: a1fe97c [jerryshao] Fix rebase issue eebf333 [jerryshao] Address the comments da40f4e [jerryshao] Fix Python 2.6 Syntax error issue 5c0ee85 [jerryshao] Style fix 4aeac18 [jerryshao] Fix bug in example code 7146d86 [jerryshao] Add unit test bf3bdd6 [jerryshao] Add more APIs and address the comments f5b3801 [jerryshao] Small style fix 8641835 [Saisai Shao] Rebase and update the code 589c05b [Saisai Shao] Fix the style d6fcb6a [Saisai Shao] Address the comments dfda902 [Saisai Shao] Style fix 0f7d168 [Saisai Shao] Add the doc and fix some style issues 67e6880 [Saisai Shao] Fix test bug 917b0db [Saisai Shao] Add Python createRDD API for Kakfa direct stream c3fc11d [jerryshao] Modify the docs 2c00936 [Saisai Shao] address the comments 3360f44 [jerryshao] Fix code style e0e0f0d [jerryshao] Code clean and bug fix 338c41f [Saisai Shao] Add python API and example for direct kafka stream
Currently only added
createDirectStream
API, I'm not sure ifcreateRDD
is also needed, since some Java object needs to be wrapped in Python. Please help to review, thanks a lot.