# Exercise 7: Message Passing Concurrency

The goal of this exercise is to implement a _port_ using the python ```multiprocessing``` module.

The first part of the exercise present the ```process``` and ```pipe``` object.


In [1]:
from multiprocessing import Process, Pipe
import threading, queue
import os, time, random
import concurrent.futures

## Example of use of processes

A new process can be created by:

```python
p = Process(target=f, args=(arg0, arg1,...))
```

Where `f` is a python function which will be executed in the new process while `arg0`, `arg1`... are arguments given to that function. On a side note, _threads_ can be created with ```Thread``` using a similar API.

Bellow is an example of use:

In [2]:
def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

In [3]:
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
    
    

main line
module name: __main__
parent process: 175424
process id: 187515
function f
module name: __main__
parent process: 187515
process id: 187525
hello bob


# Message passing

Communication between two processes in python can be achieved transparently using `Pipe`, an example is shown bellow:

In [4]:
def f(conn):
    time.sleep(1)
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    # Create the connections
    parent_conn, child_conn = Pipe(duplex = True)
    # Start the process
    p = Process(target=f, args=(child_conn,))
    p.start()
    # Wait for the message
    while not parent_conn.poll(0.1):
        print("Waiting....")
    # Receive the message
    print(parent_conn.recv())
    p.join()

Waiting....
Waiting....
Waiting....
Waiting....
Waiting....
Waiting....
Waiting....
Waiting....
Waiting....
[42, None, 'hello']


*WARNING* the connection objects returned by ```Pipe``` can only be used by a single thread at a given time.

# Queue

A python ```queue.Queue``` implements the datastructure _Queue_ with a thread-safe implementation, so that multiple threads can add element to the queue, and a single thread get elements from the queue.

The following show an example of use of the queue:

In [5]:
# Create a queue
q = queue.Queue()

# Add two elements in the queue
q.put(10)
q.put(12)

# q.get() is blocking if no elements are in the queue, we can use
# q.empty() to check if the queue is empty or not.
while not q.empty():
    print(q.get())

10
12


# Future

A python ```concurrent.futures.Future``` is an object that can be returned from a thread and the result is available later.

In [6]:
def future_test(future_result):
    time.sleep(5)
    future_result.set_result("world")
future_value = concurrent.futures.Future()
t = threading.Thread(target=future_test, args=(future_value,))
t.start()
print("Thread is running...")
print("Waiting for the result of the future: hello {}".format(future_value.result()))

Thread is running...
Waiting for the result of the future: hello world


# Port

The goal of this exercise is to study an implementation of a ```port``` object, which take a function that handle messages. This exercise differs from the lecture since the _port_ is merged with the _server_ function.

The ```server_f``` function is an example of port.

In [7]:
def server_f(msg):
    time.sleep(1)
    return msg['a'] + msg['b']

The following ```port``` presents a skeleton implementation, the server side is implemented for you, you only need to implement the ```__handle_client_messages``` function. 

In [12]:
class port:
    def __init__(self, f):
        """Create a port using ```f``` to handle messages, messages are handled by ```f``` sequentially."""
        client_conn, server_conn = Pipe(duplex = True)
        self.client_conn = client_conn
        self.process = Process(target=self.__handle_server_messages, args=(f, server_conn))
        self.process.start()
        self.message_queue = queue.Queue()
        self.client_thread = threading.Thread(target=self.__handle_client_messages, args=(client_conn, self.message_queue))
        self.client_thread.start()
        self.sent_messages = 0
        
    def __handle_server_messages(self, f, server_conn):
        """Private function used to receive message"""
        while(True):
            msg = server_conn.recv()
            msg["data"] = f(msg["data"])
            server_conn.send(msg)
    def __handle_client_messages(self, client_conn, queue):
        """This function receive the messages from the clients,
        it needs to send requests from the queue to the server and receives message from the server."""
        
        """This function receives message from the queue and from the client_conn
        and run indefenitly in a thread.
        It should contains two parts:
            * check if there are message in the queue, if so send them on the client_conn to the server
              And increment ```self.sent_messages``` after sending the message to the server
            * check if messages are received from the server on the client connection, and if so, send
              the answer to the correct client
        """
        msgs = {}
        while True:
            if client_conn.poll(0.1):
                recv = self.client_conn.recv()
                msgs[recv['id']].set_result(recv['data'])
                del msgs[recv['id']]
            while not queue.empty():
                req = queue.get()
                id = random.randint(0, 1000000)
                msgs[id] = req
                self.sent_messages += 1
                self.client_conn.send({'id': id, 'data': req.msg})

    class __request(concurrent.futures.Future):
        """Represent a request. It is implemented using a future and contains the message from the request."""
        def __init__(self, msg):
            super().__init__()
            self.msg = msg
            
    def send(self, msg):
        """Send msg to the server. And return a future that can be waited for the answer."""
        req = port.__request(msg)
        self.message_queue.put(req)
        return req
        


In [13]:
p = port(server_f)

The following function is a convenience function to call the server:

In [14]:

def call_server(p, a, b):
    v = p.send({'a': a, 'b': b})
    return v


Use the following to test your server with a single call:

In [15]:
previous_sent_messages = p.sent_messages
c = call_server(p, 1, 2)
print("{} == 3".format(c.result()))
print("{} == 1".format(p.sent_messages - previous_sent_messages))

3 == 3
1 == 1


Use the following to test your server with two calls:

In [16]:
previous_sent_messages = p.sent_messages
c1 = call_server(p, 1, 2)
c2 = call_server(p, 3, 4)
print("{} == 3".format(c1.result()))
print("{} == 7".format(c2.result()))
print("{} == 2".format(p.sent_messages - previous_sent_messages))

3 == 3
7 == 7
2 == 2


Final test

In [19]:
previous_sent_messages = p.sent_messages

calls = [ [call_server(p, 1, 2), 3],
          [call_server(p, 3, 4), 7],
          [call_server(p, -1, 5), 4],
          [call_server(p, 2, 7), 9],
          [call_server(p, -1, -4), -5] ]

print("Waiting for results...")

time.sleep(0.1)

assert(p.sent_messages == previous_sent_messages + 5)

for c in calls:
    print("{} == {} ?".format(c[0].result(), c[1]))
    assert(c[0].result() == c[1])


Waiting for results...
3 == 3 ?
7 == 7 ?
4 == 4 ?
9 == 9 ?
-5 == -5 ?


Explain (in detail) how the port class works:

**ANSWER HERE**