Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

第三章第7小节管道实现进程通信部分 #90

Open
Bestporter opened this issue Feb 26, 2022 · 0 comments
Open

第三章第7小节管道实现进程通信部分 #90

Bestporter opened this issue Feb 26, 2022 · 0 comments
Labels

Comments

@Bestporter
Copy link
Contributor

函数create_items、multiply_items部分无注释,看了好久才明白。
第一个费解的地方pipe_1 = multiprocessing.Pipe(True),不知道这个Pipe具体干了什么的,查看源码后发现就是弄了两个队列,然后建立连接,至于至于send,就是使用的Queue的put函数,recv就是使用的Queue的get函数,用两个队列实现双工。

# 源代码位于multiprocessing/dummy/connection.py
def Pipe(duplex=True):
    a, b = Queue(), Queue()
    return Connection(a, b), Connection(b, a)

class Connection(object):

    def __init__(self, _in, _out):
        self._out = _out
        self._in = _in
        self.send = self.send_bytes = _out.put
        self.recv = self.recv_bytes = _in.get

    def poll(self, timeout=0.0):
        if self._in.qsize() > 0:
            return True
        if timeout <= 0.0:
            return False
        with self._in.not_empty:
            self._in.not_empty.wait(timeout)
        return self._in.qsize() > 0

    def close(self):
        pass

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.close()

自己写了一下文中的实现,p1_a与p1_b是一组能够通信的,p2_a与p2_b是一组互相能通信的

import multiprocessing

def create_items(pipe):
    p1_a, p1_b = pipe
    for item in range(10):
        p1_a.send(item)
    p1_a.close()

def multiply_items(pipe_1, pipe_2):
    p1_a, p1_b = pipe_1
    p1_a.close()
    p2_a, p2_b = pipe_2
    try:
        while True:
            item = p1_b.recv()
            p2_a.send(item * item)
    except EOFError:
        p2_a.close()

if __name__== '__main__':
    # 第一个进程管道发出数字
    pipe_1 = multiprocessing.Pipe(True)
    process_pipe_1 = multiprocessing.Process(target=create_items, args=(pipe_1,))
    process_pipe_1.start()
    # 第二个进程管道接收数字并计算
    pipe_2 = multiprocessing.Pipe(True)
    process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2,))
    process_pipe_2.start()
    pipe_1[0].close()
    pipe_2[0].close()
    try:
        while True:
            print(pipe_2[1].recv())
    except EOFError:
        print("End")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants