Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5946][Streaming] Add Python API for direct Kafka stream #4723

Closed
wants to merge 20 commits into from

Conversation

jerryshao
Copy link
Contributor

Currently only added createDirectStream API, I'm not sure if createRDD is also needed, since some Java object needs to be wrapped in Python. Please help to review, thanks a lot.

@SparkQA
Copy link

SparkQA commented Feb 23, 2015

Test build #27848 has finished for PR 4723 at commit f80a6be.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Feb 23, 2015

@tdas What's the difference between createStream() and createDirectStream() ? It's possible to combine them together?

@tdas
Copy link
Contributor

tdas commented Feb 23, 2015

@davies It is a the new experimental API for Kafka, which does not use Receivers at all, and rather treats Kafka as a file system and topics as files. It finds new data in topic pretty much like your patch to support appends in file stream.

@davies
Copy link
Contributor

davies commented Feb 23, 2015

@tdas Cool, thanks. Do it mean that it does NOT need WAL to have high durability?

This patch looks good to me, just need more docs (somewhere) to tell the difference between these two APIs.

ssc = StreamingContext(sc, 2)

brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If broker is the MUST-HAVE argument, I'd like to put it into the arguments list (before topics).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @davies , thanks for your comment, I will add this as a argument.

@SparkQA
Copy link

SparkQA commented Feb 23, 2015

Test build #27849 has finished for PR 4723 at commit adbc332.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jerryshao
Copy link
Contributor Author

Hi @tdas , do we need to add a Python version of createRDD for direct Kafka stream? Seems this API requires Python wrapper of Java object like OffsetRange.

@SparkQA
Copy link

SparkQA commented Feb 23, 2015

Test build #27855 has finished for PR 4723 at commit 5381db1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Feb 24, 2015

Yes, python version of createRDD would be great.

BTW, is it possible to mark these experimental in Python @davies? The
Scala, Java AP is experimental as of now.

TD

On Mon, Feb 23, 2015 at 11:16 AM, UCB AMPLab notifications@github.com
wrote:

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27855/
Test PASSed.


Reply to this email directly or view it on GitHub
#4723 (comment).

@davies
Copy link
Contributor

davies commented Feb 24, 2015

We can mark it as experimental by

  ::note: experimental

@jerryshao
Copy link
Contributor Author

Hi @davies , should I use ::note: experimental or .. note:: Experimental, seems the MLlib codes just use the latter #3951 .

@davies
Copy link
Contributor

davies commented Feb 24, 2015

@jerryshao please follow the examples in MLlib, thanks.

@jerryshao
Copy link
Contributor Author

Add a new API of createRDD, @davies & @tdas , please help to review. Thanks a lot.

@SparkQA
Copy link

SparkQA commented Feb 24, 2015

Test build #27918 has finished for PR 4723 at commit cd56575.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class OffsetRange(object):

self._untilOffset = untilOffset

def _joffsetRange(self, sc):
java_import(sc._jvm, "org.apache.spark.streaming.kafka.OffsetRange")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's imported by createRDD(), no more needed here.

@SparkQA
Copy link

SparkQA commented Feb 25, 2015

Test build #27919 has finished for PR 4723 at commit 90ed034.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class OffsetRange(object):

@jerryshao
Copy link
Contributor Author

Thanks @davies for your review, I will fix these comments :).

@SparkQA
Copy link

SparkQA commented Feb 25, 2015

Test build #27939 has finished for PR 4723 at commit 377b73f.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class OffsetRange(object):

@SparkQA
Copy link

SparkQA commented Feb 25, 2015

Test build #27941 has finished for PR 4723 at commit d68aad2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class OffsetRange(object):

@jerryshao
Copy link
Contributor Author

Hi @davies , mind taking a look at this again, I've addressed your comments, though some duplications are hard to remove, any suggestions?

@davies
Copy link
Contributor

davies commented Feb 25, 2015

LGTM, thanks!

@SparkQA
Copy link

SparkQA commented Feb 28, 2015

Test build #28111 has finished for PR 4723 at commit 1b6e873.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class OffsetRange(object):

@SparkQA
Copy link

SparkQA commented Mar 4, 2015

Test build #28254 has finished for PR 4723 at commit c301951.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class OffsetRange(object):

return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))

@staticmethod
def createDirectStream(ssc, brokerList, topics, kafkaParams={},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also allow creating direct stream with offsets and specify optional leaders. That is, all the stuff the advanced version of createDirectStream supports. The only thing that we cannot easily support in python, is the custom function MessagAndMetadata => R. Other than that we should be able to do others.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @tdas , these days I'm trying to enable custom function MessageAndMetadata => R in Python related API, I just tried several solutions but unfortunately no success, do you have any suggestion? Can we just ignore this parameter for Python related APIs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is quite tricky. And its not even clear to me what the demand will be for this from the Python API. What I wanted to have in the Python aPI is the ability to specify starting offsets. I would say lets not block on handling this, and just find a way to specify offset; no need to for messageAndMetadata handler Python function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently I have a solution, serialize the content of MessageAndMetadata into byte array in Scala code, and unpack, reconstruct the python object of MessageAndMetadata in python code. So the handler can be supported. What do you think of this solution?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets leave that for another PR. That sounds complicated, and needs careful
thought.

On Mon, Apr 13, 2015 at 10:32 PM, Saisai Shao notifications@github.com
wrote:

In python/pyspark/streaming/kafka.py
#4723 (comment):

@@ -70,7 +71,103 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
except Py4JJavaError, e:
# TODO: use --jar once it also work on driver
if 'ClassNotFoundException' in str(e.java_exception):

  •            print """
    
  •            KafkaUtils._printErrorMsg(ssc.sparkContext)
    
  •        raise e
    
  •    ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
    
  •    stream = DStream(jstream, ssc, ser)
    
  •    return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))
    
  • @staticmethod
  • def createDirectStream(ssc, brokerList, topics, kafkaParams={},

Currently I have a solution, serialize the content of MessageAndMetadata
into byte array in Scala code, and unpack, reconstruct the python object of
MessageAndMetadata in python code. So the handler can be supported. What
do you think of this solution?


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/4723/files#r28301541.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will leave this to another PR.

Basically this solution is not so complicated, just write (topic, partition, key, message, offset) with a controlled format into a byte array. And the python code unpack this byte array with same format into the original data and construct a object used for message handler function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree as such it is not a complicated thing, just that this way of doing
things is non-standard practice (unless you show me some examples of this
being done in other python kafka libraries) and therefore should be added
only if there is a good use case / demand for it.

On Mon, Apr 13, 2015 at 10:48 PM, Saisai Shao notifications@github.com
wrote:

In python/pyspark/streaming/kafka.py
#4723 (comment):

@@ -70,7 +71,103 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
except Py4JJavaError, e:
# TODO: use --jar once it also work on driver
if 'ClassNotFoundException' in str(e.java_exception):

  •            print """
    
  •            KafkaUtils._printErrorMsg(ssc.sparkContext)
    
  •        raise e
    
  •    ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
    
  •    stream = DStream(jstream, ssc, ser)
    
  •    return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))
    
  • @staticmethod
  • def createDirectStream(ssc, brokerList, topics, kafkaParams={},

OK, I will leave this to another PR.

Basically this solution is not so complicated, just write (topic,
partition, key, message, offset) with a controlled format into a byte
array. And the python code unpack this byte array with same format into the
original data and construct a object used for message handler function.


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/4723/files#r28301930.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, that make sense, I will ignore this handler support in Python API unless someone has specific requirement for this.

@tdas
Copy link
Contributor

tdas commented Mar 6, 2015

This is looking good, but unfortunately its hard to say due to the lack of unit tests. We have to test the kafka python API through python unit tests. I can open a separate JIRA for that, and it will be good to have that framework set up first.

@jerryshao
Copy link
Contributor Author

Thanks @tdas for your review, maybe we should figure out a way to test the Kafka Python API at first.

@jerryshao
Copy link
Contributor Author

Hi @davies and @tdas , I met a problem of converting Python int into Java Long, the Java API in KafkaUtils requires offset as Long type, this is simple for Python 2, since Python 2 has a built-in long type which can be mapped to Java Long through py4j automatically, but python 3 only has int type, and py4j will map python int into Java Integer, I'm not sure how to support Long in python 3.

A simple solution is to modify all the Java-Python interface to change to type Interger, but it may not support super large offset. I'm not sure is there any other solution. Sorry for dumb question and thanks a lot in advance.

@tdas
Copy link
Contributor

tdas commented Apr 28, 2015

@JoshRosen Any thoughts on this.

@tdas
Copy link
Contributor

tdas commented Apr 28, 2015

Offline conversation with @JoshRosen
If it works for python 2, then make the corresponding unit test such that it will be skipped for python 3. Take a look at https://github.com/apache/spark/blob/master/python/pyspark/tests.py#L944

@jerryshao
Copy link
Contributor Author

Thanks a lot @tdas for your comments, but what if users want to use this API in Python 3, they will still meet such problem, we only neglect the problem by skipping unit test, but not actually solve this.

@tdas
Copy link
Contributor

tdas commented Apr 28, 2015

Aaah, I forgot to mention. Please open up a JIRA regarding this pySpark
problem. Its most likely a Py4J problem, and hence needs to be followed up
with Py4J. And the python 3 is still an experimental so its okay for now.
There are other unit tests in Spark that are skipped for Python 3.

On Mon, Apr 27, 2015 at 7:29 PM, Saisai Shao notifications@github.com
wrote:

Thanks a lot @tdas https://github.com/tdas for your comments, but what
if users want to use this API in Python 3, they will still meet such
problem, we only neglect the problem by skipping unit test, but not
actually solve this.


Reply to this email directly or view it on GitHub
#4723 (comment).

@jerryshao
Copy link
Contributor Author

OK, thanks a lot for your suggestion, I will open a JIRA about this issue.

@tdas
Copy link
Contributor

tdas commented Apr 28, 2015

LGTM. Will merge when tests pass.

@SparkQA
Copy link

SparkQA commented Apr 28, 2015

Test build #31115 has finished for PR 4723 at commit a1fe97c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class OffsetRange(object):
    • class TopicAndPartition(object):
    • class Broker(object):
  • This patch does not change any dependencies.

@tdas
Copy link
Contributor

tdas commented Apr 28, 2015

Alright! Merging it. Thanks! :)

@asfgit asfgit closed this in 9e4e82b Apr 28, 2015
@Arttii
Copy link

Arttii commented Apr 29, 2015

Hi

I seem to be having a problem with this. The default py4j dict and set conversion does not seem to be working on my machine(Windows 8.1 Anaconda Python 2.7 64 Bit Java 1.7). I had to hack kafka.py a bit and include the old conversion code in there for this to work. Any idea what the problem might be?

@jerryshao
Copy link
Contributor Author

Hi @Arttii , maybe it is the problem of your local environment, my machine is Ubuntu 14.04 with Python 2.7.6, it looks OK on my side, also there's no problem in Jenkins environment, where Python version is 2.6.

@jerryshao
Copy link
Contributor Author

Would you please paste your stack trace?

@Arttii
Copy link

Arttii commented Apr 29, 2015

This is the normal WordCount example running with local[*]

15/04/29 10:53:48 INFO SparkContext: Running Spark version 1.3.1
15/04/29 10:53:49 INFO SecurityManager: Changing view acls to: a.topchyan
15/04/29 10:53:49 INFO SecurityManager: Changing modify acls to: a.topchyan
15/04/29 10:53:49 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(a.topchyan); users with mod
ify permissions: Set(a.topchyan)
15/04/29 10:53:49 INFO Slf4jLogger: Slf4jLogger started
15/04/29 10:53:49 INFO Remoting: Starting remoting
15/04/29 10:53:49 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@DELLHR57L12.replynet.prv:52706]
15/04/29 10:53:49 INFO Utils: Successfully started service 'sparkDriver' on port 52706.
15/04/29 10:53:49 INFO SparkEnv: Registering MapOutputTracker
15/04/29 10:53:49 INFO SparkEnv: Registering BlockManagerMaster
15/04/29 10:53:49 INFO DiskBlockManager: Created local directory at C:\Users\ATOPCH~1.REP\AppData\Local\Temp\spark-af5177e7-d148-4013-a097-113c8f92ed63\blockmgr
-c45e19f6-a825-4763-a94e-eea19dbb1f26
15/04/29 10:53:49 INFO MemoryStore: MemoryStore started with capacity 265.4 MB
15/04/29 10:53:49 INFO HttpFileServer: HTTP File server directory is C:\Users\ATOPCH~1.REP\AppData\Local\Temp\spark-8474fc1d-7c66-4b25-b304-2fd743061967\httpd-e
ba80b64-1a5a-49c7-9bd2-6f7a4be543d7
15/04/29 10:53:49 INFO HttpServer: Starting HTTP Server
15/04/29 10:53:49 INFO Server: jetty-8.y.z-SNAPSHOT
15/04/29 10:53:49 INFO AbstractConnector: Started SocketConnector@0.0.0.0:52707
15/04/29 10:53:49 INFO Utils: Successfully started service 'HTTP file server' on port 52707.
15/04/29 10:53:49 INFO SparkEnv: Registering OutputCommitCoordinator
15/04/29 10:53:50 INFO Server: jetty-8.y.z-SNAPSHOT
15/04/29 10:53:50 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
15/04/29 10:53:50 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/04/29 10:53:50 INFO SparkUI: Started SparkUI at http://DELLHR57L12.replynet.prv:4040
15/04/29 10:53:50 INFO Executor: Starting executor ID <driver> on host localhost
15/04/29 10:53:50 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@DELLHR57L12.replynet.prv:52706/user/HeartbeatReceiver
15/04/29 10:53:50 INFO NettyBlockTransferService: Server created on 52745
15/04/29 10:53:50 INFO BlockManagerMaster: Trying to register BlockManager
15/04/29 10:53:50 INFO BlockManagerMasterActor: Registering block manager localhost:52745 with 265.4 MB RAM, BlockManagerId(<driver>, localhost, 52745)
15/04/29 10:53:50 INFO BlockManagerMaster: Registered BlockManager
Traceback (most recent call last):
  File "D:\workingdir\app\driver.py", line 33, in <module>
    kvs = KafkaUtils.createDirectStream(ssc, ["testopic"], {"metadata.broker.list": "obiwan:9092,r2d2:9092,vader:9092"})
  File "C:/spark\python\pyspark\streaming\kafka.py", line 126, in createDirectStream
    jstream = helper.createDirectStream(ssc._jssc, kafkaParams, jtopics, jfromOffsets)
  File "C:\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py", line 529, in __call__
    [get_command_part(arg, self.pool) for arg in new_args])
  File "C:\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py", line 265, in get_command_part
    command_part = REFERENCE_TYPE + parameter._get_object_id()
AttributeError: 'dict' object has no attribute '_get_object_id'
Press any key to continue . . .

If I pass in

    jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client)

seems to work fine.

@Arttii
Copy link

Arttii commented Apr 29, 2015

I dont think this is a bug, but rather a problem on my side. I am just wondering what parts can impact this. There's nothing in KafkaUtils or the PythonHelper class that handles any of the conversion logic( I think), so this is a py4j thing. Well I think so at least

@jerryshao
Copy link
Contributor Author

So can you try receiver-based Kafka stream using createStream? If this is a problem of py4j, I assume this will also be failed. Originally we actually use MapConverter to convert python dict into Java Map, but now seems this conversion is done implicitly.

@Arttii
Copy link

Arttii commented Apr 29, 2015

I did try it. Also fails. Py4J docs say that dict and set are converted to HashMap and HasSet by default. This does not seem to be happening, which is really weird. So I just put in the direct conversion and everything works. I might try py4j forums or something. This is fairly weird.

@jerryshao
Copy link
Contributor Author

Might be the problem of py4j, I'm not the expert of py4j, what is your insight @davies ?

Maybe you could report a bug in JIRA, so others can take a crack of this problem.

@tdas
Copy link
Contributor

tdas commented Apr 29, 2015

I believe @davies will be slow in responding as he is on a vacation.
Pinging @JoshRosen instead.

On Wed, Apr 29, 2015 at 2:54 AM, Saisai Shao notifications@github.com
wrote:

Might be the problem of py4j, I'm not the expert of py4j, what is your
insight @davies https://github.com/davies ?

Maybe you could report a bug in JIRA, so others can take a crack of this
problem.


Reply to this email directly or view it on GitHub
#4723 (comment).

jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request May 14, 2015
Currently only added `createDirectStream` API, I'm not sure if `createRDD` is also needed, since some Java object needs to be wrapped in Python. Please help to review, thanks a lot.

Author: jerryshao <saisai.shao@intel.com>
Author: Saisai Shao <saisai.shao@intel.com>

Closes apache#4723 from jerryshao/direct-kafka-python-api and squashes the following commits:

a1fe97c [jerryshao] Fix rebase issue
eebf333 [jerryshao] Address the comments
da40f4e [jerryshao] Fix Python 2.6 Syntax error issue
5c0ee85 [jerryshao] Style fix
4aeac18 [jerryshao] Fix bug in example code
7146d86 [jerryshao] Add unit test
bf3bdd6 [jerryshao] Add more APIs and address the comments
f5b3801 [jerryshao] Small style fix
8641835 [Saisai Shao] Rebase and update the code
589c05b [Saisai Shao] Fix the style
d6fcb6a [Saisai Shao] Address the comments
dfda902 [Saisai Shao] Style fix
0f7d168 [Saisai Shao] Add the doc and fix some style issues
67e6880 [Saisai Shao] Fix test bug
917b0db [Saisai Shao] Add Python createRDD API for Kakfa direct stream
c3fc11d [jerryshao] Modify the docs
2c00936 [Saisai Shao] address the comments
3360f44 [jerryshao] Fix code style
e0e0f0d [jerryshao] Code clean and bug fix
338c41f [Saisai Shao] Add python API and example for direct kafka stream
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
Currently only added `createDirectStream` API, I'm not sure if `createRDD` is also needed, since some Java object needs to be wrapped in Python. Please help to review, thanks a lot.

Author: jerryshao <saisai.shao@intel.com>
Author: Saisai Shao <saisai.shao@intel.com>

Closes apache#4723 from jerryshao/direct-kafka-python-api and squashes the following commits:

a1fe97c [jerryshao] Fix rebase issue
eebf333 [jerryshao] Address the comments
da40f4e [jerryshao] Fix Python 2.6 Syntax error issue
5c0ee85 [jerryshao] Style fix
4aeac18 [jerryshao] Fix bug in example code
7146d86 [jerryshao] Add unit test
bf3bdd6 [jerryshao] Add more APIs and address the comments
f5b3801 [jerryshao] Small style fix
8641835 [Saisai Shao] Rebase and update the code
589c05b [Saisai Shao] Fix the style
d6fcb6a [Saisai Shao] Address the comments
dfda902 [Saisai Shao] Style fix
0f7d168 [Saisai Shao] Add the doc and fix some style issues
67e6880 [Saisai Shao] Fix test bug
917b0db [Saisai Shao] Add Python createRDD API for Kakfa direct stream
c3fc11d [jerryshao] Modify the docs
2c00936 [Saisai Shao] address the comments
3360f44 [jerryshao] Fix code style
e0e0f0d [jerryshao] Code clean and bug fix
338c41f [Saisai Shao] Add python API and example for direct kafka stream
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants