analitics

Pages

Showing posts with label multiprocessing. Show all posts
Showing posts with label multiprocessing. Show all posts

Sunday, March 31, 2024

Python 3.12.1 : About multiprocessing performance.

Today I test a simple python script for Pool and ThreadPool python classes from multiprocessing python module.
The main goal was to test Python’s multiprocessing performance with my computer.
NumPy releases the GIL for many of its operations, which means you can use multiple CPU cores even with threads.
Processing large amounts of data with Pandas can be difficult, and with Polars dataframe library is a potential solution.
Sciagraph gives you both performance profiling and peak memory profiling information.
Let's teste only these class:
The multiprocessing.pool.Pool class provides a process pool in Python.
The multiprocessing.pool.ThreadPool class in Python provides a pool of reusable threads for executing spontaneous tasks.
This is the python script:
from time import time
import multiprocessing as mp
from multiprocessing.pool import ThreadPool
import numpy as np
import pickle

def main():
    arr = np.ones((1024, 1024, 1024), dtype=np.uint8)
    expected_sum = np.sum(arr)

    with ThreadPool(1) as threadpool:
        start = time()
        assert (
            threadpool.apply(np.sum, (arr,)) == expected_sum
        )
        print("Thread pool:", time() - start)

    with mp.get_context("spawn").Pool(1) as process_pool:
        start = time()
        assert (
            process_pool.apply(np.sum, (arr,))
            == expected_sum
        )
        print("Process pool:", time() - start)

if __name__ == "__main__":
    main()
This is the result:
python thread_process_pool_001.py
Thread pool: 1.6689703464508057
Process pool: 11.644825458526611

Tuesday, February 25, 2020

Python 3.7.6 : The new concepts of execution in python 3 - part 001.

The main goal of these tutorials series is learning to deal with python source code using the new concepts of execution in python 3.
When two or more events are concurrent it means that they are happening at the same time.
Concurrent programming is not equivalent to parallel execution.
In computing, concurrency is the execution of pieces of work or tasks by a computer at the same time.
Concurrency is a property which more than one operation can be run simultaneously.
When multiple computations or operations are carried out at the same time or in parallel with the goal of speeding up the computation process then this process is named parallelism.
Parallelism is a property which operations are actually being run simultaneously using the multiprocessing.
Multiprocessing, on the other hand, involves utilizing two or more processor units on a computer to achieve parallelism.
Multithreading is a property that refers to the ability of a CPU to execute multiple threads concurrently.
Python’s concurrency methods including threading, multiprocessing, and asyncio.
The difference between the threading and multiprocessing is this: the threading module uses threads, the multiprocessing module uses processes.
The threading is the package that provides API to create and manage threads.
With multiprocessing, Python creates new processes using an API similar to the threading module.
The asyncio is a library to write concurrent code using the async/await syntax.
The keyword async indicates that our function is a coroutine meaning they choose when to pause and let others execute and run coroutines multitask cooperatively.
The three fundamental advantages of async and await over threads are:
  • cooperative multitasking - you can reasonably have millions of concurrent tasks;
  • using await makes visible where the schedule points;
  • if a task doesn’t yield then it can accidentally block all other tasks from running;
  • tasks can support cancellation.
The next source of code show us how can deal with the execution code in Python 3 using the threading and multiprocessing python packages.
The timeit python package is used to benchmark the code write in the code_to_test variable:
Let's test the multi-threading execution with python:
[mythcat@desk ~]$ python3 
Python 3.7.6 (default, Jan 30 2020, 09:44:41) 
[GCC 9.2.1 20190827 (Red Hat 9.2.1-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import timeit
>>> code_to_test = """
... import threading
... 
... text = "Hello World"
... 
... def print_text(text):
...     for char in text:
...             print (char)
... 
... # multi-threading execution
... def multi_threads():
...     thread_1 = threading.Thread(target=print_text(text))
...     thread_2 = threading.Thread(target=print_text(text))
...     thread_1.start()
...     thread_2.start()
...     thread_1.join()
...     thread_2.join()
... multi_threads()
... """
>>> 
>>> elapsed_time = timeit.timeit(code_to_test, number=1)
H
e
l
l
o
 
W
o
r
l
d
H
e
l
l
o
 
W
o
r
l
d
>>> print(elapsed_time)
0.010613240000566293
Let's test the serially execution with python:
[mythcat@desk ~]$ python3 
Python 3.7.6 (default, Jan 30 2020, 09:44:41) 
[GCC 9.2.1 20190827 (Red Hat 9.2.1-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import timeit
>>> code_to_test = """
... import threading
... 
... text = "Hello World"
... 
... def print_text(text):
...     for char in text:
...             print (char)
... # serially execution
... def serially():
...     print_text(text)
...     print_text(text)
... serially()
... """
>>> elapsed_time = timeit.timeit(code_to_test, number=1)
H
e
l
l
o
 
W
o
r
l
d
H
e
l
l
o
 
W
o
r
l
d
>>> print(elapsed_time)
0.011771811000471644
Let's test the multiprocessing execution with python:
[mythcat@desk ~]$ python3 
Python 3.7.6 (default, Jan 30 2020, 09:44:41) 
[GCC 9.2.1 20190827 (Red Hat 9.2.1-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import timeit
>>> code_to_test = """
... import multiprocessing
... 
... text = "Hello World"
... 
... def print_text(text):
...     for char in text:
...             print (char)
... 
... # multiprocessing execution
... def multiprocessing_test():
...      process_1 = multiprocessing.Process(target=print_text(text))
...      process_2 = multiprocessing.Process(target=print_text(text))
...      process_1.start()
...      process_2.start()
...      process_1.join()
...      process_2.join()
... multiprocessing_test()
... """
>>> elapsed_time = timeit.timeit(code_to_test, number=1)
H
e
l
l
o
 
W
o
r
l
d
H
e
l
l
o
 
W
o
r
l
d
>>> print(elapsed_time)
0.3649730779998208
Since asyncio is a little complex, I will write about this in the next tutorial.

Wednesday, March 20, 2019

Using multiprocessing - a simple introduction.

The multiprocessing module was added to Python in version 2.6 and can be used with new python versions.
It was originally defined in PEP 371 by Jesse Noller and Richard Oudkerk.
The multiprocessing package also includes some APIs that are not in the threading module at all.
Python introduced the multiprocessing module to let us write parallel code.
You can use three basic classes: Process, Queue, and Lock, which will help you build a parallel program.
For more details, you can check the documentation of this python module.
Today, I will present in this tutorial two examples that highlight how to work with this python module.
The most basic approach is probably to use the Process class.
The Process class is very similar to the threading module’s Thread class.
It is also necessary to use the Manager because is responsible for coordinating shared information.
The source code is simple to understand and is commented on to see the intermediate steps.
Let's start with the first example that uses a function of adding two numbers in two processes.
import multiprocessing

def sum(a,b):
 t=a+b
 print ("sum is:",t)

if __name__ == '__main__':
 # create process processing_001 with target function and args of the target function
 processing_001 = multiprocessing.Process(target=sum, args=(7,6))
 processing_002 = multiprocessing.Process(target=sum, args=(1,1))
 # starting processing_001
 processing_001.start()
 # starting processing_002
 processing_002.start()
 #processes are started
 # stop processing_001 until is complete with join
 processing_001.join()
 # stop processing_002 until is complete with join
 processing_001.join()
 # result is C:\Python364>python.exe multiprocessing_001.py
 #sum is: 13
 #sum is: 2
The second example solves the next problem of the engineer who checks the speed of accomplishment of the tasks performed by two robots.
Three processes are created for each: robots and engineer.
All these processes are completed in the same dictionary resource using the Manager.
For each task performed by the robot (start/join processes), the robot finds a random execution rate (function random.random) that is then added to the result (result) with the robot name and execution speed.
Let's see the source code and the result:
import multiprocessing 
from multiprocessing import Process, Manager
import os
import random

def robot1(result):
 print("robot1 has ID process: {}".format(os.getpid()))
 speed_robot1=random.random()
 result["robot1"] = speed_robot1
 print ("worker speed tasks:",speed_robot1)
 #print (result)
 
def robot2(result):
 print("robot2 has ID process: {}".format(os.getpid()))
 speed_robot2=random.random()
 result["robot2"] = speed_robot2
 print ("worker speed tasks:",speed_robot2)
 #print (result)
 
def engineer(result):
 print("engineer has ID process: {}".format(os.getpid()))
 for key, value in result.items():
  print("Result: {}".format((key, value)))
 # you can add code to sort the values and print 
 # for key, value in sorted(result.items()):
 # print("Result: {}".format((key, value)))
 
if __name__ == '__main__':
 # show the main process
 print("the main ID process: {}". format(os.getpid()))
 # create a share dictionary resouce 
 manager = multiprocessing.Manager()
 result = manager.dict()
 # the process starts for  robot1
 ro1 = multiprocessing.Process(target=robot1, args=(result,))
 ro1.start()
 # the process starts for  robot2
 ro2 = multiprocessing.Process(target=robot2, args=(result,))
 ro2.start()
 # stops the robot1 process
 ro1.join()
 # stops the robot2 process
 ro2.join()
 # create the engineer process
 en = multiprocessing.Process(target=engineer, args=(result,))
 # start the process for the engineer
 en.start()
 # stops the engineer's process
 en.join()
The result is:
C:\Python364>python.exe multiprocessing_002.py
the main ID process: 7816
robot1 has ID process: 6476
worker speed tasks: 0.48279764083908094
robot2 has ID process: 7560
worker speed tasks: 0.44408591959008503
engineer has ID process: 7000
Result: ('robot1', 0.48279764083908094)
Result: ('robot2', 0.44408591959008503)